You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/12/06 12:22:33 UTC
[2/2] qpid-proton git commit: pool deliveries at the connection level;
improved delivery refcounting semantics
pool deliveries at the connection level; improved delivery refcounting semantics
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d549ec38
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d549ec38
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d549ec38
Branch: refs/heads/master
Commit: d549ec388dfff3abc262c2ca226a36f42b37ee16
Parents: 3b8de5b
Author: Rafael Schloming <rh...@alum.mit.edu>
Authored: Sat Dec 6 06:20:15 2014 -0500
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Sat Dec 6 06:20:15 2014 -0500
----------------------------------------------------------------------
proton-c/src/engine/engine-internal.h | 7 +-
proton-c/src/engine/engine.c | 142 +++++++++++++++++++----------
proton-c/src/tests/refcount.c | 110 ++++++++++++++++++++++
proton-c/src/transport/transport.c | 10 +-
4 files changed, 214 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d549ec38/proton-c/src/engine/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h
index 16ce6a4..204ef01 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -201,6 +201,7 @@ struct pn_connection_t {
pn_data_t *properties;
pn_collector_t *collector;
pn_record_t *context;
+ pn_list_t *delivery_pool;
};
struct pn_session_t {
@@ -242,8 +243,6 @@ struct pn_link_t {
pn_delivery_t *unsettled_head;
pn_delivery_t *unsettled_tail;
pn_delivery_t *current;
- pn_delivery_t *settled_head;
- pn_delivery_t *settled_tail;
pn_record_t *context;
size_t unsettled_count;
pn_sequence_t available;
@@ -278,8 +277,6 @@ struct pn_delivery_t {
pn_buffer_t *tag;
pn_delivery_t *unsettled_next;
pn_delivery_t *unsettled_prev;
- pn_delivery_t *settled_next;
- pn_delivery_t *settled_prev;
pn_delivery_t *work_next;
pn_delivery_t *work_prev;
pn_delivery_t *tpwork_next;
@@ -292,6 +289,7 @@ struct pn_delivery_t {
bool work;
bool tpwork;
bool done;
+ bool referenced;
};
#define PN_SET_LOCAL(OLD, NEW) \
@@ -312,6 +310,7 @@ void pn_real_settle(pn_delivery_t *delivery); // will free delivery if link is
void pn_clear_tpwork(pn_delivery_t *delivery);
void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery);
void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
+void pn_connection_bound(pn_connection_t *conn);
void pn_connection_unbound(pn_connection_t *conn);
int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...);
void pn_session_unbound(pn_session_t* ssn);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d549ec38/proton-c/src/engine/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c
index 96c601f..1f2768a 100644
--- a/proton-c/src/engine/engine.c
+++ b/proton-c/src/engine/engine.c
@@ -134,6 +134,11 @@ void pn_connection_free(pn_connection_t *connection)
pn_decref(connection);
}
+void pn_connection_bound(pn_connection_t *connection)
+{
+ pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND);
+}
+
// invoked when transport has been removed:
void pn_connection_unbound(pn_connection_t *connection)
{
@@ -146,7 +151,6 @@ void pn_connection_unbound(pn_connection_t *connection)
pn_clear_modified(connection, connection->transport_head);
}
while (connection->tpwork_head) {
- pn_real_settle(connection->tpwork_head);
pn_clear_tpwork(connection->tpwork_head);
}
}
@@ -306,11 +310,6 @@ void pn_link_free(pn_link_t *link)
pn_delivery_settle(delivery);
delivery = next;
}
- while (link->settled_head) {
- delivery = link->settled_head;
- LL_POP(link, settled, pn_delivery_t);
- pn_decref(delivery);
- }
link->endpoint.freed = true;
pn_decref(link);
}
@@ -402,6 +401,7 @@ static void pn_connection_finalize(void *object)
pn_free(conn->desired_capabilities);
pn_free(conn->properties);
pn_endpoint_tini(endpoint);
+ pn_free(conn->delivery_pool);
}
#define pn_connection_initialize NULL
@@ -433,6 +433,7 @@ pn_connection_t *pn_connection(void)
conn->properties = pn_data(0);
conn->collector = NULL;
conn->context = pn_record();
+ conn->delivery_pool = pn_list(PN_OBJECT, 0);
return conn;
}
@@ -600,7 +601,6 @@ void pn_add_tpwork(pn_delivery_t *delivery)
{
LL_ADD(connection, tpwork, delivery);
delivery->tpwork = true;
- pn_incref(delivery);
}
pn_modified(connection, &connection->endpoint, true);
}
@@ -612,7 +612,6 @@ void pn_clear_tpwork(pn_delivery_t *delivery)
{
LL_REMOVE(connection, tpwork, delivery);
delivery->tpwork = false;
- pn_decref(delivery); // may free delivery!
}
}
@@ -634,7 +633,6 @@ void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit
if (!endpoint->modified) {
LL_ADD(connection, transport, endpoint);
endpoint->modified = true;
- pn_incref(endpoint);
}
if (emit && connection->transport) {
@@ -650,7 +648,6 @@ void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint)
endpoint->transport_next = NULL;
endpoint->transport_prev = NULL;
endpoint->modified = false;
- pn_decref(endpoint); // may free endpoint!
}
}
@@ -739,12 +736,14 @@ static void pn_session_incref(void *object)
static bool pni_preserve_child(pn_endpoint_t *endpoint, pn_endpoint_t *parent)
{
- if (!endpoint->freed && endpoint->referenced) {
+ pn_connection_t *conn = pn_ep_get_connection(endpoint);
+ if ((!endpoint->freed || (conn->transport && endpoint->modified)) && endpoint->referenced) {
pn_object_incref(endpoint);
endpoint->referenced = false;
pn_decref(parent);
return true;
} else {
+ LL_REMOVE(conn, transport, endpoint);
return false;
}
}
@@ -912,10 +911,6 @@ static void pn_link_finalize(void *object)
pn_link_t *link = (pn_link_t *) object;
pn_endpoint_t *endpoint = &link->endpoint;
- // assumptions: all deliveries freed
- assert(link->settled_head == NULL);
- assert(link->unsettled_head == NULL);
-
if (pni_post_final(endpoint, PN_LINK_FINAL)) {
return;
}
@@ -925,6 +920,11 @@ static void pn_link_finalize(void *object)
return;
}
+ while (link->unsettled_head) {
+ assert(!link->unsettled_head->referenced);
+ pn_free(link->unsettled_head);
+ }
+
pn_free(link->context);
pn_terminus_free(&link->source);
pn_terminus_free(&link->target);
@@ -963,7 +963,6 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name)
pn_terminus_init(&link->target, PN_TARGET);
pn_terminus_init(&link->remote_source, PN_UNSPECIFIED);
pn_terminus_init(&link->remote_target, PN_UNSPECIFIED);
- link->settled_head = link->settled_tail = NULL;
link->unsettled_head = link->unsettled_tail = link->current = NULL;
link->unsettled_count = 0;
link->available = 0;
@@ -1197,17 +1196,76 @@ static void pn_disposition_finalize(pn_disposition_t *ds)
pn_condition_tini(&ds->condition);
}
+static bool pni_link_live(pn_link_t *link)
+{
+ return pni_session_live(link->session) || pn_refcount(link) > 1;
+}
+
+static void pn_delivery_incref(void *object)
+{
+ pn_delivery_t *delivery = (pn_delivery_t *) object;
+ if (delivery->link && !delivery->referenced) {
+ delivery->referenced = true;
+ pn_incref(delivery->link);
+ } else {
+ pn_object_incref(object);
+ }
+}
+
+static bool pni_preserve_delivery(pn_delivery_t *delivery)
+{
+ pn_connection_t *conn = delivery->link->session->connection;
+ return !delivery->local.settled || (conn->transport && delivery->tpwork);
+}
+
static void pn_delivery_finalize(void *object)
{
pn_delivery_t *delivery = (pn_delivery_t *) object;
- assert(delivery->settled);
- assert(!delivery->state.init); // no longer in session delivery map
- pn_free(delivery->context);
- pn_buffer_free(delivery->tag);
- pn_buffer_free(delivery->bytes);
- pn_disposition_finalize(&delivery->local);
- pn_disposition_finalize(&delivery->remote);
- pn_decref(delivery->link);
+ pn_link_t *link = delivery->link;
+
+ bool pooled = false;
+ bool referenced = true;
+ if (link) {
+ if (pni_link_live(link) && pni_preserve_delivery(delivery) && delivery->referenced) {
+ delivery->referenced = false;
+ pn_object_incref(delivery);
+ pn_decref(link);
+ return;
+ }
+ referenced = delivery->referenced;
+
+ pn_clear_tpwork(delivery);
+ LL_REMOVE(link, unsettled, delivery);
+ pn_delivery_map_del(pn_link_is_sender(link)
+ ? &link->session->state.outgoing
+ : &link->session->state.incoming,
+ delivery);
+ pn_buffer_clear(delivery->tag);
+ pn_buffer_clear(delivery->bytes);
+ pn_record_clear(delivery->context);
+ delivery->settled = true;
+ pn_connection_t *conn = link->session->connection;
+ assert(pn_refcount(delivery) == 0);
+ if (pni_connection_live(conn)) {
+ pn_list_t *pool = link->session->connection->delivery_pool;
+ delivery->link = NULL;
+ pn_list_add(pool, delivery);
+ pooled = true;
+ assert(pn_refcount(delivery) == 1);
+ }
+ }
+
+ if (!pooled) {
+ pn_free(delivery->context);
+ pn_buffer_free(delivery->tag);
+ pn_buffer_free(delivery->bytes);
+ pn_disposition_finalize(&delivery->local);
+ pn_disposition_finalize(&delivery->remote);
+ }
+
+ if (referenced) {
+ pn_decref(link);
+ }
}
static void pn_disposition_init(pn_disposition_t *ds)
@@ -1230,6 +1288,11 @@ static void pn_disposition_clear(pn_disposition_t *ds)
pn_condition_clear(&ds->condition);
}
+#define pn_delivery_new pn_object_new
+#define pn_delivery_refcount pn_object_refcount
+#define pn_delivery_decref pn_object_decref
+#define pn_delivery_free pn_object_free
+#define pn_delivery_reify pn_object_reify
#define pn_delivery_initialize NULL
#define pn_delivery_hashcode NULL
#define pn_delivery_compare NULL
@@ -1238,14 +1301,12 @@ static void pn_disposition_clear(pn_disposition_t *ds)
pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
{
assert(link);
- pn_delivery_t *delivery = link->settled_head;
- LL_POP(link, settled, pn_delivery_t);
+ pn_list_t *pool = link->session->connection->delivery_pool;
+ pn_delivery_t *delivery = (pn_delivery_t *) pn_list_pop(pool);
if (!delivery) {
- static const pn_class_t clazz = PN_CLASS(pn_delivery);
+ static const pn_class_t clazz = PN_METACLASS(pn_delivery);
delivery = (pn_delivery_t *) pn_class_new(&clazz, sizeof(pn_delivery_t));
if (!delivery) return NULL;
- delivery->link = link;
- pn_incref(delivery->link); // keep link until finalized
delivery->tag = pn_buffer(16);
delivery->bytes = pn_buffer(64);
pn_disposition_init(&delivery->local);
@@ -1254,6 +1315,8 @@ pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
} else {
assert(!delivery->tpwork);
}
+ delivery->link = link;
+ pn_incref(delivery->link); // keep link until finalized
pn_buffer_clear(delivery->tag);
pn_buffer_append(delivery->tag, tag.bytes, tag.size);
pn_disposition_clear(&delivery->local);
@@ -1261,6 +1324,7 @@ pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
delivery->updated = false;
delivery->settled = false;
LL_ADD(link, unsettled, delivery);
+ delivery->referenced = true;
delivery->work_next = NULL;
delivery->work_prev = NULL;
delivery->work = false;
@@ -1556,25 +1620,6 @@ void pn_link_set_rcv_settle_mode(pn_link_t *link, pn_rcv_settle_mode_t mode)
link->rcv_settle_mode = (uint8_t)mode;
}
-void pn_real_settle(pn_delivery_t *delivery)
-{
- pn_link_t *link = delivery->link;
- LL_REMOVE(link, unsettled, delivery);
- pn_delivery_map_del(pn_link_is_sender(link)
- ? &link->session->state.outgoing
- : &link->session->state.incoming,
- delivery);
- pn_buffer_clear(delivery->tag);
- pn_buffer_clear(delivery->bytes);
- pn_record_clear(delivery->context);
- delivery->settled = true;
- if (link->endpoint.freed) {
- pn_decref(delivery);
- } else {
- LL_ADD(link, settled, delivery);
- }
-}
-
void pn_delivery_settle(pn_delivery_t *delivery)
{
assert(delivery);
@@ -1588,6 +1633,7 @@ void pn_delivery_settle(pn_delivery_t *delivery)
delivery->local.settled = true;
pn_add_tpwork(delivery);
pn_work_update(delivery->link->session->connection, delivery);
+ pn_decref(delivery);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d549ec38/proton-c/src/tests/refcount.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/refcount.c b/proton-c/src/tests/refcount.c
index 2c1cd0c..87ce488 100644
--- a/proton-c/src/tests/refcount.c
+++ b/proton-c/src/tests/refcount.c
@@ -22,6 +22,7 @@
#include <proton/connection.h>
#include <proton/session.h>
#include <proton/link.h>
+#include <proton/delivery.h>
#include <stdio.h>
#include <stdlib.h>
@@ -161,6 +162,113 @@ static void test_incref_order_ls(void) {
pn_decref(lnk);
}
+static void swap(int array[], int i, int j) {
+ int a = array[i];
+ int b = array[j];
+ array[j] = a;
+ array[i] = b;
+}
+
+static void setup(void **objects) {
+ pn_connection_t *conn = pn_connection();
+ pn_session_t *ssn = pn_session(conn);
+ pn_link_t *lnk = pn_sender(ssn, "sender");
+ pn_delivery_t *dlv = pn_delivery(lnk, pn_dtag("dtag", 4));
+
+ assert(pn_refcount(conn) == 2);
+ assert(pn_refcount(ssn) == 2);
+ assert(pn_refcount(lnk) == 2);
+ assert(pn_refcount(dlv) == 1);
+
+ objects[0] = conn;
+ objects[1] = ssn;
+ objects[2] = lnk;
+ objects[3] = dlv;
+}
+
+static bool decreffed(int *indexes, void **objects, int step, void *object) {
+ for (int i = 0; i <= step; i++) {
+ if (object == objects[indexes[i]]) {
+ return true;
+ }
+ }
+ return false;
+}
+
+static bool live_descendent(int *indexes, void **objects, int step, int objidx) {
+ for (int i = objidx + 1; i < 4; i++) {
+ if (!decreffed(indexes, objects, step, objects[i])) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+static void assert_refcount(void *object, int expected) {
+ int rc = pn_refcount(object);
+ //printf("pn_refcount(%s) = %d\n", pn_object_reify(object)->name, rc);
+ assert(rc == expected);
+}
+
+static void test_decref_order(int *indexes, void **objects) {
+ setup(objects);
+
+ //printf("-----------\n");
+ for (int i = 0; i < 3; i++) {
+ int idx = indexes[i];
+ void *obj = objects[idx];
+ //printf("decreffing %s\n", pn_object_reify(obj)->name);
+ pn_decref(obj);
+ for (int j = 0; j <= i; j++) {
+ // everything we've decreffed already should have a refcount of
+ // 1 because it has been preserved by its parent
+ assert_refcount(objects[indexes[j]], 1);
+ }
+ for (int j = i+1; j < 4; j++) {
+ // everything we haven't decreffed yet should have a refcount of
+ // 2 unless it has a descendent that has not been decrefed (or
+ // it has no child) in which case it should have a refcount of 1
+ int idx = indexes[j];
+ void *obj = objects[idx];
+ assert(!decreffed(indexes, objects, i, obj));
+ if (live_descendent(indexes, objects, i, idx)) {
+ assert_refcount(obj, 2);
+ } else {
+ assert_refcount(obj, 1);
+ }
+ }
+ }
+
+ void *last = objects[indexes[3]];
+ //printf("decreffing %s\n", pn_object_reify(last)->name);
+ pn_decref(last);
+ // all should be gone now, need to run with valgrind to check
+}
+
+static void permute(int n, int *indexes, void **objects) {
+ int j;
+ if (n == 1) {
+ test_decref_order(indexes, objects);
+ } else {
+ for (int i = 1; i <= n; i++) {
+ permute(n-1, indexes, objects);
+ if ((n % 2) == 1) {
+ j = 1;
+ } else {
+ j = i;
+ }
+ swap(indexes, j-1, n-1);
+ }
+ }
+}
+
+static void test_decref_permutations(void) {
+ void *objects[4];
+ int indexes[4] = {0, 1, 2, 3};
+ permute(4, indexes, objects);
+}
+
int main(int argc, char **argv)
{
test_decref_order_csl();
@@ -172,5 +280,7 @@ int main(int argc, char **argv)
test_incref_order_sl();
test_incref_order_ls();
+
+ test_decref_permutations();
return 0;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d549ec38/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index 08072fe..ec0ec3e 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -377,6 +377,7 @@ void pni_transport_unbind_handles(pn_hash_t *handles, bool reset_state);
static void pni_unmap_remote_channel(pn_session_t *ssn)
{
// XXX: should really update link state also
+ pn_delivery_map_clear(&ssn->state.incoming);
pni_transport_unbind_handles(ssn->state.remote_handles, false);
pn_transport_t *transport = ssn->connection->transport;
uint16_t channel = ssn->state.remote_channel;
@@ -463,9 +464,10 @@ int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection)
transport->connection = connection;
connection->transport = transport;
- pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND);
-
pn_incref(connection);
+
+ pn_connection_bound(connection);
+
if (transport->open_rcvd) {
PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_REMOTE_OPEN);
@@ -493,6 +495,8 @@ void pni_transport_unbind_channels(pn_hash_t *channels)
for (pn_handle_t h = pn_hash_head(channels); h; h = pn_hash_next(channels, h)) {
uintptr_t key = pn_hash_key(channels, h);
pn_session_t *ssn = (pn_session_t *) pn_hash_value(channels, h);
+ pn_delivery_map_clear(&ssn->state.incoming);
+ pn_delivery_map_clear(&ssn->state.outgoing);
pni_transport_unbind_handles(ssn->state.local_handles, true);
pni_transport_unbind_handles(ssn->state.remote_handles, true);
pn_session_unbound(ssn);
@@ -952,7 +956,6 @@ static void pn_full_settle(pn_delivery_map_t *db, pn_delivery_t *delivery)
{
assert(!delivery->work);
pn_clear_tpwork(delivery);
- pn_real_settle(delivery);
}
int pn_do_transfer(pn_dispatcher_t *disp)
@@ -1901,6 +1904,7 @@ bool pn_pointful_buffering(pn_transport_t *transport, pn_session_t *session)
static void pni_unmap_local_channel(pn_session_t *ssn) {
// XXX: should really update link state also
+ pn_delivery_map_clear(&ssn->state.outgoing);
pni_transport_unbind_handles(ssn->state.local_handles, false);
pn_transport_t *transport = ssn->connection->transport;
pn_session_state_t *state = &ssn->state;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org