You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/09/18 12:48:03 UTC

[2/2] qpid-dispatch git commit: DISPATCH 1096 - store priority in links

DISPATCH 1096 - store priority in links


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a9ace970
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a9ace970
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a9ace970

Branch: refs/heads/master
Commit: a9ace9708ec8c48ca1868eb5998befe81640bfb0
Parents: 94bef70
Author: Michael Goulish <mg...@redhat.com>
Authored: Thu Sep 13 10:33:02 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Sep 18 08:46:43 2018 -0400

----------------------------------------------------------------------
 src/router_core/connections.c         | 18 ++++++------------
 src/router_core/forwarder.c           | 14 +++++++-------
 src/router_core/router_core_private.h |  3 ++-
 src/router_core/transfer.c            |  3 +--
 4 files changed, 16 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 2723c6c..32fbf48 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -817,9 +817,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
         if (link->link_type == QD_LINK_CONTROL)
             core->control_links_by_mask_bit[conn->mask_bit] = 0;
         if (link->link_type == QD_LINK_ROUTER)
-            for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority)
-                if (link == core->data_links_by_mask_bit[conn->mask_bit].links[priority])
-                    core->data_links_by_mask_bit[conn->mask_bit].links[priority] = 0;
+            core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
     }
 
     //
@@ -852,9 +850,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     //
     qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
     sys_mutex_lock(conn->work_lock);
-    for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
-        qdr_del_link_ref(conn->links_with_work + priority, link, QDR_LINK_LIST_CLASS_WORK);
-    }
+    qdr_del_link_ref(conn->links_with_work + link->priority, link, QDR_LINK_LIST_CLASS_WORK);
     sys_mutex_unlock(conn->work_lock);
 
     //
@@ -1438,11 +1434,14 @@ static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn,
 static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
 {
     if (conn->role == QDR_ROLE_INTER_ROUTER) {
+        // As inter-router links are attached to this connection, they
+        // are assigned priorities in the order in which they are attached.
         int next_slot = core->data_links_by_mask_bit[conn->mask_bit].count ++;
         if (next_slot >= QDR_N_PRIORITIES) {
             qd_log(core->log, QD_LOG_ERROR, "Attempt to attach too many inter-router links for priority sheaf.");
             return;
         }
+        link->priority = next_slot;
         core->data_links_by_mask_bit[conn->mask_bit].links[next_slot] = link;
     }
 
@@ -1483,12 +1482,7 @@ static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qd
 static void qdr_detach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
 {
     if (conn->role == QDR_ROLE_INTER_ROUTER)
-        for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
-            if (link == core->data_links_by_mask_bit[conn->mask_bit].links[priority]) {
-                core->data_links_by_mask_bit[conn->mask_bit].links[priority] = 0;
-                break;
-            }
-        }
+        core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
     //
     // TODO - This needs to be refactored in terms of a non-inter-router link type
     //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 4364c47..8ccb049 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -175,7 +175,7 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_core_t *core, qdr_link_t *link
 }
 
 
-void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv, int priority)
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv)
 {
     sys_mutex_lock(out_link->conn->work_lock);
 
@@ -208,7 +208,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery
         work->value     = 1;
         DEQ_INSERT_TAIL(out_link->work_list, work);
     }
-    qdr_add_link_ref(out_link->conn->links_with_work + priority, out_link, QDR_LINK_LIST_CLASS_WORK);
+    qdr_add_link_ref(out_link->conn->links_with_work + out_link->priority, out_link, QDR_LINK_LIST_CLASS_WORK);
 
     out_dlv->link_work = work;
     sys_mutex_unlock(out_link->conn->work_lock);
@@ -285,7 +285,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         while (link_ref) {
             qdr_link_t     *out_link     = link_ref->link;
             qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
-            qdr_forward_deliver_CT(core, out_link, out_delivery, priority);
+            qdr_forward_deliver_CT(core, out_link, out_delivery);
             fanout++;
             if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER) {
                 addr->deliveries_egress++;
@@ -360,7 +360,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
                 core->data_links_by_mask_bit[link_bit].links[priority];
             if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) {
                 qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
-                qdr_forward_deliver_CT(core, dest_link, out_delivery, priority);
+                qdr_forward_deliver_CT(core, dest_link, out_delivery);
                 fanout++;
                 addr->deliveries_transit++;
                 if (dest_link->link_type == QD_LINK_ROUTER)
@@ -485,7 +485,7 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
     if (link_ref) {
         out_link     = link_ref->link;
         out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
-        qdr_forward_deliver_CT(core, out_link, out_delivery, qd_message_get_priority(msg));
+        qdr_forward_deliver_CT(core, out_link, out_delivery);
 
         //
         // If there are multiple local subscribers, rotate the list of link references
@@ -536,7 +536,7 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
             out_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority);
             if (out_link) {
                 out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
-                qdr_forward_deliver_CT(core, out_link, out_delivery, priority);
+                qdr_forward_deliver_CT(core, out_link, out_delivery);
                 addr->deliveries_transit++;
                 if (out_link->link_type == QD_LINK_ROUTER)
                     core->deliveries_transit++;
@@ -685,7 +685,7 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
 
     if (chosen_link) {
         qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg);
-        qdr_forward_deliver_CT(core, chosen_link, out_delivery, qd_message_get_priority(msg));
+        qdr_forward_deliver_CT(core, chosen_link, out_delivery);
 
         //
         // If the delivery is unsettled and the link is inter-router, account for the outstanding delivery.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 7d9b7e1..ee8930c 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -422,6 +422,7 @@ struct qdr_link_t {
     uint64_t  released_deliveries;
     uint64_t  modified_deliveries;
     uint64_t *ingress_histogram;
+    uint8_t   priority;
 };
 
 ALLOC_DECLARE(qdr_link_t);
@@ -846,7 +847,7 @@ void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work);
 void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local);
 bool qdr_is_addr_treatment_multicast(qdr_address_t *addr);
 qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg);
-void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, int priority);
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv);
 void qdr_connection_free(qdr_connection_t *conn);
 void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
 qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index ae1dbd9..68645ea 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -1001,8 +1001,7 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
         peer->tag_length = action->args.connection.tag_length;
         memcpy(peer->tag, action->args.connection.tag, peer->tag_length);
 
-        // Adding this work at priority 0.
-        qdr_forward_deliver_CT(core, link->connected_link, peer, 0);
+        qdr_forward_deliver_CT(core, link->connected_link, peer);
 
         link->total_deliveries++;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org