You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/03/10 17:17:18 UTC
[qpid-dispatch] 01/02: DISPATCH-1579: open dedicated inter-router
sessions for data and control links
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 0264301cf9a1fb6e767f75d4e1aeabecf7355e2a
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Feb 19 13:45:58 2020 -0500
DISPATCH-1579: open dedicated inter-router sessions for data and control links
---
include/qpid/dispatch/container.h | 28 +++++++++++++++++++++-
include/qpid/dispatch/router_core.h | 5 ++--
src/container.c | 30 ++++++++++++++++--------
src/router_core/connections.c | 28 +++++++++++++---------
src/router_core/core_link_endpoint.c | 3 ++-
src/router_core/forwarder.c | 1 +
src/router_core/modules/edge_router/addr_proxy.c | 10 ++++----
src/router_core/route_control.c | 3 ++-
src/router_core/router_core_private.h | 14 ++++++-----
src/router_node.c | 5 ++--
src/server_private.h | 2 +-
tests/system_tests_router_mesh.py | 29 +++++++++++++++++++++++
tests/test-sender.c | 4 ++--
13 files changed, 121 insertions(+), 41 deletions(-)
diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index cf1b0d4..905c385 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -68,6 +68,32 @@ typedef enum {
} qd_detach_type_t;
+/**
+ * Session Class
+ *
+ * Used when creating new links from the router. A connection maintains a set
+ * of sessions over which links can be created. The session class indicates
+ * which session to use when creating a link.
+ */
+typedef enum {
+ QD_SSN_ENDPOINT, ///< client data links
+ QD_SSN_ROUTER_CONTROL, ///< router protocol
+ QD_SSN_ROUTER_DATA_PRI_0, ///< inter-router data links (by priority)
+ QD_SSN_ROUTER_DATA_PRI_1,
+ QD_SSN_ROUTER_DATA_PRI_2,
+ QD_SSN_ROUTER_DATA_PRI_3,
+ QD_SSN_ROUTER_DATA_PRI_4,
+ QD_SSN_ROUTER_DATA_PRI_5,
+ QD_SSN_ROUTER_DATA_PRI_6,
+ QD_SSN_ROUTER_DATA_PRI_7,
+ QD_SSN_ROUTER_DATA_PRI_8,
+ QD_SSN_ROUTER_DATA_PRI_9,
+ QD_SSN_CORE_ENDPOINT, ///< core subscriptions
+ QD_SSN_LINK_ROUTE, ///< link routes
+ QD_SSN_CLASS_COUNT
+} qd_session_class_t;
+
+
typedef struct qd_node_t qd_node_t;
typedef struct qd_link_t qd_link_t;
@@ -160,7 +186,7 @@ void qd_container_node_set_context(qd_node_t *node, void *node_context);
qd_dist_mode_t qd_container_node_get_dist_modes(const qd_node_t *node);
qd_lifetime_policy_t qd_container_node_get_life_policy(const qd_node_t *node);
-qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char *name);
+qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char *name, qd_session_class_t);
void qd_link_free(qd_link_t *link);
/**
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 18bef79..7339714 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -700,8 +700,9 @@ void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mo
*/
void qdr_link_set_drained(qdr_core_t *core, qdr_link_t *link);
-typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t *conn, qdr_link_t *link,
- qdr_terminus_t *source, qdr_terminus_t *target);
+typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t *conn, qdr_link_t *link,
+ qdr_terminus_t *source, qdr_terminus_t *target,
+ qd_session_class_t);
typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link,
qdr_terminus_t *source, qdr_terminus_t *target);
typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close);
diff --git a/src/container.c b/src/container.c
index 8b19074..986ee9c 100644
--- a/src/container.c
+++ b/src/container.c
@@ -484,8 +484,12 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
case PN_SESSION_LOCAL_CLOSE :
ssn = pn_event_session(event);
- if (ssn == qd_conn->pn_sess)
- qd_conn->pn_sess = 0;
+ for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) {
+ if (ssn == qd_conn->pn_sessions[i]) {
+ qd_conn->pn_sessions[i] = 0;
+ break;
+ }
+ }
pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
while (pn_link) {
if (pn_link_session(pn_link) == ssn) {
@@ -503,8 +507,12 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
case PN_SESSION_REMOTE_CLOSE :
ssn = pn_event_session(event);
- if (ssn == qd_conn->pn_sess)
- qd_conn->pn_sess = 0;
+ for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) {
+ if (ssn == qd_conn->pn_sessions[i]) {
+ qd_conn->pn_sessions[i] = 0;
+ break;
+ }
+ }
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
@@ -846,7 +854,7 @@ qd_lifetime_policy_t qd_container_node_get_life_policy(const qd_node_t *node)
}
-qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char* name)
+qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char* name, qd_session_class_t ssn_class)
{
qd_link_t *link = new_qd_link_t();
if (!link) {
@@ -859,13 +867,15 @@ qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, c
DEQ_INSERT_TAIL(node->container->links, link);
sys_mutex_unlock(node->container->lock);
- if (!conn->pn_sess) {
- conn->pn_sess = pn_session(qd_connection_pn(conn));
- pn_session_set_incoming_capacity(conn->pn_sess, cf->incoming_capacity);
- pn_session_open(conn->pn_sess);
+ pn_session_t *ssn = conn->pn_sessions[ssn_class];
+ if (!ssn) {
+ ssn = pn_session(qd_connection_pn(conn));
+ conn->pn_sessions[ssn_class] = ssn;
+ pn_session_set_incoming_capacity(ssn, cf->incoming_capacity);
+ pn_session_open(ssn);
}
- link->pn_sess = conn->pn_sess;
+ link->pn_sess = ssn;
if (dir == QD_OUTGOING)
link->pn_link = pn_sender(link->pn_sess, name);
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 31e523d..a6672c9 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -19,6 +19,7 @@
#include <qpid/dispatch/router_core.h>
#include <qpid/dispatch/discriminator.h>
+#include <qpid/dispatch/static_assert.h>
#include "route_control.h"
#include <qpid/dispatch/amqp.h>
#include <stdio.h>
@@ -295,7 +296,7 @@ int qdr_connection_process(qdr_connection_t *conn)
switch (work->work_type) {
case QDR_CONNECTION_WORK_FIRST_ATTACH :
- core->first_attach_handler(core->user_context, conn, work->link, work->source, work->target);
+ core->first_attach_handler(core->user_context, conn, work->link, work->source, work->target, work->ssn_class);
break;
case QDR_CONNECTION_WORK_SECOND_ATTACH :
@@ -1046,12 +1047,13 @@ static void qdr_link_cleanup_protected_CT(qdr_core_t *core, qdr_connection_t *co
}
-qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
- qdr_connection_t *conn,
- qd_link_type_t link_type,
- qd_direction_t dir,
- qdr_terminus_t *source,
- qdr_terminus_t *target)
+qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
+ qdr_connection_t *conn,
+ qd_link_type_t link_type,
+ qd_direction_t dir,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target,
+ qd_session_class_t ssn_class)
{
//
// Create a new link, initiated by the router core. This will involve issuing a first-attach outbound.
@@ -1093,6 +1095,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
work->link = link;
work->source = source;
work->target = target;
+ work->ssn_class = ssn_class;
char source_str[1000];
char target_str[1000];
@@ -1314,12 +1317,15 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo
// inter-router links: Two (in and out) for control, 2 * QDR_N_PRIORITIES for
// routed-message transfer.
//
- (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control());
- (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control());
+ (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control(), QD_SSN_ROUTER_CONTROL);
+ (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control(), QD_SSN_ROUTER_CONTROL);
+ STATIC_ASSERT((QD_SSN_ROUTER_DATA_PRI_9 - QD_SSN_ROUTER_DATA_PRI_0 + 1) == QDR_N_PRIORITIES, PRIORITY_SESSION_NOT_SAME);
for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
- (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data());
- (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data());
+ // a session is reserved for each priority link
+ qd_session_class_t sc = (qd_session_class_t)(QD_SSN_ROUTER_DATA_PRI_0 + priority);
+ (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data(), sc);
+ (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data(), sc);
}
}
}
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index e48b80e..bbfc84f 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -72,7 +72,8 @@ qdrc_endpoint_t *qdrc_endpoint_create_link_CT(qdr_core_t *core,
ep->desc = desc;
ep->link_context = link_context;
- ep->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, dir, source, target);
+ ep->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, dir, source, target,
+ QD_SSN_CORE_ENDPOINT);
ep->link->core_endpoint = ep;
return ep;
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 52e479a..ad3fb02 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -930,6 +930,7 @@ void qdr_forward_link_direct_CT(qdr_core_t *core,
work->link = out_link;
work->source = source;
work->target = target;
+ work->ssn_class = QD_SSN_LINK_ROUTE;
qdr_connection_enqueue_work_CT(core, conn, work);
diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c
index af6e5c2..7ed6d08 100644
--- a/src/router_core/modules/edge_router/addr_proxy.c
+++ b/src/router_core/modules/edge_router/addr_proxy.c
@@ -156,7 +156,7 @@ static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t
}
qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, QD_LINK_ENDPOINT, QD_INCOMING,
- term, qdr_terminus_normal(0));
+ term, qdr_terminus_normal(0), QD_SSN_ENDPOINT);
qdr_core_bind_address_link_CT(ap->core, addr, link);
addr->edge_inlink = link;
}
@@ -199,7 +199,7 @@ static void add_outlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_
}
qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, QD_LINK_ENDPOINT, QD_OUTGOING,
- qdr_terminus_normal(0), term);
+ qdr_terminus_normal(0), term, QD_SSN_ENDPOINT);
addr->edge_outlink = link;
}
}
@@ -233,7 +233,8 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
//
qdr_link_t *out_link = qdr_create_link_CT(ap->core, conn,
QD_LINK_ENDPOINT, QD_OUTGOING,
- qdr_terminus(0), qdr_terminus(0));
+ qdr_terminus(0), qdr_terminus(0),
+ QD_SSN_ENDPOINT);
//
// Associate the anonymous sender with the edge connection address. This will cause
@@ -248,7 +249,8 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
(void) qdr_create_link_CT(ap->core, conn,
QD_LINK_ENDPOINT, QD_INCOMING,
qdr_terminus_edge_downlink(ap->core->router_id),
- qdr_terminus_edge_downlink(0));
+ qdr_terminus_edge_downlink(0),
+ QD_SSN_ENDPOINT);
//
// Attach a receiving link for edge address tracking updates.
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index 9ec0023..68de4a0 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -253,7 +253,8 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr
al->internal_addr = &key[2];
} else
qdr_terminus_set_address(term, &key[2]); // truncate the "Mp" annotation (where p = phase)
- al->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, al->dir, source, target);
+ al->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, al->dir, source, target,
+ QD_SSN_ENDPOINT);
al->link->auto_link = al;
al->link->phase = al->phase;
al->link->fallback = al->fallback;
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 611ecd8..7939763 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -271,6 +271,7 @@ typedef struct qdr_connection_work_t {
qdr_link_t *link;
qdr_terminus_t *source;
qdr_terminus_t *target;
+ qd_session_class_t ssn_class;
} qdr_connection_work_t;
ALLOC_DECLARE(qdr_connection_work_t);
@@ -964,12 +965,13 @@ void qdr_link_enqueue_work_CT(qdr_core_t *core,
qdr_link_t *conn,
qdr_link_work_t *work);
-qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
- qdr_connection_t *conn,
- qd_link_type_t link_type,
- qd_direction_t dir,
- qdr_terminus_t *source,
- qdr_terminus_t *target);
+qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
+ qdr_connection_t *conn,
+ qd_link_type_t link_type,
+ qd_direction_t dir,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target,
+ qd_session_class_t ssn_class);
void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close);
void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);
diff --git a/src/router_node.c b/src/router_node.c
index 365a88f..ea35944 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1353,7 +1353,8 @@ static void CORE_link_first_attach(void *context,
qdr_connection_t *conn,
qdr_link_t *link,
qdr_terminus_t *source,
- qdr_terminus_t *target)
+ qdr_terminus_t *target,
+ qd_session_class_t ssn_class)
{
qd_router_t *router = (qd_router_t*) context;
qd_connection_t *qconn = (qd_connection_t*) qdr_connection_get_context(conn);
@@ -1362,7 +1363,7 @@ static void CORE_link_first_attach(void *context,
//
// Create a new link to be attached
//
- qd_link_t *qlink = qd_link(router->node, qconn, qdr_link_direction(link), qdr_link_name(link));
+ qd_link_t *qlink = qd_link(router->node, qconn, qdr_link_direction(link), qdr_link_name(link), ssn_class);
//
// Copy the source and target termini to the link
diff --git a/src/server_private.h b/src/server_private.h
index 167a5ff..4176429 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -147,7 +147,7 @@ struct qd_connection_t {
int enqueued;
qd_timer_t *timer; // Timer for initial-setup
pn_connection_t *pn_conn;
- pn_session_t *pn_sess;
+ pn_session_t *pn_sessions[QD_SSN_CLASS_COUNT];
pn_ssl_t *ssl;
qd_listener_t *listener;
qd_connector_t *connector;
diff --git a/tests/system_tests_router_mesh.py b/tests/system_tests_router_mesh.py
index 99b9829..e8fab01 100644
--- a/tests/system_tests_router_mesh.py
+++ b/tests/system_tests_router_mesh.py
@@ -209,6 +209,35 @@ class ThreeRouterTest(TestCase):
self.assertEqual(1, ats.released)
self.assertEqual(0, ats.rejected)
+ def test_06_parallel_priority(self):
+ """
+ Create 10 senders each with a different priority. Send large messages
+ - large enough to trigger Qx flow control. Ensure all messages arrive
+ as expected
+ """
+
+ priorities = 10
+ send_batch = 150
+
+ total = priorities * send_batch
+ rx = self.spawn_receiver(self.RouterC,
+ count=total,
+ address="closest/test_06_address")
+ self.RouterA.wait_address("closest/test_06_address")
+
+ senders = [self.spawn_sender(self.RouterA,
+ send_batch,
+ "closest/test_06_address",
+ "-sx", "-p%s" % p)
+ for p in range(priorities)]
+
+ if rx.wait(timeout=TIMEOUT):
+ raise Exception("Receiver failed to consume all messages")
+ for tx in senders:
+ out_text, out_err = tx.communicate(timeout=TIMEOUT)
+ if tx.returncode:
+ raise Exception("Sender failed: %s %s" % (out_text, out_err))
+
if __name__ == '__main__':
unittest.main(main_module())
diff --git a/tests/test-sender.c b/tests/test-sender.c
index 0fecea0..1386619 100644
--- a/tests/test-sender.c
+++ b/tests/test-sender.c
@@ -44,8 +44,8 @@
#define BODY_SIZE_SMALL 100
#define BODY_SIZE_MEDIUM 2000
-#define BODY_SIZE_LARGE 60000
-#define BODY_SIZE_HUGE 257 * 1024 // will trigger Q2/Q3
+#define BODY_SIZE_LARGE 65 * 1024
+#define BODY_SIZE_HUGE 1024 * 1024 // will trigger Q2/Q3
#define DEFAULT_PRIORITY 4
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org