You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2021/09/24 14:01:20 UTC

[qpid-dispatch] branch main updated: DISPATCH-2219: fix inter-router link priorities

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d5d3e4  DISPATCH-2219: fix inter-router link priorities
8d5d3e4 is described below

commit 8d5d3e41cda09cdc56fceab3bb7f52add87eac6a
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Fri Sep 10 13:18:07 2021 -0400

    DISPATCH-2219: fix inter-router link priorities
    
    This closes #1367
---
 include/qpid/dispatch/amqp.h                     |  7 +++
 src/message_private.h                            |  4 --
 src/router_core/connections.c                    | 60 ++++++++++++++++--------
 src/router_core/core_link_endpoint.c             |  3 +-
 src/router_core/forwarder.c                      |  3 +-
 src/router_core/modules/edge_router/addr_proxy.c | 11 +++--
 src/router_core/route_control.c                  |  2 +-
 src/router_core/router_core_private.h            |  6 ++-
 8 files changed, 63 insertions(+), 33 deletions(-)

diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index 9b8665a..7c3ef08 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -218,4 +218,11 @@ extern const char * const QD_AMQP_COND_MESSAGE_SIZE_EXCEEDED;
 #define QD_AMQP_LINK_ROLE_RECEIVER true
 /// @};
 
+/** @name AMQP Message priority. */
+/// @{
+#define QDR_N_PRIORITIES     10
+#define QDR_MAX_PRIORITY     (QDR_N_PRIORITIES - 1)
+#define QDR_DEFAULT_PRIORITY  4
+/// @};
+
 #endif
diff --git a/src/message_private.h b/src/message_private.h
index e7217e8..944612e 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -179,10 +179,6 @@ ALLOC_DECLARE(qd_message_content_t);
 /** Initialize logging */
 void qd_message_initialize();
 
-#define QDR_N_PRIORITIES     10
-#define QDR_MAX_PRIORITY     (QDR_N_PRIORITIES - 1)
-#define QDR_DEFAULT_PRIORITY  4
-
 // These expect content->lock to be locked.
 bool _Q2_holdoff_should_block_LH(const qd_message_content_t *content);
 bool _Q2_holdoff_should_unblock_LH(const qd_message_content_t *content);
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4f28d48..12812ad 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -618,6 +618,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     link->zero_credit_time = conn->core->uptime_ticks;
     link->terminus_survives_disconnect = qdr_terminus_survives_disconnect(local_terminus);
     link->no_route = no_route;
+    link->priority = QDR_DEFAULT_PRIORITY;
 
     link->strip_annotations_in  = conn->strip_annotations_in;
     link->strip_annotations_out = conn->strip_annotations_out;
@@ -630,9 +631,10 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
         tsan_reset_delivery_ids(initial_delivery, link->conn->identity, link->identity);
     }
 
-    if      (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_CONTROL))
+    if      (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_CONTROL)) {
         link->link_type = QD_LINK_CONTROL;
-    else if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_DATA))
+        link->priority = QDR_MAX_PRIORITY;
+    } else if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_DATA))
         link->link_type = QD_LINK_ROUTER;
     else if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_EDGE_DOWNLINK)) {
         if (conn->core->router_mode == QD_ROUTER_MODE_INTERIOR &&
@@ -1120,7 +1122,8 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t        *core,
                                qd_direction_t     dir,
                                qdr_terminus_t    *source,
                                qdr_terminus_t    *target,
-                               qd_session_class_t ssn_class)
+                               qd_session_class_t ssn_class,
+                               uint8_t            priority)
 {
     //
     // Create a new link, initiated by the router core.  This will involve issuing a first-attach outbound.
@@ -1148,6 +1151,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t        *core,
     link->attach_count   = 1;
     link->core_ticks     = core->uptime_ticks;
     link->zero_credit_time = core->uptime_ticks;
+    link->priority       = priority;
 
     link->strip_annotations_in  = conn->strip_annotations_in;
     link->strip_annotations_out = conn->strip_annotations_out;
@@ -1407,15 +1411,23 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo
                 // inter-router links:  Two (in and out) for control, 2 * QDR_N_PRIORITIES for
                 // routed-message transfer.
                 //
-                (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control(), QD_SSN_ROUTER_CONTROL);
-                (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control(), QD_SSN_ROUTER_CONTROL);
+                (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING,
+                                          qdr_terminus_router_control(), qdr_terminus_router_control(),
+                                          QD_SSN_ROUTER_CONTROL, QDR_MAX_PRIORITY);
+                (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING,
+                                          qdr_terminus_router_control(), qdr_terminus_router_control(),
+                                          QD_SSN_ROUTER_CONTROL, QDR_MAX_PRIORITY);
                 STATIC_ASSERT((QD_SSN_ROUTER_DATA_PRI_9 - QD_SSN_ROUTER_DATA_PRI_0 + 1) == QDR_N_PRIORITIES, PRIORITY_SESSION_NOT_SAME);
 
                 for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
                     // a session is reserved for each priority link
                     qd_session_class_t sc = (qd_session_class_t)(QD_SSN_ROUTER_DATA_PRI_0 + priority);
-                    (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data(), sc);
-                    (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data(), sc);
+                    (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING,
+                                              qdr_terminus_router_data(), qdr_terminus_router_data(),
+                                              sc, priority);
+                    (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_OUTGOING,
+                                              qdr_terminus_router_data(), qdr_terminus_router_data(),
+                                              sc, priority);
                 }
             }
         }
@@ -1462,12 +1474,12 @@ qdr_link_t *qdr_connection_new_streaming_link_CT(qdr_core_t *core, qdr_connectio
     case QDR_ROLE_INTER_ROUTER:
         out_link = qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING,
                                       qdr_terminus_router_data(), qdr_terminus_router_data(),
-                                      QD_SSN_LINK_STREAMING);
+                                      QD_SSN_LINK_STREAMING, QDR_DEFAULT_PRIORITY);
         break;
     case QDR_ROLE_EDGE_CONNECTION:
         out_link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, QD_OUTGOING,
                                       qdr_terminus(0), qdr_terminus(0),
-                                      QD_SSN_LINK_STREAMING);
+                                      QD_SSN_LINK_STREAMING, QDR_DEFAULT_PRIORITY);
         break;
     default:
         assert(false);
@@ -1599,14 +1611,20 @@ static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn,
 static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
 {
     assert(link->link_type == QD_LINK_ROUTER);
-    // The first QDR_N_PRIORITIES (10) QDR_LINK_ROUTER links to attach over the
-    // connection are the shared priority links.  These links are attached in
-    // priority order starting at zero.
-    int next_pri = core->data_links_by_mask_bit[conn->mask_bit].count;
-    if (next_pri < QDR_N_PRIORITIES) {
-        link->priority = next_pri;
-        core->data_links_by_mask_bit[conn->mask_bit].links[next_pri] = link;
-        core->data_links_by_mask_bit[conn->mask_bit].count += 1;
+    // The first 2 x QDR_N_PRIORITIES (10) QDR_LINK_ROUTER links to attach over
+    // the inter-router connection are the shared priority links.  These links
+    // are attached in priority order starting at zero.
+    if (link->link_direction == QD_OUTGOING) {
+        int next_pri = core->data_links_by_mask_bit[conn->mask_bit].count;
+        if (next_pri < QDR_N_PRIORITIES) {
+            link->priority = next_pri;
+            core->data_links_by_mask_bit[conn->mask_bit].links[next_pri] = link;
+            core->data_links_by_mask_bit[conn->mask_bit].count += 1;
+        }
+    } else {
+        if (conn->next_pri < QDR_N_PRIORITIES) {
+            link->priority = conn->next_pri++;
+        }
     }
 }
 
@@ -1792,8 +1810,10 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
             break;
         }
 
-        case QD_LINK_CONTROL:
         case QD_LINK_ROUTER:
+            qdr_attach_link_data_CT(core, conn, link);
+            // fall-through:
+        case QD_LINK_CONTROL:
             qdr_link_outbound_second_attach_CT(core, link, source, target);
             qdr_link_issue_credit_CT(core, link, link->capacity, false);
             break;
@@ -1918,8 +1938,10 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
                 qdr_link_issue_credit_CT(core, link, link->capacity, false);
             break;
 
-        case QD_LINK_CONTROL:
         case QD_LINK_ROUTER:
+            qdr_attach_link_data_CT(core, conn, link);
+            // fall-through
+        case QD_LINK_CONTROL:
             qdr_link_issue_credit_CT(core, link, link->capacity, false);
             break;
 
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index fdecf92..b65ac6a 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -76,8 +76,7 @@ qdrc_endpoint_t *qdrc_endpoint_create_link_CT(qdr_core_t           *core,
     ep->desc         = desc;
     ep->link_context = link_context;
     ep->link         = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, dir, source, target,
-                                          QD_SSN_CORE_ENDPOINT);
-
+                                          QD_SSN_CORE_ENDPOINT, QDR_DEFAULT_PRIORITY);
     ep->link->core_endpoint = ep;
     return ep;
 }
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 642b4cd..3bc3a8f 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -1037,7 +1037,8 @@ void qdr_forward_link_direct_CT(qdr_core_t       *core,
     out_link->zero_credit_time = core->uptime_ticks;
     out_link->strip_annotations_in  = conn->strip_annotations_in;
     out_link->strip_annotations_out = conn->strip_annotations_out;
-    
+    out_link->priority       = in_link->priority;
+
     if (strip) {
         out_link->strip_prefix = strip;
     }
diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c
index 0e76626..53ead53 100644
--- a/src/router_core/modules/edge_router/addr_proxy.c
+++ b/src/router_core/modules/edge_router/addr_proxy.c
@@ -159,7 +159,8 @@ static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t
         }
 
         qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, QD_LINK_ENDPOINT, QD_INCOMING,
-                                              term, qdr_terminus_normal(0), QD_SSN_ENDPOINT);
+                                              term, qdr_terminus_normal(0), QD_SSN_ENDPOINT,
+                                              QDR_DEFAULT_PRIORITY);
         qdr_core_bind_address_link_CT(ap->core, addr, link);
         addr->edge_inlink = link;
     }
@@ -202,7 +203,8 @@ static void add_outlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_
         }
 
         qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, QD_LINK_ENDPOINT, QD_OUTGOING,
-                                              qdr_terminus_normal(0), term, QD_SSN_ENDPOINT);
+                                              qdr_terminus_normal(0), term, QD_SSN_ENDPOINT,
+                                              QDR_DEFAULT_PRIORITY);
         addr->edge_outlink = link;
     }
 }
@@ -281,7 +283,8 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
         qdr_link_t *out_link = qdr_create_link_CT(ap->core, conn,
                                                   QD_LINK_ENDPOINT, QD_OUTGOING,
                                                   qdr_terminus(0), qdr_terminus(0),
-                                                  QD_SSN_ENDPOINT);
+                                                  QD_SSN_ENDPOINT,
+                                                  QDR_DEFAULT_PRIORITY);
 
         //
         // Associate the anonymous sender with the edge connection address.  This will cause
@@ -297,7 +300,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
                                   QD_LINK_ENDPOINT, QD_INCOMING,
                                   qdr_terminus_edge_downlink(ap->core->router_id),
                                   qdr_terminus_edge_downlink(0),
-                                  QD_SSN_ENDPOINT);
+                                  QD_SSN_ENDPOINT, QDR_DEFAULT_PRIORITY);
 
         //
         // Attach a receiving link for edge address tracking updates.
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index 7adfdc3..3f898b7 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -257,7 +257,7 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr
             } else
                 qdr_terminus_set_address(term, &key[2]); // truncate the "Mp" annotation (where p = phase)
             al->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, al->dir, source, target,
-                                          QD_SSN_ENDPOINT);
+                                          QD_SSN_ENDPOINT, QDR_DEFAULT_PRIORITY);
             al->link->auto_link = al;
             al->link->phase     = al->phase;
             al->link->fallback  = al->fallback;
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 9754465..219290c 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -673,6 +673,8 @@ struct qdr_connection_t {
     qdr_core_t                 *core;
     bool                        incoming;
     bool                        in_activate_list;
+    bool                        closed; // This bit is used in the case where a client is trying to force close this connection.
+    uint8_t                     next_pri;  // for incoming inter-router data links
     qdr_connection_role_t       role;
     int                         inter_router_cost;
     qdr_conn_identifier_t      *conn_id;
@@ -693,7 +695,6 @@ struct qdr_connection_t {
     qdr_conn_oper_status_t      oper_status;
     qdr_conn_admin_status_t     admin_status;
     qdr_error_t                *error;
-    bool                        closed; // This bit is used in the case where a client is trying to force close this connection.
     uint32_t                    conn_uptime; // Timestamp which can be used to calculate the number of seconds this connection has been up and running.
     uint32_t                    last_delivery_time; // Timestamp which can be used to calculate the number of seconds since the last delivery arrived on this connection.
     bool                        enable_protocol_trace; // Has trace level logging been turned on for this connection.
@@ -1010,7 +1011,8 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t        *core,
                                qd_direction_t     dir,
                                qdr_terminus_t    *source,
                                qdr_terminus_t    *target,
-                               qd_session_class_t ssn_class);
+                               qd_session_class_t ssn_class,
+                               uint8_t priority);
 
 void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close);
 void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org