You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/03/10 17:17:18 UTC

[qpid-dispatch] 01/02: DISPATCH-1579: open dedicated inter-router sessions for data and control links

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 0264301cf9a1fb6e767f75d4e1aeabecf7355e2a
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Feb 19 13:45:58 2020 -0500

    DISPATCH-1579: open dedicated inter-router sessions for data and control links
---
 include/qpid/dispatch/container.h                | 28 +++++++++++++++++++++-
 include/qpid/dispatch/router_core.h              |  5 ++--
 src/container.c                                  | 30 ++++++++++++++++--------
 src/router_core/connections.c                    | 28 +++++++++++++---------
 src/router_core/core_link_endpoint.c             |  3 ++-
 src/router_core/forwarder.c                      |  1 +
 src/router_core/modules/edge_router/addr_proxy.c | 10 ++++----
 src/router_core/route_control.c                  |  3 ++-
 src/router_core/router_core_private.h            | 14 ++++++-----
 src/router_node.c                                |  5 ++--
 src/server_private.h                             |  2 +-
 tests/system_tests_router_mesh.py                | 29 +++++++++++++++++++++++
 tests/test-sender.c                              |  4 ++--
 13 files changed, 121 insertions(+), 41 deletions(-)

diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index cf1b0d4..905c385 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -68,6 +68,32 @@ typedef enum {
 } qd_detach_type_t;
 
 
+/**
+ * Session Class
+ *
+ * Used when creating new links from the router.  A connection maintains a set
+ * of sessions over which links can be created.  The session class indicates
+ * which session to use when creating a link.
+ */
+typedef enum {
+    QD_SSN_ENDPOINT,          ///< client data links
+    QD_SSN_ROUTER_CONTROL,    ///< router protocol
+    QD_SSN_ROUTER_DATA_PRI_0, ///< inter-router data links (by priority)
+    QD_SSN_ROUTER_DATA_PRI_1,
+    QD_SSN_ROUTER_DATA_PRI_2,
+    QD_SSN_ROUTER_DATA_PRI_3,
+    QD_SSN_ROUTER_DATA_PRI_4,
+    QD_SSN_ROUTER_DATA_PRI_5,
+    QD_SSN_ROUTER_DATA_PRI_6,
+    QD_SSN_ROUTER_DATA_PRI_7,
+    QD_SSN_ROUTER_DATA_PRI_8,
+    QD_SSN_ROUTER_DATA_PRI_9,
+    QD_SSN_CORE_ENDPOINT,     ///< core subscriptions
+    QD_SSN_LINK_ROUTE,        ///< link routes
+    QD_SSN_CLASS_COUNT
+} qd_session_class_t;
+
+
 typedef struct qd_node_t     qd_node_t;
 typedef struct qd_link_t     qd_link_t;
 
@@ -160,7 +186,7 @@ void qd_container_node_set_context(qd_node_t *node, void *node_context);
 qd_dist_mode_t qd_container_node_get_dist_modes(const qd_node_t *node);
 qd_lifetime_policy_t qd_container_node_get_life_policy(const qd_node_t *node);
 
-qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char *name);
+qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char *name, qd_session_class_t);
 void qd_link_free(qd_link_t *link);
 
 /**
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 18bef79..7339714 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -700,8 +700,9 @@ void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mo
  */
 void qdr_link_set_drained(qdr_core_t *core, qdr_link_t *link);
 
-typedef void (*qdr_link_first_attach_t)  (void *context, qdr_connection_t *conn, qdr_link_t *link, 
-                                          qdr_terminus_t *source, qdr_terminus_t *target);
+typedef void (*qdr_link_first_attach_t)  (void *context, qdr_connection_t *conn, qdr_link_t *link,
+                                          qdr_terminus_t *source, qdr_terminus_t *target,
+                                          qd_session_class_t);
 typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link,
                                           qdr_terminus_t *source, qdr_terminus_t *target);
 typedef void (*qdr_link_detach_t)        (void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close);
diff --git a/src/container.c b/src/container.c
index 8b19074..986ee9c 100644
--- a/src/container.c
+++ b/src/container.c
@@ -484,8 +484,12 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
 
     case PN_SESSION_LOCAL_CLOSE :
         ssn = pn_event_session(event);
-        if (ssn == qd_conn->pn_sess)
-            qd_conn->pn_sess = 0;
+        for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) {
+            if (ssn == qd_conn->pn_sessions[i]) {
+                qd_conn->pn_sessions[i] = 0;
+                break;
+            }
+        }
         pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
         while (pn_link) {
             if (pn_link_session(pn_link) == ssn) {
@@ -503,8 +507,12 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
 
     case PN_SESSION_REMOTE_CLOSE :
         ssn = pn_event_session(event);
-        if (ssn == qd_conn->pn_sess)
-            qd_conn->pn_sess = 0;
+        for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) {
+            if (ssn == qd_conn->pn_sessions[i]) {
+                qd_conn->pn_sessions[i] = 0;
+                break;
+            }
+        }
         if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
             if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
 
@@ -846,7 +854,7 @@ qd_lifetime_policy_t qd_container_node_get_life_policy(const qd_node_t *node)
 }
 
 
-qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char* name)
+qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char* name, qd_session_class_t ssn_class)
 {
     qd_link_t *link = new_qd_link_t();
     if (!link) {
@@ -859,13 +867,15 @@ qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, c
     DEQ_INSERT_TAIL(node->container->links, link);
     sys_mutex_unlock(node->container->lock);
 
-    if (!conn->pn_sess) {
-        conn->pn_sess = pn_session(qd_connection_pn(conn));
-        pn_session_set_incoming_capacity(conn->pn_sess, cf->incoming_capacity);
-        pn_session_open(conn->pn_sess);
+    pn_session_t *ssn = conn->pn_sessions[ssn_class];
+    if (!ssn) {
+        ssn = pn_session(qd_connection_pn(conn));
+        conn->pn_sessions[ssn_class] = ssn;
+        pn_session_set_incoming_capacity(ssn, cf->incoming_capacity);
+        pn_session_open(ssn);
     }
 
-    link->pn_sess = conn->pn_sess;
+    link->pn_sess = ssn;
 
     if (dir == QD_OUTGOING)
         link->pn_link = pn_sender(link->pn_sess, name);
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 31e523d..a6672c9 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -19,6 +19,7 @@
 
 #include <qpid/dispatch/router_core.h>
 #include <qpid/dispatch/discriminator.h>
+#include <qpid/dispatch/static_assert.h>
 #include "route_control.h"
 #include <qpid/dispatch/amqp.h>
 #include <stdio.h>
@@ -295,7 +296,7 @@ int qdr_connection_process(qdr_connection_t *conn)
 
         switch (work->work_type) {
         case QDR_CONNECTION_WORK_FIRST_ATTACH :
-            core->first_attach_handler(core->user_context, conn, work->link, work->source, work->target);
+            core->first_attach_handler(core->user_context, conn, work->link, work->source, work->target, work->ssn_class);
             break;
 
         case QDR_CONNECTION_WORK_SECOND_ATTACH :
@@ -1046,12 +1047,13 @@ static void qdr_link_cleanup_protected_CT(qdr_core_t *core, qdr_connection_t *co
 }
 
 
-qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
-                               qdr_connection_t *conn,
-                               qd_link_type_t    link_type,
-                               qd_direction_t    dir,
-                               qdr_terminus_t   *source,
-                               qdr_terminus_t   *target)
+qdr_link_t *qdr_create_link_CT(qdr_core_t        *core,
+                               qdr_connection_t  *conn,
+                               qd_link_type_t     link_type,
+                               qd_direction_t     dir,
+                               qdr_terminus_t    *source,
+                               qdr_terminus_t    *target,
+                               qd_session_class_t ssn_class)
 {
     //
     // Create a new link, initiated by the router core.  This will involve issuing a first-attach outbound.
@@ -1093,6 +1095,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
     work->link      = link;
     work->source    = source;
     work->target    = target;
+    work->ssn_class = ssn_class;
 
     char   source_str[1000];
     char   target_str[1000];
@@ -1314,12 +1317,15 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo
                     // inter-router links:  Two (in and out) for control, 2 * QDR_N_PRIORITIES for
                     // routed-message transfer.
                     //
-                    (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control());
-                    (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control());
+                    (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control(), QD_SSN_ROUTER_CONTROL);
+                    (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control(), QD_SSN_ROUTER_CONTROL);
+                    STATIC_ASSERT((QD_SSN_ROUTER_DATA_PRI_9 - QD_SSN_ROUTER_DATA_PRI_0 + 1) == QDR_N_PRIORITIES, PRIORITY_SESSION_NOT_SAME);
 
                     for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
-                        (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data());
-                        (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data());
+                        // a session is reserved for each priority link
+                        qd_session_class_t sc = (qd_session_class_t)(QD_SSN_ROUTER_DATA_PRI_0 + priority);
+                        (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data(), sc);
+                        (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data(), sc);
                     }
                 }
             }
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index e48b80e..bbfc84f 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -72,7 +72,8 @@ qdrc_endpoint_t *qdrc_endpoint_create_link_CT(qdr_core_t           *core,
 
     ep->desc         = desc;
     ep->link_context = link_context;
-    ep->link         = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, dir, source, target);
+    ep->link         = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, dir, source, target,
+                                          QD_SSN_CORE_ENDPOINT);
 
     ep->link->core_endpoint = ep;
     return ep;
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 52e479a..ad3fb02 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -930,6 +930,7 @@ void qdr_forward_link_direct_CT(qdr_core_t       *core,
     work->link      = out_link;
     work->source    = source;
     work->target    = target;
+    work->ssn_class = QD_SSN_LINK_ROUTE;
 
     qdr_connection_enqueue_work_CT(core, conn, work);
 
diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c
index af6e5c2..7ed6d08 100644
--- a/src/router_core/modules/edge_router/addr_proxy.c
+++ b/src/router_core/modules/edge_router/addr_proxy.c
@@ -156,7 +156,7 @@ static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t
         }
 
         qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, QD_LINK_ENDPOINT, QD_INCOMING,
-                                              term, qdr_terminus_normal(0));
+                                              term, qdr_terminus_normal(0), QD_SSN_ENDPOINT);
         qdr_core_bind_address_link_CT(ap->core, addr, link);
         addr->edge_inlink = link;
     }
@@ -199,7 +199,7 @@ static void add_outlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_
         }
 
         qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, QD_LINK_ENDPOINT, QD_OUTGOING,
-                                              qdr_terminus_normal(0), term);
+                                              qdr_terminus_normal(0), term, QD_SSN_ENDPOINT);
         addr->edge_outlink = link;
     }
 }
@@ -233,7 +233,8 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
         //
         qdr_link_t *out_link = qdr_create_link_CT(ap->core, conn,
                                                   QD_LINK_ENDPOINT, QD_OUTGOING,
-                                                  qdr_terminus(0), qdr_terminus(0));
+                                                  qdr_terminus(0), qdr_terminus(0),
+                                                  QD_SSN_ENDPOINT);
 
         //
         // Associate the anonymous sender with the edge connection address.  This will cause
@@ -248,7 +249,8 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
         (void) qdr_create_link_CT(ap->core, conn,
                                   QD_LINK_ENDPOINT, QD_INCOMING,
                                   qdr_terminus_edge_downlink(ap->core->router_id),
-                                  qdr_terminus_edge_downlink(0));
+                                  qdr_terminus_edge_downlink(0),
+                                  QD_SSN_ENDPOINT);
 
         //
         // Attach a receiving link for edge address tracking updates.
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index 9ec0023..68de4a0 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -253,7 +253,8 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr
                     al->internal_addr = &key[2];
             } else
                 qdr_terminus_set_address(term, &key[2]); // truncate the "Mp" annotation (where p = phase)
-            al->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, al->dir, source, target);
+            al->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, al->dir, source, target,
+                                          QD_SSN_ENDPOINT);
             al->link->auto_link = al;
             al->link->phase     = al->phase;
             al->link->fallback  = al->fallback;
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 611ecd8..7939763 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -271,6 +271,7 @@ typedef struct qdr_connection_work_t {
     qdr_link_t                 *link;
     qdr_terminus_t             *source;
     qdr_terminus_t             *target;
+    qd_session_class_t          ssn_class;
 } qdr_connection_work_t;
 
 ALLOC_DECLARE(qdr_connection_work_t);
@@ -964,12 +965,13 @@ void qdr_link_enqueue_work_CT(qdr_core_t      *core,
                               qdr_link_t      *conn,
                               qdr_link_work_t *work);
 
-qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
-                               qdr_connection_t *conn,
-                               qd_link_type_t    link_type,
-                               qd_direction_t    dir,
-                               qdr_terminus_t   *source,
-                               qdr_terminus_t   *target);
+qdr_link_t *qdr_create_link_CT(qdr_core_t        *core,
+                               qdr_connection_t  *conn,
+                               qd_link_type_t     link_type,
+                               qd_direction_t     dir,
+                               qdr_terminus_t    *source,
+                               qdr_terminus_t    *target,
+                               qd_session_class_t ssn_class);
 
 void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close);
 void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);
diff --git a/src/router_node.c b/src/router_node.c
index 365a88f..ea35944 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1353,7 +1353,8 @@ static void CORE_link_first_attach(void             *context,
                                    qdr_connection_t *conn,
                                    qdr_link_t       *link, 
                                    qdr_terminus_t   *source,
-                                   qdr_terminus_t   *target)
+                                   qdr_terminus_t   *target,
+                                   qd_session_class_t ssn_class)
 {
     qd_router_t     *router = (qd_router_t*) context;
     qd_connection_t *qconn  = (qd_connection_t*) qdr_connection_get_context(conn);
@@ -1362,7 +1363,7 @@ static void CORE_link_first_attach(void             *context,
     //
     // Create a new link to be attached
     //
-    qd_link_t *qlink = qd_link(router->node, qconn, qdr_link_direction(link), qdr_link_name(link));
+    qd_link_t *qlink = qd_link(router->node, qconn, qdr_link_direction(link), qdr_link_name(link), ssn_class);
 
     //
     // Copy the source and target termini to the link
diff --git a/src/server_private.h b/src/server_private.h
index 167a5ff..4176429 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -147,7 +147,7 @@ struct qd_connection_t {
     int                             enqueued;
     qd_timer_t                      *timer;   // Timer for initial-setup
     pn_connection_t                 *pn_conn;
-    pn_session_t                    *pn_sess;
+    pn_session_t                    *pn_sessions[QD_SSN_CLASS_COUNT];
     pn_ssl_t                        *ssl;
     qd_listener_t                   *listener;
     qd_connector_t                  *connector;
diff --git a/tests/system_tests_router_mesh.py b/tests/system_tests_router_mesh.py
index 99b9829..e8fab01 100644
--- a/tests/system_tests_router_mesh.py
+++ b/tests/system_tests_router_mesh.py
@@ -209,6 +209,35 @@ class ThreeRouterTest(TestCase):
         self.assertEqual(1, ats.released)
         self.assertEqual(0, ats.rejected)
 
+    def test_06_parallel_priority(self):
+        """
+        Create 10 senders each with a different priority.  Send large messages
+        - large enough to trigger Qx flow control.  Ensure all messages arrive
+        as expected
+        """
+
+        priorities = 10
+        send_batch = 150
+
+        total = priorities * send_batch
+        rx = self.spawn_receiver(self.RouterC,
+                                 count=total,
+                                 address="closest/test_06_address")
+        self.RouterA.wait_address("closest/test_06_address")
+
+        senders = [self.spawn_sender(self.RouterA,
+                                     send_batch,
+                                     "closest/test_06_address",
+                                     "-sx", "-p%s" % p)
+                   for p in range(priorities)]
+
+        if rx.wait(timeout=TIMEOUT):
+            raise Exception("Receiver failed to consume all messages")
+        for tx in senders:
+            out_text, out_err = tx.communicate(timeout=TIMEOUT)
+            if tx.returncode:
+                raise Exception("Sender failed: %s %s" % (out_text, out_err))
+
 
 if __name__ == '__main__':
     unittest.main(main_module())
diff --git a/tests/test-sender.c b/tests/test-sender.c
index 0fecea0..1386619 100644
--- a/tests/test-sender.c
+++ b/tests/test-sender.c
@@ -44,8 +44,8 @@
 
 #define BODY_SIZE_SMALL  100
 #define BODY_SIZE_MEDIUM 2000
-#define BODY_SIZE_LARGE  60000
-#define BODY_SIZE_HUGE   257 * 1024  // will trigger Q2/Q3
+#define BODY_SIZE_LARGE  65 * 1024
+#define BODY_SIZE_HUGE   1024 * 1024  // will trigger Q2/Q3
 
 #define DEFAULT_PRIORITY 4
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org