You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2017/06/20 17:31:40 UTC

qpid-dispatch git commit: DISPATCH-788 - First attempt at adding a core thread API around handling of peers. The next step is to introduce a list to hold more than one peer

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master e22091b3f -> db96fd8f1


DISPATCH-788 - First attempt at adding a core thread API around handling of peers. The next step is to introduce a list to hold more than one peer

(cherry picked from commit b196ebb3159ed7670f1f295d06fed97ae0bedda1)


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/db96fd8f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/db96fd8f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/db96fd8f

Branch: refs/heads/master
Commit: db96fd8f1748b4c7faeefeb8580567143ad5350a
Parents: e22091b
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Mon Jun 19 10:57:26 2017 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Tue Jun 20 13:04:11 2017 -0400

----------------------------------------------------------------------
 src/router_core/connections.c         | 21 +++++-------
 src/router_core/forwarder.c           | 37 +++++++++-----------
 src/router_core/router_core_private.h | 24 +++++++++++++
 src/router_core/transfer.c            | 54 ++++++++++++++++++++++++++----
 4 files changed, 94 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db96fd8f/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index f0b8d8e..df2f69e 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -673,13 +673,11 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     qdr_delivery_t *peer;
     while (dlv) {
         DEQ_REMOVE_HEAD(undelivered);
-        peer = dlv->peer;
-        if (peer) {
-            dlv->peer  = 0;
-            peer->peer = 0;
+        peer = qdr_delivery_first_peer_CT(dlv);
+        while (peer) {
             qdr_delivery_release_CT(core, peer);
-            qdr_delivery_decref_CT(core, peer);
-            qdr_delivery_decref_CT(core, dlv);
+            qdr_delivery_unlink_peers_CT(core, dlv, peer);
+            peer = qdr_delivery_next_peer_CT(dlv);
         }
 
         //
@@ -715,15 +713,12 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
             dlv->tracking_addr = 0;
         }
 
-        peer = dlv->peer;
-        if (peer) {
-            dlv->peer  = 0;
-            peer->peer = 0;
+        peer = qdr_delivery_first_peer_CT(dlv);
+        while (peer) {
             if (link->link_direction == QD_OUTGOING)
                 qdr_delivery_failed_CT(core, peer);
-
-            qdr_delivery_decref_CT(core, peer);
-            qdr_delivery_decref_CT(core, dlv);
+            qdr_delivery_unlink_peers_CT(core, dlv, peer);
+            peer = qdr_delivery_next_peer_CT(dlv);
         }
 
         //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db96fd8f/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 8cfcb03..7828c6a 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -101,33 +101,26 @@ static void qdr_forward_find_closest_remotes_CT(qdr_core_t *core, qdr_address_t
 
 qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, qdr_link_t *link, qd_message_t *msg)
 {
-    qdr_delivery_t *dlv = new_qdr_delivery_t();
-    uint64_t       *tag = (uint64_t*) dlv->tag;
-
-    ZERO(dlv);
-    sys_atomic_init(&dlv->ref_count, 0);
-    dlv->link       = link;
-    dlv->msg        = qd_message_copy(msg);
-    dlv->settled    = !in_dlv || in_dlv->settled;
-    dlv->presettled = dlv->settled;
+    qdr_delivery_t *out_dlv = new_qdr_delivery_t();
+    uint64_t       *tag = (uint64_t*) out_dlv->tag;
+
+    ZERO(out_dlv);
+    sys_atomic_init(&out_dlv->ref_count, 0);
+    out_dlv->link       = link;
+    out_dlv->msg        = qd_message_copy(msg);
+    out_dlv->settled    = !in_dlv || in_dlv->settled;
+    out_dlv->presettled = out_dlv->settled;
     *tag            = core->next_tag++;
-    dlv->tag_length = 8;
-    dlv->error      = 0;
+    out_dlv->tag_length = 8;
+    out_dlv->error      = 0;
 
     //
-    // Create peer linkage only if the delivery is not settled
+    // Create peer linkage only if the outgoing delivery is not settled
     //
-    if (!dlv->settled) {
-        if (in_dlv && in_dlv->peer == 0) {
-            dlv->peer = in_dlv;
-            in_dlv->peer = dlv;
+    if (!out_dlv->settled && in_dlv)
+        qdr_delivery_link_peers_CT(in_dlv, out_dlv);
 
-            qdr_delivery_incref(dlv);
-            qdr_delivery_incref(in_dlv);
-        }
-    }
-
-    return dlv;
+    return out_dlv;
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db96fd8f/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 07d832b..f33aeda 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -309,6 +309,7 @@ struct qdr_delivery_t {
     sys_atomic_t         ref_count;
     qdr_link_t          *link;
     qdr_delivery_t      *peer;
+    qdr_delivery_t      *next_peer;
     qd_message_t        *msg;
     qd_iterator_t       *to_addr;
     qd_iterator_t       *origin;
@@ -693,6 +694,29 @@ void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+
+/**
+ * Links the in_dlv to the out_dlv and increments ref counts of both deliveries
+ */
+void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv);
+
+/**
+ * Zeroes out peer references from both peers and decrefs ref counts.
+ */
+void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer);
+
+/**
+ * Returns the first peer of the delivery.
+ * @see qdr_delivery_next_peer_CT
+ */
+qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv);
+
+/**
+ * Returns the next peer of the passed in delivery.
+ */
+qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv);
+
+
 void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
 
 void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db96fd8f/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 40d4c45..7dc4970 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -413,6 +413,51 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de
 }
 
 
+void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv)
+{
+    assert(!in_dlv->peer);
+    assert(!out_dlv->peer);
+
+    out_dlv->peer = in_dlv;
+    in_dlv->peer = out_dlv;
+
+    qdr_delivery_incref(out_dlv);
+    qdr_delivery_incref(in_dlv);
+}
+
+
+void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer)
+{
+    //
+    // Make sure that the passed in deliveries are indeed peers.
+    //
+
+    assert(dlv->peer == peer);
+    assert(peer->peer == dlv);
+
+    dlv->peer  = 0;
+    peer->peer = 0;
+
+    qdr_delivery_decref_CT(core, dlv);
+    qdr_delivery_decref_CT(core, peer);
+}
+
+
+qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv)
+{
+    dlv->next_peer = dlv->peer;
+    return dlv->next_peer;
+}
+
+qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv)
+{
+    //Get the next peer. In the current case we have no next peer.
+    // When a peer list is introduced, this function might return something based on the content of the peer list.
+    dlv->next_peer = 0;
+    return dlv->next_peer;
+}
+
+
 void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *dlv)
 {
     uint32_t ref_count = sys_atomic_dec(&dlv->ref_count);
@@ -671,7 +716,7 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     qdr_delivery_t *dlv        = action->args.delivery.delivery;
-    qdr_delivery_t *peer       = dlv->peer;
+    qdr_delivery_t *peer       = qdr_delivery_first_peer_CT(dlv);
     bool            push       = false;
     bool            peer_moved = false;
     bool            dlv_moved  = false;
@@ -705,17 +750,12 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
     if (settled) {
         if (peer) {
             peer->settled = true;
-            peer->peer = 0;
-            dlv->peer  = 0;
-
             if (peer->link) {
                 peer_moved = qdr_delivery_settled_CT(core, peer);
                 if (peer_moved)
                     push = true;
             }
-
-            qdr_delivery_decref_CT(core, dlv);
-            qdr_delivery_decref_CT(core, peer);
+            qdr_delivery_unlink_peers_CT(core, dlv, peer);
         }
 
         if (dlv->link)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org