You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2017/06/20 17:31:40 UTC
qpid-dispatch git commit: DISPATCH-788 - First attempt at adding a
core thread API around handling of peers. The next step is to introduce a
list to hold more than one peer
Repository: qpid-dispatch
Updated Branches:
refs/heads/master e22091b3f -> db96fd8f1
DISPATCH-788 - First attempt at adding a core thread API around handling of peers. The next step is to introduce a list to hold more than one peer
(cherry picked from commit b196ebb3159ed7670f1f295d06fed97ae0bedda1)
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/db96fd8f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/db96fd8f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/db96fd8f
Branch: refs/heads/master
Commit: db96fd8f1748b4c7faeefeb8580567143ad5350a
Parents: e22091b
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Mon Jun 19 10:57:26 2017 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Tue Jun 20 13:04:11 2017 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 21 +++++-------
src/router_core/forwarder.c | 37 +++++++++-----------
src/router_core/router_core_private.h | 24 +++++++++++++
src/router_core/transfer.c | 54 ++++++++++++++++++++++++++----
4 files changed, 94 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db96fd8f/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index f0b8d8e..df2f69e 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -673,13 +673,11 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
qdr_delivery_t *peer;
while (dlv) {
DEQ_REMOVE_HEAD(undelivered);
- peer = dlv->peer;
- if (peer) {
- dlv->peer = 0;
- peer->peer = 0;
+ peer = qdr_delivery_first_peer_CT(dlv);
+ while (peer) {
qdr_delivery_release_CT(core, peer);
- qdr_delivery_decref_CT(core, peer);
- qdr_delivery_decref_CT(core, dlv);
+ qdr_delivery_unlink_peers_CT(core, dlv, peer);
+ peer = qdr_delivery_next_peer_CT(dlv);
}
//
@@ -715,15 +713,12 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
dlv->tracking_addr = 0;
}
- peer = dlv->peer;
- if (peer) {
- dlv->peer = 0;
- peer->peer = 0;
+ peer = qdr_delivery_first_peer_CT(dlv);
+ while (peer) {
if (link->link_direction == QD_OUTGOING)
qdr_delivery_failed_CT(core, peer);
-
- qdr_delivery_decref_CT(core, peer);
- qdr_delivery_decref_CT(core, dlv);
+ qdr_delivery_unlink_peers_CT(core, dlv, peer);
+ peer = qdr_delivery_next_peer_CT(dlv);
}
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db96fd8f/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 8cfcb03..7828c6a 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -101,33 +101,26 @@ static void qdr_forward_find_closest_remotes_CT(qdr_core_t *core, qdr_address_t
qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, qdr_link_t *link, qd_message_t *msg)
{
- qdr_delivery_t *dlv = new_qdr_delivery_t();
- uint64_t *tag = (uint64_t*) dlv->tag;
-
- ZERO(dlv);
- sys_atomic_init(&dlv->ref_count, 0);
- dlv->link = link;
- dlv->msg = qd_message_copy(msg);
- dlv->settled = !in_dlv || in_dlv->settled;
- dlv->presettled = dlv->settled;
+ qdr_delivery_t *out_dlv = new_qdr_delivery_t();
+ uint64_t *tag = (uint64_t*) out_dlv->tag;
+
+ ZERO(out_dlv);
+ sys_atomic_init(&out_dlv->ref_count, 0);
+ out_dlv->link = link;
+ out_dlv->msg = qd_message_copy(msg);
+ out_dlv->settled = !in_dlv || in_dlv->settled;
+ out_dlv->presettled = out_dlv->settled;
*tag = core->next_tag++;
- dlv->tag_length = 8;
- dlv->error = 0;
+ out_dlv->tag_length = 8;
+ out_dlv->error = 0;
//
- // Create peer linkage only if the delivery is not settled
+ // Create peer linkage only if the outgoing delivery is not settled
//
- if (!dlv->settled) {
- if (in_dlv && in_dlv->peer == 0) {
- dlv->peer = in_dlv;
- in_dlv->peer = dlv;
+ if (!out_dlv->settled && in_dlv)
+ qdr_delivery_link_peers_CT(in_dlv, out_dlv);
- qdr_delivery_incref(dlv);
- qdr_delivery_incref(in_dlv);
- }
- }
-
- return dlv;
+ return out_dlv;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db96fd8f/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 07d832b..f33aeda 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -309,6 +309,7 @@ struct qdr_delivery_t {
sys_atomic_t ref_count;
qdr_link_t *link;
qdr_delivery_t *peer;
+ qdr_delivery_t *next_peer;
qd_message_t *msg;
qd_iterator_t *to_addr;
qd_iterator_t *origin;
@@ -693,6 +694,29 @@ void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);
bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery);
void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+
+/**
+ * Links the in_dlv to the out_dlv and increments ref counts of both deliveries
+ */
+void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv);
+
+/**
+ * Zeroes out peer references from both peers and decrefs ref counts.
+ */
+void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer);
+
+/**
+ * Returns the first peer of the delivery.
+ * @see qdr_delivery_next_peer_CT
+ */
+qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv);
+
+/**
+ * Returns the next peer of the passed in delivery.
+ */
+qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv);
+
+
void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db96fd8f/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 40d4c45..7dc4970 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -413,6 +413,51 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de
}
+void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv)
+{
+ assert(!in_dlv->peer);
+ assert(!out_dlv->peer);
+
+ out_dlv->peer = in_dlv;
+ in_dlv->peer = out_dlv;
+
+ qdr_delivery_incref(out_dlv);
+ qdr_delivery_incref(in_dlv);
+}
+
+
+void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer)
+{
+ //
+ // Make sure that the passed in deliveries are indeed peers.
+ //
+
+ assert(dlv->peer == peer);
+ assert(peer->peer == dlv);
+
+ dlv->peer = 0;
+ peer->peer = 0;
+
+ qdr_delivery_decref_CT(core, dlv);
+ qdr_delivery_decref_CT(core, peer);
+}
+
+
+qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv)
+{
+ dlv->next_peer = dlv->peer;
+ return dlv->next_peer;
+}
+
+qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv)
+{
+ //Get the next peer. In the current case we have no next peer.
+ // When a peer list is introduced, this function might return something based on the content of the peer list.
+ dlv->next_peer = 0;
+ return dlv->next_peer;
+}
+
+
void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *dlv)
{
uint32_t ref_count = sys_atomic_dec(&dlv->ref_count);
@@ -671,7 +716,7 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
qdr_delivery_t *dlv = action->args.delivery.delivery;
- qdr_delivery_t *peer = dlv->peer;
+ qdr_delivery_t *peer = qdr_delivery_first_peer_CT(dlv);
bool push = false;
bool peer_moved = false;
bool dlv_moved = false;
@@ -705,17 +750,12 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
if (settled) {
if (peer) {
peer->settled = true;
- peer->peer = 0;
- dlv->peer = 0;
-
if (peer->link) {
peer_moved = qdr_delivery_settled_CT(core, peer);
if (peer_moved)
push = true;
}
-
- qdr_delivery_decref_CT(core, dlv);
- qdr_delivery_decref_CT(core, peer);
+ qdr_delivery_unlink_peers_CT(core, dlv, peer);
}
if (dlv->link)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org