You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2014/09/12 02:27:09 UTC

svn commit: r1624429 - /qpid/proton/trunk/proton-c/src/windows/driver.c

Author: cliffjansen
Date: Fri Sep 12 00:27:09 2014
New Revision: 1624429

URL: http://svn.apache.org/r1624429
Log:
QPID-658 Windows driver.c with selectors and selectables

Modified:
    qpid/proton/trunk/proton-c/src/windows/driver.c

Modified: qpid/proton/trunk/proton-c/src/windows/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/windows/driver.c?rev=1624429&r1=1624428&r2=1624429&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/windows/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/windows/driver.c Fri Sep 12 00:27:09 2014
@@ -19,59 +19,39 @@
  *
  */
 
-/*
- * Copy of posix poll-based driver with minimal changes to use
- * select().  TODO: fully native implementaton with I/O completion
- * ports.
- *
- * This implementation comments out the posix max_fds arg to select
- * which has no meaning on windows.  The number of fd_set slots are
- * configured at compile time via FD_SETSIZE, chosen "large enough"
- * for the limited scalability of select() at the expense of
- * 2*N*sizeof(unsigned int) bytes per driver instance.  select (and
- * associated macros like FD_ZERO) are otherwise unaffected
- * performance-wise by increasing FD_SETSIZE.
- */
-
-#define FD_SETSIZE 2048
-#ifndef _WIN32_WINNT
-#define _WIN32_WINNT 0x0501
-#endif
-#if _WIN32_WINNT < 0x0501
-#error "Proton requires Windows API support for XP or later."
-#endif
-#include <winsock2.h>
-#include <Ws2tcpip.h>
-#define PN_WINAPI
-
 #include <assert.h>
 #include <stdio.h>
 #include <ctype.h>
 #include <sys/types.h>
 #include <fcntl.h>
 
-#include "../platform.h"
-#include <proton/io.h>
 #include <proton/driver.h>
 #include <proton/driver_extras.h>
 #include <proton/error.h>
+#include <proton/io.h>
 #include <proton/sasl.h>
 #include <proton/ssl.h>
 #include <proton/util.h>
+#include <proton/object.h>
+#include <proton/selector.h>
+#include <proton/types.h>
+#include "../selectable.h"
 #include "../util.h"
+#include "../platform.h"
 #include "../ssl/ssl-internal.h"
 
-#include <proton/types.h>
-
-/* Posix compatibility helpers */
-
-static int pn_socket_pair(SOCKET sv[2]);
-#define close(sock) closesocket(sock)
-static int pn_i_error_from_errno_wrap(pn_error_t *error, const char *msg) {
-  errno = WSAGetLastError();
-  return pn_i_error_from_errno(error, msg);
-}
-#define pn_i_error_from_errno(e,m) pn_i_error_from_errno_wrap(e,m)
+/*
+ * This driver provides limited thread safety for some operations on pn_connector_t objects.
+ *
+ * These calls are: pn_connector_process(), pn_connector_activate(), pn_connector_activated(),
+ * pn_connector_close(), and others that only touch the connection object, i.e.
+ * pn_connector_context().  These calls provide limited safety in that simultaneous calls are
+ * not allowed to the same pn_connector_t object.
+ *
+ * The application must call pn_driver_wakeup() and resume its wait loop logic if a call to
+ * pn_wait() may have overlapped with any of the above calls that could affect a pn_wait()
+ * outcome.
+ */
 
 /* Decls */
 
@@ -81,82 +61,137 @@ static int pn_i_error_from_errno_wrap(pn
 struct pn_driver_t {
   pn_error_t *error;
   pn_io_t *io;
+  pn_selector_t *selector;
   pn_listener_t *listener_head;
   pn_listener_t *listener_tail;
   pn_listener_t *listener_next;
   pn_connector_t *connector_head;
   pn_connector_t *connector_tail;
-  pn_connector_t *connector_next;
+  pn_listener_t *ready_listener_head;
+  pn_listener_t *ready_listener_tail;
+  pn_connector_t *ready_connector_head;
+  pn_connector_t *ready_connector_tail;
+  pn_selectable_t *ctrl_selectable;
   size_t listener_count;
   size_t connector_count;
-  size_t closed_count;
-  fd_set readfds;
-  fd_set writefds;
-  fd_set exceptfds;
-  // int max_fds;
-  bool overflow;
   pn_socket_t ctrl[2]; //pipe for updating selectable status
-
   pn_trace_t trace;
-  pn_timestamp_t wakeup;
 };
 
+typedef enum {LISTENER, CONNECTOR} sel_type_t;
+
 struct pn_listener_t {
+  sel_type_t type;
   pn_driver_t *driver;
   pn_listener_t *listener_next;
   pn_listener_t *listener_prev;
-  int idx;
+  pn_listener_t *ready_listener_next;
+  pn_listener_t *ready_listener_prev;
+  void *context;
+  pn_selectable_t *selectable;
   bool pending;
-  pn_socket_t fd;
   bool closed;
-  void *context;
 };
 
 #define PN_NAME_MAX (256)
 
 struct pn_connector_t {
+  sel_type_t type;
   pn_driver_t *driver;
   pn_connector_t *connector_next;
   pn_connector_t *connector_prev;
+  pn_connector_t *ready_connector_next;
+  pn_connector_t *ready_connector_prev;
   char name[PN_NAME_MAX];
+  pn_timestamp_t wakeup;
+  pn_timestamp_t posted_wakeup;
+  pn_connection_t *connection;
+  pn_transport_t *transport;
+  pn_sasl_t *sasl;
+  pn_listener_t *listener;
+  void *context;
+  pn_selectable_t *selectable;
   int idx;
+  int status;
+  int posted_status;
+  pn_trace_t trace;
   bool pending_tick;
   bool pending_read;
   bool pending_write;
-  pn_socket_t fd;
-  int status;
-  pn_trace_t trace;
   bool closed;
-  pn_timestamp_t wakeup;
-  pn_connection_t *connection;
-  pn_transport_t *transport;
-  pn_sasl_t *sasl;
   bool input_done;
   bool output_done;
-  pn_listener_t *listener;
-  void *context;
 };
 
+static void get_new_events(pn_driver_t *);
+
 /* Impls */
 
 // listener
 
+static void driver_listener_readable(pn_selectable_t *sel)
+{
+  // do nothing
+}
+
+static void driver_listener_writable(pn_selectable_t *sel)
+{
+  // do nothing
+}
+
+static void driver_listener_expired(pn_selectable_t *sel)
+{
+  // do nothing
+}
+
+static ssize_t driver_listener_capacity(pn_selectable_t *sel)
+{
+  return 1;
+}
+
+static ssize_t driver_listener_pending(pn_selectable_t *sel)
+{
+  return 0;
+}
+
+static pn_timestamp_t driver_listener_deadline(pn_selectable_t *sel)
+{
+  return 0;
+}
+
+static void driver_listener_finalize(pn_selectable_t *sel)
+{
+  // do nothing
+}
+
+
 static void pn_driver_add_listener(pn_driver_t *d, pn_listener_t *l)
 {
   if (!l->driver) return;
   LL_ADD(d, listener, l);
   l->driver = d;
   d->listener_count++;
+  pn_selector_add(d->selector, l->selectable);
+}
+
+static void ready_listener_list_remove(pn_driver_t *d, pn_listener_t *l)
+{
+  LL_REMOVE(d, ready_listener, l);
+  l->ready_listener_next = NULL;
+  l->ready_listener_prev = NULL;
 }
 
 static void pn_driver_remove_listener(pn_driver_t *d, pn_listener_t *l)
 {
   if (!l->driver) return;
 
+  pn_selector_remove(d->selector, l->selectable);
+  if (l == d->ready_listener_head || l->ready_listener_prev)
+    ready_listener_list_remove(d, l);
+
   if (l == d->listener_next) {
     d->listener_next = l->listener_next;
   }
-
   LL_REMOVE(d, listener, l);
   l->driver = NULL;
   d->listener_count--;
@@ -169,7 +204,7 @@ pn_listener_t *pn_listener(pn_driver_t *
 
   pn_socket_t sock = pn_listen(driver->io, host, port);
 
-  if (sock == INVALID_SOCKET) {
+  if (sock == PN_INVALID_SOCKET) {
     return NULL;
   } else {
     pn_listener_t *l = pn_listener_fd(driver, sock, context);
@@ -186,15 +221,24 @@ pn_listener_t *pn_listener_fd(pn_driver_
 
   pn_listener_t *l = (pn_listener_t *) malloc(sizeof(pn_listener_t));
   if (!l) return NULL;
+  l->type = LISTENER;
   l->driver = driver;
   l->listener_next = NULL;
   l->listener_prev = NULL;
-  l->idx = 0;
+  l->ready_listener_next = NULL;
+  l->ready_listener_prev = NULL;
   l->pending = false;
-  l->fd = fd;
   l->closed = false;
   l->context = context;
-
+  l->selectable = pni_selectable(driver_listener_capacity,
+                                 driver_listener_pending,
+                                 driver_listener_deadline,
+                                 driver_listener_readable,
+                                 driver_listener_writable,
+                                 driver_listener_expired,
+                                 driver_listener_finalize);
+  pni_selectable_set_fd(l->selectable, fd);
+  pni_selectable_set_context(l->selectable, l);
   pn_driver_add_listener(driver, l);
   return l;
 }
@@ -202,7 +246,7 @@ pn_listener_t *pn_listener_fd(pn_driver_
 pn_socket_t pn_listener_get_fd(pn_listener_t *listener)
 {
   assert(listener);
-  return listener->fd;
+  return pn_selectable_fd(listener->selectable);
 }
 
 pn_listener_t *pn_listener_head(pn_driver_t *driver)
@@ -234,8 +278,8 @@ pn_connector_t *pn_listener_accept(pn_li
   if (!l || !l->pending) return NULL;
   char name[PN_NAME_MAX];
 
-  pn_socket_t sock = pn_accept(l->driver->io, l->fd, name, PN_NAME_MAX);
-  if (sock == INVALID_SOCKET) {
+  pn_socket_t sock = pn_accept(l->driver->io, pn_selectable_fd(l->selectable), name, PN_NAME_MAX);
+  if (sock == PN_INVALID_SOCKET) {
     return NULL;
   } else {
     if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
@@ -252,8 +296,7 @@ void pn_listener_close(pn_listener_t *l)
   if (!l) return;
   if (l->closed) return;
 
-  if (close(l->fd) == -1)
-    perror("close");
+  pn_close(l->driver->io, pn_selectable_fd(l->selectable));
   l->closed = true;
 }
 
@@ -262,45 +305,85 @@ void pn_listener_free(pn_listener_t *l)
   if (!l) return;
 
   if (l->driver) pn_driver_remove_listener(l->driver, l);
+  pn_selectable_free(l->selectable);
   free(l);
 }
 
 // connector
 
+static ssize_t driver_connection_capacity(pn_selectable_t *sel)
+{
+  pn_connector_t *c = (pn_connector_t *) pni_selectable_get_context(sel);
+  return c->posted_status & PN_SEL_RD ? 1 : 0;
+}
+
+static ssize_t driver_connection_pending(pn_selectable_t *sel)
+{
+  pn_connector_t *c = (pn_connector_t *) pni_selectable_get_context(sel);
+  return c->posted_status & PN_SEL_WR ? 1 : 0;
+}
+
+static pn_timestamp_t driver_connection_deadline(pn_selectable_t *sel)
+{
+  pn_connector_t *c = (pn_connector_t *) pni_selectable_get_context(sel);
+  return c->posted_wakeup;
+}
+
+static void driver_connection_readable(pn_selectable_t *sel)
+{
+  // do nothing
+}
+
+static void driver_connection_writable(pn_selectable_t *sel)
+{
+  // do nothing
+}
+
+static void driver_connection_expired(pn_selectable_t *sel)
+{
+  // do nothing
+}
+
+static void driver_connection_finalize(pn_selectable_t *sel)
+{
+  // do nothing
+}
+
 static void pn_driver_add_connector(pn_driver_t *d, pn_connector_t *c)
 {
   if (!c->driver) return;
   LL_ADD(d, connector, c);
   c->driver = d;
   d->connector_count++;
+  pn_selector_add(d->selector, c->selectable);
+}
+
+static void ready_connector_list_remove(pn_driver_t *d, pn_connector_t *c)
+{
+  LL_REMOVE(d, ready_connector, c);
+  c->ready_connector_next = NULL;
+  c->ready_connector_prev = NULL;
 }
 
 static void pn_driver_remove_connector(pn_driver_t *d, pn_connector_t *c)
 {
   if (!c->driver) return;
 
-  if (c == d->connector_next) {
-    d->connector_next = c->connector_next;
-  }
+  pn_selector_remove(d->selector, c->selectable);
+  if (c == d->ready_connector_head || c->ready_connector_prev)
+    ready_connector_list_remove(d, c);
 
   LL_REMOVE(d, connector, c);
   c->driver = NULL;
   d->connector_count--;
-  if (c->closed) {
-    d->closed_count--;
-  }
 }
 
-pn_connector_t *pn_connector(pn_driver_t *driver, const char *hostarg,
+pn_connector_t *pn_connector(pn_driver_t *driver, const char *host,
                              const char *port, void *context)
 {
   if (!driver) return NULL;
 
-  // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
-  const char *host = strcmp("0.0.0.0", hostarg) ? hostarg : "127.0.0.1";
-
   pn_socket_t sock = pn_connect(driver->io, host, port);
-
   pn_connector_t *c = pn_connector_fd(driver, sock, context);
   snprintf(c->name, PN_NAME_MAX, "%s:%s", host, port);
   if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
@@ -308,28 +391,28 @@ pn_connector_t *pn_connector(pn_driver_t
   return c;
 }
 
-static void pn_connector_read(pn_connector_t *ctor);
-static void pn_connector_write(pn_connector_t *ctor);
-
 pn_connector_t *pn_connector_fd(pn_driver_t *driver, pn_socket_t fd, void *context)
 {
   if (!driver) return NULL;
 
   pn_connector_t *c = (pn_connector_t *) malloc(sizeof(pn_connector_t));
   if (!c) return NULL;
+  c->type = CONNECTOR;
   c->driver = driver;
   c->connector_next = NULL;
   c->connector_prev = NULL;
+  c->ready_connector_next = NULL;
+  c->ready_connector_prev = NULL;
   c->pending_tick = false;
   c->pending_read = false;
   c->pending_write = false;
   c->name[0] = '\0';
-  c->idx = 0;
-  c->fd = fd;
   c->status = PN_SEL_RD | PN_SEL_WR;
+  c->posted_status = -1;
   c->trace = driver->trace;
   c->closed = false;
   c->wakeup = 0;
+  c->posted_wakeup = 0;
   c->connection = NULL;
   c->transport = pn_transport();
   c->sasl = pn_sasl(c->transport);
@@ -337,7 +420,15 @@ pn_connector_t *pn_connector_fd(pn_drive
   c->output_done = false;
   c->context = context;
   c->listener = NULL;
-
+  c->selectable = pni_selectable(driver_connection_capacity,
+                                 driver_connection_pending,
+                                 driver_connection_deadline,
+                                 driver_connection_readable,
+                                 driver_connection_writable,
+                                 driver_connection_expired,
+                                 driver_connection_finalize);
+  pni_selectable_set_fd(c->selectable, fd);
+  pni_selectable_set_context(c->selectable, c);
   pn_connector_trace(c, driver->trace);
 
   pn_driver_add_connector(driver, c);
@@ -347,7 +438,7 @@ pn_connector_t *pn_connector_fd(pn_drive
 pn_socket_t pn_connector_get_fd(pn_connector_t *connector)
 {
   assert(connector);
-  return connector->fd;
+  return pn_selectable_fd(connector->selectable);
 }
 
 pn_connector_t *pn_connector_head(pn_driver_t *driver)
@@ -380,8 +471,15 @@ pn_transport_t *pn_connector_transport(p
 void pn_connector_set_connection(pn_connector_t *ctor, pn_connection_t *connection)
 {
   if (!ctor) return;
+  if (ctor->connection) {
+    pn_decref(ctor->connection);
+    pn_transport_unbind(ctor->transport);
+  }
   ctor->connection = connection;
-  pn_transport_bind(ctor->transport, connection);
+  if (ctor->connection) {
+    pn_incref(ctor->connection);
+    pn_transport_bind(ctor->transport, connection);
+  }
   if (ctor->transport) pn_transport_trace(ctor->transport, ctor->trace);
 }
 
@@ -418,10 +516,8 @@ void pn_connector_close(pn_connector_t *
   if (!ctor) return;
 
   ctor->status = 0;
-  if (close(ctor->fd) == -1)
-    perror("close");
+  pn_close(ctor->driver->io, pn_selectable_fd(ctor->selectable));
   ctor->closed = true;
-  ctor->driver->closed_count++;
 }
 
 bool pn_connector_closed(pn_connector_t *ctor)
@@ -434,9 +530,11 @@ void pn_connector_free(pn_connector_t *c
   if (!ctor) return;
 
   if (ctor->driver) pn_driver_remove_connector(ctor->driver, ctor);
-  ctor->connection = NULL;
   pn_transport_free(ctor->transport);
   ctor->transport = NULL;
+  if (ctor->connection) pn_decref(ctor->connection);
+  ctor->connection = NULL;
+  pn_selectable_free(ctor->selectable);
   free(ctor);
 }
 
@@ -487,6 +585,7 @@ void pn_connector_process(pn_connector_t
     if (c->closed) return;
 
     pn_transport_t *transport = c->transport;
+    pn_socket_t sock = pn_selectable_fd(c->selectable);
 
     ///
     /// Socket read
@@ -497,7 +596,7 @@ void pn_connector_process(pn_connector_t
         c->status |= PN_SEL_RD;
         if (c->pending_read) {
           c->pending_read = false;
-          ssize_t n =  pn_recv(c->driver->io, c->fd, pn_transport_tail(transport), capacity);
+          ssize_t n =  pn_recv(c->driver->io, sock, pn_transport_tail(transport), capacity);
           if (n < 0) {
             if (errno != EAGAIN) {
               perror("read");
@@ -540,7 +639,7 @@ void pn_connector_process(pn_connector_t
         c->status |= PN_SEL_WR;
         if (c->pending_write) {
           c->pending_write = false;
-          ssize_t n = pn_send(c->driver->io, c->fd, pn_transport_head(transport), pending);
+          ssize_t n = pn_send(c->driver->io, sock, pn_transport_head(transport), pending);
           if (n < 0) {
             // XXX
             if (errno != EAGAIN) {
@@ -574,35 +673,41 @@ void pn_connector_process(pn_connector_t
 
 // driver
 
+static pn_selectable_t *create_ctrl_selectable(pn_socket_t fd);
+
 pn_driver_t *pn_driver()
 {
   pn_driver_t *d = (pn_driver_t *) malloc(sizeof(pn_driver_t));
   if (!d) return NULL;
+
   d->error = pn_error();
   d->io = pn_io();
+  d->selector = pn_io_selector(d->io);
   d->listener_head = NULL;
   d->listener_tail = NULL;
   d->listener_next = NULL;
+  d->ready_listener_head = NULL;
+  d->ready_listener_tail = NULL;
   d->connector_head = NULL;
   d->connector_tail = NULL;
-  d->connector_next = NULL;
+  d->ready_connector_head = NULL;
+  d->ready_connector_tail = NULL;
   d->listener_count = 0;
   d->connector_count = 0;
-  d->closed_count = 0;
-  // d->max_fds = 0;
   d->ctrl[0] = 0;
   d->ctrl[1] = 0;
   d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
               (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
               (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF));
-  d->wakeup = 0;
 
   // XXX
-  if (pn_socket_pair(d->ctrl)) {
+  if (pn_pipe(d->io, d->ctrl)) {
     perror("Can't create control pipe");
     free(d);
     return NULL;
   }
+  d->ctrl_selectable = create_ctrl_selectable(d->ctrl[0]);
+  pn_selector_add(d->selector, d->ctrl_selectable);
 
   return d;
 }
@@ -626,8 +731,9 @@ void pn_driver_free(pn_driver_t *d)
 {
   if (!d) return;
 
-  close(d->ctrl[0]);
-  close(d->ctrl[1]);
+  pn_selectable_free(d->ctrl_selectable);
+  pn_close(d->io, d->ctrl[0]);
+  pn_close(d->io, d->ctrl[1]);
   while (d->connector_head)
     pn_connector_free(d->connector_head);
   while (d->listener_head)
@@ -640,7 +746,7 @@ void pn_driver_free(pn_driver_t *d)
 int pn_driver_wakeup(pn_driver_t *d)
 {
   if (d) {
-      ssize_t count = send(d->ctrl[1], "x", 1, 0);
+    ssize_t count = pn_write(d->io, d->ctrl[1], "x", 1);
     if (count <= 0) {
       return count;
     } else {
@@ -651,158 +757,57 @@ int pn_driver_wakeup(pn_driver_t *d)
   }
 }
 
-static void pn_driver_rebuild(pn_driver_t *d)
+void pn_driver_wait_1(pn_driver_t *d)
 {
-  d->wakeup = 0;
-  d->overflow = false;
-  int r_avail = FD_SETSIZE;
-  int w_avail = FD_SETSIZE;
-  // d->max_fds = -1;
-  FD_ZERO(&d->readfds);
-  FD_ZERO(&d->writefds);
-  FD_ZERO(&d->exceptfds);
-
-  FD_SET(d->ctrl[0], &d->readfds);
-  // if (d->ctrl[0] > d->max_fds) d->max_fds = d->ctrl[0];
-
-  pn_listener_t *l = d->listener_head;
-  for (unsigned i = 0; i < d->listener_count; i++) {
-    if (r_avail) {
-      FD_SET(l->fd, &d->readfds);
-      // if (l->fd > d->max_fds) d->max_fds = l->fd;
-      r_avail--;
-      l = l->listener_next;
-    }
-    else {
-      d->overflow = true;
-      break;
-    }
-  }
+}
 
+int pn_driver_wait_2(pn_driver_t *d, int timeout)
+{
+  // These lists will normally be empty
+  while (d->ready_listener_head)
+    ready_listener_list_remove(d, d->ready_listener_head);
+  while (d->ready_connector_head)
+    ready_connector_list_remove(d, d->ready_connector_head);
   pn_connector_t *c = d->connector_head;
   for (unsigned i = 0; i < d->connector_count; i++)
   {
-    if (!c->closed) {
-      FD_SET(c->fd, &d->exceptfds);
-      d->wakeup = pn_timestamp_min(d->wakeup, c->wakeup);
-      if (c->status & PN_SEL_RD) {
-        if (r_avail) {
-          FD_SET(c->fd, &d->readfds);
-          r_avail--;
-        }
-        else {
-          d->overflow = true;
-          break;
-        }
-      }
-      if (c->status & PN_SEL_WR) {
-        if (w_avail) {
-          FD_SET(c->fd, &d->writefds);
-          w_avail--;
-        }
-        else {
-          d->overflow = true;
-          break;
-        }
-      }
-      // if (c->fd > d->max_fds) d->max_fds = c->fd;
+    // Optimistically use a snapshot of the non-threadsafe vars.
+    // If they are in flux, the app will guarantee progress with a pn_driver_wakeup().
+    int current_status = c->status;
+    pn_timestamp_t current_wakeup = c->wakeup;
+    if (c->posted_status != current_status || c->posted_wakeup != current_wakeup) {
+      c->posted_status = current_status;
+      c->posted_wakeup = current_wakeup;
+      pn_selector_update(c->driver->selector, c->selectable);
+    }
+    if (c->closed) {
+      c->pending_read = false;
+      c->pending_write = false;
+      c->pending_tick = false;
+      LL_ADD(d, ready_connector, c);
     }
     c = c->connector_next;
   }
-}
 
-void pn_driver_wait_1(pn_driver_t *d)
-{
-  pn_driver_rebuild(d);
-}
+  if (d->ready_connector_head)
+    timeout = 0;   // We found closed connections
 
-int pn_driver_wait_2(pn_driver_t *d, int timeout)
-{
-  if (d->overflow)
-      return pn_error_set(d->error, PN_ERR, "maximum driver sockets exceeded");
-  if (d->wakeup) {
-    pn_timestamp_t now = pn_i_now();
-    if (now >= d->wakeup)
-      timeout = 0;
-    else
-      timeout = (timeout < 0) ? d->wakeup-now : pn_min(timeout, d->wakeup - now);
-  }
-
-  struct timeval to = {0};
-  struct timeval *to_arg = &to;
-  // block only if (timeout == 0) and (closed_count == 0)
-  if (d->closed_count == 0) {
-    if (timeout > 0) {
-      // convert millisecs to sec and usec:
-      to.tv_sec = timeout/1000;
-      to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000;
-    }
-    else if (timeout < 0) {
-        to_arg = NULL;
-    }
-  }
-  int nfds = select(/* d->max_fds */ 0, &d->readfds, &d->writefds, &d->exceptfds, to_arg);
-  if (nfds == SOCKET_ERROR) {
-    errno = WSAGetLastError();
-    pn_i_error_from_errno(d->error, "select");
+  int code = pn_selector_select(d->selector, timeout);
+  if (code) {
+    pn_error_set(d->error, code, "select");
     return -1;
   }
+  get_new_events(d);
   return 0;
 }
 
 int pn_driver_wait_3(pn_driver_t *d)
 {
-  bool woken = false;
-  if (FD_ISSET(d->ctrl[0], &d->readfds)) {
-    woken = true;
-    //clear the pipe
-    char buffer[512];
-    while (recv(d->ctrl[0], buffer, 512, 0) == 512);
-  }
-
-  pn_listener_t *l = d->listener_head;
-  while (l) {
-    l->pending = (FD_ISSET(l->fd, &d->readfds));
-    l = l->listener_next;
-  }
-
-  pn_timestamp_t now = pn_i_now();
-  pn_connector_t *c = d->connector_head;
-  while (c) {
-    if (c->closed) {
-      c->pending_read = false;
-      c->pending_write = false;
-      c->pending_tick = false;
-    } else {
-      c->pending_read = FD_ISSET(c->fd, &d->readfds);
-      c->pending_write = FD_ISSET(c->fd, &d->writefds);
-      c->pending_tick = (c->wakeup &&  c->wakeup <= now);
-// Unlike Posix no distinction of POLLERR and POLLHUP
-//      if (idx && d->fds[idx].revents & POLLERR)
-//          pn_connector_close(c);
-//      else if (idx && (d->fds[idx].revents & POLLHUP)) {
-//          [...]
-// Strategy, defer error to a recv or send if read or write pending.
-// Otherwise proclaim the connection dead.
-      if (!c->pending_read && !c->pending_write) {
-        if (FD_ISSET(c->fd, &d->exceptfds)) {
-          // can't defer error to a read or write, close now.
-          // How to get WSAlastError() equivalent info?
-          fprintf(stderr, "connector cleanup on unknown error %s\n", c->name);
-          pn_connector_close(c);
-        }
-      }
-    }
-    c = c->connector_next;
-  }
-
-  d->listener_next = d->listener_head;
-  d->connector_next = d->connector_head;
-
-  return woken ? PN_INTR : 0;
+  //  no-op with new selector/selectables
+  return 0;
 }
 
-//
+
 // XXX - pn_driver_wait has been divided into three internal functions as a
 //       temporary workaround for a multi-threading problem.  A multi-threaded
 //       application must hold a lock on parts 1 and 3, but not on part 2.
@@ -821,103 +826,75 @@ int pn_driver_wait(pn_driver_t *d, int t
     return pn_driver_wait_3(d);
 }
 
+static void get_new_events(pn_driver_t *d)
+{
+  bool woken = false;
+  int events;
+  pn_selectable_t *sel;
+  while ((sel = pn_selector_next(d->selector, &events)) != NULL) {
+    if (sel == d->ctrl_selectable) {
+      woken = true;
+      //clear the pipe
+      char buffer[512];
+      while (pn_read(d->io, d->ctrl[0], buffer, 512) == 512);
+      continue;
+    }
+
+    void *ctx = pni_selectable_get_context(sel);
+    sel_type_t *type = (sel_type_t *) ctx;
+    if (*type == CONNECTOR) {
+      pn_connector_t *c = (pn_connector_t *) ctx;
+      if (!c->closed) {
+        LL_ADD(d, ready_connector, c);
+        c->pending_read = events & PN_READABLE;
+        c->pending_write = events & PN_WRITABLE;
+        c->pending_tick = events & PN_EXPIRED;
+      }
+    } else {
+      pn_listener_t *l = (pn_listener_t *) ctx;
+      LL_ADD(d, ready_listener, l);
+      l->pending = events & PN_READABLE;
+    }
+  }
+}
+
 pn_listener_t *pn_driver_listener(pn_driver_t *d) {
   if (!d) return NULL;
 
-  while (d->listener_next) {
-    pn_listener_t *l = d->listener_next;
-    d->listener_next = l->listener_next;
-
-    if (l->pending) {
+  pn_listener_t *l = d->ready_listener_head;
+  while (l) {
+    ready_listener_list_remove(d, l);
+    if (l->pending)
       return l;
-    }
+    l = d->ready_listener_head;
   }
-
   return NULL;
 }
 
 pn_connector_t *pn_driver_connector(pn_driver_t *d) {
   if (!d) return NULL;
 
-  while (d->connector_next) {
-    pn_connector_t *c = d->connector_next;
-    d->connector_next = c->connector_next;
-
+  pn_connector_t *c = d->ready_connector_head;
+  while (c) {
+    ready_connector_list_remove(d, c);
     if (c->closed || c->pending_read || c->pending_write || c->pending_tick) {
       return c;
     }
+    c = d->ready_connector_head;
   }
-
   return NULL;
 }
 
-static int pn_socket_pair (SOCKET sv[2]) {
-  // no socketpair on windows.  provide pipe() semantics using sockets
-
-  SOCKET sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
-  if (sock == INVALID_SOCKET) {
-    perror("socket");
-    return -1;
-  }
-
-  BOOL b = 1;
-  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &b, sizeof(b)) == -1) {
-    perror("setsockopt");
-    closesocket(sock);
-    return -1;
-  }
-  else {
-    struct sockaddr_in addr = {0};
-    addr.sin_family = AF_INET;
-    addr.sin_port = 0;
-    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
-
-    if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
-      perror("bind");
-      closesocket(sock);
-      return -1;
-    }
-  }
-
-  if (listen(sock, 50) == -1) {
-    perror("listen");
-    closesocket(sock);
-    return -1;
-  }
-
-  if ((sv[1] = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto)) == INVALID_SOCKET) {
-    perror("sock1");
-    closesocket(sock);
-    return -1;
-  }
-  else {
-    struct sockaddr addr = {0};
-    int l = sizeof(addr);
-    if (getsockname(sock, &addr, &l) == -1) {
-      perror("getsockname");
-      closesocket(sock);
-      return -1;
-    }
-
-    if (connect(sv[1], &addr, sizeof(addr)) == -1) {
-      int err = WSAGetLastError();
-      fprintf(stderr, "connect wsaerrr %d\n", err);
-      closesocket(sock);
-      closesocket(sv[1]);
-      return -1;
-    }
-
-    if ((sv[0] = accept(sock, &addr, &l)) == INVALID_SOCKET) {
-      perror("accept");
-      closesocket(sock);
-      closesocket(sv[1]);
-      return -1;
-    }
-  }
-
-  u_long v = 1;
-  ioctlsocket (sv[0], FIONBIO, &v);
-  ioctlsocket (sv[1], FIONBIO, &v);
-  closesocket(sock);
-  return 0;
+static pn_selectable_t *create_ctrl_selectable(pn_socket_t fd)
+{
+  // ctrl input only needs to know about read events, just like a listener.
+  pn_selectable_t *sel = pni_selectable(driver_listener_capacity,
+                                        driver_listener_pending,
+                                        driver_listener_deadline,
+                                        driver_listener_readable,
+                                        driver_listener_writable,
+                                        driver_listener_expired,
+                                        driver_listener_finalize);
+  pni_selectable_set_fd(sel, fd);
+  return sel;
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org