You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/05/11 18:26:19 UTC
[qpid-dispatch] branch master updated: DISPATCH-1545: Prevent
Head-of-line blocking on shared inter-router links
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 9d4c52c DISPATCH-1545: Prevent Head-of-line blocking on shared inter-router links
9d4c52c is described below
commit 9d4c52cf8c54d8a3e268c8e900e9baf632b063a4
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Thu Jan 23 15:36:52 2020 -0500
DISPATCH-1545: Prevent Head-of-line blocking on shared inter-router links
This patch prevents head-of-line blocking on shared links by
dynamically assigning a dedicated inter-router link for the transfer
of a streaming message.
This closes #716
---
include/qpid/dispatch/container.h | 3 +
include/qpid/dispatch/message.h | 9 +
include/qpid/dispatch/router_core.h | 4 +-
python/qpid_dispatch/management/qdrouter.json | 2 +-
src/CMakeLists.txt | 1 +
src/container.c | 14 +
src/message.c | 7 +
src/router_core/connections.c | 96 ++++-
src/router_core/delivery.c | 50 ++-
src/router_core/delivery.h | 2 +-
src/router_core/forwarder.c | 300 +++++++------
.../streaming_link_scrubber.c | 187 ++++++++
src/router_core/route_tables.c | 18 +-
src/router_core/router_core.c | 11 +
src/router_core/router_core_private.h | 42 +-
src/router_core/transfer.c | 13 +
src/router_node.c | 60 ++-
tests/CMakeLists.txt | 3 +
tests/clogger.c | 437 +++++++++++++++++++
tests/system_tests_edge_router.py | 471 ++++++++++++++++++++-
tests/system_tests_two_routers.py | 104 +++++
21 files changed, 1638 insertions(+), 196 deletions(-)
diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index 054c6d5..54f3d7c 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -91,6 +91,7 @@ typedef enum {
QD_SSN_ROUTER_DATA_PRI_9,
QD_SSN_CORE_ENDPOINT, ///< core subscriptions
QD_SSN_LINK_ROUTE, ///< link routes
+ QD_SSN_LINK_STREAMING, ///< link dedicated to streaming messages
QD_SSN_CLASS_COUNT
} qd_session_class_t;
@@ -231,6 +232,8 @@ void *qd_link_get_node_context(const qd_link_t *link);
void qd_link_restart_rx(qd_link_t *link);
void qd_link_q3_block(qd_link_t *link);
void qd_link_q3_unblock(qd_link_t *link);
+uint64_t qd_link_link_id(const qd_link_t *link);
+void qd_link_set_link_id(qd_link_t *link, uint64_t link_id);
qd_session_t *qd_session(pn_session_t *pn_ssn);
void qd_session_cleanup(qd_connection_t *qd_conn);
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index afc39c1..63f0b47 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -404,6 +404,15 @@ bool qd_message_Q2_holdoff_should_block(qd_message_t *msg);
*/
bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg);
+
+/**
+ * Check if a message has hit its Q2 limit and is currently blocked.
+ * When blocked no further message data will be read from the link.
+ *
+ * @param msg A pointer to the message
+ */
+bool qd_message_is_Q2_blocked(const qd_message_t *msg);
+
/**
* Return qd_link through which the message is being received.
* @param msg A pointer to the message
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index ce45950..51806f7 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -617,6 +617,7 @@ const char *qdr_link_name(const qdr_link_t *link);
* @param target Target terminus of the attach
* @param name - name of the link
* @param terminus_addr - terminus address if any
+ * @param link_id - set to the management id of the new link
* @return A pointer to a new qdr_link_t object to track the link
*/
qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
@@ -624,7 +625,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
qdr_terminus_t *source,
qdr_terminus_t *target,
const char *name,
- const char *terminus_addr);
+ const char *terminus_addr,
+ uint64_t *link_id);
/**
* qdr_link_second_attach
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index ec678d0..688bcc4 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1652,7 +1652,7 @@
},
"transitOutstanding": {
"type": "list",
- "description": "List of numbers of outstanding deliveries across a transit (inter-router) link for this address. This is for balanced distribution only."
+ "description": "List of numbers of outstanding deliveries across a transit (inter-router) connection for this address. This is for balanced distribution only."
},
"trackedDeliveries": {
"type": "integer",
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4257d95..9ca178b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -102,6 +102,7 @@ set(qpid_dispatch_SOURCES
router_core/modules/address_lookup_client/lookup_client.c
router_core/modules/stuck_delivery_detection/delivery_tracker.c
router_core/modules/mobile_sync/mobile.c
+ router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c
router_node.c
router_pynode.c
schema_enum.c
diff --git a/src/container.c b/src/container.c
index a728de7..651db95 100644
--- a/src/container.c
+++ b/src/container.c
@@ -19,6 +19,7 @@
#include <stdio.h>
#include <string.h>
+#include <inttypes.h>
#include "dispatch_private.h"
#include "policy.h"
#include <qpid/dispatch/container.h>
@@ -63,6 +64,7 @@ struct qd_link_t {
bool q2_limit_unbounded;
bool q3_blocked;
DEQ_LINKS_N(Q3, qd_link_t); ///< Q3 blocked links
+ uint64_t link_id;
};
ALLOC_DEFINE(qd_link_t);
@@ -1154,6 +1156,18 @@ void qd_link_q3_unblock(qd_link_t *link)
}
+uint64_t qd_link_link_id(const qd_link_t *link)
+{
+ return link->link_id;
+}
+
+
+void qd_link_set_link_id(qd_link_t *link, uint64_t link_id)
+{
+ link->link_id = link_id;
+}
+
+
qd_session_t *qd_session(pn_session_t *pn_ssn)
{
assert(pn_ssn);
diff --git a/src/message.c b/src/message.c
index 0d7b08b..447691b 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2232,6 +2232,12 @@ bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg)
}
+bool qd_message_is_Q2_blocked(const qd_message_t *msg)
+{
+ return ((const qd_message_pvt_t*)msg)->content->q2_input_holdoff;
+}
+
+
qd_link_t * qd_message_get_receiving_link(const qd_message_t *msg)
{
return safe_deref_qd_link_t(((qd_message_pvt_t *)msg)->content->input_link_sp);
@@ -2251,6 +2257,7 @@ void qd_message_set_aborted(const qd_message_t *msg, bool aborted)
msg_pvt->content->aborted = aborted;
}
+
bool qd_message_oversize(const qd_message_t *msg)
{
qd_message_content_t * mc = MSG_CONTENT(msg);
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4c0e73b..20fc2f9 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -105,6 +105,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
conn->oper_status = QDR_CONN_OPER_UP;
DEQ_INIT(conn->links);
DEQ_INIT(conn->work_list);
+ DEQ_INIT(conn->streaming_link_pool);
conn->connection_info->role = conn->role;
conn->work_lock = sys_mutex();
conn->conn_uptime = core->uptime_ticks;
@@ -525,7 +526,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
qdr_terminus_t *source,
qdr_terminus_t *target,
const char *name,
- const char *terminus_addr)
+ const char *terminus_addr,
+ uint64_t *link_id)
{
qdr_action_t *action = qdr_action(qdr_link_inbound_first_attach_CT, "link_first_attach");
qdr_link_t *link = new_qdr_link_t();
@@ -534,6 +536,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
ZERO(link);
link->core = conn->core;
link->identity = qdr_identifier(conn->core);
+ *link_id = link->identity;
link->conn = conn;
link->name = (char*) malloc(strlen(name) + 1);
@@ -839,7 +842,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
if (!qdr_delivery_receive_complete(dlv)) {
qdr_delivery_set_aborted(dlv, true);
- qdr_deliver_continue_peers_CT(core, dlv);
+ qdr_deliver_continue_peers_CT(core, dlv, false);
}
if (dlv->multicast) {
@@ -888,7 +891,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
if (!qdr_delivery_receive_complete(dlv)) {
qdr_delivery_set_aborted(dlv, true);
- qdr_deliver_continue_peers_CT(core, dlv);
+ qdr_deliver_continue_peers_CT(core, dlv, false);
}
peer = qdr_delivery_first_peer_CT(dlv);
@@ -932,7 +935,8 @@ static void qdr_link_abort_undelivered_CT(qdr_core_t *core, qdr_link_t *link)
static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, const char *log_text)
{
//
- // Remove the link from the master list of links
+ // Remove the link from the master list of links and possibly the streaming
+ // link pool
//
DEQ_REMOVE(core->open_links, link);
@@ -958,8 +962,10 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
if (qd_bitmask_valid_bit_value(conn->mask_bit)) {
if (link->link_type == QD_LINK_CONTROL)
core->control_links_by_mask_bit[conn->mask_bit] = 0;
- if (link->link_type == QD_LINK_ROUTER)
- core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
+ if (link->link_type == QD_LINK_ROUTER) {
+ if (link == core->data_links_by_mask_bit[conn->mask_bit].links[link->priority])
+ core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
+ }
}
//
@@ -1004,6 +1010,11 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
link, QDR_LINK_LIST_CLASS_ADDRESS);
}
+ if (link->in_streaming_pool) {
+ DEQ_REMOVE_N(STREAMING_POOL, conn->streaming_link_pool, link);
+ link->in_streaming_pool = false;
+ }
+
//
// Free the link's name and terminus_addr
//
@@ -1125,6 +1136,19 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close)
{
+ //
+ // Ensure a pooled link is no longer available for streaming messages
+ //
+ if (link->streaming) {
+ if (link->in_streaming_pool) {
+ DEQ_REMOVE_N(STREAMING_POOL, link->conn->streaming_link_pool, link);
+ link->in_streaming_pool = false;
+ }
+ }
+
+ //
+ // tell the I/O thread to do the detach
+ //
qdr_link_work_t *work = new_qdr_link_work_t();
ZERO(work);
work->work_type = ++link->detach_count == 1 ? QDR_LINK_WORK_FIRST_DETACH : QDR_LINK_WORK_SECOND_DETACH;
@@ -1308,9 +1332,11 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo
// Assign a unique mask-bit to this connection as a reference to be used by
// the router module
//
- if (qd_bitmask_first_set(core->neighbor_free_mask, &conn->mask_bit))
+ if (qd_bitmask_first_set(core->neighbor_free_mask, &conn->mask_bit)) {
qd_bitmask_clear_bit(core->neighbor_free_mask, conn->mask_bit);
- else {
+ assert(core->rnode_conns_by_mask_bit[conn->mask_bit] == 0);
+ core->rnode_conns_by_mask_bit[conn->mask_bit] = conn;
+ } else {
qd_log(core->log, QD_LOG_CRITICAL, "Exceeded maximum inter-router connection count");
conn->role = QDR_ROLE_NORMAL;
break;
@@ -1369,6 +1395,38 @@ void qdr_connection_free(qdr_connection_t *conn)
}
+// create a new outoing link for streaming messages
+qdr_link_t *qdr_connection_new_streaming_link_CT(qdr_core_t *core, qdr_connection_t *conn)
+{
+ qdr_link_t *out_link = 0;
+
+ switch (conn->role) {
+ case QDR_ROLE_INTER_ROUTER:
+ out_link = qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING,
+ qdr_terminus_router_data(), qdr_terminus_router_data(),
+ QD_SSN_LINK_STREAMING);
+ break;
+ case QDR_ROLE_EDGE_CONNECTION:
+ out_link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, QD_OUTGOING,
+ qdr_terminus(0), qdr_terminus(0),
+ QD_SSN_LINK_STREAMING);
+ break;
+ default:
+ assert(false);
+ break;
+ }
+
+ if (out_link) {
+ out_link->streaming = true;
+ if (!conn->has_streaming_links) {
+ qdr_add_connection_ref(&core->streaming_connections, conn);
+ conn->has_streaming_links = true;
+ }
+ }
+ return out_link;
+}
+
+
static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
qdr_connection_t *conn = safe_deref_qdr_connection_t(action->args.connection.conn);
@@ -1384,8 +1442,10 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
// Give back the router mask-bit.
//
if (conn->role == QDR_ROLE_INTER_ROUTER) {
+ assert(qd_bitmask_valid_bit_value(conn->mask_bit));
qdr_reset_sheaf(core, conn->mask_bit);
qd_bitmask_set_bit(core->neighbor_free_mask, conn->mask_bit);
+ core->rnode_conns_by_mask_bit[conn->mask_bit] = 0;
}
//
@@ -1418,6 +1478,11 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
link_ref = DEQ_HEAD(conn->links);
}
+ if (conn->has_streaming_links) {
+ assert(DEQ_IS_EMPTY(conn->streaming_link_pool)); // all links have been released
+ qdr_del_connection_ref(&core->streaming_connections, conn);
+ }
+
//
// Discard items on the work list
//
@@ -1489,8 +1554,10 @@ static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qd
static void qdr_detach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
{
+ // if this link is in the priority sheaf it needs to be removed
if (conn->role == QDR_ROLE_INTER_ROUTER)
- core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
+ if (link == core->data_links_by_mask_bit[conn->mask_bit].links[link->priority])
+ core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
}
@@ -1801,6 +1868,17 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
return;
} else {
+
+ //
+ // ensure a pooled link is no longer available for use
+ //
+ if (link->streaming) {
+ if (link->in_streaming_pool) {
+ DEQ_REMOVE_N(STREAMING_POOL, conn->streaming_link_pool, link);
+ link->in_streaming_pool = false;
+ }
+ }
+
//
// For routed links, propagate the detach
//
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 9e0400e..e3bc5f4 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -1049,7 +1049,7 @@ static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
}
-void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv)
+void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more)
{
qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
@@ -1060,24 +1060,38 @@ void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv)
qdr_link_work_t *work = peer->link_work;
qdr_link_t *peer_link = qdr_delivery_link(peer);
- //
- // Determines if the peer connection can be activated.
- // For a large message, the peer delivery's link_work MUST be at the head of the peer link's work list. This link work is only removed
- // after the streaming message has been sent.
- //
- if (!!work && !!peer_link) {
- sys_mutex_lock(peer_link->conn->work_lock);
- if (work->processing || work == DEQ_HEAD(peer_link->work_list)) {
- qdr_add_link_ref(&peer_link->conn->links_with_work[peer_link->priority], peer_link, QDR_LINK_LIST_CLASS_WORK);
- sys_mutex_unlock(peer_link->conn->work_lock);
+ if (!!peer_link) {
- //
- // Activate the outgoing connection for later processing.
- //
- qdr_connection_activate_CT(core, peer_link->conn);
+ if (peer_link->streaming && !more) {
+ if (!peer_link->in_streaming_pool) {
+ // A streaming message has completed. It is now safe to
+ // re-use this streaming link for the next streaming
+ // message since that new message will not be blocked
+ // indefinitely by the current message.
+ DEQ_INSERT_TAIL_N(STREAMING_POOL, peer_link->conn->streaming_link_pool, peer_link);
+ peer_link->in_streaming_pool = true;
+ }
+ }
+
+ //
+ // Determines if the peer connection can be activated.
+ // For a large message, the peer delivery's link_work MUST be at the head of the peer link's work list. This link work is only removed
+ // after the streaming message has been sent.
+ //
+ if (!!work) {
+ sys_mutex_lock(peer_link->conn->work_lock);
+ if (work->processing || work == DEQ_HEAD(peer_link->work_list)) {
+ qdr_add_link_ref(&peer_link->conn->links_with_work[peer_link->priority], peer_link, QDR_LINK_LIST_CLASS_WORK);
+ sys_mutex_unlock(peer_link->conn->work_lock);
+
+ //
+ // Activate the outgoing connection for later processing.
+ //
+ qdr_connection_activate_CT(core, peer_link->conn);
+ }
+ else
+ sys_mutex_unlock(peer_link->conn->work_lock);
}
- else
- sys_mutex_unlock(peer_link->conn->work_lock);
}
peer = qdr_delivery_next_peer_CT(in_dlv);
@@ -1108,7 +1122,7 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
// If it is already in the undelivered list, don't try to deliver this again.
//
if (!!link && in_dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
- qdr_deliver_continue_peers_CT(core, in_dlv);
+ qdr_deliver_continue_peers_CT(core, in_dlv, more);
qd_message_t *msg = qdr_delivery_message(in_dlv);
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index 8807996..4c6e646 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -154,7 +154,7 @@ qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv);
qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv);
/* schedules all peer deliveries with work for I/O processing */
-void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv);
+void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more);
/* update the links counters with respect to its delivery */
void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery);
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index ad3fb02..f6ee7bc 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -23,6 +23,8 @@
#include <strings.h>
#include "forwarder.h"
#include "delivery.h"
+#include <inttypes.h>
+
typedef struct qdr_forward_deliver_info_t {
DEQ_LINKS(struct qdr_forward_deliver_info_t);
@@ -36,20 +38,26 @@ DEQ_DECLARE(qdr_forward_deliver_info_t, qdr_forward_deliver_info_list_t);
ALLOC_DEFINE(qdr_forward_deliver_info_t);
-static qdr_link_t * peer_data_link(qdr_core_t *core,
- qdr_node_t *node,
- int priority)
+// get the control link for a given inter-router connection
+static inline qdr_link_t *peer_router_control_link(qdr_core_t *core, int conn_mask)
{
- int nlmb = node->link_mask_bit;
+ return (conn_mask >= 0) ? core->control_links_by_mask_bit[conn_mask] : 0;
+}
+
- if (nlmb < 0 || priority < 0)
+// find the proper outgoing data link on a connection using the given priority
+static inline qdr_link_t *peer_router_data_link(qdr_core_t *core,
+ int conn_mask,
+ int priority)
+{
+ if (conn_mask < 0 || priority < 0)
return 0;
// Try to return the requested priority link, but if it does
// not exist, return the closest one that is lower.
qdr_link_t * link = 0;
while (1) {
- if ((link = core->data_links_by_mask_bit[nlmb].links[priority]))
+ if ((link = core->data_links_by_mask_bit[conn_mask].links[priority]))
return link;
if (-- priority < 0)
return 0;
@@ -58,6 +66,43 @@ static qdr_link_t * peer_data_link(qdr_core_t *core,
}
+// Returns true if the peer router can support this router opening additional incoming links dedicated for streaming messages
+//
+static inline bool next_hop_supports_streaming_links(const qdr_connection_t *conn)
+{
+ if (conn->role == QDR_ROLE_EDGE_CONNECTION)
+ return true;
+ if (conn->role == QDR_ROLE_INTER_ROUTER) {
+ return QDR_ROUTER_VERSION_AT_LEAST(conn->connection_info->version,
+ 1, 13, 0);
+ }
+ return false;
+}
+
+
+// Get an idle anonymous link for a streaming message. This link will come from
+// either the connections free link pool or it will be dynamically created on
+// the given connection.
+static inline qdr_link_t *get_outgoing_streaming_link(qdr_core_t *core, qdr_connection_t *conn)
+{
+ if (!conn) return 0;
+
+ qdr_link_t *out_link = DEQ_HEAD(conn->streaming_link_pool);
+ if (out_link) {
+ DEQ_REMOVE_HEAD_N(STREAMING_POOL, conn->streaming_link_pool);
+ out_link->in_streaming_pool = false;
+ } else {
+ // no free links - create a new one
+ out_link = qdr_connection_new_streaming_link_CT(core, conn);
+ if (!out_link) {
+ qd_log(core->log, QD_LOG_WARNING, "[C%"PRIu64"] Unable to setup new outgoing streaming message link", conn->identity);
+ return 0;
+ }
+ }
+ return out_link;
+}
+
+
//==================================================================================
// Built-in Forwarders
//==================================================================================
@@ -112,15 +157,15 @@ static void qdr_forward_find_closest_remotes_CT(qdr_core_t *core, qdr_address_t
}
-qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, qdr_link_t *link, qd_message_t *msg)
+qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, qdr_link_t *out_link, qd_message_t *msg)
{
qdr_delivery_t *out_dlv = new_qdr_delivery_t();
uint64_t *tag = (uint64_t*) out_dlv->tag;
- if (link->conn)
- link->conn->last_delivery_time = core->uptime_ticks;
+ if (out_link->conn)
+ out_link->conn->last_delivery_time = core->uptime_ticks;
ZERO(out_dlv);
- set_safe_ptr_qdr_link_t(link, &out_dlv->link_sp);
+ set_safe_ptr_qdr_link_t(out_link, &out_dlv->link_sp);
out_dlv->msg = qd_message_copy(msg);
out_dlv->settled = !in_dlv || in_dlv->settled;
out_dlv->presettled = out_dlv->settled;
@@ -357,8 +402,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
bool bypass_valid_origins = addr->forwarder->bypass_valid_origins;
int fanout = 0;
qd_bitmask_t *link_exclusion = !!in_delivery ? in_delivery->link_exclusion : 0;
- bool receive_complete = qd_message_receive_complete(qdr_delivery_message(in_delivery));
- uint8_t priority = qdr_forward_effective_priority(msg, addr);
+ bool receive_complete = qd_message_receive_complete(msg);
qdr_forward_deliver_info_list_t deliver_info_list;
DEQ_INIT(deliver_info_list);
@@ -375,19 +419,26 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
// Only forward via links that don't result in edge-echo.
//
if (!qdr_forward_edge_echo_CT(in_delivery, out_link)) {
- qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- // Store the out_link and out_delivery so we can forward the delivery later on
- qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
- ZERO(deliver_info);
- deliver_info->out_dlv = out_delivery;
- deliver_info->out_link = out_link;
- DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
+ if (!receive_complete && next_hop_supports_streaming_links(out_link->conn)) {
+ out_link = get_outgoing_streaming_link(core, out_link->conn);
+ }
- fanout++;
- if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER && !out_link->fallback) {
- addr->deliveries_egress++;
- core->deliveries_egress++;
+ if (out_link) {
+ qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
+
+ // Store the out_link and out_delivery so we can forward the delivery later on
+ qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
+ ZERO(deliver_info);
+ deliver_info->out_dlv = out_delivery;
+ deliver_info->out_link = out_link;
+ DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
+
+ fanout++;
+ if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER && !out_link->fallback) {
+ addr->deliveries_egress++;
+ core->deliveries_egress++;
+ }
}
}
@@ -399,6 +450,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
// Forward to remote routers with subscribers using the appropriate
// link for the traffic class: control or data
//
+
//
// Get the mask bit associated with the ingress router for the message.
// This will be compared against the "valid_origin" masks for each
@@ -421,17 +473,15 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
//
if (origin >= 0) {
int dest_bit;
- qdr_link_t *dest_link;
- qdr_node_t *next_node;
- qd_bitmask_t *link_set = qd_bitmask(0);
+ qd_bitmask_t *conn_set = qd_bitmask(0); // connections to the next-hops
//
- // Loop over the target nodes for this address. Build a set of outgoing links
+ // Loop over the target nodes for this address. Build a set of outgoing connections
// for which there are valid targets. We do this to avoid sending more than one
- // message down a given link. It's possible that there are multiple destinations
- // for this address that are all reachable over the same link. In this case, we
- // will send only one copy of the message over the link and allow a downstream
- // router to fan the message out.
+ // message to a given next-hop. It's possible that there are multiple destinations
+ // for this address that are all reachable via the same next-hop. In this case, we
+ // will send only one copy of the message to the next-hop and allow the downstream
+ // routers to fan the message out.
//
int c;
for (QD_BITMASK_EACH(addr->rnodes, dest_bit, c)) {
@@ -439,26 +489,30 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
if (!rnode)
continue;
- if (rnode->next_hop)
- next_node = rnode->next_hop;
- else
- next_node = rnode;
-
- dest_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority);
- if (dest_link && qd_bitmask_value(rnode->valid_origins, origin))
- qd_bitmask_set_bit(link_set, dest_link->conn->mask_bit);
+ // get the inter-router connection associated with path to rnode:
+ int conn_bit = (rnode->next_hop) ? rnode->next_hop->conn_mask_bit : rnode->conn_mask_bit;
+ if (conn_bit >= 0 && (!link_exclusion || qd_bitmask_value(link_exclusion, conn_bit) == 0)) {
+ qd_bitmask_set_bit(conn_set, conn_bit);
+ }
}
//
- // Send a copy of the message outbound on each identified link.
+ // Send a copy of the message over the inter-router connection to the next hop
//
- int link_bit;
- while (qd_bitmask_first_set(link_set, &link_bit)) {
- qd_bitmask_clear_bit(link_set, link_bit);
- dest_link = control ?
- core->control_links_by_mask_bit[link_bit] :
- core->data_links_by_mask_bit[link_bit].links[priority];
- if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) {
+ int conn_bit;
+ while (qd_bitmask_first_set(conn_set, &conn_bit)) {
+ qd_bitmask_clear_bit(conn_set, conn_bit);
+
+ qdr_link_t *dest_link;
+ if (control) {
+ dest_link = peer_router_control_link(core, conn_bit);
+ } else if (!receive_complete) { // inter-router conns support dynamic streaming links
+ dest_link = get_outgoing_streaming_link(core, core->rnode_conns_by_mask_bit[conn_bit]);
+ } else {
+ dest_link = peer_router_data_link(core, conn_bit, qdr_forward_effective_priority(msg, addr));
+ }
+
+ if (dest_link) {
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
// Store the out_link and out_delivery so we can forward the delivery later on
@@ -475,7 +529,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
}
}
- qd_bitmask_free(link_set);
+ qd_bitmask_free(conn_set);
}
if (!exclude_inprocess) {
@@ -510,14 +564,11 @@ int qdr_forward_closest_CT(qdr_core_t *core,
bool exclude_inprocess,
bool control)
{
- qdr_link_t *out_link;
- qdr_delivery_t *out_delivery;
-
+ const bool receive_complete = qd_message_receive_complete(msg);
//
// Forward to an in-process subscriber if there is one.
//
if (!exclude_inprocess) {
- bool receive_complete = qd_message_receive_complete(msg);
qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
if (sub) {
qdr_forward_to_subscriber(core, sub, in_delivery, msg, receive_complete);
@@ -545,7 +596,7 @@ int qdr_forward_closest_CT(qdr_core_t *core,
}
//
- // Forward to a local subscriber.
+ // Forward to a locally attached subscriber.
//
qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
@@ -556,31 +607,38 @@ int qdr_forward_closest_CT(qdr_core_t *core,
link_ref = DEQ_NEXT(link_ref);
if (link_ref) {
- out_link = link_ref->link;
- out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- qdr_forward_deliver_CT(core, out_link, out_delivery);
+ qdr_link_t *out_link = link_ref->link;
- //
- // If there are multiple local subscribers, rotate the list of link references
- // so deliveries will be distributed among the subscribers in a round-robin pattern.
- //
- if (DEQ_SIZE(addr->rlinks) > 1) {
- link_ref = DEQ_HEAD(addr->rlinks);
- DEQ_REMOVE_HEAD(addr->rlinks);
- DEQ_INSERT_TAIL(addr->rlinks, link_ref);
+ if (!receive_complete && next_hop_supports_streaming_links(out_link->conn)) {
+ out_link = get_outgoing_streaming_link(core, out_link->conn);
}
- if (!out_link->fallback) {
- addr->deliveries_egress++;
- core->deliveries_egress++;
- }
+ if (out_link) {
+ qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
+ qdr_forward_deliver_CT(core, out_link, out_delivery);
- if (qdr_connection_route_container(out_link->conn)) {
- core->deliveries_egress_route_container++;
- addr->deliveries_egress_route_container++;
- }
+ //
+ // If there are multiple local subscribers, rotate the list of link references
+ // so deliveries will be distributed among the subscribers in a round-robin pattern.
+ //
+ if (DEQ_SIZE(addr->rlinks) > 1) {
+ link_ref = DEQ_HEAD(addr->rlinks);
+ DEQ_REMOVE_HEAD(addr->rlinks);
+ DEQ_INSERT_TAIL(addr->rlinks, link_ref);
+ }
- return 1;
+ if (!out_link->fallback) {
+ addr->deliveries_egress++;
+ core->deliveries_egress++;
+ }
+
+ if (qdr_connection_route_container(out_link->conn)) {
+ core->deliveries_egress_route_container++;
+ addr->deliveries_egress_route_container++;
+ }
+
+ return 1;
+ }
}
//
@@ -594,8 +652,6 @@ int qdr_forward_closest_CT(qdr_core_t *core,
// Forward to remote routers with subscribers using the appropriate
// link for the traffic class: control or data
//
- qdr_node_t *next_node;
-
if (addr->next_remote >= 0) {
qdr_node_t *rnode = core->routers_by_mask_bit[addr->next_remote];
if (rnode) {
@@ -603,15 +659,19 @@ int qdr_forward_closest_CT(qdr_core_t *core,
if (addr->next_remote == -1)
qd_bitmask_first_set(addr->closest_remotes, &addr->next_remote);
- if (rnode->next_hop)
- next_node = rnode->next_hop;
- else
- next_node = rnode;
+ // get the inter-router connection associated with path to rnode:
+ int conn_bit = (rnode->next_hop) ? rnode->next_hop->conn_mask_bit : rnode->conn_mask_bit;
+ qdr_link_t *out_link;
+ if (control) {
+ out_link = peer_router_control_link(core, conn_bit);
+ } else if (!receive_complete) {
+ out_link = get_outgoing_streaming_link(core, core->rnode_conns_by_mask_bit[conn_bit]);
+ } else {
+ out_link = peer_router_data_link(core, conn_bit, qdr_forward_effective_priority(msg, addr));
+ }
- uint8_t priority = qdr_forward_effective_priority(msg, addr);
- out_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority);
if (out_link) {
- out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
+ qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
qdr_forward_deliver_CT(core, out_link, out_delivery);
addr->deliveries_transit++;
if (out_link->link_type == QD_LINK_ROUTER)
@@ -647,10 +707,10 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
}
qdr_link_t *best_eligible_link = 0;
- int best_eligible_link_bit = -1;
+ int best_eligible_conn_bit = -1;
uint32_t eligible_link_value = UINT32_MAX;
qdr_link_t *best_ineligible_link = 0;
- int best_ineligible_link_bit = -1;
+ int best_ineligible_conn_bit = -1;
uint32_t ineligible_link_value = UINT32_MAX;
//
@@ -702,7 +762,7 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
// This will be compared against the "valid_origin" masks for each
// candidate destination router.
//
- int origin = 0;
+ int origin = 0; // default to this router
qd_iterator_t *ingress_iter = in_delivery ? in_delivery->origin : 0;
if (ingress_iter) {
@@ -717,26 +777,29 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
int node_bit;
for (QD_BITMASK_EACH(addr->rnodes, node_bit, c)) {
qdr_node_t *rnode = core->routers_by_mask_bit[node_bit];
- qdr_node_t *next_node = rnode->next_hop ? rnode->next_hop : rnode;
- uint8_t priority = qdr_forward_effective_priority(msg, addr);
- qdr_link_t *link = peer_data_link(core, next_node, priority);
- if (!link) continue;
- int link_bit = link->conn->mask_bit;
- int value = addr->outstanding_deliveries[link_bit];
- bool eligible = link->capacity > value;
if (qd_bitmask_value(rnode->valid_origins, origin)) {
+
+ qdr_node_t *next_node = rnode->next_hop ? rnode->next_hop : rnode;
+ int conn_bit = next_node->conn_mask_bit;
+ uint8_t priority = qdr_forward_effective_priority(msg, addr);
+ qdr_link_t *link = peer_router_data_link(core, conn_bit, priority);
+ if (!link) continue;
+
+ int value = addr->outstanding_deliveries[conn_bit];
+ bool eligible = link->capacity > value;
+
//
// Link is a candidate, adjust the value by the bias (node cost).
//
value += rnode->cost;
if (eligible && eligible_link_value > value) {
best_eligible_link = link;
- best_eligible_link_bit = link_bit;
+ best_eligible_conn_bit = conn_bit;
eligible_link_value = value;
} else if (!eligible && ineligible_link_value > value) {
best_ineligible_link = link;
- best_ineligible_link_bit = link_bit;
+ best_ineligible_conn_bit = conn_bit;
ineligible_link_value = value;
}
}
@@ -754,34 +817,43 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
}
qdr_link_t *chosen_link = 0;
- int chosen_link_bit = -1;
+ int chosen_conn_bit = -1;
if (best_eligible_link) {
chosen_link = best_eligible_link;
- chosen_link_bit = best_eligible_link_bit;
+ chosen_conn_bit = best_eligible_conn_bit;
} else if (best_ineligible_link) {
chosen_link = best_ineligible_link;
- chosen_link_bit = best_ineligible_link_bit;
+ chosen_conn_bit = best_ineligible_conn_bit;
}
if (chosen_link) {
- qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg);
- qdr_forward_deliver_CT(core, chosen_link, out_delivery);
- //
- // If the delivery is unsettled and the link is inter-router, account for the outstanding delivery.
- //
- if (in_delivery && !in_delivery->settled && chosen_link_bit >= 0) {
- addr->outstanding_deliveries[chosen_link_bit]++;
- out_delivery->tracking_addr = addr;
- out_delivery->tracking_addr_bit = chosen_link_bit;
- addr->tracked_deliveries++;
+ // DISPATCH-1545 (head of line blocking): if the message is streaming,
+ // see if the allows us to open a dedicated link for streaming
+ if (!qd_message_receive_complete(msg) && next_hop_supports_streaming_links(chosen_link->conn)) {
+ chosen_link = get_outgoing_streaming_link(core, chosen_link->conn);
+ if (!chosen_link)
+ return 0;
}
+ qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg);
+ qdr_forward_deliver_CT(core, chosen_link, out_delivery);
+
//
// Bump the appropriate counter based on where we sent the delivery.
//
- if (chosen_link_bit >= 0) {
+ if (chosen_conn_bit >= 0) { // sent to peer router
+ //
+ // If the delivery is unsettled account for the outstanding delivery sent inter-router.
+ //
+ if (in_delivery && !in_delivery->settled) {
+ addr->outstanding_deliveries[chosen_conn_bit]++;
+ out_delivery->tracking_addr = addr;
+ out_delivery->tracking_addr_bit = chosen_conn_bit;
+ addr->tracked_deliveries++;
+ }
+
addr->deliveries_transit++;
if (chosen_link->link_type == QD_LINK_ROUTER)
core->deliveries_transit++;
@@ -841,8 +913,6 @@ bool qdr_forward_link_balanced_CT(qdr_core_t *core,
//
// Look for a next-hop we can use to forward the link-attach.
//
- qdr_node_t *next_node;
-
if (addr->cost_epoch != core->cost_epoch) {
addr->next_remote = -1;
addr->cost_epoch = core->cost_epoch;
@@ -864,14 +934,8 @@ bool qdr_forward_link_balanced_CT(qdr_core_t *core,
if (addr->next_remote == -1)
qd_bitmask_first_set(addr->rnodes, &addr->next_remote);
- if (rnode->next_hop)
- next_node = rnode->next_hop;
- else
- next_node = rnode;
-
- qdr_link_t * pdl = peer_data_link(core, next_node, 0);
- if (next_node && pdl)
- conn = pdl->conn;
+ int conn_bit = (rnode->next_hop) ? rnode->next_hop->conn_mask_bit : rnode->conn_mask_bit;
+ conn = (conn_bit >= 0) ? core->rnode_conns_by_mask_bit[conn_bit] : 0;
}
}
}
diff --git a/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c
new file mode 100644
index 0000000..1c578ca
--- /dev/null
+++ b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/dispatch/ctools.h"
+#include "module.h"
+#include <inttypes.h>
+
+
+/*
+ * Release unused streaming links
+ *
+ * Periodically scan through the list of open connections checking for idle
+ * streaming links. If the connections idle streaming link pool is oversized
+ * then release some of the unused links in the background.
+ */
+
+#define PROD_TIMER_INTERVAL 30
+#define TEST_TIMER_INTERVAL 5
+#define TEST_MAX_FREE_POOL 2
+#define MAX_FREE_BATCH 10 // rate limit the link detach
+
+static int timer_interval = PROD_TIMER_INTERVAL;
+static int max_free_pool_size = 128;
+
+static void qdr_streaming_link_scrubber_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+typedef struct tracker_t tracker_t;
+struct tracker_t {
+ qdr_core_t *core;
+ qdr_core_timer_t *timer;
+ qdr_connection_ref_t_sp next_conn_ref;
+};
+
+
+/* Idle streaming link cleanup
+ *
+ * Check the size of the connections idle link free pool. If the connection
+ * has accumulated too many unused links start closing them
+ */
+static void idle_link_cleanup(qdr_core_t *core, qdr_connection_t *conn)
+{
+ qdr_link_list_t to_free = DEQ_EMPTY;
+
+ qd_log(core->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"] Streaming link scrubber: scanning connection", conn->identity);
+
+ const size_t pool_size = DEQ_SIZE(conn->streaming_link_pool);
+ if (pool_size > max_free_pool_size) {
+ size_t count = MIN(MAX_FREE_BATCH, pool_size - max_free_pool_size);
+
+ // links are returned to the pool by inserting at the tail. Thus the
+ // links at head have been on the list the longest and are more likely
+ // be candidates for cleanup (e.g. idle)
+ while (count) {
+ qdr_link_t *link = DEQ_HEAD(conn->streaming_link_pool);
+ if (!qdr_link_is_idle_CT(link))
+ break;
+ DEQ_REMOVE_HEAD_N(STREAMING_POOL, conn->streaming_link_pool);
+ DEQ_INSERT_TAIL_N(STREAMING_POOL, to_free, link);
+ link->in_streaming_pool = false;
+ count -= 1;
+ }
+
+ }
+
+ if (DEQ_HEAD(to_free)) {
+ qd_log(core->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"] Streaming link scrubber: found %d idle links", conn->identity, (int)DEQ_SIZE(to_free));
+
+ while (DEQ_HEAD(to_free)) {
+ qdr_link_t *link = DEQ_HEAD(to_free);
+ DEQ_REMOVE_HEAD_N(STREAMING_POOL, to_free);
+ qd_log(core->log, QD_LOG_DEBUG,
+ "[C%"PRIu64"][L%"PRIu64"] Streaming link scrubber: closing idle link %s",
+ link->conn->identity, link->identity, (link->name) ? link->name : "");
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE, true);
+ }
+ }
+}
+
+
+static void timer_handler_CT(qdr_core_t *core, void *context)
+{
+ tracker_t *tracker = (tracker_t*) context;
+ qdr_connection_ref_t *first_ref = DEQ_HEAD(core->streaming_connections);
+
+ if (!!first_ref) {
+ qd_log(core->log, QD_LOG_DEBUG, "Starting streaming link scrubber scan");
+ set_safe_ptr_qdr_connection_ref_t(first_ref, &tracker->next_conn_ref);
+ qdr_action_t *action = qdr_action(qdr_streaming_link_scrubber_CT, "streaming_link_scrubber");
+ action->args.general.context_1 = tracker;
+ qdr_action_background_enqueue(core, action);
+ } else
+ qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval);
+}
+
+
+static void qdr_streaming_link_scrubber_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ if (discard)
+ return;
+
+ tracker_t *tracker = (tracker_t*) action->args.general.context_1;
+ qdr_connection_ref_t *conn_ref = safe_deref_qdr_connection_ref_t(tracker->next_conn_ref);
+
+ if (!!conn_ref) {
+ idle_link_cleanup(core, conn_ref->conn);
+
+ conn_ref = DEQ_NEXT(conn_ref);
+ if (!!conn_ref) {
+ //
+ // There is another connection on the list. Schedule another
+ // background action to process the next connection.
+ //
+ set_safe_ptr_qdr_connection_ref_t(conn_ref, &tracker->next_conn_ref);
+ action = qdr_action(qdr_streaming_link_scrubber_CT, "streaming_link_scrubber");
+ action->args.general.context_1 = tracker;
+ qdr_action_background_enqueue(core, action);
+ } else
+ //
+ // We've come to the end of the list of open connections. Set the
+ // timer to start a new sweep after the interval.
+ //
+ qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval);
+ } else
+ //
+ // The connection we were provided is no longer valid. It was probably
+ // closed since the last time we came through this path. Abort the
+ // sweep and set the timer for a new one after the interval.
+ //
+ qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval);
+}
+
+
+static bool qcm_streaming_link_scrubber_enable_CT(qdr_core_t *core)
+{
+ if (core->qd->test_hooks) {
+ //
+ // Test hooks are enabled, override the timing constants with the test values
+ //
+ timer_interval = TEST_TIMER_INTERVAL;
+ max_free_pool_size = TEST_MAX_FREE_POOL;
+ }
+
+ return true;
+}
+
+
+static void qcm_streaming_link_scrubber_init_CT(qdr_core_t *core, void **module_context)
+{
+ tracker_t *tracker = NEW(tracker_t);
+ ZERO(tracker);
+ tracker->core = core;
+ tracker->timer = qdr_core_timer_CT(core, timer_handler_CT, tracker);
+ qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval);
+ *module_context = tracker;
+
+ qd_log(core->log, QD_LOG_INFO,
+ "Streaming link scrubber: Scan interval: %d seconds, max free pool: %d links", timer_interval, max_free_pool_size);
+}
+
+
+static void qcm_streaming_link_scrubber_final_CT(void *module_context)
+{
+ tracker_t *tracker = (tracker_t*) module_context;
+ qdr_core_timer_free_CT(tracker->core, tracker->timer);
+ free(tracker);
+}
+
+
+QDR_CORE_MODULE_DECLARE("streaming_link_scruber", qcm_streaming_link_scrubber_enable_CT, qcm_streaming_link_scrubber_init_CT, qcm_streaming_link_scrubber_final_CT)
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 7c55964..3388507 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -246,11 +246,13 @@ void qdr_route_table_setup_CT(qdr_core_t *core)
core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
core->control_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width());
+ core->rnode_conns_by_mask_bit = NEW_PTR_ARRAY(qdr_connection_t, qd_bitmask_width());
core->data_links_by_mask_bit = NEW_ARRAY(qdr_priority_sheaf_t, qd_bitmask_width());
for (int idx = 0; idx < qd_bitmask_width(); idx++) {
core->routers_by_mask_bit[idx] = 0;
core->control_links_by_mask_bit[idx] = 0;
core->data_links_by_mask_bit[idx].count = 0;
+ core->rnode_conns_by_mask_bit[idx] = 0;
for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority)
core->data_links_by_mask_bit[idx].links[priority] = 0;
@@ -312,7 +314,7 @@ static void qdr_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca
ZERO(rnode);
rnode->owning_addr = addr;
rnode->mask_bit = router_maskbit;
- rnode->link_mask_bit = -1;
+ rnode->conn_mask_bit = -1;
rnode->valid_origins = qd_bitmask(0);
qd_iterator_reset_view(iter, ITER_VIEW_ALL);
@@ -414,20 +416,20 @@ static void qdr_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca
static void qdr_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
- int link_maskbit = action->args.route_table.link_maskbit;
+ int conn_maskbit = action->args.route_table.link_maskbit; // "link" identifies a connection, not an amqp link
if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
qd_log(core->log, QD_LOG_CRITICAL, "set_link: Router maskbit out of range: %d", router_maskbit);
return;
}
- if (link_maskbit >= qd_bitmask_width() || link_maskbit < 0) {
- qd_log(core->log, QD_LOG_CRITICAL, "set_link: Link maskbit out of range: %d", link_maskbit);
+ if (conn_maskbit >= qd_bitmask_width() || conn_maskbit < 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "set_link: Link maskbit out of range: %d", conn_maskbit);
return;
}
- if (core->control_links_by_mask_bit[link_maskbit] == 0) {
- qd_log(core->log, QD_LOG_CRITICAL, "set_link: Invalid link reference: %d", link_maskbit);
+ if (core->control_links_by_mask_bit[conn_maskbit] == 0) {
+ qd_log(core->log, QD_LOG_CRITICAL, "set_link: Invalid link reference: %d", conn_maskbit);
return;
}
@@ -440,7 +442,7 @@ static void qdr_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard
// Add the peer_link reference to the router record.
//
qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
- rnode->link_mask_bit = link_maskbit;
+ rnode->conn_mask_bit = conn_maskbit;
qdr_addr_start_inlinks_CT(core, rnode->owning_addr);
}
@@ -460,7 +462,7 @@ static void qdr_remove_link_CT(qdr_core_t *core, qdr_action_t *action, bool disc
}
qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
- rnode->link_mask_bit = -1;
+ rnode->conn_mask_bit = -1;
}
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index ec8e0ad..91169ec 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -165,6 +165,10 @@ void qdr_core_free(qdr_core_t *core)
qdr_link_t *link = DEQ_HEAD(core->open_links);
while (link) {
DEQ_REMOVE_HEAD(core->open_links);
+ if (link->in_streaming_pool) {
+ DEQ_REMOVE_N(STREAMING_POOL, link->conn->streaming_link_pool, link);
+ link->in_streaming_pool = false;
+ }
if (link->core_endpoint)
qdrc_endpoint_do_cleanup_CT(core, link->core_endpoint);
qdr_del_link_ref(&link->conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
@@ -196,9 +200,15 @@ void qdr_core_free(qdr_core_t *core)
work = DEQ_HEAD(conn->work_list);
}
+ if (conn->has_streaming_links) {
+ assert(DEQ_IS_EMPTY(conn->streaming_link_pool)); // all links have been released
+ qdr_del_connection_ref(&core->streaming_connections, conn);
+ }
+
qdr_connection_free(conn);
conn = DEQ_HEAD(core->open_connections);
}
+ assert(DEQ_SIZE(core->streaming_connections) == 0);
// at this point all the conn identifiers have been freed
qd_hash_free(core->conn_id_hash);
@@ -211,6 +221,7 @@ void qdr_core_free(qdr_core_t *core)
if (core->control_links_by_mask_bit) free(core->control_links_by_mask_bit);
if (core->data_links_by_mask_bit) free(core->data_links_by_mask_bit);
if (core->neighbor_free_mask) qd_bitmask_free(core->neighbor_free_mask);
+ if (core->rnode_conns_by_mask_bit) free(core->rnode_conns_by_mask_bit);
free(core);
}
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index e3757c4..707d5f2 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -348,7 +348,7 @@ struct qdr_node_t {
qdr_address_t *owning_addr;
int mask_bit;
qdr_node_t *next_hop; ///< Next hop node _if_ this is not a neighbor node
- int link_mask_bit; ///< Mask bit of inter-router connection if this is a neighbor node
+ int conn_mask_bit; ///< qdr_connection_t->mask_bit inter-router conn if this is a neighbor node
uint32_t ref_count;
qd_bitmask_t *valid_origins;
int cost;
@@ -360,11 +360,6 @@ struct qdr_node_t {
DEQ_DECLARE(qdr_node_t, qdr_node_list_t);
void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode);
-#define PEER_CONTROL_LINK(c,n) ((n->link_mask_bit >= 0) ? (c)->control_links_by_mask_bit[n->link_mask_bit] : 0)
-// PEER_DATA_LINK has gotten more complex with prioritized links, and is now a function, peer_data_link().
-
-
-
struct qdr_router_ref_t {
DEQ_LINKS(qdr_router_ref_t);
qdr_node_t *router;
@@ -466,9 +461,11 @@ struct qdr_link_t {
bool processing; ///< True if an IO thread is currently handling this link
bool ready_to_free; ///< True if the core thread wanted to clean up the link but it was processing
bool fallback; ///< True if this link is attached to a fallback destination for an address
+ bool streaming; ///< True if this link can be reused for streaming msgs
+ bool in_streaming_pool; ///< True if this link is in the connections standby pool STREAMING_POOL
+ bool terminus_survives_disconnect;
char *strip_prefix;
char *insert_prefix;
- bool terminus_survives_disconnect;
uint64_t total_deliveries;
uint64_t presettled_deliveries;
@@ -485,8 +482,9 @@ struct qdr_link_t {
uint8_t priority;
uint8_t rate_cursor;
uint32_t core_ticks;
-};
+ DEQ_LINKS_N(STREAMING_POOL, qdr_link_t);
+};
DEQ_DECLARE(qdr_link_t, qdr_link_list_t);
struct qdr_link_ref_t {
@@ -552,7 +550,7 @@ struct qdr_address_t {
int next_remote;
//
- // State for "balanced" treatment
+ // State for "balanced" treatment, indexed by inter-router connection mask bit
//
int *outstanding_deliveries;
@@ -671,7 +669,7 @@ struct qdr_connection_t {
bool policy_allow_dynamic_link_routes;
bool policy_allow_admin_status_update;
int link_capacity;
- int mask_bit;
+ int mask_bit; ///< set only if inter-router connection
qdr_connection_work_list_t work_list;
sys_mutex_t *work_lock;
qdr_link_ref_list_t links;
@@ -688,6 +686,8 @@ struct qdr_connection_t {
uint32_t conn_uptime; // Timestamp which can be used to calculate the number of seconds this connection has been up and running.
uint32_t last_delivery_time; // Timestamp which can be used to calculate the number of seconds since the last delivery arrived on this connection.
bool enable_protocol_trace; // Has trace level logging been turned on for this connection.
+ bool has_streaming_links; ///< one or more of this connection's links are for streaming messages
+ qdr_link_list_t streaming_link_pool; ///< pool of links available for streaming messages
};
DEQ_DECLARE(qdr_connection_t, qdr_connection_list_t);
@@ -801,10 +801,11 @@ struct qdr_core_t {
qd_timer_t *work_timer;
uint32_t uptime_ticks;
- qdr_connection_list_t open_connections;
- qdr_connection_t *active_edge_connection;
- qdr_connection_list_t connections_to_activate;
- qdr_link_list_t open_links;
+ qdr_connection_list_t open_connections;
+ qdr_connection_t *active_edge_connection;
+ qdr_connection_list_t connections_to_activate;
+ qdr_link_list_t open_links;
+ qdr_connection_ref_list_t streaming_connections;
qdrc_attach_addr_lookup_t addr_lookup_handler;
void *addr_lookup_context;
@@ -868,10 +869,11 @@ struct qdr_core_t {
qdr_address_t *routerma_addr_T;
qdr_node_list_t routers; ///< List of routers, in order of cost, from lowest to highest
- qd_bitmask_t *neighbor_free_mask;
- qdr_node_t **routers_by_mask_bit;
- qdr_link_t **control_links_by_mask_bit;
- qdr_priority_sheaf_t *data_links_by_mask_bit;
+ qd_bitmask_t *neighbor_free_mask; ///< bits available for new conns (qd_connection_t->mask_bit values)
+ qdr_node_t **routers_by_mask_bit; ///< indexed by qdr_node_t->mask_bit
+ qdr_connection_t **rnode_conns_by_mask_bit; ///< inter-router conns indexed by conn->mask_bit
+ qdr_link_t **control_links_by_mask_bit; ///< indexed by qdr_node_t->link_mask_bit, qdr_connection_t->mask_bit
+ qdr_priority_sheaf_t *data_links_by_mask_bit; ///< indexed by qdr_node_t->link_mask_bit, qdr_connection_t->mask_bit
uint64_t cost_epoch;
uint64_t next_tag;
@@ -958,6 +960,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *pe
void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv);
void qdr_connection_free(qdr_connection_t *conn);
void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
+qdr_link_t *qdr_connection_new_streaming_link_CT(qdr_core_t *core, qdr_connection_t *conn);
qdr_address_config_t *qdr_config_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter);
qd_address_treatment_t qdr_treatment_for_address_hash_CT(qdr_core_t *core, qd_iterator_t *iter, qdr_address_config_t **addr_config);
qd_address_treatment_t qdr_treatment_for_address_hash_with_default_CT(qdr_core_t *core, qd_iterator_t *iter, qd_address_treatment_t default_treatment, qdr_address_config_t **addr_config);
@@ -983,6 +986,9 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close);
void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);
+bool qdr_link_is_idle_CT(const qdr_link_t *link);
+qdr_terminus_t *qdr_terminus_router_control(void); ///< new terminus for router control links
+qdr_terminus_t *qdr_terminus_router_data(void); ///< new terminus for router links
qdr_query_t *qdr_query(qdr_core_t *core,
void *context,
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 3dab635..456bd25 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -925,3 +925,16 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
qdr_addr_start_inlinks_CT(core, addr->fallback_for);
}
}
+
+
+// True if link currently has no outstanding deliveries or work.
+// Used to determine if it is safe for the core to close a link.
+//
+bool qdr_link_is_idle_CT(const qdr_link_t *link)
+{
+ return (DEQ_SIZE(link->undelivered) == 0 &&
+ DEQ_SIZE(link->unsettled) == 0 &&
+ DEQ_SIZE(link->settled) == 0 &&
+ DEQ_SIZE(link->updated_deliveries) == 0 &&
+ !link->ref[QDR_LINK_LIST_CLASS_WORK]);
+}
diff --git a/src/router_node.c b/src/router_node.c
index b2dd1d5..c711b2d 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -483,6 +483,38 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
}
//
+ // Head of line blocking avoidance (DISPATCH-1545)
+ //
+ // Before we can forward a message we need to determine whether or not this
+ // message is "streaming" - a large message that has the potential to block
+ // other messages sharing the trunk link. At this point we cannot for sure
+ // know the actual length of the incoming message, so we employ the
+ // following heuristic to determine if the message is "streaming":
+ //
+ // - If the message is receive-complete it is NOT a streaming message.
+ // - If it is NOT receive-complete:
+ // Continue buffering incoming data until:
+ // - receive has completed => NOT a streaming message
+ // - not rx-complete AND Q2 threshold hit => a streaming message
+ //
+ // Once Q2 is hit we MUST forward the message regardless of rx-complete
+ // since Q2 will block forever unless the incoming data is drained via
+ // forwarding.
+ //
+ if (!receive_complete) {
+ if (qd_message_is_Q2_blocked(msg)) {
+ qd_log(router->log_source, QD_LOG_DEBUG,
+ "[C%"PRIu64" L%"PRIu64"] Incoming message classified as streaming. User:%s",
+ conn->connection_id,
+ qd_link_link_id(link),
+ conn->user_id);
+ } else {
+ // Continue buffering this message
+ return false;
+ }
+ }
+
+ //
// Determine if the incoming link is anonymous. If the link is addressed,
// there are some optimizations we can take advantage of.
//
@@ -766,6 +798,7 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery
static int AMQP_incoming_link_handler(void* context, qd_link_t *link)
{
qd_connection_t *conn = qd_link_connection(link);
+ uint64_t link_id;
// The connection that this link belongs to is gone. Perhaps an AMQP close came in.
// This link handler should not continue since there is no connection.
@@ -780,7 +813,9 @@ static int AMQP_incoming_link_handler(void* context, qd_link_t *link)
qdr_terminus(qd_link_remote_source(link)),
qdr_terminus(qd_link_remote_target(link)),
pn_link_name(qd_link_pn(link)),
- terminus_addr);
+ terminus_addr,
+ &link_id);
+ qd_link_set_link_id(link, link_id);
qdr_link_set_context(qdr_link, link);
qd_link_set_context(link, qdr_link);
@@ -794,6 +829,7 @@ static int AMQP_incoming_link_handler(void* context, qd_link_t *link)
static int AMQP_outgoing_link_handler(void* context, qd_link_t *link)
{
qd_connection_t *conn = qd_link_connection(link);
+ uint64_t link_id;
// The connection that this link belongs to is gone. Perhaps an AMQP close came in.
// This link handler should not continue since there is no connection.
@@ -806,7 +842,9 @@ static int AMQP_outgoing_link_handler(void* context, qd_link_t *link)
qdr_terminus(qd_link_remote_source(link)),
qdr_terminus(qd_link_remote_target(link)),
pn_link_name(qd_link_pn(link)),
- terminus_addr);
+ terminus_addr,
+ &link_id);
+ qd_link_set_link_id(link, link_id);
qdr_link_set_context(qdr_link, link);
qd_link_set_context(link, qdr_link);
@@ -1481,14 +1519,6 @@ static void CORE_link_first_attach(void *context,
pn_link_open(qd_link_pn(qlink));
//
- // All links on the inter router or edge connection have unbounded q2 limit.
- // Blocking control messages can lead to various failures
- //
- if (qdr_connection_role(conn) == QDR_ROLE_EDGE_CONNECTION || qdr_connection_role(conn) == QDR_ROLE_INTER_ROUTER) {
- qd_link_set_q2_limit_unbounded(qlink, true);
- }
-
- //
// Mark the link as stalled and waiting for initial credit.
//
if (qdr_link_direction(link) == QD_OUTGOING)
@@ -1514,16 +1544,6 @@ static void CORE_link_second_attach(void *context, qdr_link_t *link, qdr_terminu
//
pn_link_open(pn_link);
- qd_connection_t *conn = qd_link_connection(qlink);
- qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
- //
- // All links on the inter router or edge connection have unbounded q2 limit
- //
- if (qdr_connection_role(qdr_conn) == QDR_ROLE_EDGE_CONNECTION || qdr_connection_role(qdr_conn) == QDR_ROLE_INTER_ROUTER) {
- qd_link_set_q2_limit_unbounded(qlink, true);
- }
-
-
//
// Mark the link as stalled and waiting for initial credit.
//
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 9fdd1aa..47d6481 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -59,6 +59,9 @@ target_link_libraries(test-sender ${Proton_LIBRARIES} qpid-dispatch)
add_executable(test-receiver test-receiver.c)
target_link_libraries(test-receiver ${Proton_LIBRARIES})
+add_executable(clogger clogger.c)
+target_link_libraries(clogger ${Proton_LIBRARIES})
+
set(TEST_WRAP ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/run.py)
diff --git a/tests/clogger.c b/tests/clogger.c
new file mode 100644
index 0000000..489c709
--- /dev/null
+++ b/tests/clogger.c
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * A test traffic generator that produces very long messages that are sent in
+ * chunks with a delay between each chunk. This client can be used to simulate
+ * very large streaming messages and/or slow producers.
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <time.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <math.h>
+#include <assert.h>
+#include <arpa/inet.h>
+#include <stdarg.h>
+
+
+#include "proton/reactor.h"
+#include "proton/message.h"
+#include "proton/connection.h"
+#include "proton/session.h"
+#include "proton/link.h"
+#include "proton/delivery.h"
+#include "proton/transport.h"
+#include "proton/event.h"
+#include "proton/handlers.h"
+
+#define DEFAULT_MAX_FRAME 65535
+#define BOOL2STR(b) ((b)?"true":"false")
+#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y))
+
+bool stop = false;
+bool verbose = false;
+
+uint64_t limit = 1; // # messages to send
+uint64_t sent = 0; // # sent
+uint64_t acked = 0; // # of received acks
+uint64_t accepted = 0;
+uint64_t not_accepted = 0;
+
+bool use_anonymous = false; // use anonymous link if true
+bool presettle = false; // true = send presettled
+uint32_t body_length = 1024 * 1024; // # bytes in vbin32 payload
+uint32_t pause_msec = 100; // pause between sending chunks (milliseconds)
+
+const char *target_address = "test-address";
+const char *host_address = "127.0.0.1:5672";
+const char *container_name = "Clogger";
+
+pn_reactor_t *reactor;
+pn_connection_t *pn_conn;
+pn_session_t *pn_ssn;
+pn_link_t *pn_link;
+pn_delivery_t *pn_dlv; // current in-flight delivery
+pn_handler_t *_handler;
+
+uint32_t bytes_sent; // number of body data bytes written out link
+uint32_t remote_max_frame = DEFAULT_MAX_FRAME; // used to limit amount written
+
+#define AMQP_MSG_HEADER 0x70
+#define AMQP_MSG_PROPERTIES 0x73
+#define AMQP_MSG_DATA 0x75
+
+// minimal AMQP header for a message that contains a single binary value
+//
+const uint8_t msg_header[] = {
+ 0x00, // begin described type
+ 0x53, // 1 byte ulong type
+ 0x70, // HEADER section
+ 0x45, // empty list
+ 0x00, // begin described type
+ 0x53, // 1 byte ulong type
+ 0x73, // PROPERTIES section
+ 0x45, // empty list
+ 0x00, // begin described type
+ 0x53, // 1 byte ulong type
+ 0x77, // AMQP Value BODY section
+ 0xb0, // Binary uint32 length
+ // 4 bytes for length here
+ // start of data...
+};
+
+
+void debug(const char *format, ...)
+{
+ va_list args;
+
+ if (!verbose) return;
+
+ va_start(args, format);
+ vprintf(format, args);
+ va_end(args);
+}
+
+
+static void signal_handler(int signum)
+{
+ signal(SIGINT, SIG_IGN);
+ signal(SIGQUIT, SIG_IGN);
+
+ switch (signum) {
+ case SIGINT:
+ case SIGQUIT:
+ stop = true;
+ if (reactor) pn_reactor_wakeup(reactor);
+ break;
+ default:
+ break;
+ }
+}
+
+
+void start_message()
+{
+ static long tag = 0; // a simple tag generator
+
+ if (!pn_link || !pn_conn) return;
+ if (pn_dlv) {
+ debug("Cannot create delivery - in process\n");
+ abort();
+ }
+
+ debug("start message #%"PRIu64"!\n", sent);
+
+ pn_dlv = pn_delivery(pn_link, pn_dtag((const char *)&tag, sizeof(tag)));
+ ++tag;
+
+ bytes_sent = 0;
+
+ // send the message header
+ ssize_t rc = pn_link_send(pn_link, (const char *)msg_header, sizeof(msg_header));
+ if (rc != sizeof(msg_header)) {
+ debug("Link send failed error=%ld\n", rc);
+ abort();
+ }
+
+ // add the vbin32 length (in network order!!!)
+ uint32_t len = htonl(body_length);
+ rc = pn_link_send(pn_link, (const char *)&len, sizeof(len));
+ if (rc != sizeof(len)) {
+ debug("Link send failed error=%ld\n", rc);
+ abort();
+ }
+}
+
+
+/* return true when message transmit is complete */
+bool send_message_data()
+{
+ static const char zero_block[DEFAULT_MAX_FRAME] = {0};
+
+ if (!pn_dlv) return true; // not sending
+
+ if (bytes_sent < body_length) {
+
+ uint32_t amount = MIN(body_length - bytes_sent, remote_max_frame);
+ amount = MIN(amount, sizeof(zero_block));
+
+ ssize_t rc = pn_link_send(pn_link, zero_block, amount);
+ if (rc < 0) {
+ debug("Link send failed error=%ld\n", rc);
+ abort();
+ }
+ bytes_sent += rc;
+
+ debug("message body bytes written=%"PRIu32" total=%"PRIu32" body_length=%"PRIu32"\n",
+ rc, bytes_sent, body_length);
+ }
+
+ if (bytes_sent == body_length) {
+ debug("message #%"PRIu64" sent!\n", sent);
+ pn_link_advance(pn_link);
+ sent += 1;
+
+ if (presettle) {
+ pn_delivery_settle(pn_dlv);
+ if (limit && sent == limit) {
+ // no need to wait for acks
+ debug("stopping...\n");
+ stop = true;
+ pn_reactor_wakeup(reactor);
+ }
+ }
+ pn_dlv = 0;
+ return true;
+ }
+
+ return false;
+}
+
+/* Process each event posted by the proactor.
+ Return true if client has stopped.
+ */
+static void event_handler(pn_handler_t *handler,
+ pn_event_t *event,
+ pn_event_type_t etype)
+{
+ debug("new event=%s\n", pn_event_type_name(etype));
+
+ switch (etype) {
+
+ case PN_CONNECTION_INIT: {
+ // Create and open all the endpoints needed to send a message
+ //
+ pn_connection_open(pn_conn);
+ pn_ssn = pn_session(pn_conn);
+ pn_session_open(pn_ssn);
+ pn_link = pn_sender(pn_ssn, "MyClogger");
+ if (!use_anonymous) {
+ pn_terminus_set_address(pn_link_target(pn_link), target_address);
+ }
+ pn_link_open(pn_link);
+ } break;
+
+
+ case PN_CONNECTION_REMOTE_OPEN: {
+ uint32_t rmf = pn_transport_get_remote_max_frame(pn_event_transport(event));
+ remote_max_frame = (rmf != 0) ? rmf : DEFAULT_MAX_FRAME;
+ debug("Remote MAX FRAME=%u\n", remote_max_frame);
+ } break;
+
+ case PN_LINK_FLOW: {
+ // the remote has given us some credit, now we can send messages
+ //
+ if (limit == 0 || sent < limit) {
+ if (pn_link_credit(pn_link) > 0) {
+ if (!pn_dlv) {
+ start_message();
+ pn_reactor_schedule(reactor, pause_msec, _handler); // send body after pause
+ }
+ }
+ }
+ } break;
+
+
+ case PN_TRANSPORT: {
+ ssize_t pending = pn_transport_pending(pn_event_transport(event));
+ debug("PN_TRANSPORT pending=%ld\n", pending);
+ } break;
+
+ case PN_DELIVERY: {
+ pn_delivery_t *dlv = pn_event_delivery(event);
+ if (pn_delivery_updated(dlv)) {
+ uint64_t rs = pn_delivery_remote_state(dlv);
+ pn_delivery_clear(dlv);
+
+ switch (rs) {
+ case PN_RECEIVED:
+ debug("PN_DELIVERY: received\n");
+ // This is not a terminal state - it is informational, and the
+ // peer is still processing the message.
+ break;
+ case PN_ACCEPTED:
+ debug("PN_DELIVERY: accept\n");
+ ++acked;
+ ++accepted;
+ pn_delivery_settle(dlv);
+ break;
+ case PN_REJECTED:
+ case PN_RELEASED:
+ case PN_MODIFIED:
+ default:
+ ++acked;
+ ++not_accepted;
+ pn_delivery_settle(dlv);
+ debug("Message not accepted - code: 0x%lX\n", (unsigned long)rs);
+ break;
+ }
+
+ if (limit && acked == limit) {
+ // initiate clean shutdown of the endpoints
+ debug("stopping...\n");
+ stop = true;
+ pn_reactor_wakeup(reactor);
+ }
+ }
+ } break;
+
+ case PN_TIMER_TASK: {
+ if (!send_message_data()) { // not done sending
+ pn_reactor_schedule(reactor, pause_msec, _handler);
+ } else if (limit == 0 || sent < limit) {
+ if (pn_link_credit(pn_link) > 0) {
+ // send next message
+ start_message();
+ pn_reactor_schedule(reactor, pause_msec, _handler);
+ }
+ }
+ } break;
+
+ default:
+ break;
+ }
+}
+
+
+static void delete_handler(pn_handler_t *handler)
+{
+}
+
+
+static void usage(const char *prog)
+{
+ printf("Usage: %s <options>\n", prog);
+ printf("-a \tThe host address [%s]\n", host_address);
+ printf("-c \t# of messages to send, 0 == nonstop [%"PRIu64"]\n", limit);
+ printf("-i \tContainer name [%s]\n", container_name);
+ printf("-n \tUse an anonymous link [%s]\n", BOOL2STR(use_anonymous));
+ printf("-s \tBody size in bytes [%d]\n", body_length);
+ printf("-t \tTarget address [%s]\n", target_address);
+ printf("-u \tSend all messages presettled [%s]\n", BOOL2STR(presettle));
+ printf("-D \tPrint debug info [off]\n");
+ printf("-P \tPause between sending frames [%"PRIu32"]\n", pause_msec);
+ exit(1);
+}
+
+int main(int argc, char** argv)
+{
+ /* command line options */
+ opterr = 0;
+ int c;
+ while ((c = getopt(argc, argv, "ha:c:i:ns:t:uDP:")) != -1) {
+ switch(c) {
+ case 'h': usage(argv[0]); break;
+ case 'a': host_address = optarg; break;
+ case 'c':
+ if (sscanf(optarg, "%"SCNu64, &limit) != 1)
+ usage(argv[0]);
+ break;
+ case 'i': container_name = optarg; break;
+ case 'n': use_anonymous = true; break;
+ case 's':
+ if (sscanf(optarg, "%"SCNu32, &body_length) != 1)
+ usage(argv[0]);
+ break;
+ case 't': target_address = optarg; break;
+ case 'u': presettle = true; break;
+ case 'D': verbose = true; break;
+ case 'P':
+ if (sscanf(optarg, "%"SCNu32, &pause_msec) != 1)
+ usage(argv[0]);
+ break;
+ default:
+ usage(argv[0]);
+ break;
+ }
+ }
+
+ signal(SIGQUIT, signal_handler);
+ signal(SIGINT, signal_handler);
+
+ // test infrastructure may add a "amqp[s]://" prefix to the address string.
+ // That causes proactor much grief, so strip it off
+ if (strncmp("amqps://", host_address, strlen("amqps://")) == 0) {
+ host_address += strlen("amqps://"); // no! no ssl for you!
+ } else if (strncmp("amqp://", host_address, strlen("amqp://")) == 0) {
+ host_address += strlen("amqp://");
+ }
+
+ // convert host_address to hostname and port
+ char *hostname = strdup(host_address);
+ char *port = strchr(hostname, ':');
+ if (!port) {
+ port = "5672";
+ } else {
+ *port++ = 0;
+ }
+
+ _handler = pn_handler_new(event_handler, 0, delete_handler);
+ pn_handler_add(_handler, pn_handshaker());
+
+ reactor = pn_reactor();
+ pn_conn = pn_reactor_connection_to_host(reactor,
+ hostname,
+ port,
+ _handler);
+
+ // the container name should be unique for each client
+ pn_connection_set_container(pn_conn, container_name);
+ pn_connection_set_hostname(pn_conn, hostname);
+
+ // break out of pn_reactor_process once a second to check if done
+ pn_reactor_set_timeout(reactor, 1000);
+
+ pn_reactor_start(reactor);
+
+ while (pn_reactor_process(reactor)) {
+ if (stop) {
+ // close the endpoints this will cause pn_reactor_process() to
+ // eventually break the loop
+ if (pn_link) pn_link_close(pn_link);
+ if (pn_ssn) pn_session_close(pn_ssn);
+ if (pn_conn) pn_connection_close(pn_conn);
+ pn_link = 0;
+ pn_ssn = 0;
+ pn_conn = 0;
+ }
+ }
+
+ if (pn_link) pn_link_free(pn_link);
+ if (pn_ssn) pn_session_free(pn_ssn);
+ if (pn_conn) pn_connection_close(pn_conn);
+
+ pn_reactor_free(reactor);
+
+ if (not_accepted) {
+ printf("Sent: %ld Accepted: %ld Not Accepted: %ld\n", sent, accepted, not_accepted);
+ if (accepted + not_accepted != sent) {
+ printf("FAILURE! Sent: %ld Acked: %ld\n", sent, accepted + not_accepted);
+ return 1;
+ }
+ }
+ return 0;
+}
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index d546a2f..4ed48da 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -22,7 +22,7 @@ from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
-from os import path
+import os
from time import sleep
from threading import Event
from threading import Timer
@@ -34,7 +34,9 @@ from system_test import AsyncTestSender
from system_test import Logger
from system_test import QdManager
from system_test import unittest
+from system_test import Process
from system_tests_link_routes import ConnLinkRouteService
+from test_broker import FakeBroker
from test_broker import FakeService
from proton.handlers import MessagingHandler
from proton.reactor import Container, DynamicNodeProperties
@@ -1647,7 +1649,7 @@ class LinkRouteProxyTest(TestCase):
numbers advertised in the incoming @open frame
"""
lines = None
- with open(path.join(self.INT_A.outdir, "INT.A.log")) as inta_log:
+ with open(os.path.join(self.INT_A.outdir, "INT.A.log")) as inta_log:
lines = [l for l in inta_log.read().split("\n")
if "] Connection Opened: " in l
or "] Peer router version: " in l]
@@ -2936,5 +2938,470 @@ class EdgeListenerSender(TestCase):
blocking_sender = blocking_connection.create_sender(address="multicast")
self.assertTrue(blocking_sender!=None)
+
+class StreamingMessageTest(TestCase):
+ """
+ Test streaming message flows across edge and interior routers
+ """
+
+ SIG_TERM = -15 # Process.terminate() sets this exit value
+ BODY_MAX = 4294967295 # AMQP 1.0 allows types of length 2^32-1
+
+ @classmethod
+ def setUpClass(cls):
+ """Start a router"""
+ super(StreamingMessageTest, cls).setUpClass()
+
+ def router(name, mode, extra):
+ config = [
+ ('router', {'mode': mode, 'id': name}),
+ ('listener', {'role': 'normal',
+ 'port': cls.tester.get_port(),
+ 'maxFrameSize': 65535}),
+
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
+ ]
+
+ if extra:
+ config.extend(extra)
+ config = Qdrouterd.Config(config)
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
+ return cls.routers[-1]
+
+ # configuration:
+ # two edge routers connected via 2 interior routers.
+ # fake broker (route-container) on EB1
+ #
+ # +-------+ +---------+ +---------+ +-------+
+ # | EA1 |<==>| INT.A |<==>| INT.B |<==>| EB1 |<-- Fake Broker
+ # +-------+ +---------+ +---------+ +-------+
+ #
+
+ cls.routers = []
+
+ interrouter_port = cls.tester.get_port()
+ cls.INTA_edge_port = cls.tester.get_port()
+ cls.INTB_edge_port = cls.tester.get_port()
+
+ router('INT.A', 'interior',
+ [('listener', {'role': 'inter-router', 'port': interrouter_port}),
+ ('listener', {'role': 'edge', 'port': cls.INTA_edge_port})])
+ cls.INT_A = cls.routers[0]
+ cls.INT_A.listener = cls.INT_A.addresses[0]
+
+ router('INT.B', 'interior',
+ [('connector', {'name': 'connectorToA', 'role': 'inter-router',
+ 'port': interrouter_port}),
+ ('listener', {'role': 'edge', 'port': cls.INTB_edge_port})])
+ cls.INT_B = cls.routers[1]
+ cls.INT_B.listener = cls.INT_B.addresses[0]
+
+ router('EA1', 'edge',
+ [('listener', {'name': 'rc', 'role': 'route-container',
+ 'port': cls.tester.get_port()}),
+ ('connector', {'name': 'uplink', 'role': 'edge',
+ 'port': cls.INTA_edge_port})
+ ])
+ cls.EA1 = cls.routers[2]
+ cls.EA1.listener = cls.EA1.addresses[0]
+
+ router('EB1', 'edge',
+ [('connector', {'name': 'uplink', 'role': 'edge',
+ 'port': cls.INTB_edge_port}),
+ # to connect to the fake broker
+ ('connector', {'name': 'broker',
+ 'role': 'route-container',
+ 'host': '127.0.0.1',
+ 'port': cls.tester.get_port(),
+ 'saslMechanisms': 'ANONYMOUS'}),
+ ('linkRoute', {'pattern': 'MyLinkRoute.#', 'containerId':
+ 'FakeBroker', 'direction': 'in'}),
+ ('linkRoute', {'pattern': 'MyLinkRoute.#', 'containerId':
+ 'FakeBroker', 'direction': 'out'})
+ ])
+ cls.EB1 = cls.routers[3]
+ cls.EB1.listener = cls.EB1.addresses[0]
+ cls.EB1.route_container = cls.EB1.connector_addresses[1];
+
+ cls.INT_A.wait_router_connected('INT.B')
+ cls.INT_B.wait_router_connected('INT.A')
+ cls.EA1.wait_connectors()
+
+ cls.skip = { 'test_01' : 0,
+ 'test_02' : 0,
+ 'test_03' : 0,
+ 'test_50' : 0,
+ 'test_51' : 0,
+ 'test_52' : 0
+ }
+
+ def _get_address(self, router, address):
+ """Lookup address in route table"""
+ a_type = 'org.apache.qpid.dispatch.router.address'
+ addrs = router.management.query(a_type).get_dicts()
+ return list(filter(lambda a: a['name'].find(address) != -1,
+ addrs))
+
+ def _wait_address_gone(self, router, address):
+ """Block until address is removed from the route table"""
+ while self._get_address(router, address):
+ sleep(0.1)
+
+ def _start_broker_EB1(self):
+ # start a new broker on EB1
+ fake_broker = FakeBroker(self.EB1.route_container)
+ # wait until the link route appears on the interior routers
+ self.INT_B.wait_address("MyLinkRoute")
+ self.INT_A.wait_address("MyLinkRoute")
+ return fake_broker
+
+ def spawn_receiver(self, router, count, address, expect=None):
+ if expect is None:
+ expect = Process.EXIT_OK
+ cmd = ["test-receiver",
+ "-a", router.listener,
+ "-c", str(count),
+ "-s", address]
+ env = dict(os.environ, PN_TRACE_FRM="1")
+ return self.popen(cmd, expect=expect, env=env)
+
+ def spawn_sender(self, router, count, address, expect=None, size=None):
+ if expect is None:
+ expect = Process.EXIT_OK
+ if size is None:
+ size = "-sm"
+ cmd = ["test-sender",
+ "-a", router.listener,
+ "-c", str(count),
+ "-t", address,
+ size]
+ env = dict(os.environ, PN_TRACE_FRM="1")
+ return self.popen(cmd, expect=expect, env=env)
+
+ def spawn_clogger(self, router, count, address,
+ size, pause_ms, expect=None):
+ if expect is None:
+ expect = Process.EXIT_OK
+ cmd = ["clogger",
+ "-a", router.listener,
+ "-c", str(count),
+ "-t", address,
+ "-s", str(size),
+ "-D",
+ "-P", str(pause_ms)]
+ env = dict(os.environ, PN_TRACE_FRM="1")
+ return self.popen(cmd, expect=expect, env=env)
+
+ def test_01_streaming_link_route(self):
+ """
+ Verify that a streaming message can be delivered over a link route
+ """
+
+ fake_broker = self._start_broker_EB1()
+
+ rx = self.spawn_receiver(self.EB1, count=1,
+ address="MyLinkRoute/test-address")
+
+ # sender a streaming message, "-sx" causes the sender to generate a
+ # large streaming message
+ tx = self.spawn_sender(self.EA1, count=1,
+ address="MyLinkRoute/test-address",
+ expect=Process.EXIT_OK,
+ size="-sx")
+
+ out_text, out_error = tx.communicate(timeout=TIMEOUT)
+ if tx.returncode:
+ raise Exception("Sender failed: %s %s" % (out_text, out_error))
+
+ out_text, out_error = rx.communicate(timeout=TIMEOUT)
+ if rx.returncode:
+ raise Exception("Receiver failed: %s %s" % (out_text, out_error))
+
+ fake_broker.join()
+ self.assertEqual(1, fake_broker.in_count)
+ self.assertEqual(1, fake_broker.out_count)
+
+ # cleanup - not EB1 since MyLinkRoute is configured
+ self._wait_address_gone(self.EA1, "MyLinkRoute")
+ self._wait_address_gone(self.INT_A, "MyLinkRoute")
+ self._wait_address_gone(self.INT_B, "MyLinkRoute")
+
+
+
+ def _streaming_test(self, address):
+
+ # send a streaming message to address across the routers
+ rx = self.spawn_receiver(self.EB1,
+ count=1,
+ address=address)
+ self.INT_A.wait_address(address)
+
+ tx = self.spawn_sender(self.EA1,
+ count=1,
+ address=address,
+ expect=Process.EXIT_OK,
+ size="-sx")
+ out_text, out_error = tx.communicate(timeout=TIMEOUT)
+ if tx.returncode:
+ raise Exception("Sender failed: %s %s" % (out_text, out_error))
+
+ out_text, out_error = rx.communicate(timeout=TIMEOUT)
+ if rx.returncode:
+ raise Exception("receiver failed: %s %s" % (out_text, out_error))
+
+ def test_02_streaming_closest(self):
+ """
+ Verify that a streaming message with closest treatment is forwarded
+ correctly.
+ """
+
+ self._streaming_test("closest/test-address")
+
+ def test_03_streaming_multicast(self):
+ """
+ Verify a streaming multicast message is forwarded correctly
+ """
+
+ routers = [self.EA1, self.EB1, self.INT_A, self.INT_B]
+ streaming_rx = [self.spawn_receiver(router,
+ count=1,
+ address="multicast/test-address")
+ for router in routers]
+ self.INT_A.wait_address("multicast/test-address", subscribers=2, remotes=1)
+ self.INT_B.wait_address("multicast/test-address", subscribers=2, remotes=1)
+ self.EA1.wait_address("multicast/test-address", subscribers=1)
+ self.EB1.wait_address("multicast/test-address", subscribers=1)
+
+ # This sender will end up multicasting the message to ALL receivers.
+ tx = self.spawn_sender(self.EA1,
+ count=1,
+ address="multicast/test-address",
+ expect=Process.EXIT_OK,
+ size="-sx")
+
+ out_text, out_error = tx.communicate(timeout=TIMEOUT)
+ if tx.returncode:
+ raise Exception("sender failed: %s %s" % (out_text, out_error))
+
+ for rx in streaming_rx:
+ out_text, out_error = rx.communicate(timeout=TIMEOUT)
+ if rx.returncode:
+ raise Exception("receiver failed: %s %s" % (out_text, out_error))
+
+ def test_04_streaming_balanced(self):
+ """
+ Verify streaming balanced messages are forwarded correctly.
+ """
+ balanced_rx = [self.spawn_receiver(self.EB1,
+ count=1,
+ address="balanced/test-address")
+ for _ in range(2)]
+ self.EB1.wait_address("balanced/test-address", subscribers=2)
+
+ tx = self.spawn_sender(self.EA1,
+ count=2,
+ address="balanced/test-address")
+ out_text, out_error = tx.communicate(timeout=TIMEOUT)
+ if tx.returncode:
+ raise Exception("sender failed: %s %s" % (out_text, out_error))
+
+ for rx in balanced_rx:
+ out_text, out_error = rx.communicate(timeout=TIMEOUT)
+ if rx.returncode:
+ raise Exception("receiver failed: %s %s" % (out_text, out_error))
+
+ def test_10_streaming_link_route_parallel(self):
+ """
+ Ensure that a streaming message sent across a link route does not block other
+ clients sending to the same container address.
+ """
+
+ fake_broker = self._start_broker_EB1()
+
+ clogger = self.spawn_clogger(self.EA1,
+ count=1,
+ address="MyLinkRoute/clogger",
+ size=self.BODY_MAX,
+ pause_ms=100,
+ expect=self.SIG_TERM)
+ sleep(0.5) # allow clogger to set up streaming links
+
+ # start a sender in parallel
+ tx = self.spawn_sender(self.EA1, count=100, address="MyLinkRoute/clogger")
+ out_text, out_error = tx.communicate(timeout=TIMEOUT)
+ if tx.returncode:
+ raise Exception("Sender failed: %s %s" % (out_text, out_error))
+
+ clogger.terminate()
+ clogger.wait()
+
+ fake_broker.join()
+ self.assertEqual(100, fake_broker.in_count)
+
+ # cleanup - not EB1 since MyLinkRoute is configured
+ self._wait_address_gone(self.EA1, "MyLinkRoute")
+ self._wait_address_gone(self.INT_A, "MyLinkRoute")
+ self._wait_address_gone(self.INT_B, "MyLinkRoute")
+
+ def test_11_streaming_closest_parallel(self):
+ """
+ Ensure that a streaming message of closest treatment does not block
+ other non-streaming messages.
+ """
+
+ # this receiver should get the streaming message
+ rx1 = self.spawn_receiver(self.EB1,
+ count=1,
+ address="closest/test-address",
+ expect=self.SIG_TERM)
+
+ self.INT_A.wait_address("closest/test-address");
+
+ clogger = self.spawn_clogger(self.EA1,
+ count=1,
+ address="closest/test-address",
+ size=self.BODY_MAX,
+ pause_ms=100,
+ expect=self.SIG_TERM)
+ sleep(0.5)
+
+ # this receiver has less cost than rx1 since it is 1 less hop from the
+ # sender
+ rx2 = self.spawn_receiver(self.INT_A,
+ count=1,
+ address="closest/test-address")
+
+ # wait for rx2 to set up links to INT_A:
+ self.INT_A.wait_address("closest/test-address", subscribers=1, remotes=1)
+
+ # start a sender in parallel. Expect the message to arrive at rx1
+ tx = self.spawn_sender(self.EA1, count=1, address="closest/test-address")
+ out_text, out_error = tx.communicate(timeout=TIMEOUT)
+ if tx.returncode:
+ raise Exception("Sender failed: %s %s" % (out_text, out_error))
+
+ out_text, out_error = rx2.communicate(timeout=TIMEOUT)
+ if rx2.returncode:
+ raise Exception("receiver failed: %s %s" % (out_text, out_error))
+
+ rx1.terminate()
+ rx1.wait()
+
+ clogger.terminate()
+ clogger.wait()
+
+
+ def test_12_streaming_multicast_parallel(self):
+ """
+ Verify a streaming multicast message does not block other non-streaming
+ multicast messages
+
+ Start a group of receivers to consume the streaming message. Then
+ start a separate group to consume the non-streaming message. Ensure
+ that the second group properly receives the non-streaming message.
+ """
+
+ routers = [self.EA1, self.EB1, self.INT_A, self.INT_B]
+ streaming_rx = [self.spawn_receiver(router,
+ count=1,
+ address="multicast/test-address",
+ expect=self.SIG_TERM)
+ for router in routers]
+ self.INT_A.wait_address("multicast/test-address", subscribers=2, remotes=1)
+ self.INT_B.wait_address("multicast/test-address", subscribers=2, remotes=1)
+ self.EA1.wait_address("multicast/test-address", subscribers=1)
+ self.EB1.wait_address("multicast/test-address", subscribers=1)
+
+ # this will block all of the above receivers with a streaming message
+
+ clogger = self.spawn_clogger(self.EA1,
+ count=1,
+ address="multicast/test-address",
+ size=self.BODY_MAX,
+ pause_ms=100,
+ expect=self.SIG_TERM)
+ sleep(0.5)
+
+ # this second set of receivers should be able to receive multicast
+ # messages sent _after_ the clogger's streaming message
+
+ blocking_rx = [self.spawn_receiver(router,
+ count=1,
+ address="multicast/test-address")
+ for router in routers]
+ self.INT_A.wait_address("multicast/test-address", subscribers=3, remotes=1)
+ self.INT_B.wait_address("multicast/test-address", subscribers=3, remotes=1)
+ self.EA1.wait_address("multicast/test-address", subscribers=2)
+ self.EB1.wait_address("multicast/test-address", subscribers=2)
+
+ # This sender will end up multicasting the message to ALL receivers.
+ # Expect it to block since the first set of receivers will never get
+ # around to acking the message
+ tx = self.spawn_sender(self.EA1,
+ count=1,
+ address="multicast/test-address",
+ expect=self.SIG_TERM)
+
+ # however the second set of receivers _should_ end up getting the
+ # message, acking it and exit (count=1)
+ for rx in blocking_rx:
+ out_text, out_error = rx.communicate(timeout=TIMEOUT)
+ if rx.returncode:
+ raise Exception("receiver failed: %s %s" % (out_text, out_error))
+
+ tx.terminate()
+ tx.wait()
+
+ for rx in streaming_rx:
+ rx.terminate()
+ rx.wait()
+
+ clogger.terminate()
+ clogger.wait()
+
+ def test_13_streaming_balanced_parallel(self):
+ """
+ Verify streaming does not block other balanced traffic.
+ """
+
+ # create 2 consumers on the balanced address. Since our Process class
+ # requires the exit code to be known when the process is spawned and we
+ # cannot predict which receiver will get the streaming message use
+ # count=2 to force the receivers to run until we force termination
+ balanced_rx = [self.spawn_receiver(self.EB1,
+ count=2,
+ address="balanced/test-address",
+ expect=self.SIG_TERM)
+ for _ in range(2)]
+ self.EB1.wait_address("balanced/test-address", subscribers=2)
+
+ # this will block one of the above receivers with a streaming message
+
+ clogger = self.spawn_clogger(self.EA1,
+ count=1,
+ address="balanced/test-address",
+ size=self.BODY_MAX,
+ pause_ms=100,
+ expect=self.SIG_TERM)
+ sleep(0.5)
+
+ # This sender should get its message through to the other receiver
+ tx = self.spawn_sender(self.EA1,
+ count=1,
+ address="balanced/test-address")
+ out_text, out_error = tx.communicate(timeout=TIMEOUT)
+ if tx.returncode:
+ raise Exception("sender failed: %s %s" % (out_text, out_error))
+
+ for rx in balanced_rx:
+ rx.terminate()
+ rx.wait()
+
+ clogger.terminate()
+ clogger.wait()
+
+
if __name__== '__main__':
unittest.main(main_module())
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index f720423..3d28ce6 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -31,6 +31,7 @@ from proton import Message, Timeout, Delivery
from system_test import TestCase, Process, Qdrouterd, main_module, TIMEOUT, DIR
from system_test import AsyncTestReceiver
from system_test import AsyncTestSender
+from system_test import get_inter_router_links
from system_test import unittest
from proton.handlers import MessagingHandler
@@ -1753,5 +1754,108 @@ class MulticastTestClient(MessagingHandler):
def run(self):
Container(self).run()
+
+class StreamingLinkScrubberTest(TestCase):
+ """
+ Verify that unused inter-router streaming links are eventually reclaimed
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ super(StreamingLinkScrubberTest, cls).setUpClass()
+
+ def router(name, extra):
+ config = [
+ ('router', {'id': 'Router%s' % name,
+ 'mode': 'interior'}),
+ ('listener', {'port': cls.tester.get_port(),
+ 'stripAnnotations': 'no'}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
+
+ ]
+
+ if extra:
+ config.extend(extra)
+
+ config = Qdrouterd.Config(config)
+
+ # run routers in test mode to shorten the streaming link scrubber
+ # interval to 5 seconds an the maximum pool size to two links
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=True, cl_args=["--test-hooks"]))
+
+ cls.routers = []
+
+ inter_router_port = cls.tester.get_port()
+
+ router('A',
+ [('listener', {'role': 'inter-router',
+ 'port': inter_router_port})])
+ cls.RouterA = cls.routers[-1]
+ cls.RouterA.listener = cls.RouterA.addresses[0]
+
+ router('B',
+ [('connector', {'name': 'connectorToA', 'role':
+ 'inter-router',
+ 'port': inter_router_port,
+ 'verifyHostname': 'no'})])
+ cls.RouterB = cls.routers[-1]
+ cls.RouterB.listener = cls.RouterB.addresses[0]
+
+ cls.RouterA.wait_router_connected('RouterB')
+ cls.RouterB.wait_router_connected('RouterA')
+
+ def test_01_streaming_link_scrubber(self):
+ """
+ Ensure extra streaming links are closed by the periodic scrubber
+ """
+ address = "closest/scrubber"
+
+ # scrubber removes at most 10 links per scan, the test pool size is 2
+ sender_count = 12
+
+ # fire up a receiver on RouterB to get 1 message from each sender:
+ env = dict(os.environ, PN_TRACE_FRM="1")
+ cmd = ["test-receiver",
+ "-a", self.RouterB.listener,
+ "-s", address,
+ "-c", str(sender_count)]
+ rx = self.popen(cmd, env=env)
+
+ self.RouterA.wait_address(address)
+
+ # remember the count of inter-router links on A before we start streaming
+ pre_count = len(get_inter_router_links(self.RouterA.listener))
+
+ # fire off the senders
+ cmd = ["test-sender",
+ "-a", self.RouterA.listener,
+ "-t", address,
+ "-c", "1",
+ "-sx"
+ ]
+ senders = [self.popen(cmd, env=env) for x in range(sender_count)]
+
+ for tx in senders:
+ out_text, out_error = tx.communicate(timeout=TIMEOUT)
+ if tx.returncode:
+ raise Exception("Sender failed: %s %s" % (out_text, out_error))
+
+ # expect: more inter-router links opened. Should be 12 more, but
+ # depending on when the scrubber runs it may be as low as two
+ post_count = len(get_inter_router_links(self.RouterA.listener))
+ self.assertTrue(post_count > pre_count)
+
+ # expect: after 5 seconds 10 of the links should be closed and 2
+ # should remain (--test-hooks router option sets these parameters)
+ while (post_count - pre_count) > 2:
+ sleep(0.1)
+ post_count = len(get_inter_router_links(self.RouterA.listener))
+
+ out_text, out_error = rx.communicate(timeout=TIMEOUT)
+ if rx.returncode:
+ raise Exception("Receiver failed: %s %s" % (out_text, out_error))
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org