You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/09/19 04:31:17 UTC

svn commit: r1387437 - in /qpid/proton/branches/driver_abstraction/proton-c: bindings/python/ include/proton/ src/ src/engine/ src/message/ src/pollers/

Author: kgiusti
Date: Wed Sep 19 02:31:16 2012
New Revision: 1387437

URL: http://svn.apache.org/viewvc?rev=1387437&view=rev
Log:
PROTON-2: merge latest trunk into branch

Added:
    qpid/proton/branches/driver_abstraction/proton-c/bindings/python/proton.py
      - copied unchanged from r1387433, qpid/proton/trunk/proton-c/bindings/python/proton.py
Modified:
    qpid/proton/branches/driver_abstraction/proton-c/bindings/python/CMakeLists.txt
    qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h
    qpid/proton/branches/driver_abstraction/proton-c/src/driver-internal.h
    qpid/proton/branches/driver_abstraction/proton-c/src/driver.c
    qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h
    qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c
    qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c
    qpid/proton/branches/driver_abstraction/proton-c/src/pollers/poll.c
    qpid/proton/branches/driver_abstraction/proton-c/src/pollers/select.c

Modified: qpid/proton/branches/driver_abstraction/proton-c/bindings/python/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/bindings/python/CMakeLists.txt?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/bindings/python/CMakeLists.txt (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/bindings/python/CMakeLists.txt Wed Sep 19 02:31:16 2012
@@ -15,9 +15,16 @@ install(CODE "execute_process(COMMAND ${
                               WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})")
 install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -O -m py_compile cproton.py
                               WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})")
+install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -m py_compile proton.py
+                              WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})")
+install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -O -m py_compile proton.py
+                              WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})")
 install(FILES ${CMAKE_CURRENT_BINARY_DIR}/cproton.py
               ${CMAKE_CURRENT_BINARY_DIR}/cproton.pyc
               ${CMAKE_CURRENT_BINARY_DIR}/cproton.pyo
+              ${CMAKE_CURRENT_SOURCE_DIR}/proton.py
+              ${CMAKE_CURRENT_SOURCE_DIR}/proton.pyc
+              ${CMAKE_CURRENT_SOURCE_DIR}/proton.pyo
         DESTINATION ${PYTHON_SITEARCH_PACKAGES}
         COMPONENT Python)
 install(FILES ${CMAKE_CURRENT_BINARY_DIR}/_cproton.so

Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h Wed Sep 19 02:31:16 2012
@@ -264,6 +264,8 @@ pn_connection_t *pn_get_connection(pn_se
 void pn_session_open(pn_session_t *session);
 void pn_session_close(pn_session_t *session);
 void pn_session_free(pn_session_t *session);
+void *pn_session_context(pn_session_t *session);
+void pn_session_set_context(pn_session_t *session, void *context);
 
 // link
 const char *pn_link_name(pn_link_t *link);
@@ -291,6 +293,8 @@ pn_delivery_t *pn_unsettled_next(pn_deli
 void pn_link_open(pn_link_t *sender);
 void pn_link_close(pn_link_t *sender);
 void pn_link_free(pn_link_t *sender);
+void *pn_link_context(pn_link_t *link);
+void pn_link_set_context(pn_link_t *link, void *context);
 
 // sender
 //void pn_offer(pn_sender_t *sender, int credits);
@@ -319,6 +323,8 @@ void pn_disposition(pn_delivery_t *deliv
 //int pn_format(pn_delivery_t *delivery);
 void pn_settle(pn_delivery_t *delivery);
 void pn_delivery_dump(pn_delivery_t *delivery);
+void *pn_delivery_context(pn_delivery_t *delivery);
+void pn_delivery_set_context(pn_delivery_t *delivery, void *context);
 
 #ifdef __cplusplus
 }

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/driver-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/driver-internal.h?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/driver-internal.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/driver-internal.h Wed Sep 19 02:31:16 2012
@@ -103,7 +103,10 @@ struct pn_connector_t {
 
 int pn_connector_poller_init( struct pn_connector_t *);
 void pn_connector_poller_destroy( struct pn_connector_t *);
-void pn_driver_poller_wait(struct pn_driver_t *, int timeout_ms);
+//void pn_driver_poller_wait(struct pn_driver_t *, int timeout_ms);
+void pn_driver_poller_wait_1(pn_driver_t *);
+void pn_driver_poller_wait_2(pn_driver_t *, int);
+void pn_driver_poller_wait_3(pn_driver_t *);
 int pn_io_handler(pn_connector_t *);
 int pn_null_io_handler(pn_connector_t *);
 void pn_connector_process_output(pn_connector_t *);

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/driver.c?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/driver.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/driver.c Wed Sep 19 02:31:16 2012
@@ -587,11 +587,36 @@ void pn_driver_wakeup(pn_driver_t *d)
 }
 
 
+//
+// XXX - pn_driver_wait has been divided into three internal functions as a
+//       temporary workaround for a multi-threading problem.  A multi-threaded
+//       application must hold a lock on parts 1 and 3, but not on part 2.
+//       This temporary change, which is not reflected in the driver's API, allows
+//       a multi-threaded application to use the three parts separately.
+//
+//       This workaround will eventually be replaced by a more elegant solution
+//       to the problem.
+//
+void pn_driver_wait_1(pn_driver_t *d)
+{
+  pn_driver_poller_wait_1(d);
+}
+
+void pn_driver_wait_2(pn_driver_t *d, int timeout)
+{
+  pn_driver_poller_wait_2(d, timeout);
+}
+
+void pn_driver_wait_3(pn_driver_t *d)
+{
+  pn_driver_poller_wait_3(d);
+}
+
 void pn_driver_wait(pn_driver_t *d, int timeout)
 {
-  pn_driver_poller_wait(d, timeout);
-  d->listener_next = d->listener_head;
-  d->connector_next = d->connector_head;
+  pn_driver_wait_1(d);
+  pn_driver_wait_2(d, timeout);
+  pn_driver_wait_3(d);
 }
 
 pn_listener_t *pn_driver_listener(pn_driver_t *d) {

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h Wed Sep 19 02:31:16 2012
@@ -138,6 +138,7 @@ struct pn_session_t {
   size_t link_capacity;
   size_t link_count;
   size_t id;
+  void *context;
 };
 
 struct pn_link_t {
@@ -159,6 +160,7 @@ struct pn_link_t {
   bool drain;
   bool drained; // sender only
   size_t id;
+  void *context;
 };
 
 struct pn_delivery_t {
@@ -182,6 +184,7 @@ struct pn_delivery_t {
   bool tpwork;
   pn_buffer_t *bytes;
   bool done;
+  void *transport_context;
   void *context;
 };
 

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c Wed Sep 19 02:31:16 2012
@@ -296,6 +296,18 @@ void pn_session_free(pn_session_t *sessi
   free(session);
 }
 
+void *pn_session_context(pn_session_t *session)
+{
+    return session ? session->context : 0;
+}
+
+void pn_session_set_context(pn_session_t *session, void *context)
+{
+    if (session)
+        session->context = context;
+}
+
+
 void pn_add_link(pn_session_t *ssn, pn_link_t *link)
 {
   PN_ENSURE(ssn->links, ssn->link_capacity, ssn->link_count + 1);
@@ -361,6 +373,17 @@ void pn_link_free(pn_link_t *link)
   free(link);
 }
 
+void *pn_link_context(pn_link_t *link)
+{
+    return link ? link->context : 0;
+}
+
+void pn_link_set_context(pn_link_t *link, void *context)
+{
+    if (link)
+        link->context = context;
+}
+
 void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn)
 {
   endpoint->type = type;
@@ -641,6 +664,7 @@ pn_session_t *pn_session(pn_connection_t
   ssn->links = NULL;
   ssn->link_capacity = 0;
   ssn->link_count = 0;
+  ssn->context = 0;
 
   return ssn;
 }
@@ -793,6 +817,7 @@ void pn_link_init(pn_link_t *link, int t
   link->queued = 0;
   link->drain = false;
   link->drained = false;
+  link->context = 0;
 }
 
 const char *pn_source(pn_link_t *link)
@@ -933,6 +958,7 @@ pn_delivery_t *pn_delivery(pn_link_t *li
   delivery->tpwork = false;
   pn_buffer_clear(delivery->bytes);
   delivery->done = false;
+  delivery->transport_context = NULL;
   delivery->context = NULL;
 
   if (!link->current)
@@ -979,6 +1005,17 @@ void pn_delivery_dump(pn_delivery_t *d)
          pn_readable(d), d->work);
 }
 
+void *pn_delivery_context(pn_delivery_t *delivery)
+{
+    return delivery ? delivery->context : NULL;
+}
+
+void pn_delivery_set_context(pn_delivery_t *delivery, void *context)
+{
+    if (delivery)
+        delivery->context = context;
+}
+
 pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
 {
   if (delivery) {
@@ -1052,8 +1089,8 @@ void pn_real_settle(pn_delivery_t *deliv
 
 void pn_full_settle(pn_delivery_buffer_t *db, pn_delivery_t *delivery)
 {
-  pn_delivery_state_t *state = delivery->context;
-  delivery->context = NULL;
+  pn_delivery_state_t *state = delivery->transport_context;
+  delivery->transport_context = NULL;
   if (state) state->delivery = NULL;
   pn_real_settle(delivery);
   if (state) pn_delivery_buffer_gc(db);
@@ -1247,7 +1284,7 @@ int pn_do_transfer(pn_dispatcher_t *disp
 
     delivery = pn_delivery(link, pn_dtag(tag.start, tag.size));
     pn_delivery_state_t *state = pn_delivery_buffer_push(incoming, delivery);
-    delivery->context = state;
+    delivery->transport_context = state;
     if (id != state->id) {
       int err = pn_do_error(transport, "amqp:session:invalid-field",
                             "sequencing error, expected delivery-id %u, got %u",
@@ -1540,7 +1577,7 @@ bool pn_delivery_buffered(pn_delivery_t 
 {
   if (delivery->settled) return false;
   if (pn_is_sender(delivery->link)) {
-    pn_delivery_state_t *state = delivery->context;
+    pn_delivery_state_t *state = delivery->transport_context;
     if (state) {
       return (delivery->done && !state->sent) || pn_buffer_size(delivery->bytes) > 0;
     } else {
@@ -1656,7 +1693,7 @@ int pn_post_disp(pn_transport_t *transpo
   pn_link_t *link = delivery->link;
   pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
   // XXX: check for null state
-  pn_delivery_state_t *state = delivery->context;
+  pn_delivery_state_t *state = delivery->transport_context;
   uint64_t code;
   switch(delivery->local_state) {
   case PN_ACCEPTED:
@@ -1684,10 +1721,10 @@ int pn_process_tpwork_sender(pn_transpor
   pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
   pn_link_state_t *link_state = pn_link_get_state(ssn_state, link);
   if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) {
-    pn_delivery_state_t *state = delivery->context;
+    pn_delivery_state_t *state = delivery->transport_context;
     if (!state && pn_delivery_buffer_available(&ssn_state->outgoing)) {
       state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery);
-      delivery->context = state;
+      delivery->transport_context = state;
     }
 
     if (state && !state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) &&
@@ -1712,7 +1749,7 @@ int pn_process_tpwork_sender(pn_transpor
     }
   }
 
-  pn_delivery_state_t *state = delivery->context;
+  pn_delivery_state_t *state = delivery->transport_context;
   // XXX: need to prevent duplicate disposition sending
   if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled
       && state && state->sent) {
@@ -1732,7 +1769,7 @@ int pn_process_tpwork_receiver(pn_transp
   pn_link_t *link = delivery->link;
   // XXX: need to prevent duplicate disposition sending
   pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
-  if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled && delivery->context) {
+  if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled && delivery->transport_context) {
     int err = pn_post_disp(transport, delivery);
     if (err) return err;
   }

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c Wed Sep 19 02:31:16 2012
@@ -445,7 +445,7 @@ int pn_message_decode(pn_message_t *msg,
       {
         pn_bytes_t user_id, address, subject, reply_to, ctype, cencoding,
           group_id, reply_to_group_id;
-        err = pn_data_scan(msg->data, "D.[.zSSS.ssLLSiS]", &user_id, &address,
+        err = pn_data_scan(msg->data, "D.[.zSSS.ssLLSIS]", &user_id, &address,
                            &subject, &reply_to, &ctype, &cencoding,
                            &msg->expiry_time, &msg->creation_time, &group_id,
                            &msg->group_sequence, &reply_to_group_id);

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/pollers/poll.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/pollers/poll.c?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/pollers/poll.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/pollers/poll.c Wed Sep 19 02:31:16 2012
@@ -96,7 +96,7 @@ void pn_connector_poller_destroy( struct
     c->poller = NULL;
 }
 
-
+#if 0   // save for now
 void pn_driver_poller_wait(pn_driver_t *d, int timeout)
 {
   pn_driver_poller_t *poller = d->poller;
@@ -166,3 +166,102 @@ void pn_driver_poller_wait(pn_driver_t *
     c = c->connector_next;
   }
 }
+#endif
+
+//
+// XXX - pn_driver_wait has been divided into three internal functions as a
+//       temporary workaround for a multi-threading problem.  A multi-threaded
+//       application must hold a lock on parts 1 and 3, but not on part 2.
+//       This temporary change, which is not reflected in the driver's API, allows
+//       a multi-threaded application to use the three parts separately.
+//
+//       This workaround will eventually be replaced by a more elegant solution
+//       to the problem.
+//
+
+static void pn_driver_poller_rebuild(pn_driver_t *d)
+{
+  pn_driver_poller_t *poller = d->poller;
+  size_t size = d->listener_count + d->connector_count;
+  while (poller->capacity < size + 1) {
+    poller->capacity = poller->capacity ? 2*poller->capacity : 16;
+    poller->fds = realloc(poller->fds, poller->capacity*sizeof(struct pollfd));
+  }
+
+  poller->nfds = 0;
+
+  poller->fds[poller->nfds].fd = d->ctrl[0];
+  poller->fds[poller->nfds].events = POLLIN;
+  poller->fds[poller->nfds].revents = 0;
+  poller->nfds++;
+
+  pn_listener_t *l = d->listener_head;
+  for (int i = 0; i < d->listener_count; i++) {
+    poller->fds[poller->nfds].fd = l->fd;
+    poller->fds[poller->nfds].events = POLLIN;
+    poller->fds[poller->nfds].revents = 0;
+    l->poller->idx = poller->nfds;
+    poller->nfds++;
+    l = l->listener_next;
+  }
+
+  pn_connector_t *c = d->connector_head;
+  for (int i = 0; i < d->connector_count; i++)
+  {
+    if (!c->closed) {
+      poller->fds[poller->nfds].fd = c->fd;
+      poller->fds[poller->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) |
+        (c->status & PN_SEL_WR ? POLLOUT : 0);
+      poller->fds[poller->nfds].revents = 0;
+      c->poller->idx = poller->nfds;
+      poller->nfds++;
+    }
+    c = c->connector_next;
+  }
+}
+
+void pn_driver_poller_wait_1(pn_driver_t *d)
+{
+  pn_driver_poller_rebuild(d);
+}
+
+void pn_driver_poller_wait_2(pn_driver_t *d, int timeout)
+{
+  pn_driver_poller_t *poller = d->poller;
+  DIE_IFE(poll(poller->fds, poller->nfds, d->closed_count > 0 ? 0 : timeout));
+}
+
+void pn_driver_poller_wait_3(pn_driver_t *d)
+{
+  pn_driver_poller_t *poller = d->poller;
+
+  if (poller->fds[0].revents & POLLIN) {
+    //clear the pipe
+    char buffer[512];
+    while (read(d->ctrl[0], buffer, 512) == 512);
+  }
+
+  pn_listener_t *l = d->listener_head;
+  while (l) {
+    int idx = l->poller->idx;
+    l->pending = (idx && poller->fds[idx].revents & POLLIN);
+    l = l->listener_next;
+  }
+
+  pn_connector_t *c = d->connector_head;
+  while (c) {
+    if (c->closed) {
+      c->pending_read = false;
+      c->pending_write = false;
+      c->pending_tick = false;
+    } else {
+      int idx = c->poller->idx;
+      c->pending_read = (idx && poller->fds[idx].revents & POLLIN);
+      c->pending_write = (idx && poller->fds[idx].revents & POLLOUT);
+    }
+    c = c->connector_next;
+  }
+
+  d->listener_next = d->listener_head;
+  d->connector_next = d->connector_head;
+}

Modified: qpid/proton/branches/driver_abstraction/proton-c/src/pollers/select.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/pollers/select.c?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/pollers/select.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/pollers/select.c Wed Sep 19 02:31:16 2012
@@ -78,7 +78,7 @@ void pn_connector_poller_destroy( struct
 {
 }
 
-
+#if 0 // save it for now
 void pn_driver_poller_wait(pn_driver_t *d, int timeout)
 {
   pn_driver_poller_t *poller = d->poller;
@@ -147,3 +147,83 @@ void pn_driver_poller_wait(pn_driver_t *
       }
   }
 }
+#endif
+
+void pn_driver_poller_wait_1(pn_driver_t *d)
+{
+  pn_driver_poller_t *poller = d->poller;
+
+  // setup the select
+  FD_ZERO(&poller->readfds);
+  FD_ZERO(&poller->writefds);
+
+  FD_SET(d->ctrl[0], &poller->readfds);
+  poller->max_fds = d->ctrl[0];
+
+  pn_listener_t *l = d->listener_head;
+  for (int i = 0; i < d->listener_count; i++) {
+      FD_SET(l->fd, &poller->readfds);
+      if (l->fd > poller->max_fds) poller->max_fds = l->fd;
+      l = l->listener_next;
+  }
+
+  pn_connector_t *c = d->connector_head;
+  for (int i = 0; i < d->connector_count; i++) {
+      if (!c->closed && (c->status & (PN_SEL_RD|PN_SEL_WR))) {
+          if (c->status & PN_SEL_RD)
+              FD_SET(c->fd, &poller->readfds);
+          if (c->status & PN_SEL_WR)
+              FD_SET(c->fd, &poller->writefds);
+          if (c->fd > poller->max_fds) poller->max_fds = c->fd;
+      }
+      c = c->connector_next;
+  }
+}
+
+void pn_driver_poller_wait_2(pn_driver_t *d, int timeout)
+{
+  pn_driver_poller_t *poller = d->poller;
+
+  struct timeval to = {0};
+  if (timeout > 0) {
+      // convert millisecs to sec and usec:
+      to.tv_sec = timeout/1000;
+      to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000;
+  }
+
+  int nfds = select(poller->max_fds + 1, &poller->readfds, &poller->writefds, NULL, timeout < 0 ? NULL : &to);
+  DIE_IFE(nfds);
+}
+
+void pn_driver_poller_wait_3(pn_driver_t *d)
+{
+  pn_driver_poller_t *poller = d->poller;
+
+  if (FD_ISSET(d->ctrl[0], &poller->readfds)) {
+    //clear the pipe
+    char buffer[512];
+    while (read(d->ctrl[0], buffer, 512) == 512);
+  }
+
+  pn_listener_t *l = d->listener_head;
+  while (l) {
+    l->pending = FD_ISSET(l->fd, &poller->readfds);
+    l = l->listener_next;
+  }
+
+  pn_connector_t *c = d->connector_head;
+  while (c) {
+    if (c->closed) {
+      c->pending_read = false;
+      c->pending_write = false;
+      c->pending_tick = false;
+    } else {
+      c->pending_read = FD_ISSET(c->fd, &poller->readfds);
+      c->pending_write = FD_ISSET(c->fd, &poller->writefds);
+    }
+    c = c->connector_next;
+  }
+
+  d->listener_next = d->listener_head;
+  d->connector_next = d->connector_head;
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org