You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/12/06 12:22:33 UTC

[2/2] qpid-proton git commit: pool deliveries at the connection level; improved delivery refcounting semantics

pool deliveries at the connection level; improved delivery refcounting semantics


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d549ec38
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d549ec38
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d549ec38

Branch: refs/heads/master
Commit: d549ec388dfff3abc262c2ca226a36f42b37ee16
Parents: 3b8de5b
Author: Rafael Schloming <rh...@alum.mit.edu>
Authored: Sat Dec 6 06:20:15 2014 -0500
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Sat Dec 6 06:20:15 2014 -0500

----------------------------------------------------------------------
 proton-c/src/engine/engine-internal.h |   7 +-
 proton-c/src/engine/engine.c          | 142 +++++++++++++++++++----------
 proton-c/src/tests/refcount.c         | 110 ++++++++++++++++++++++
 proton-c/src/transport/transport.c    |  10 +-
 4 files changed, 214 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d549ec38/proton-c/src/engine/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h
index 16ce6a4..204ef01 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -201,6 +201,7 @@ struct pn_connection_t {
   pn_data_t *properties;
   pn_collector_t *collector;
   pn_record_t *context;
+  pn_list_t *delivery_pool;
 };
 
 struct pn_session_t {
@@ -242,8 +243,6 @@ struct pn_link_t {
   pn_delivery_t *unsettled_head;
   pn_delivery_t *unsettled_tail;
   pn_delivery_t *current;
-  pn_delivery_t *settled_head;
-  pn_delivery_t *settled_tail;
   pn_record_t *context;
   size_t unsettled_count;
   pn_sequence_t available;
@@ -278,8 +277,6 @@ struct pn_delivery_t {
   pn_buffer_t *tag;
   pn_delivery_t *unsettled_next;
   pn_delivery_t *unsettled_prev;
-  pn_delivery_t *settled_next;
-  pn_delivery_t *settled_prev;
   pn_delivery_t *work_next;
   pn_delivery_t *work_prev;
   pn_delivery_t *tpwork_next;
@@ -292,6 +289,7 @@ struct pn_delivery_t {
   bool work;
   bool tpwork;
   bool done;
+  bool referenced;
 };
 
 #define PN_SET_LOCAL(OLD, NEW)                                          \
@@ -312,6 +310,7 @@ void pn_real_settle(pn_delivery_t *delivery);  // will free delivery if link is
 void pn_clear_tpwork(pn_delivery_t *delivery);
 void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery);
 void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
+void pn_connection_bound(pn_connection_t *conn);
 void pn_connection_unbound(pn_connection_t *conn);
 int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...);
 void pn_session_unbound(pn_session_t* ssn);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d549ec38/proton-c/src/engine/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c
index 96c601f..1f2768a 100644
--- a/proton-c/src/engine/engine.c
+++ b/proton-c/src/engine/engine.c
@@ -134,6 +134,11 @@ void pn_connection_free(pn_connection_t *connection)
   pn_decref(connection);
 }
 
+void pn_connection_bound(pn_connection_t *connection)
+{
+  pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND);
+}
+
 // invoked when transport has been removed:
 void pn_connection_unbound(pn_connection_t *connection)
 {
@@ -146,7 +151,6 @@ void pn_connection_unbound(pn_connection_t *connection)
         pn_clear_modified(connection, connection->transport_head);
     }
     while (connection->tpwork_head) {
-      pn_real_settle(connection->tpwork_head);
       pn_clear_tpwork(connection->tpwork_head);
     }
   }
@@ -306,11 +310,6 @@ void pn_link_free(pn_link_t *link)
     pn_delivery_settle(delivery);
     delivery = next;
   }
-  while (link->settled_head) {
-    delivery = link->settled_head;
-    LL_POP(link, settled, pn_delivery_t);
-    pn_decref(delivery);
-  }
   link->endpoint.freed = true;
   pn_decref(link);
 }
@@ -402,6 +401,7 @@ static void pn_connection_finalize(void *object)
   pn_free(conn->desired_capabilities);
   pn_free(conn->properties);
   pn_endpoint_tini(endpoint);
+  pn_free(conn->delivery_pool);
 }
 
 #define pn_connection_initialize NULL
@@ -433,6 +433,7 @@ pn_connection_t *pn_connection(void)
   conn->properties = pn_data(0);
   conn->collector = NULL;
   conn->context = pn_record();
+  conn->delivery_pool = pn_list(PN_OBJECT, 0);
 
   return conn;
 }
@@ -600,7 +601,6 @@ void pn_add_tpwork(pn_delivery_t *delivery)
   {
     LL_ADD(connection, tpwork, delivery);
     delivery->tpwork = true;
-    pn_incref(delivery);
   }
   pn_modified(connection, &connection->endpoint, true);
 }
@@ -612,7 +612,6 @@ void pn_clear_tpwork(pn_delivery_t *delivery)
   {
     LL_REMOVE(connection, tpwork, delivery);
     delivery->tpwork = false;
-    pn_decref(delivery);  // may free delivery!
   }
 }
 
@@ -634,7 +633,6 @@ void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit
   if (!endpoint->modified) {
     LL_ADD(connection, transport, endpoint);
     endpoint->modified = true;
-    pn_incref(endpoint);
   }
 
   if (emit && connection->transport) {
@@ -650,7 +648,6 @@ void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint)
     endpoint->transport_next = NULL;
     endpoint->transport_prev = NULL;
     endpoint->modified = false;
-    pn_decref(endpoint);  // may free endpoint!
   }
 }
 
@@ -739,12 +736,14 @@ static void pn_session_incref(void *object)
 
 static bool pni_preserve_child(pn_endpoint_t *endpoint, pn_endpoint_t *parent)
 {
-  if (!endpoint->freed && endpoint->referenced) {
+  pn_connection_t *conn = pn_ep_get_connection(endpoint);
+  if ((!endpoint->freed || (conn->transport && endpoint->modified)) && endpoint->referenced) {
     pn_object_incref(endpoint);
     endpoint->referenced = false;
     pn_decref(parent);
     return true;
   } else {
+    LL_REMOVE(conn, transport, endpoint);
     return false;
   }
 }
@@ -912,10 +911,6 @@ static void pn_link_finalize(void *object)
   pn_link_t *link = (pn_link_t *) object;
   pn_endpoint_t *endpoint = &link->endpoint;
 
-  // assumptions: all deliveries freed
-  assert(link->settled_head == NULL);
-  assert(link->unsettled_head == NULL);
-
   if (pni_post_final(endpoint, PN_LINK_FINAL)) {
     return;
   }
@@ -925,6 +920,11 @@ static void pn_link_finalize(void *object)
     return;
   }
 
+  while (link->unsettled_head) {
+    assert(!link->unsettled_head->referenced);
+    pn_free(link->unsettled_head);
+  }
+
   pn_free(link->context);
   pn_terminus_free(&link->source);
   pn_terminus_free(&link->target);
@@ -963,7 +963,6 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name)
   pn_terminus_init(&link->target, PN_TARGET);
   pn_terminus_init(&link->remote_source, PN_UNSPECIFIED);
   pn_terminus_init(&link->remote_target, PN_UNSPECIFIED);
-  link->settled_head = link->settled_tail = NULL;
   link->unsettled_head = link->unsettled_tail = link->current = NULL;
   link->unsettled_count = 0;
   link->available = 0;
@@ -1197,17 +1196,76 @@ static void pn_disposition_finalize(pn_disposition_t *ds)
   pn_condition_tini(&ds->condition);
 }
 
+static bool pni_link_live(pn_link_t *link)
+{
+  return pni_session_live(link->session) || pn_refcount(link) > 1;
+}
+
+static void pn_delivery_incref(void *object)
+{
+  pn_delivery_t *delivery = (pn_delivery_t *) object;
+  if (delivery->link && !delivery->referenced) {
+    delivery->referenced = true;
+    pn_incref(delivery->link);
+  } else {
+    pn_object_incref(object);
+  }
+}
+
+static bool pni_preserve_delivery(pn_delivery_t *delivery)
+{
+  pn_connection_t *conn = delivery->link->session->connection;
+  return !delivery->local.settled || (conn->transport && delivery->tpwork);
+}
+
 static void pn_delivery_finalize(void *object)
 {
   pn_delivery_t *delivery = (pn_delivery_t *) object;
-  assert(delivery->settled);
-  assert(!delivery->state.init);  // no longer in session delivery map
-  pn_free(delivery->context);
-  pn_buffer_free(delivery->tag);
-  pn_buffer_free(delivery->bytes);
-  pn_disposition_finalize(&delivery->local);
-  pn_disposition_finalize(&delivery->remote);
-  pn_decref(delivery->link);
+  pn_link_t *link = delivery->link;
+
+  bool pooled = false;
+  bool referenced = true;
+  if (link) {
+    if (pni_link_live(link) && pni_preserve_delivery(delivery) && delivery->referenced) {
+      delivery->referenced = false;
+      pn_object_incref(delivery);
+      pn_decref(link);
+      return;
+    }
+    referenced = delivery->referenced;
+
+    pn_clear_tpwork(delivery);
+    LL_REMOVE(link, unsettled, delivery);
+    pn_delivery_map_del(pn_link_is_sender(link)
+                        ? &link->session->state.outgoing
+                        : &link->session->state.incoming,
+                        delivery);
+    pn_buffer_clear(delivery->tag);
+    pn_buffer_clear(delivery->bytes);
+    pn_record_clear(delivery->context);
+    delivery->settled = true;
+    pn_connection_t *conn = link->session->connection;
+    assert(pn_refcount(delivery) == 0);
+    if (pni_connection_live(conn)) {
+      pn_list_t *pool = link->session->connection->delivery_pool;
+      delivery->link = NULL;
+      pn_list_add(pool, delivery);
+      pooled = true;
+      assert(pn_refcount(delivery) == 1);
+    }
+  }
+
+  if (!pooled) {
+    pn_free(delivery->context);
+    pn_buffer_free(delivery->tag);
+    pn_buffer_free(delivery->bytes);
+    pn_disposition_finalize(&delivery->local);
+    pn_disposition_finalize(&delivery->remote);
+  }
+
+  if (referenced) {
+    pn_decref(link);
+  }
 }
 
 static void pn_disposition_init(pn_disposition_t *ds)
@@ -1230,6 +1288,11 @@ static void pn_disposition_clear(pn_disposition_t *ds)
   pn_condition_clear(&ds->condition);
 }
 
+#define pn_delivery_new pn_object_new
+#define pn_delivery_refcount pn_object_refcount
+#define pn_delivery_decref pn_object_decref
+#define pn_delivery_free pn_object_free
+#define pn_delivery_reify pn_object_reify
 #define pn_delivery_initialize NULL
 #define pn_delivery_hashcode NULL
 #define pn_delivery_compare NULL
@@ -1238,14 +1301,12 @@ static void pn_disposition_clear(pn_disposition_t *ds)
 pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
 {
   assert(link);
-  pn_delivery_t *delivery = link->settled_head;
-  LL_POP(link, settled, pn_delivery_t);
+  pn_list_t *pool = link->session->connection->delivery_pool;
+  pn_delivery_t *delivery = (pn_delivery_t *) pn_list_pop(pool);
   if (!delivery) {
-    static const pn_class_t clazz = PN_CLASS(pn_delivery);
+    static const pn_class_t clazz = PN_METACLASS(pn_delivery);
     delivery = (pn_delivery_t *) pn_class_new(&clazz, sizeof(pn_delivery_t));
     if (!delivery) return NULL;
-    delivery->link = link;
-    pn_incref(delivery->link);  // keep link until finalized
     delivery->tag = pn_buffer(16);
     delivery->bytes = pn_buffer(64);
     pn_disposition_init(&delivery->local);
@@ -1254,6 +1315,8 @@ pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
   } else {
     assert(!delivery->tpwork);
   }
+  delivery->link = link;
+  pn_incref(delivery->link);  // keep link until finalized
   pn_buffer_clear(delivery->tag);
   pn_buffer_append(delivery->tag, tag.bytes, tag.size);
   pn_disposition_clear(&delivery->local);
@@ -1261,6 +1324,7 @@ pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
   delivery->updated = false;
   delivery->settled = false;
   LL_ADD(link, unsettled, delivery);
+  delivery->referenced = true;
   delivery->work_next = NULL;
   delivery->work_prev = NULL;
   delivery->work = false;
@@ -1556,25 +1620,6 @@ void pn_link_set_rcv_settle_mode(pn_link_t *link, pn_rcv_settle_mode_t mode)
     link->rcv_settle_mode = (uint8_t)mode;
 }
 
-void pn_real_settle(pn_delivery_t *delivery)
-{
-  pn_link_t *link = delivery->link;
-  LL_REMOVE(link, unsettled, delivery);
-  pn_delivery_map_del(pn_link_is_sender(link)
-                      ? &link->session->state.outgoing
-                      : &link->session->state.incoming,
-                      delivery);
-  pn_buffer_clear(delivery->tag);
-  pn_buffer_clear(delivery->bytes);
-  pn_record_clear(delivery->context);
-  delivery->settled = true;
-  if (link->endpoint.freed) {
-    pn_decref(delivery);
-  } else {
-    LL_ADD(link, settled, delivery);
-  }
-}
-
 void pn_delivery_settle(pn_delivery_t *delivery)
 {
   assert(delivery);
@@ -1588,6 +1633,7 @@ void pn_delivery_settle(pn_delivery_t *delivery)
     delivery->local.settled = true;
     pn_add_tpwork(delivery);
     pn_work_update(delivery->link->session->connection, delivery);
+    pn_decref(delivery);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d549ec38/proton-c/src/tests/refcount.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/refcount.c b/proton-c/src/tests/refcount.c
index 2c1cd0c..87ce488 100644
--- a/proton-c/src/tests/refcount.c
+++ b/proton-c/src/tests/refcount.c
@@ -22,6 +22,7 @@
 #include <proton/connection.h>
 #include <proton/session.h>
 #include <proton/link.h>
+#include <proton/delivery.h>
 #include <stdio.h>
 #include <stdlib.h>
 
@@ -161,6 +162,113 @@ static void test_incref_order_ls(void) {
   pn_decref(lnk);
 }
 
+static void swap(int array[], int i, int j) {
+  int a = array[i];
+  int b = array[j];
+  array[j] = a;
+  array[i] = b;
+}
+
+static void setup(void **objects) {
+  pn_connection_t *conn = pn_connection();
+  pn_session_t *ssn = pn_session(conn);
+  pn_link_t *lnk = pn_sender(ssn, "sender");
+  pn_delivery_t *dlv = pn_delivery(lnk, pn_dtag("dtag", 4));
+
+  assert(pn_refcount(conn) == 2);
+  assert(pn_refcount(ssn) == 2);
+  assert(pn_refcount(lnk) == 2);
+  assert(pn_refcount(dlv) == 1);
+
+  objects[0] = conn;
+  objects[1] = ssn;
+  objects[2] = lnk;
+  objects[3] = dlv;
+}
+
+static bool decreffed(int *indexes, void **objects, int step, void *object) {
+  for (int i = 0; i <= step; i++) {
+    if (object == objects[indexes[i]]) {
+      return true;
+    }
+  }
+  return false;
+}
+
+static bool live_descendent(int *indexes, void **objects, int step, int objidx) {
+  for (int i = objidx + 1; i < 4; i++) {
+    if (!decreffed(indexes, objects, step, objects[i])) {
+      return true;
+    }
+  }
+
+  return false;
+}
+
+static void assert_refcount(void *object, int expected) {
+  int rc = pn_refcount(object);
+  //printf("pn_refcount(%s) = %d\n", pn_object_reify(object)->name, rc);
+  assert(rc == expected);
+}
+
+static void test_decref_order(int *indexes, void **objects) {
+  setup(objects);
+
+  //printf("-----------\n");
+  for (int i = 0; i < 3; i++) {
+    int idx = indexes[i];
+    void *obj = objects[idx];
+    //printf("decreffing %s\n", pn_object_reify(obj)->name);
+    pn_decref(obj);
+    for (int j = 0; j <= i; j++) {
+      // everything we've decreffed already should have a refcount of
+      // 1 because it has been preserved by its parent
+      assert_refcount(objects[indexes[j]], 1);
+    }
+    for (int j = i+1; j < 4; j++) {
+      // everything we haven't decreffed yet should have a refcount of
+      // 2 unless it has a descendent that has not been decrefed (or
+      // it has no child) in which case it should have a refcount of 1
+      int idx = indexes[j];
+      void *obj = objects[idx];
+      assert(!decreffed(indexes, objects, i, obj));
+      if (live_descendent(indexes, objects, i, idx)) {
+        assert_refcount(obj, 2);
+      } else {
+        assert_refcount(obj, 1);
+      }
+    }
+  }
+
+  void *last = objects[indexes[3]];
+  //printf("decreffing %s\n", pn_object_reify(last)->name);
+  pn_decref(last);
+  // all should be gone now, need to run with valgrind to check
+}
+
+static void permute(int n, int *indexes, void **objects) {
+  int j;
+  if (n == 1) {
+    test_decref_order(indexes, objects);
+  } else {
+    for (int i = 1; i <= n; i++) {
+      permute(n-1, indexes, objects);
+      if ((n % 2) == 1) {
+        j = 1;
+      } else {
+        j = i;
+      }
+      swap(indexes, j-1, n-1);
+    }
+  }
+}
+
+static void test_decref_permutations(void) {
+  void *objects[4];
+  int indexes[4] = {0, 1, 2, 3};
+  permute(4, indexes, objects);
+}
+
 int main(int argc, char **argv)
 {
   test_decref_order_csl();
@@ -172,5 +280,7 @@ int main(int argc, char **argv)
 
   test_incref_order_sl();
   test_incref_order_ls();
+
+  test_decref_permutations();
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d549ec38/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index 08072fe..ec0ec3e 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -377,6 +377,7 @@ void pni_transport_unbind_handles(pn_hash_t *handles, bool reset_state);
 static void pni_unmap_remote_channel(pn_session_t *ssn)
 {
   // XXX: should really update link state also
+  pn_delivery_map_clear(&ssn->state.incoming);
   pni_transport_unbind_handles(ssn->state.remote_handles, false);
   pn_transport_t *transport = ssn->connection->transport;
   uint16_t channel = ssn->state.remote_channel;
@@ -463,9 +464,10 @@ int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection)
   transport->connection = connection;
   connection->transport = transport;
 
-  pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND);
-
   pn_incref(connection);
+
+  pn_connection_bound(connection);
+
   if (transport->open_rcvd) {
     PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
     pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_REMOTE_OPEN);
@@ -493,6 +495,8 @@ void pni_transport_unbind_channels(pn_hash_t *channels)
   for (pn_handle_t h = pn_hash_head(channels); h; h = pn_hash_next(channels, h)) {
     uintptr_t key = pn_hash_key(channels, h);
     pn_session_t *ssn = (pn_session_t *) pn_hash_value(channels, h);
+    pn_delivery_map_clear(&ssn->state.incoming);
+    pn_delivery_map_clear(&ssn->state.outgoing);
     pni_transport_unbind_handles(ssn->state.local_handles, true);
     pni_transport_unbind_handles(ssn->state.remote_handles, true);
     pn_session_unbound(ssn);
@@ -952,7 +956,6 @@ static void pn_full_settle(pn_delivery_map_t *db, pn_delivery_t *delivery)
 {
   assert(!delivery->work);
   pn_clear_tpwork(delivery);
-  pn_real_settle(delivery);
 }
 
 int pn_do_transfer(pn_dispatcher_t *disp)
@@ -1901,6 +1904,7 @@ bool pn_pointful_buffering(pn_transport_t *transport, pn_session_t *session)
 
 static void pni_unmap_local_channel(pn_session_t *ssn) {
   // XXX: should really update link state also
+  pn_delivery_map_clear(&ssn->state.outgoing);
   pni_transport_unbind_handles(ssn->state.local_handles, false);
   pn_transport_t *transport = ssn->connection->transport;
   pn_session_state_t *state = &ssn->state;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org