You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2015/01/21 10:24:53 UTC

qpid-proton git commit: PROTON-800: proton-c reactor fixes for Windows, destructor ordering and selectables handling

Repository: qpid-proton
Updated Branches:
  refs/heads/master e2be27cef -> 6df8ad351


PROTON-800: proton-c reactor fixes for Windows, destructor ordering and selectables handling


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6df8ad35
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6df8ad35
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6df8ad35

Branch: refs/heads/master
Commit: 6df8ad3511e81b3b674e857632af35dd2e3e5883
Parents: e2be27c
Author: Clifford Jansen <cl...@apache.org>
Authored: Wed Jan 21 01:23:47 2015 -0800
Committer: Clifford Jansen <cl...@apache.org>
Committed: Wed Jan 21 01:23:47 2015 -0800

----------------------------------------------------------------------
 proton-c/src/reactor/reactor.c  |  4 +-
 proton-c/src/windows/iocp.c     | 11 +++++-
 proton-c/src/windows/iocp.h     |  2 +
 proton-c/src/windows/selector.c | 72 ++++++++++++++++++++----------------
 4 files changed, 55 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6df8ad35/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index bdee2de..b9346d2 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -76,12 +76,12 @@ static void pn_reactor_initialize(pn_reactor_t *reactor) {
 
 static void pn_reactor_finalize(pn_reactor_t *reactor) {
   pn_decref(reactor->attachments);
-  pn_decref(reactor->selector);
-  pn_decref(reactor->io);
   pn_decref(reactor->collector);
   pn_decref(reactor->handler);
   pn_decref(reactor->children);
   pn_decref(reactor->timer);
+  pn_decref(reactor->selector);
+  pn_decref(reactor->io);
 }
 
 #define pn_reactor_hashcode NULL

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6df8ad35/proton-c/src/windows/iocp.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/iocp.c b/proton-c/src/windows/iocp.c
index 6acff41..3feec01 100644
--- a/proton-c/src/windows/iocp.c
+++ b/proton-c/src/windows/iocp.c
@@ -733,7 +733,6 @@ static iocpdesc_t *pni_iocpdesc(pn_socket_t s)
 {
   static const pn_cid_t CID_pni_iocpdesc = CID_pn_void;
   static pn_class_t clazz = PN_CLASS(pni_iocpdesc);
-  assert (s != INVALID_SOCKET);
   iocpdesc_t *iocpd = (iocpdesc_t *) pn_class_new(&clazz, sizeof(iocpdesc_t));
   assert(iocpd);
   iocpd->socket = s;
@@ -749,6 +748,7 @@ static bool is_listener_socket(pn_socket_t s)
 }
 
 iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) {
+  assert (s != INVALID_SOCKET);
   assert(!pni_iocpdesc_map_get(iocp, s));
   bool listening = is_listener_socket(s);
   iocpdesc_t *iocpd = pni_iocpdesc(s);
@@ -767,6 +767,15 @@ iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) {
   return iocpd;
 }
 
+iocpdesc_t *pni_deadline_desc(iocp_t *iocp) {
+  // Non IO descriptor for selector deadlines.  Do not add to iocpdesc map or
+  // zombie list.  Selector responsible to free/decref object.
+  iocpdesc_t *iocpd = pni_iocpdesc(PN_INVALID_SOCKET);
+  iocpd->iocp = iocp;
+  iocpd->deadline_desc = true;
+  return iocpd;
+}
+
 // === Fast lookup of a socket's iocpdesc_t
 
 iocpdesc_t *pni_iocpdesc_map_get(iocp_t *iocp, pn_socket_t s) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6df8ad35/proton-c/src/windows/iocp.h
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/iocp.h b/proton-c/src/windows/iocp.h
index 91ded50..adccaab 100644
--- a/proton-c/src/windows/iocp.h
+++ b/proton-c/src/windows/iocp.h
@@ -74,6 +74,7 @@ struct iocpdesc_t {
   bool closing;        // pn_close called
   bool read_closed;    // EOF or read error
   bool write_closed;   // shutdown sent or write error
+  bool deadline_desc;  // Socket-less deadline descriptor for selectors
   pn_selector_t *selector;
   pn_selectable_t *selectable;
   int events;
@@ -104,6 +105,7 @@ struct write_result_t {
 
 iocpdesc_t *pni_iocpdesc_create(iocp_t *, pn_socket_t s, bool external);
 iocpdesc_t *pni_iocpdesc_map_get(iocp_t *, pn_socket_t s);
+iocpdesc_t *pni_deadline_desc(iocp_t *);
 void pni_iocpdesc_map_del(iocp_t *, pn_socket_t s);
 void pni_iocpdesc_map_push(iocpdesc_t *iocpd);
 void pni_iocpdesc_start(iocpdesc_t *iocpd);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6df8ad35/proton-c/src/windows/selector.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/selector.c b/proton-c/src/windows/selector.c
index a59dc9c..6bd8d8c 100644
--- a/proton-c/src/windows/selector.c
+++ b/proton-c/src/windows/selector.c
@@ -44,8 +44,6 @@ static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t t);
 
 struct pn_selector_t {
   iocp_t *iocp;
-  pn_timestamp_t *deadlines;
-  size_t capacity;
   pn_list_t *selectables;
   pn_list_t *iocp_descriptors;
   size_t current;
@@ -62,8 +60,6 @@ void pn_selector_initialize(void *obj)
 {
   pn_selector_t *selector = (pn_selector_t *) obj;
   selector->iocp = NULL;
-  selector->deadlines = NULL;
-  selector->capacity = 0;
   selector->selectables = pn_list(PN_WEAKREF, 0);
   selector->iocp_descriptors = pn_list(PN_OBJECT, 0);
   selector->current = 0;
@@ -79,7 +75,6 @@ void pn_selector_initialize(void *obj)
 void pn_selector_finalize(void *obj)
 {
   pn_selector_t *selector = (pn_selector_t *) obj;
-  free(selector->deadlines);
   pn_free(selector->selectables);
   pn_free(selector->iocp_descriptors);
   pn_error_free(selector->error);
@@ -110,34 +105,13 @@ void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable)
   assert(selectable);
   assert(pni_selectable_get_index(selectable) < 0);
   pn_socket_t sock = pn_selectable_get_fd(selectable);
-
   iocpdesc_t *iocpd = NULL;
-  if (sock != INVALID_SOCKET) {
-    iocpd = pni_iocpdesc_map_get(selector->iocp, sock);
-    if (!iocpd) {
-      // Socket created outside proton.  Hook it up to iocp.
-      iocpd = pni_iocpdesc_create(selector->iocp, sock, true);
-      pni_iocpdesc_start(iocpd);
-    } else {
-      assert(iocpd->iocp == selector->iocp);
-    }
-  }
 
   if (pni_selectable_get_index(selectable) < 0) {
     pn_list_add(selector->selectables, selectable);
-    pn_list_add(selector->iocp_descriptors, iocpd);
+    pn_list_add(selector->iocp_descriptors, NULL);
     size_t size = pn_list_size(selector->selectables);
-
-    if (selector->capacity < size) {
-      selector->deadlines = (pn_timestamp_t *) realloc(selector->deadlines, size*sizeof(pn_timestamp_t));
-      selector->capacity = size;
-    }
-
     pni_selectable_set_index(selectable, size - 1);
-    if (iocpd) {
-      iocpd->selector = selector;
-      iocpd->selectable = selectable;
-    }
   }
 
   pn_selector_update(selector, selectable);
@@ -145,12 +119,48 @@ void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable)
 
 void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable)
 {
+  // A selectable's fd may switch from PN_INVALID_SCOKET to a working socket between
+  // update calls.  If a selectable without a valid socket has a deadline, we need
+  // a dummy iocpdesc_t to participate in the deadlines list.
   int idx = pni_selectable_get_index(selectable);
   assert(idx >= 0);
-  selector->deadlines[idx] = pn_selectable_get_deadline(selectable);
-
+  pn_timestamp_t deadline = pn_selectable_get_deadline(selectable);
   pn_socket_t sock = pn_selectable_get_fd(selectable);
   iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx);
+
+  if (!iocpd && deadline && sock == PN_INVALID_SOCKET) {
+    iocpd = pni_deadline_desc(selector->iocp);
+    assert(iocpd);
+    pn_list_set(selector->iocp_descriptors, idx, iocpd);
+    pn_decref(iocpd);  // life is solely tied to iocp_descriptors list
+    iocpd->selector = selector;
+    iocpd->selectable = selectable;
+  }
+  else if (iocpd && iocpd->deadline_desc && sock != PN_INVALID_SOCKET) {
+    // Switching to a real socket.  Stop using a deadline descriptor.
+    deadlines_update(iocpd, 0);
+    // decref descriptor in list and pick up a real iocpd below
+    pn_list_set(selector->iocp_descriptors, idx, NULL);
+    iocpd = NULL;
+  }
+
+  // The selectables socket may be set long after it has been added
+  if (!iocpd && sock != PN_INVALID_SOCKET) {
+    iocpd = pni_iocpdesc_map_get(selector->iocp, sock);
+    if (!iocpd) {
+      // Socket created outside proton.  Hook it up to iocp.
+      iocpd = pni_iocpdesc_create(selector->iocp, sock, true);
+      assert(iocpd);
+      if (iocpd)
+        pni_iocpdesc_start(iocpd);
+    }
+    if (iocpd) {
+      pn_list_set(selector->iocp_descriptors, idx, iocpd);
+      iocpd->selector = selector;
+      iocpd->selectable = selectable;
+    }
+  }
+
   if (iocpd) {
     assert(sock == iocpd->socket || iocpd->closing);
     int interests = 0;
@@ -160,11 +170,11 @@ void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable)
     if (pn_selectable_is_writing(selectable)) {
       interests |= PN_WRITABLE;
     }
-    if (selector->deadlines[idx]) {
+    if (deadline) {
       interests |= PN_EXPIRED;
     }
     interests_update(iocpd, interests);
-    deadlines_update(iocpd, selector->deadlines[idx]);
+    deadlines_update(iocpd, deadline);
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org