You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/05/11 18:26:19 UTC

[qpid-dispatch] branch master updated: DISPATCH-1545: Prevent Head-of-line blocking on shared inter-router links

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d4c52c  DISPATCH-1545: Prevent Head-of-line blocking on shared inter-router links
9d4c52c is described below

commit 9d4c52cf8c54d8a3e268c8e900e9baf632b063a4
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Thu Jan 23 15:36:52 2020 -0500

    DISPATCH-1545: Prevent Head-of-line blocking on shared inter-router links
    
    This patch prevents head-of-line blocking on shared links by
    dynamically assigning a dedicated inter-router link for the transfer
    of a streaming message.
    
    This closes #716
---
 include/qpid/dispatch/container.h                  |   3 +
 include/qpid/dispatch/message.h                    |   9 +
 include/qpid/dispatch/router_core.h                |   4 +-
 python/qpid_dispatch/management/qdrouter.json      |   2 +-
 src/CMakeLists.txt                                 |   1 +
 src/container.c                                    |  14 +
 src/message.c                                      |   7 +
 src/router_core/connections.c                      |  96 ++++-
 src/router_core/delivery.c                         |  50 ++-
 src/router_core/delivery.h                         |   2 +-
 src/router_core/forwarder.c                        | 300 +++++++------
 .../streaming_link_scrubber.c                      | 187 ++++++++
 src/router_core/route_tables.c                     |  18 +-
 src/router_core/router_core.c                      |  11 +
 src/router_core/router_core_private.h              |  42 +-
 src/router_core/transfer.c                         |  13 +
 src/router_node.c                                  |  60 ++-
 tests/CMakeLists.txt                               |   3 +
 tests/clogger.c                                    | 437 +++++++++++++++++++
 tests/system_tests_edge_router.py                  | 471 ++++++++++++++++++++-
 tests/system_tests_two_routers.py                  | 104 +++++
 21 files changed, 1638 insertions(+), 196 deletions(-)

diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index 054c6d5..54f3d7c 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -91,6 +91,7 @@ typedef enum {
     QD_SSN_ROUTER_DATA_PRI_9,
     QD_SSN_CORE_ENDPOINT,     ///< core subscriptions
     QD_SSN_LINK_ROUTE,        ///< link routes
+    QD_SSN_LINK_STREAMING,    ///< link dedicated to streaming messages
     QD_SSN_CLASS_COUNT
 } qd_session_class_t;
 
@@ -231,6 +232,8 @@ void *qd_link_get_node_context(const qd_link_t *link);
 void qd_link_restart_rx(qd_link_t *link);
 void qd_link_q3_block(qd_link_t *link);
 void qd_link_q3_unblock(qd_link_t *link);
+uint64_t qd_link_link_id(const qd_link_t *link);
+void qd_link_set_link_id(qd_link_t *link, uint64_t link_id);
 
 qd_session_t *qd_session(pn_session_t *pn_ssn);
 void qd_session_cleanup(qd_connection_t *qd_conn);
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index afc39c1..63f0b47 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -404,6 +404,15 @@ bool qd_message_Q2_holdoff_should_block(qd_message_t *msg);
  */
 bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg);
 
+
+/**
+ * Check if a message has hit its Q2 limit and is currently blocked.
+ * When blocked no further message data will be read from the link.
+ *
+ * @param msg A pointer to the message
+ */
+bool qd_message_is_Q2_blocked(const qd_message_t *msg);
+
 /**
  * Return qd_link through which the message is being received.
  * @param msg A pointer to the message
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index ce45950..51806f7 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -617,6 +617,7 @@ const char *qdr_link_name(const qdr_link_t *link);
  * @param target Target terminus of the attach
  * @param name - name of the link
  * @param terminus_addr - terminus address if any
+ * @param link_id - set to the management id of the new link
  * @return A pointer to a new qdr_link_t object to track the link
  */
 qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
@@ -624,7 +625,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
                                   qdr_terminus_t   *source,
                                   qdr_terminus_t   *target,
                                   const char       *name,
-                                  const char       *terminus_addr);
+                                  const char       *terminus_addr,
+                                  uint64_t         *link_id);
 
 /**
  * qdr_link_second_attach
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index ec678d0..688bcc4 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1652,7 +1652,7 @@
                 },
                 "transitOutstanding": {
                     "type": "list",
-                    "description": "List of numbers of outstanding deliveries across a transit (inter-router) link for this address.  This is for balanced distribution only."
+                    "description": "List of numbers of outstanding deliveries across a transit (inter-router) connection for this address.  This is for balanced distribution only."
                 },
                 "trackedDeliveries": {
                     "type": "integer",
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4257d95..9ca178b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -102,6 +102,7 @@ set(qpid_dispatch_SOURCES
   router_core/modules/address_lookup_client/lookup_client.c
   router_core/modules/stuck_delivery_detection/delivery_tracker.c
   router_core/modules/mobile_sync/mobile.c
+  router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c
   router_node.c
   router_pynode.c
   schema_enum.c
diff --git a/src/container.c b/src/container.c
index a728de7..651db95 100644
--- a/src/container.c
+++ b/src/container.c
@@ -19,6 +19,7 @@
 
 #include <stdio.h>
 #include <string.h>
+#include <inttypes.h>
 #include "dispatch_private.h"
 #include "policy.h"
 #include <qpid/dispatch/container.h>
@@ -63,6 +64,7 @@ struct qd_link_t {
     bool                        q2_limit_unbounded;
     bool                        q3_blocked;
     DEQ_LINKS_N(Q3, qd_link_t); ///< Q3 blocked links
+    uint64_t                    link_id;
 };
 
 ALLOC_DEFINE(qd_link_t);
@@ -1154,6 +1156,18 @@ void qd_link_q3_unblock(qd_link_t *link)
 }
 
 
+uint64_t qd_link_link_id(const qd_link_t *link)
+{
+    return link->link_id;
+}
+
+
+void qd_link_set_link_id(qd_link_t *link, uint64_t link_id)
+{
+    link->link_id = link_id;
+}
+
+
 qd_session_t *qd_session(pn_session_t *pn_ssn)
 {
     assert(pn_ssn);
diff --git a/src/message.c b/src/message.c
index 0d7b08b..447691b 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2232,6 +2232,12 @@ bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg)
 }
 
 
+bool qd_message_is_Q2_blocked(const qd_message_t *msg)
+{
+    return ((const qd_message_pvt_t*)msg)->content->q2_input_holdoff;
+}
+
+
 qd_link_t * qd_message_get_receiving_link(const qd_message_t *msg)
 {
     return safe_deref_qd_link_t(((qd_message_pvt_t *)msg)->content->input_link_sp);
@@ -2251,6 +2257,7 @@ void qd_message_set_aborted(const qd_message_t *msg, bool aborted)
     msg_pvt->content->aborted = aborted;
 }
 
+
 bool qd_message_oversize(const qd_message_t *msg)
 {
     qd_message_content_t * mc = MSG_CONTENT(msg);
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4c0e73b..20fc2f9 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -105,6 +105,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t            *core,
     conn->oper_status           = QDR_CONN_OPER_UP;
     DEQ_INIT(conn->links);
     DEQ_INIT(conn->work_list);
+    DEQ_INIT(conn->streaming_link_pool);
     conn->connection_info->role = conn->role;
     conn->work_lock = sys_mutex();
     conn->conn_uptime = core->uptime_ticks;
@@ -525,7 +526,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
                                   qdr_terminus_t   *source,
                                   qdr_terminus_t   *target,
                                   const char       *name,
-                                  const char       *terminus_addr)
+                                  const char       *terminus_addr,
+                                  uint64_t         *link_id)
 {
     qdr_action_t   *action         = qdr_action(qdr_link_inbound_first_attach_CT, "link_first_attach");
     qdr_link_t     *link           = new_qdr_link_t();
@@ -534,6 +536,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     ZERO(link);
     link->core = conn->core;
     link->identity = qdr_identifier(conn->core);
+    *link_id = link->identity;
     link->conn = conn;
     link->name = (char*) malloc(strlen(name) + 1);
 
@@ -839,7 +842,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
 
         if (!qdr_delivery_receive_complete(dlv)) {
             qdr_delivery_set_aborted(dlv, true);
-            qdr_deliver_continue_peers_CT(core, dlv);
+            qdr_deliver_continue_peers_CT(core, dlv, false);
         }
 
         if (dlv->multicast) {
@@ -888,7 +891,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
 
         if (!qdr_delivery_receive_complete(dlv)) {
             qdr_delivery_set_aborted(dlv, true);
-            qdr_deliver_continue_peers_CT(core, dlv);
+            qdr_deliver_continue_peers_CT(core, dlv, false);
         }
 
         peer = qdr_delivery_first_peer_CT(dlv);
@@ -932,7 +935,8 @@ static void qdr_link_abort_undelivered_CT(qdr_core_t *core, qdr_link_t *link)
 static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, const char *log_text)
 {
     //
-    // Remove the link from the master list of links
+    // Remove the link from the master list of links and possibly the streaming
+    // link pool
     //
     DEQ_REMOVE(core->open_links, link);
 
@@ -958,8 +962,10 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     if (qd_bitmask_valid_bit_value(conn->mask_bit)) {
         if (link->link_type == QD_LINK_CONTROL)
             core->control_links_by_mask_bit[conn->mask_bit] = 0;
-        if (link->link_type == QD_LINK_ROUTER)
-            core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
+        if (link->link_type == QD_LINK_ROUTER) {
+            if (link == core->data_links_by_mask_bit[conn->mask_bit].links[link->priority])
+                core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
+        }
     }
 
     //
@@ -1004,6 +1010,11 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
                          link,  QDR_LINK_LIST_CLASS_ADDRESS);
     }
 
+    if (link->in_streaming_pool) {
+        DEQ_REMOVE_N(STREAMING_POOL, conn->streaming_link_pool, link);
+        link->in_streaming_pool = false;
+    }
+
     //
     // Free the link's name and terminus_addr
     //
@@ -1125,6 +1136,19 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t        *core,
 
 void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close)
 {
+    //
+    // Ensure a pooled link is no longer available for streaming messages
+    //
+    if (link->streaming) {
+        if (link->in_streaming_pool) {
+            DEQ_REMOVE_N(STREAMING_POOL, link->conn->streaming_link_pool, link);
+            link->in_streaming_pool = false;
+        }
+    }
+
+    //
+    // tell the I/O thread to do the detach
+    //
     qdr_link_work_t *work = new_qdr_link_work_t();
     ZERO(work);
     work->work_type  = ++link->detach_count == 1 ? QDR_LINK_WORK_FIRST_DETACH : QDR_LINK_WORK_SECOND_DETACH;
@@ -1308,9 +1332,11 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo
                 // Assign a unique mask-bit to this connection as a reference to be used by
                 // the router module
                 //
-                if (qd_bitmask_first_set(core->neighbor_free_mask, &conn->mask_bit))
+                if (qd_bitmask_first_set(core->neighbor_free_mask, &conn->mask_bit)) {
                     qd_bitmask_clear_bit(core->neighbor_free_mask, conn->mask_bit);
-                else {
+                    assert(core->rnode_conns_by_mask_bit[conn->mask_bit] == 0);
+                    core->rnode_conns_by_mask_bit[conn->mask_bit] = conn;
+                } else {
                     qd_log(core->log, QD_LOG_CRITICAL, "Exceeded maximum inter-router connection count");
                     conn->role = QDR_ROLE_NORMAL;
                     break;
@@ -1369,6 +1395,38 @@ void qdr_connection_free(qdr_connection_t *conn)
 }
 
 
+// create a new outoing link for streaming messages
+qdr_link_t *qdr_connection_new_streaming_link_CT(qdr_core_t *core, qdr_connection_t *conn)
+{
+    qdr_link_t *out_link = 0;
+
+    switch (conn->role) {
+    case QDR_ROLE_INTER_ROUTER:
+        out_link = qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING,
+                                      qdr_terminus_router_data(), qdr_terminus_router_data(),
+                                      QD_SSN_LINK_STREAMING);
+        break;
+    case QDR_ROLE_EDGE_CONNECTION:
+        out_link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, QD_OUTGOING,
+                                      qdr_terminus(0), qdr_terminus(0),
+                                      QD_SSN_LINK_STREAMING);
+        break;
+    default:
+        assert(false);
+        break;
+    }
+
+    if (out_link) {
+        out_link->streaming = true;
+        if (!conn->has_streaming_links) {
+            qdr_add_connection_ref(&core->streaming_connections, conn);
+            conn->has_streaming_links = true;
+        }
+    }
+    return out_link;
+}
+
+
 static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     qdr_connection_t *conn = safe_deref_qdr_connection_t(action->args.connection.conn);
@@ -1384,8 +1442,10 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
     // Give back the router mask-bit.
     //
     if (conn->role == QDR_ROLE_INTER_ROUTER) {
+        assert(qd_bitmask_valid_bit_value(conn->mask_bit));
         qdr_reset_sheaf(core, conn->mask_bit);
         qd_bitmask_set_bit(core->neighbor_free_mask, conn->mask_bit);
+        core->rnode_conns_by_mask_bit[conn->mask_bit] = 0;
     }
 
     //
@@ -1418,6 +1478,11 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
         link_ref = DEQ_HEAD(conn->links);
     }
 
+    if (conn->has_streaming_links) {
+        assert(DEQ_IS_EMPTY(conn->streaming_link_pool));  // all links have been released
+        qdr_del_connection_ref(&core->streaming_connections, conn);
+    }
+
     //
     // Discard items on the work list
     //
@@ -1489,8 +1554,10 @@ static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qd
 
 static void qdr_detach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
 {
+    // if this link is in the priority sheaf it needs to be removed
     if (conn->role == QDR_ROLE_INTER_ROUTER)
-        core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
+        if (link == core->data_links_by_mask_bit[conn->mask_bit].links[link->priority])
+            core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
 }
 
 
@@ -1801,6 +1868,17 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
         return;
 
     } else {
+
+        //
+        // ensure a pooled link is no longer available for use
+        //
+        if (link->streaming) {
+            if (link->in_streaming_pool) {
+                DEQ_REMOVE_N(STREAMING_POOL, conn->streaming_link_pool, link);
+                link->in_streaming_pool = false;
+            }
+        }
+
         //
         // For routed links, propagate the detach
         //
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 9e0400e..e3bc5f4 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -1049,7 +1049,7 @@ static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
 }
 
 
-void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv)
+void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more)
 {
     qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
 
@@ -1060,24 +1060,38 @@ void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv)
         qdr_link_work_t *work      = peer->link_work;
         qdr_link_t      *peer_link = qdr_delivery_link(peer);
 
-        //
-        // Determines if the peer connection can be activated.
-        // For a large message, the peer delivery's link_work MUST be at the head of the peer link's work list. This link work is only removed
-        // after the streaming message has been sent.
-        //
-        if (!!work && !!peer_link) {
-            sys_mutex_lock(peer_link->conn->work_lock);
-            if (work->processing || work == DEQ_HEAD(peer_link->work_list)) {
-                qdr_add_link_ref(&peer_link->conn->links_with_work[peer_link->priority], peer_link, QDR_LINK_LIST_CLASS_WORK);
-                sys_mutex_unlock(peer_link->conn->work_lock);
+        if (!!peer_link) {
 
-                //
-                // Activate the outgoing connection for later processing.
-                //
-                qdr_connection_activate_CT(core, peer_link->conn);
+            if (peer_link->streaming && !more) {
+                if (!peer_link->in_streaming_pool) {
+                    // A streaming message has completed.  It is now safe to
+                    // re-use this streaming link for the next streaming
+                    // message since that new message will not be blocked
+                    // indefinitely by the current message.
+                    DEQ_INSERT_TAIL_N(STREAMING_POOL, peer_link->conn->streaming_link_pool, peer_link);
+                    peer_link->in_streaming_pool = true;
+                }
+            }
+
+            //
+            // Determines if the peer connection can be activated.
+            // For a large message, the peer delivery's link_work MUST be at the head of the peer link's work list. This link work is only removed
+            // after the streaming message has been sent.
+            //
+            if (!!work) {
+                sys_mutex_lock(peer_link->conn->work_lock);
+                if (work->processing || work == DEQ_HEAD(peer_link->work_list)) {
+                    qdr_add_link_ref(&peer_link->conn->links_with_work[peer_link->priority], peer_link, QDR_LINK_LIST_CLASS_WORK);
+                    sys_mutex_unlock(peer_link->conn->work_lock);
+
+                    //
+                    // Activate the outgoing connection for later processing.
+                    //
+                    qdr_connection_activate_CT(core, peer_link->conn);
+                }
+                else
+                    sys_mutex_unlock(peer_link->conn->work_lock);
             }
-            else
-                sys_mutex_unlock(peer_link->conn->work_lock);
         }
 
         peer = qdr_delivery_next_peer_CT(in_dlv);
@@ -1108,7 +1122,7 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
     // If it is already in the undelivered list, don't try to deliver this again.
     //
     if (!!link && in_dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
-        qdr_deliver_continue_peers_CT(core, in_dlv);
+        qdr_deliver_continue_peers_CT(core, in_dlv, more);
 
         qd_message_t *msg = qdr_delivery_message(in_dlv);
 
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index 8807996..4c6e646 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -154,7 +154,7 @@ qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv);
 qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv);
 
 /* schedules all peer deliveries with work for I/O processing */
-void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv);
+void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more);
 
 /* update the links counters with respect to its delivery */
 void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery);
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index ad3fb02..f6ee7bc 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -23,6 +23,8 @@
 #include <strings.h>
 #include "forwarder.h"
 #include "delivery.h"
+#include <inttypes.h>
+
 
 typedef struct qdr_forward_deliver_info_t {
     DEQ_LINKS(struct qdr_forward_deliver_info_t);
@@ -36,20 +38,26 @@ DEQ_DECLARE(qdr_forward_deliver_info_t, qdr_forward_deliver_info_list_t);
 ALLOC_DEFINE(qdr_forward_deliver_info_t);
 
 
-static qdr_link_t * peer_data_link(qdr_core_t *core,
-                                   qdr_node_t *node,
-                                   int         priority)
+// get the control link for a given inter-router connection
+static inline qdr_link_t *peer_router_control_link(qdr_core_t *core, int conn_mask)
 {
-    int nlmb = node->link_mask_bit;
+    return (conn_mask >= 0) ? core->control_links_by_mask_bit[conn_mask] : 0;
+}
+
 
-    if (nlmb < 0 || priority < 0)
+// find the proper outgoing data link on a connection using the given priority
+static inline qdr_link_t *peer_router_data_link(qdr_core_t *core,
+                                                int         conn_mask,
+                                                int         priority)
+{
+    if (conn_mask < 0 || priority < 0)
         return 0;
 
     // Try to return the requested priority link, but if it does
     // not exist, return the closest one that is lower.
     qdr_link_t * link = 0;
     while (1) {
-        if ((link = core->data_links_by_mask_bit[nlmb].links[priority]))
+        if ((link = core->data_links_by_mask_bit[conn_mask].links[priority]))
             return link;
         if (-- priority < 0)
             return 0;
@@ -58,6 +66,43 @@ static qdr_link_t * peer_data_link(qdr_core_t *core,
 }
 
 
+// Returns true if the peer router can support this router opening additional incoming links dedicated for streaming messages
+//
+static inline bool next_hop_supports_streaming_links(const qdr_connection_t *conn)
+{
+    if (conn->role == QDR_ROLE_EDGE_CONNECTION)
+        return true;
+    if (conn->role == QDR_ROLE_INTER_ROUTER) {
+        return QDR_ROUTER_VERSION_AT_LEAST(conn->connection_info->version,
+                                           1, 13, 0);
+    }
+    return false;
+}
+
+
+// Get an idle anonymous link for a streaming message. This link will come from
+// either the connections free link pool or it will be dynamically created on
+// the given connection.
+static inline qdr_link_t *get_outgoing_streaming_link(qdr_core_t *core, qdr_connection_t *conn)
+{
+    if (!conn) return 0;
+
+    qdr_link_t *out_link = DEQ_HEAD(conn->streaming_link_pool);
+    if (out_link) {
+        DEQ_REMOVE_HEAD_N(STREAMING_POOL, conn->streaming_link_pool);
+        out_link->in_streaming_pool = false;
+    } else {
+        // no free links - create a new one
+        out_link = qdr_connection_new_streaming_link_CT(core, conn);
+        if (!out_link) {
+            qd_log(core->log, QD_LOG_WARNING, "[C%"PRIu64"] Unable to setup new outgoing streaming message link", conn->identity);
+            return 0;
+        }
+    }
+    return out_link;
+}
+
+
 //==================================================================================
 // Built-in Forwarders
 //==================================================================================
@@ -112,15 +157,15 @@ static void qdr_forward_find_closest_remotes_CT(qdr_core_t *core, qdr_address_t
 }
 
 
-qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, qdr_link_t *link, qd_message_t *msg)
+qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, qdr_link_t *out_link, qd_message_t *msg)
 {
     qdr_delivery_t *out_dlv = new_qdr_delivery_t();
     uint64_t       *tag = (uint64_t*) out_dlv->tag;
-    if (link->conn)
-        link->conn->last_delivery_time = core->uptime_ticks;
+    if (out_link->conn)
+        out_link->conn->last_delivery_time = core->uptime_ticks;
 
     ZERO(out_dlv);
-    set_safe_ptr_qdr_link_t(link, &out_dlv->link_sp);
+    set_safe_ptr_qdr_link_t(out_link, &out_dlv->link_sp);
     out_dlv->msg        = qd_message_copy(msg);
     out_dlv->settled    = !in_dlv || in_dlv->settled;
     out_dlv->presettled = out_dlv->settled;
@@ -357,8 +402,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     bool          bypass_valid_origins = addr->forwarder->bypass_valid_origins;
     int           fanout               = 0;
     qd_bitmask_t *link_exclusion       = !!in_delivery ? in_delivery->link_exclusion : 0;
-    bool          receive_complete     = qd_message_receive_complete(qdr_delivery_message(in_delivery));
-    uint8_t       priority             = qdr_forward_effective_priority(msg, addr);
+    bool          receive_complete     = qd_message_receive_complete(msg);
 
     qdr_forward_deliver_info_list_t deliver_info_list;
     DEQ_INIT(deliver_info_list);
@@ -375,19 +419,26 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
             // Only forward via links that don't result in edge-echo.
             //
             if (!qdr_forward_edge_echo_CT(in_delivery, out_link)) {
-                qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
 
-                // Store the out_link and out_delivery so we can forward the delivery later on
-                qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
-                ZERO(deliver_info);
-                deliver_info->out_dlv = out_delivery;
-                deliver_info->out_link = out_link;
-                DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
+                if (!receive_complete && next_hop_supports_streaming_links(out_link->conn)) {
+                    out_link = get_outgoing_streaming_link(core, out_link->conn);
+                }
 
-                fanout++;
-                if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER && !out_link->fallback) {
-                    addr->deliveries_egress++;
-                    core->deliveries_egress++;
+                if (out_link) {
+                    qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
+
+                    // Store the out_link and out_delivery so we can forward the delivery later on
+                    qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
+                    ZERO(deliver_info);
+                    deliver_info->out_dlv = out_delivery;
+                    deliver_info->out_link = out_link;
+                    DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
+
+                    fanout++;
+                    if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER && !out_link->fallback) {
+                        addr->deliveries_egress++;
+                        core->deliveries_egress++;
+                    }
                 }
             }
 
@@ -399,6 +450,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     // Forward to remote routers with subscribers using the appropriate
     // link for the traffic class: control or data
     //
+
     //
     // Get the mask bit associated with the ingress router for the message.
     // This will be compared against the "valid_origin" masks for each
@@ -421,17 +473,15 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     //
     if (origin >= 0) {
         int           dest_bit;
-        qdr_link_t   *dest_link;
-        qdr_node_t   *next_node;
-        qd_bitmask_t *link_set = qd_bitmask(0);
+        qd_bitmask_t *conn_set = qd_bitmask(0);  // connections to the next-hops
 
         //
-        // Loop over the target nodes for this address.  Build a set of outgoing links
+        // Loop over the target nodes for this address.  Build a set of outgoing connections
         // for which there are valid targets.  We do this to avoid sending more than one
-        // message down a given link.  It's possible that there are multiple destinations
-        // for this address that are all reachable over the same link.  In this case, we
-        // will send only one copy of the message over the link and allow a downstream
-        // router to fan the message out.
+        // message to a given next-hop.  It's possible that there are multiple destinations
+        // for this address that are all reachable via the same next-hop.  In this case, we
+        // will send only one copy of the message to the next-hop and allow the downstream
+        // routers to fan the message out.
         //
         int c;
         for (QD_BITMASK_EACH(addr->rnodes, dest_bit, c)) {
@@ -439,26 +489,30 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
             if (!rnode)
                 continue;
 
-            if (rnode->next_hop)
-                next_node = rnode->next_hop;
-            else
-                next_node = rnode;
-
-            dest_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority);
-            if (dest_link && qd_bitmask_value(rnode->valid_origins, origin))
-                qd_bitmask_set_bit(link_set, dest_link->conn->mask_bit);
+            // get the inter-router connection associated with path to rnode:
+            int conn_bit = (rnode->next_hop) ? rnode->next_hop->conn_mask_bit : rnode->conn_mask_bit;
+            if (conn_bit >= 0 && (!link_exclusion || qd_bitmask_value(link_exclusion, conn_bit) == 0)) {
+                qd_bitmask_set_bit(conn_set, conn_bit);
+            }
         }
 
         //
-        // Send a copy of the message outbound on each identified link.
+        // Send a copy of the message over the inter-router connection to the next hop
         //
-        int link_bit;
-        while (qd_bitmask_first_set(link_set, &link_bit)) {
-            qd_bitmask_clear_bit(link_set, link_bit);
-            dest_link = control ?
-                core->control_links_by_mask_bit[link_bit] :
-                core->data_links_by_mask_bit[link_bit].links[priority];
-            if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) {
+        int conn_bit;
+        while (qd_bitmask_first_set(conn_set, &conn_bit)) {
+            qd_bitmask_clear_bit(conn_set, conn_bit);
+
+            qdr_link_t  *dest_link;
+            if (control) {
+                dest_link = peer_router_control_link(core, conn_bit);
+            } else if (!receive_complete) {  // inter-router conns support dynamic streaming links
+                dest_link = get_outgoing_streaming_link(core, core->rnode_conns_by_mask_bit[conn_bit]);
+            } else {
+                dest_link = peer_router_data_link(core, conn_bit, qdr_forward_effective_priority(msg, addr));
+            }
+
+            if (dest_link) {
                 qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
 
                 // Store the out_link and out_delivery so we can forward the delivery later on
@@ -475,7 +529,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
             }
         }
 
-        qd_bitmask_free(link_set);
+        qd_bitmask_free(conn_set);
     }
 
     if (!exclude_inprocess) {
@@ -510,14 +564,11 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
                            bool             exclude_inprocess,
                            bool             control)
 {
-    qdr_link_t     *out_link;
-    qdr_delivery_t *out_delivery;
-
+    const bool receive_complete = qd_message_receive_complete(msg);
     //
     // Forward to an in-process subscriber if there is one.
     //
     if (!exclude_inprocess) {
-        bool receive_complete = qd_message_receive_complete(msg);
         qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
         if (sub) {
             qdr_forward_to_subscriber(core, sub, in_delivery, msg, receive_complete);
@@ -545,7 +596,7 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
     }
 
     //
-    // Forward to a local subscriber.
+    // Forward to a locally attached subscriber.
     //
     qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
 
@@ -556,31 +607,38 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
         link_ref = DEQ_NEXT(link_ref);
 
     if (link_ref) {
-        out_link     = link_ref->link;
-        out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
-        qdr_forward_deliver_CT(core, out_link, out_delivery);
+        qdr_link_t *out_link = link_ref->link;
 
-        //
-        // If there are multiple local subscribers, rotate the list of link references
-        // so deliveries will be distributed among the subscribers in a round-robin pattern.
-        //
-        if (DEQ_SIZE(addr->rlinks) > 1) {
-            link_ref = DEQ_HEAD(addr->rlinks);
-            DEQ_REMOVE_HEAD(addr->rlinks);
-            DEQ_INSERT_TAIL(addr->rlinks, link_ref);
+        if (!receive_complete && next_hop_supports_streaming_links(out_link->conn)) {
+            out_link = get_outgoing_streaming_link(core, out_link->conn);
         }
 
-        if (!out_link->fallback) {
-            addr->deliveries_egress++;
-            core->deliveries_egress++;
-        }
+        if (out_link) {
+            qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
+            qdr_forward_deliver_CT(core, out_link, out_delivery);
 
-        if (qdr_connection_route_container(out_link->conn)) {
-            core->deliveries_egress_route_container++;
-            addr->deliveries_egress_route_container++;
-        }
+            //
+            // If there are multiple local subscribers, rotate the list of link references
+            // so deliveries will be distributed among the subscribers in a round-robin pattern.
+            //
+            if (DEQ_SIZE(addr->rlinks) > 1) {
+                link_ref = DEQ_HEAD(addr->rlinks);
+                DEQ_REMOVE_HEAD(addr->rlinks);
+                DEQ_INSERT_TAIL(addr->rlinks, link_ref);
+            }
 
-        return 1;
+            if (!out_link->fallback) {
+                addr->deliveries_egress++;
+                core->deliveries_egress++;
+            }
+
+            if (qdr_connection_route_container(out_link->conn)) {
+                core->deliveries_egress_route_container++;
+                addr->deliveries_egress_route_container++;
+            }
+
+            return 1;
+        }
     }
 
     //
@@ -594,8 +652,6 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
     // Forward to remote routers with subscribers using the appropriate
     // link for the traffic class: control or data
     //
-    qdr_node_t *next_node;
-
     if (addr->next_remote >= 0) {
         qdr_node_t *rnode = core->routers_by_mask_bit[addr->next_remote];
         if (rnode) {
@@ -603,15 +659,19 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
             if (addr->next_remote == -1)
                 qd_bitmask_first_set(addr->closest_remotes, &addr->next_remote);
 
-            if (rnode->next_hop)
-                next_node = rnode->next_hop;
-            else
-                next_node = rnode;
+            // get the inter-router connection associated with path to rnode:
+            int conn_bit = (rnode->next_hop) ? rnode->next_hop->conn_mask_bit : rnode->conn_mask_bit;
+            qdr_link_t *out_link;
+            if (control) {
+                out_link = peer_router_control_link(core, conn_bit);
+            } else if (!receive_complete) {
+                out_link = get_outgoing_streaming_link(core, core->rnode_conns_by_mask_bit[conn_bit]);
+            } else {
+                out_link = peer_router_data_link(core, conn_bit, qdr_forward_effective_priority(msg, addr));
+            }
 
-            uint8_t priority = qdr_forward_effective_priority(msg, addr);
-            out_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority);
             if (out_link) {
-                out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
+                qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
                 qdr_forward_deliver_CT(core, out_link, out_delivery);
                 addr->deliveries_transit++;
                 if (out_link->link_type == QD_LINK_ROUTER)
@@ -647,10 +707,10 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
     }
 
     qdr_link_t *best_eligible_link       = 0;
-    int         best_eligible_link_bit   = -1;
+    int         best_eligible_conn_bit   = -1;
     uint32_t    eligible_link_value      = UINT32_MAX;
     qdr_link_t *best_ineligible_link     = 0;
-    int         best_ineligible_link_bit = -1;
+    int         best_ineligible_conn_bit = -1;
     uint32_t    ineligible_link_value    = UINT32_MAX;
 
     //
@@ -702,7 +762,7 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
         // This will be compared against the "valid_origin" masks for each
         // candidate destination router.
         //
-        int origin = 0;
+        int origin = 0;  // default to this router
         qd_iterator_t *ingress_iter = in_delivery ? in_delivery->origin : 0;
 
         if (ingress_iter) {
@@ -717,26 +777,29 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
         int node_bit;
         for (QD_BITMASK_EACH(addr->rnodes, node_bit, c)) {
             qdr_node_t *rnode     = core->routers_by_mask_bit[node_bit];
-            qdr_node_t *next_node = rnode->next_hop ? rnode->next_hop : rnode;
-            uint8_t     priority  = qdr_forward_effective_priority(msg, addr);
-            qdr_link_t *link      = peer_data_link(core, next_node, priority);
-            if (!link) continue;
-            int         link_bit  = link->conn->mask_bit;
-            int         value     = addr->outstanding_deliveries[link_bit];
-            bool        eligible  = link->capacity > value;
 
             if (qd_bitmask_value(rnode->valid_origins, origin)) {
+
+                qdr_node_t *next_node = rnode->next_hop ? rnode->next_hop : rnode;
+                int         conn_bit  = next_node->conn_mask_bit;
+                uint8_t     priority  = qdr_forward_effective_priority(msg, addr);
+                qdr_link_t *link      = peer_router_data_link(core, conn_bit, priority);
+                if (!link) continue;
+
+                int         value     = addr->outstanding_deliveries[conn_bit];
+                bool        eligible  = link->capacity > value;
+
                 //
                 // Link is a candidate, adjust the value by the bias (node cost).
                 //
                 value += rnode->cost;
                 if (eligible && eligible_link_value > value) {
                     best_eligible_link     = link;
-                    best_eligible_link_bit = link_bit;
+                    best_eligible_conn_bit = conn_bit;
                     eligible_link_value    = value;
                 } else if (!eligible && ineligible_link_value > value) {
                     best_ineligible_link     = link;
-                    best_ineligible_link_bit = link_bit;
+                    best_ineligible_conn_bit = conn_bit;
                     ineligible_link_value    = value;
                 }
             }
@@ -754,34 +817,43 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
     }
 
     qdr_link_t *chosen_link     = 0;
-    int         chosen_link_bit = -1;
+    int         chosen_conn_bit = -1;
 
     if (best_eligible_link) {
         chosen_link     = best_eligible_link;
-        chosen_link_bit = best_eligible_link_bit;
+        chosen_conn_bit = best_eligible_conn_bit;
     } else if (best_ineligible_link) {
         chosen_link     = best_ineligible_link;
-        chosen_link_bit = best_ineligible_link_bit;
+        chosen_conn_bit = best_ineligible_conn_bit;
     }
 
     if (chosen_link) {
-        qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg);
-        qdr_forward_deliver_CT(core, chosen_link, out_delivery);
 
-        //
-        // If the delivery is unsettled and the link is inter-router, account for the outstanding delivery.
-        //
-        if (in_delivery && !in_delivery->settled && chosen_link_bit >= 0) {
-            addr->outstanding_deliveries[chosen_link_bit]++;
-            out_delivery->tracking_addr     = addr;
-            out_delivery->tracking_addr_bit = chosen_link_bit;
-            addr->tracked_deliveries++;
+        // DISPATCH-1545 (head of line blocking): if the message is streaming,
+        // see if the allows us to open a dedicated link for streaming
+        if (!qd_message_receive_complete(msg) && next_hop_supports_streaming_links(chosen_link->conn)) {
+            chosen_link = get_outgoing_streaming_link(core, chosen_link->conn);
+            if (!chosen_link)
+                return 0;
         }
 
+        qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg);
+        qdr_forward_deliver_CT(core, chosen_link, out_delivery);
+
         //
         // Bump the appropriate counter based on where we sent the delivery.
         //
-        if (chosen_link_bit >= 0) {
+        if (chosen_conn_bit >= 0) {  // sent to peer router
+            //
+            // If the delivery is unsettled account for the outstanding delivery sent inter-router.
+            //
+            if (in_delivery && !in_delivery->settled) {
+                addr->outstanding_deliveries[chosen_conn_bit]++;
+                out_delivery->tracking_addr     = addr;
+                out_delivery->tracking_addr_bit = chosen_conn_bit;
+                addr->tracked_deliveries++;
+            }
+
             addr->deliveries_transit++;
             if (chosen_link->link_type == QD_LINK_ROUTER)
                 core->deliveries_transit++;
@@ -841,8 +913,6 @@ bool qdr_forward_link_balanced_CT(qdr_core_t     *core,
         //
         // Look for a next-hop we can use to forward the link-attach.
         //
-        qdr_node_t *next_node;
-
         if (addr->cost_epoch != core->cost_epoch) {
             addr->next_remote = -1;
             addr->cost_epoch  = core->cost_epoch;
@@ -864,14 +934,8 @@ bool qdr_forward_link_balanced_CT(qdr_core_t     *core,
                 if (addr->next_remote == -1)
                     qd_bitmask_first_set(addr->rnodes, &addr->next_remote);
 
-                if (rnode->next_hop)
-                    next_node = rnode->next_hop;
-                else
-                    next_node = rnode;
-
-                qdr_link_t * pdl = peer_data_link(core, next_node, 0);
-                if (next_node && pdl)
-                    conn = pdl->conn;
+                int conn_bit = (rnode->next_hop) ? rnode->next_hop->conn_mask_bit : rnode->conn_mask_bit;
+                conn = (conn_bit >= 0) ? core->rnode_conns_by_mask_bit[conn_bit] : 0;
             }
         }
     }
diff --git a/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c
new file mode 100644
index 0000000..1c578ca
--- /dev/null
+++ b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/dispatch/ctools.h"
+#include "module.h"
+#include <inttypes.h>
+
+
+/*
+ * Release unused streaming links
+ *
+ * Periodically scan through the list of open connections checking for idle
+ * streaming links.  If the connections idle streaming link pool is oversized
+ * then release some of the unused links in the background.
+ */
+
+#define PROD_TIMER_INTERVAL    30
+#define TEST_TIMER_INTERVAL    5
+#define TEST_MAX_FREE_POOL     2
+#define MAX_FREE_BATCH         10  // rate limit the link detach
+
+static int timer_interval     = PROD_TIMER_INTERVAL;
+static int max_free_pool_size = 128;
+
+static void qdr_streaming_link_scrubber_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+typedef struct tracker_t tracker_t;
+struct tracker_t {
+    qdr_core_t              *core;
+    qdr_core_timer_t        *timer;
+    qdr_connection_ref_t_sp  next_conn_ref;
+};
+
+
+/* Idle streaming link cleanup
+ *
+ * Check the size of the connections idle link free pool.  If the connection
+ * has accumulated too many unused links start closing them
+ */
+static void idle_link_cleanup(qdr_core_t *core, qdr_connection_t *conn)
+{
+    qdr_link_list_t to_free = DEQ_EMPTY;
+
+    qd_log(core->log, QD_LOG_DEBUG,
+           "[C%"PRIu64"] Streaming link scrubber: scanning connection", conn->identity);
+
+    const size_t pool_size = DEQ_SIZE(conn->streaming_link_pool);
+    if (pool_size > max_free_pool_size) {
+        size_t count = MIN(MAX_FREE_BATCH, pool_size - max_free_pool_size);
+
+        // links are returned to the pool by inserting at the tail.  Thus the
+        // links at head have been on the list the longest and are more likely
+        // be candidates for cleanup (e.g. idle)
+        while (count) {
+            qdr_link_t *link = DEQ_HEAD(conn->streaming_link_pool);
+            if (!qdr_link_is_idle_CT(link))
+                break;
+            DEQ_REMOVE_HEAD_N(STREAMING_POOL, conn->streaming_link_pool);
+            DEQ_INSERT_TAIL_N(STREAMING_POOL, to_free, link);
+            link->in_streaming_pool = false;
+            count -= 1;
+        }
+
+    }
+
+    if (DEQ_HEAD(to_free)) {
+        qd_log(core->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"] Streaming link scrubber: found %d idle links", conn->identity, (int)DEQ_SIZE(to_free));
+
+        while (DEQ_HEAD(to_free)) {
+            qdr_link_t *link = DEQ_HEAD(to_free);
+            DEQ_REMOVE_HEAD_N(STREAMING_POOL, to_free);
+            qd_log(core->log, QD_LOG_DEBUG,
+                   "[C%"PRIu64"][L%"PRIu64"] Streaming link scrubber: closing idle link %s",
+                   link->conn->identity, link->identity, (link->name) ? link->name : "");
+            qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE, true);
+        }
+    }
+}
+
+
+static void timer_handler_CT(qdr_core_t *core, void *context)
+{
+    tracker_t            *tracker    = (tracker_t*) context;
+    qdr_connection_ref_t *first_ref = DEQ_HEAD(core->streaming_connections);
+
+    if (!!first_ref) {
+        qd_log(core->log, QD_LOG_DEBUG, "Starting streaming link scrubber scan");
+        set_safe_ptr_qdr_connection_ref_t(first_ref, &tracker->next_conn_ref);
+        qdr_action_t *action = qdr_action(qdr_streaming_link_scrubber_CT, "streaming_link_scrubber");
+        action->args.general.context_1 = tracker;
+        qdr_action_background_enqueue(core, action);
+    } else
+        qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval);
+}
+
+
+static void qdr_streaming_link_scrubber_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    if (discard)
+        return;
+
+    tracker_t            *tracker  = (tracker_t*) action->args.general.context_1;
+    qdr_connection_ref_t *conn_ref = safe_deref_qdr_connection_ref_t(tracker->next_conn_ref);
+
+    if (!!conn_ref) {
+        idle_link_cleanup(core, conn_ref->conn);
+
+        conn_ref = DEQ_NEXT(conn_ref);
+        if (!!conn_ref) {
+            //
+            // There is another connection on the list.  Schedule another
+            // background action to process the next connection.
+            //
+            set_safe_ptr_qdr_connection_ref_t(conn_ref, &tracker->next_conn_ref);
+            action = qdr_action(qdr_streaming_link_scrubber_CT, "streaming_link_scrubber");
+            action->args.general.context_1 = tracker;
+            qdr_action_background_enqueue(core, action);
+        } else
+            //
+            // We've come to the end of the list of open connections.  Set the
+            // timer to start a new sweep after the interval.
+            //
+            qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval);
+    } else
+        //
+        // The connection we were provided is no longer valid.  It was probably
+        // closed since the last time we came through this path.  Abort the
+        // sweep and set the timer for a new one after the interval.
+        //
+        qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval);
+}
+
+
+static bool qcm_streaming_link_scrubber_enable_CT(qdr_core_t *core)
+{
+    if (core->qd->test_hooks) {
+        //
+        // Test hooks are enabled, override the timing constants with the test values
+        //
+        timer_interval = TEST_TIMER_INTERVAL;
+        max_free_pool_size = TEST_MAX_FREE_POOL;
+    }
+
+    return true;
+}
+
+
+static void qcm_streaming_link_scrubber_init_CT(qdr_core_t *core, void **module_context)
+{
+    tracker_t *tracker = NEW(tracker_t);
+    ZERO(tracker);
+    tracker->core  = core;
+    tracker->timer = qdr_core_timer_CT(core, timer_handler_CT, tracker);
+    qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval);
+    *module_context = tracker;
+
+    qd_log(core->log, QD_LOG_INFO,
+           "Streaming link scrubber: Scan interval: %d seconds, max free pool: %d links", timer_interval, max_free_pool_size);
+}
+
+
+static void qcm_streaming_link_scrubber_final_CT(void *module_context)
+{
+    tracker_t *tracker = (tracker_t*) module_context;
+    qdr_core_timer_free_CT(tracker->core, tracker->timer);
+    free(tracker);
+}
+
+
+QDR_CORE_MODULE_DECLARE("streaming_link_scruber", qcm_streaming_link_scrubber_enable_CT, qcm_streaming_link_scrubber_init_CT, qcm_streaming_link_scrubber_final_CT)
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 7c55964..3388507 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -246,11 +246,13 @@ void qdr_route_table_setup_CT(qdr_core_t *core)
 
         core->routers_by_mask_bit       = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
         core->control_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width());
+        core->rnode_conns_by_mask_bit   = NEW_PTR_ARRAY(qdr_connection_t, qd_bitmask_width());
         core->data_links_by_mask_bit    = NEW_ARRAY(qdr_priority_sheaf_t, qd_bitmask_width());
         for (int idx = 0; idx < qd_bitmask_width(); idx++) {
             core->routers_by_mask_bit[idx]   = 0;
             core->control_links_by_mask_bit[idx] = 0;
             core->data_links_by_mask_bit[idx].count = 0;
+            core->rnode_conns_by_mask_bit[idx] = 0;
             for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority)
               core->data_links_by_mask_bit[idx].links[priority] = 0;
 
@@ -312,7 +314,7 @@ static void qdr_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca
         ZERO(rnode);
         rnode->owning_addr       = addr;
         rnode->mask_bit          = router_maskbit;
-        rnode->link_mask_bit     = -1;
+        rnode->conn_mask_bit     = -1;
         rnode->valid_origins     = qd_bitmask(0);
 
         qd_iterator_reset_view(iter, ITER_VIEW_ALL);
@@ -414,20 +416,20 @@ static void qdr_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca
 static void qdr_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     int router_maskbit = action->args.route_table.router_maskbit;
-    int link_maskbit   = action->args.route_table.link_maskbit;
+    int conn_maskbit   = action->args.route_table.link_maskbit;  // "link" identifies a connection, not an amqp link
 
     if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
         qd_log(core->log, QD_LOG_CRITICAL, "set_link: Router maskbit out of range: %d", router_maskbit);
         return;
     }
 
-    if (link_maskbit >= qd_bitmask_width() || link_maskbit < 0) {
-        qd_log(core->log, QD_LOG_CRITICAL, "set_link: Link maskbit out of range: %d", link_maskbit);
+    if (conn_maskbit >= qd_bitmask_width() || conn_maskbit < 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "set_link: Link maskbit out of range: %d", conn_maskbit);
         return;
     }
 
-    if (core->control_links_by_mask_bit[link_maskbit] == 0) {
-        qd_log(core->log, QD_LOG_CRITICAL, "set_link: Invalid link reference: %d", link_maskbit);
+    if (core->control_links_by_mask_bit[conn_maskbit] == 0) {
+        qd_log(core->log, QD_LOG_CRITICAL, "set_link: Invalid link reference: %d", conn_maskbit);
         return;
     }
 
@@ -440,7 +442,7 @@ static void qdr_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard
     // Add the peer_link reference to the router record.
     //
     qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
-    rnode->link_mask_bit = link_maskbit;
+    rnode->conn_mask_bit = conn_maskbit;
     qdr_addr_start_inlinks_CT(core, rnode->owning_addr);
 }
 
@@ -460,7 +462,7 @@ static void qdr_remove_link_CT(qdr_core_t *core, qdr_action_t *action, bool disc
     }
 
     qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
-    rnode->link_mask_bit = -1;
+    rnode->conn_mask_bit = -1;
 }
 
 
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index ec8e0ad..91169ec 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -165,6 +165,10 @@ void qdr_core_free(qdr_core_t *core)
     qdr_link_t *link = DEQ_HEAD(core->open_links);
     while (link) {
         DEQ_REMOVE_HEAD(core->open_links);
+        if (link->in_streaming_pool) {
+            DEQ_REMOVE_N(STREAMING_POOL, link->conn->streaming_link_pool, link);
+            link->in_streaming_pool = false;
+        }
         if (link->core_endpoint)
             qdrc_endpoint_do_cleanup_CT(core, link->core_endpoint);
         qdr_del_link_ref(&link->conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
@@ -196,9 +200,15 @@ void qdr_core_free(qdr_core_t *core)
             work = DEQ_HEAD(conn->work_list);
         }
 
+        if (conn->has_streaming_links) {
+            assert(DEQ_IS_EMPTY(conn->streaming_link_pool));  // all links have been released
+            qdr_del_connection_ref(&core->streaming_connections, conn);
+        }
+
         qdr_connection_free(conn);
         conn = DEQ_HEAD(core->open_connections);
     }
+    assert(DEQ_SIZE(core->streaming_connections) == 0);
 
     // at this point all the conn identifiers have been freed
     qd_hash_free(core->conn_id_hash);
@@ -211,6 +221,7 @@ void qdr_core_free(qdr_core_t *core)
     if (core->control_links_by_mask_bit) free(core->control_links_by_mask_bit);
     if (core->data_links_by_mask_bit)    free(core->data_links_by_mask_bit);
     if (core->neighbor_free_mask)        qd_bitmask_free(core->neighbor_free_mask);
+    if (core->rnode_conns_by_mask_bit)   free(core->rnode_conns_by_mask_bit);
 
     free(core);
 }
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index e3757c4..707d5f2 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -348,7 +348,7 @@ struct qdr_node_t {
     qdr_address_t    *owning_addr;
     int               mask_bit;
     qdr_node_t       *next_hop;           ///< Next hop node _if_ this is not a neighbor node
-    int               link_mask_bit;      ///< Mask bit of inter-router connection if this is a neighbor node
+    int               conn_mask_bit;      ///< qdr_connection_t->mask_bit inter-router conn if this is a neighbor node
     uint32_t          ref_count;
     qd_bitmask_t     *valid_origins;
     int               cost;
@@ -360,11 +360,6 @@ struct qdr_node_t {
 DEQ_DECLARE(qdr_node_t, qdr_node_list_t);
 void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode);
 
-#define PEER_CONTROL_LINK(c,n) ((n->link_mask_bit >= 0) ? (c)->control_links_by_mask_bit[n->link_mask_bit] : 0)
-// PEER_DATA_LINK has gotten more complex with prioritized links, and is now a function, peer_data_link().
-
-
-
 struct qdr_router_ref_t {
     DEQ_LINKS(qdr_router_ref_t);
     qdr_node_t *router;
@@ -466,9 +461,11 @@ struct qdr_link_t {
     bool                     processing;        ///< True if an IO thread is currently handling this link
     bool                     ready_to_free;     ///< True if the core thread wanted to clean up the link but it was processing
     bool                     fallback;          ///< True if this link is attached to a fallback destination for an address
+    bool                     streaming;         ///< True if this link can be reused for streaming msgs
+    bool                     in_streaming_pool; ///< True if this link is in the connections standby pool STREAMING_POOL
+    bool                     terminus_survives_disconnect;
     char                    *strip_prefix;
     char                    *insert_prefix;
-    bool                     terminus_survives_disconnect;
 
     uint64_t  total_deliveries;
     uint64_t  presettled_deliveries;
@@ -485,8 +482,9 @@ struct qdr_link_t {
     uint8_t   priority;
     uint8_t   rate_cursor;
     uint32_t  core_ticks;
-};
 
+    DEQ_LINKS_N(STREAMING_POOL, qdr_link_t);
+};
 DEQ_DECLARE(qdr_link_t, qdr_link_list_t);
 
 struct qdr_link_ref_t {
@@ -552,7 +550,7 @@ struct qdr_address_t {
     int           next_remote;
 
     //
-    // State for "balanced" treatment
+    // State for "balanced" treatment, indexed by inter-router connection mask bit
     //
     int *outstanding_deliveries;
 
@@ -671,7 +669,7 @@ struct qdr_connection_t {
     bool                        policy_allow_dynamic_link_routes;
     bool                        policy_allow_admin_status_update;
     int                         link_capacity;
-    int                         mask_bit;
+    int                         mask_bit;  ///< set only if inter-router connection
     qdr_connection_work_list_t  work_list;
     sys_mutex_t                *work_lock;
     qdr_link_ref_list_t         links;
@@ -688,6 +686,8 @@ struct qdr_connection_t {
     uint32_t                    conn_uptime; // Timestamp which can be used to calculate the number of seconds this connection has been up and running.
     uint32_t                    last_delivery_time; // Timestamp which can be used to calculate the number of seconds since the last delivery arrived on this connection.
     bool                        enable_protocol_trace; // Has trace level logging been turned on for this connection.
+    bool                        has_streaming_links;   ///< one or more of this connection's links are for streaming messages
+    qdr_link_list_t             streaming_link_pool;   ///< pool of links available for streaming messages
 };
 
 DEQ_DECLARE(qdr_connection_t, qdr_connection_list_t);
@@ -801,10 +801,11 @@ struct qdr_core_t {
     qd_timer_t              *work_timer;
     uint32_t                 uptime_ticks;
 
-    qdr_connection_list_t open_connections;
-    qdr_connection_t     *active_edge_connection;
-    qdr_connection_list_t connections_to_activate;
-    qdr_link_list_t       open_links;
+    qdr_connection_list_t      open_connections;
+    qdr_connection_t          *active_edge_connection;
+    qdr_connection_list_t      connections_to_activate;
+    qdr_link_list_t            open_links;
+    qdr_connection_ref_list_t  streaming_connections;
 
     qdrc_attach_addr_lookup_t  addr_lookup_handler;
     void                      *addr_lookup_context;
@@ -868,10 +869,11 @@ struct qdr_core_t {
     qdr_address_t             *routerma_addr_T;
 
     qdr_node_list_t       routers;            ///< List of routers, in order of cost, from lowest to highest
-    qd_bitmask_t         *neighbor_free_mask;
-    qdr_node_t          **routers_by_mask_bit;
-    qdr_link_t          **control_links_by_mask_bit;
-    qdr_priority_sheaf_t *data_links_by_mask_bit;
+    qd_bitmask_t         *neighbor_free_mask;        ///< bits available for new conns (qd_connection_t->mask_bit values)
+    qdr_node_t          **routers_by_mask_bit;       ///< indexed by qdr_node_t->mask_bit
+    qdr_connection_t    **rnode_conns_by_mask_bit;   ///< inter-router conns indexed by conn->mask_bit
+    qdr_link_t          **control_links_by_mask_bit; ///< indexed by qdr_node_t->link_mask_bit, qdr_connection_t->mask_bit
+    qdr_priority_sheaf_t *data_links_by_mask_bit;    ///< indexed by qdr_node_t->link_mask_bit, qdr_connection_t->mask_bit
     uint64_t              cost_epoch;
 
     uint64_t              next_tag;
@@ -958,6 +960,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *pe
 void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv);
 void qdr_connection_free(qdr_connection_t *conn);
 void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
+qdr_link_t *qdr_connection_new_streaming_link_CT(qdr_core_t *core, qdr_connection_t *conn);
 qdr_address_config_t *qdr_config_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter);
 qd_address_treatment_t qdr_treatment_for_address_hash_CT(qdr_core_t *core, qd_iterator_t *iter, qdr_address_config_t **addr_config);
 qd_address_treatment_t qdr_treatment_for_address_hash_with_default_CT(qdr_core_t *core, qd_iterator_t *iter, qd_address_treatment_t default_treatment, qdr_address_config_t **addr_config);
@@ -983,6 +986,9 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t        *core,
 
 void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close);
 void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);
+bool qdr_link_is_idle_CT(const qdr_link_t *link);
+qdr_terminus_t *qdr_terminus_router_control(void);  ///< new terminus for router control links
+qdr_terminus_t *qdr_terminus_router_data(void);  ///< new terminus for router links
 
 qdr_query_t *qdr_query(qdr_core_t              *core,
                        void                    *context,
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 3dab635..456bd25 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -925,3 +925,16 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
             qdr_addr_start_inlinks_CT(core, addr->fallback_for);
     }
 }
+
+
+// True if link currently has no outstanding deliveries or work.
+// Used to determine if it is safe for the core to close a link.
+//
+bool qdr_link_is_idle_CT(const qdr_link_t *link)
+{
+    return (DEQ_SIZE(link->undelivered) == 0 &&
+            DEQ_SIZE(link->unsettled) == 0 &&
+            DEQ_SIZE(link->settled) == 0 &&
+            DEQ_SIZE(link->updated_deliveries) == 0 &&
+            !link->ref[QDR_LINK_LIST_CLASS_WORK]);
+}
diff --git a/src/router_node.c b/src/router_node.c
index b2dd1d5..c711b2d 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -483,6 +483,38 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     }
 
     //
+    // Head of line blocking avoidance (DISPATCH-1545)
+    //
+    // Before we can forward a message we need to determine whether or not this
+    // message is "streaming" - a large message that has the potential to block
+    // other messages sharing the trunk link.  At this point we cannot for sure
+    // know the actual length of the incoming message, so we employ the
+    // following heuristic to determine if the message is "streaming":
+    //
+    // - If the message is receive-complete it is NOT a streaming message.
+    // - If it is NOT receive-complete:
+    //   Continue buffering incoming data until:
+    //   - receive has completed => NOT a streaming message
+    //   - not rx-complete AND Q2 threshold hit => a streaming message
+    //
+    // Once Q2 is hit we MUST forward the message regardless of rx-complete
+    // since Q2 will block forever unless the incoming data is drained via
+    // forwarding.
+    //
+    if (!receive_complete) {
+        if (qd_message_is_Q2_blocked(msg)) {
+            qd_log(router->log_source, QD_LOG_DEBUG,
+                   "[C%"PRIu64" L%"PRIu64"] Incoming message classified as streaming. User:%s",
+                   conn->connection_id,
+                   qd_link_link_id(link),
+                   conn->user_id);
+        } else {
+            // Continue buffering this message
+            return false;
+        }
+    }
+
+    //
     // Determine if the incoming link is anonymous.  If the link is addressed,
     // there are some optimizations we can take advantage of.
     //
@@ -766,6 +798,7 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery
 static int AMQP_incoming_link_handler(void* context, qd_link_t *link)
 {
     qd_connection_t  *conn     = qd_link_connection(link);
+    uint64_t link_id;
 
     // The connection that this link belongs to is gone. Perhaps an AMQP close came in.
     // This link handler should not continue since there is no connection.
@@ -780,7 +813,9 @@ static int AMQP_incoming_link_handler(void* context, qd_link_t *link)
                                                        qdr_terminus(qd_link_remote_source(link)),
                                                        qdr_terminus(qd_link_remote_target(link)),
                                                        pn_link_name(qd_link_pn(link)),
-                                                       terminus_addr);
+                                                       terminus_addr,
+                                                       &link_id);
+    qd_link_set_link_id(link, link_id);
     qdr_link_set_context(qdr_link, link);
     qd_link_set_context(link, qdr_link);
 
@@ -794,6 +829,7 @@ static int AMQP_incoming_link_handler(void* context, qd_link_t *link)
 static int AMQP_outgoing_link_handler(void* context, qd_link_t *link)
 {
     qd_connection_t  *conn     = qd_link_connection(link);
+    uint64_t link_id;
 
     // The connection that this link belongs to is gone. Perhaps an AMQP close came in.
     // This link handler should not continue since there is no connection.
@@ -806,7 +842,9 @@ static int AMQP_outgoing_link_handler(void* context, qd_link_t *link)
                                                  qdr_terminus(qd_link_remote_source(link)),
                                                  qdr_terminus(qd_link_remote_target(link)),
                                                  pn_link_name(qd_link_pn(link)),
-                                                 terminus_addr);
+                                                 terminus_addr,
+                                                 &link_id);
+    qd_link_set_link_id(link, link_id);
     qdr_link_set_context(qdr_link, link);
     qd_link_set_context(link, qdr_link);
 
@@ -1481,14 +1519,6 @@ static void CORE_link_first_attach(void             *context,
     pn_link_open(qd_link_pn(qlink));
 
     //
-    // All links on the inter router or edge connection have unbounded q2 limit.
-    // Blocking control messages can lead to various failures
-    //
-    if (qdr_connection_role(conn) == QDR_ROLE_EDGE_CONNECTION || qdr_connection_role(conn) == QDR_ROLE_INTER_ROUTER) {
-        qd_link_set_q2_limit_unbounded(qlink, true);
-    }
-
-    //
     // Mark the link as stalled and waiting for initial credit.
     //
     if (qdr_link_direction(link) == QD_OUTGOING)
@@ -1514,16 +1544,6 @@ static void CORE_link_second_attach(void *context, qdr_link_t *link, qdr_terminu
     //
     pn_link_open(pn_link);
 
-    qd_connection_t  *conn     = qd_link_connection(qlink);
-    qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
-    //
-    // All links on the inter router or edge connection have unbounded q2 limit
-    //
-    if (qdr_connection_role(qdr_conn) == QDR_ROLE_EDGE_CONNECTION || qdr_connection_role(qdr_conn) == QDR_ROLE_INTER_ROUTER) {
-        qd_link_set_q2_limit_unbounded(qlink, true);
-    }
-
-
     //
     // Mark the link as stalled and waiting for initial credit.
     //
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 9fdd1aa..47d6481 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -59,6 +59,9 @@ target_link_libraries(test-sender ${Proton_LIBRARIES} qpid-dispatch)
 add_executable(test-receiver test-receiver.c)
 target_link_libraries(test-receiver ${Proton_LIBRARIES})
 
+add_executable(clogger clogger.c)
+target_link_libraries(clogger ${Proton_LIBRARIES})
+
 
 set(TEST_WRAP ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/run.py)
 
diff --git a/tests/clogger.c b/tests/clogger.c
new file mode 100644
index 0000000..489c709
--- /dev/null
+++ b/tests/clogger.c
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * A test traffic generator that produces very long messages that are sent in
+ * chunks with a delay between each chunk.  This client can be used to simulate
+ * very large streaming messages and/or slow producers.
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <time.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <math.h>
+#include <assert.h>
+#include <arpa/inet.h>
+#include <stdarg.h>
+
+
+#include "proton/reactor.h"
+#include "proton/message.h"
+#include "proton/connection.h"
+#include "proton/session.h"
+#include "proton/link.h"
+#include "proton/delivery.h"
+#include "proton/transport.h"
+#include "proton/event.h"
+#include "proton/handlers.h"
+
+#define DEFAULT_MAX_FRAME  65535
+#define BOOL2STR(b) ((b)?"true":"false")
+#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y))
+
+bool stop = false;
+bool verbose = false;
+
+uint64_t limit = 1;               // # messages to send
+uint64_t sent = 0;                // # sent
+uint64_t acked = 0;               // # of received acks
+uint64_t accepted = 0;
+uint64_t not_accepted = 0;
+
+bool use_anonymous = false;  // use anonymous link if true
+bool presettle = false;      // true = send presettled
+uint32_t body_length = 1024 * 1024; // # bytes in vbin32 payload
+uint32_t pause_msec = 100;   // pause between sending chunks (milliseconds)
+
+const char *target_address = "test-address";
+const char *host_address = "127.0.0.1:5672";
+const char *container_name = "Clogger";
+
+pn_reactor_t    *reactor;
+pn_connection_t *pn_conn;
+pn_session_t    *pn_ssn;
+pn_link_t       *pn_link;
+pn_delivery_t   *pn_dlv;       // current in-flight delivery
+pn_handler_t    *_handler;
+
+uint32_t         bytes_sent;   // number of body data bytes written out link
+uint32_t         remote_max_frame = DEFAULT_MAX_FRAME;  // used to limit amount written
+
+#define AMQP_MSG_HEADER     0x70
+#define AMQP_MSG_PROPERTIES 0x73
+#define AMQP_MSG_DATA       0x75
+
+// minimal AMQP header for a message that contains a single binary value
+//
+const uint8_t msg_header[] = {
+    0x00,  // begin described type
+    0x53,  // 1 byte ulong type
+    0x70,  // HEADER section
+    0x45,  // empty list
+    0x00,  // begin described type
+    0x53,  // 1 byte ulong type
+    0x73,  // PROPERTIES section
+    0x45,  // empty list
+    0x00,  // begin described type
+    0x53,  // 1 byte ulong type
+    0x77,  // AMQP Value BODY section
+    0xb0,  // Binary uint32 length
+    // 4 bytes for length here
+    // start of data...
+};
+
+
+void debug(const char *format, ...)
+{
+    va_list args;
+
+    if (!verbose) return;
+
+    va_start(args, format);
+    vprintf(format, args);
+    va_end(args);
+}
+
+
+static void signal_handler(int signum)
+{
+    signal(SIGINT,  SIG_IGN);
+    signal(SIGQUIT, SIG_IGN);
+
+    switch (signum) {
+    case SIGINT:
+    case SIGQUIT:
+        stop = true;
+        if (reactor) pn_reactor_wakeup(reactor);
+        break;
+    default:
+        break;
+    }
+}
+
+
+void start_message()
+{
+    static long tag = 0;  // a simple tag generator
+
+    if (!pn_link || !pn_conn) return;
+    if (pn_dlv) {
+        debug("Cannot create delivery - in process\n");
+        abort();
+    }
+
+    debug("start message #%"PRIu64"!\n", sent);
+
+    pn_dlv = pn_delivery(pn_link, pn_dtag((const char *)&tag, sizeof(tag)));
+    ++tag;
+
+    bytes_sent = 0;
+
+    // send the message header
+    ssize_t rc = pn_link_send(pn_link, (const char *)msg_header, sizeof(msg_header));
+    if (rc != sizeof(msg_header)) {
+        debug("Link send failed error=%ld\n", rc);
+        abort();
+    }
+
+    // add the vbin32 length (in network order!!!)
+    uint32_t len = htonl(body_length);
+    rc = pn_link_send(pn_link, (const char *)&len, sizeof(len));
+    if (rc != sizeof(len)) {
+        debug("Link send failed error=%ld\n", rc);
+        abort();
+    }
+}
+
+
+/* return true when message transmit is complete */
+bool send_message_data()
+{
+    static const char zero_block[DEFAULT_MAX_FRAME] = {0};
+
+    if (!pn_dlv) return true;  // not sending
+
+    if (bytes_sent < body_length) {
+
+        uint32_t amount = MIN(body_length - bytes_sent, remote_max_frame);
+        amount = MIN(amount, sizeof(zero_block));
+
+        ssize_t rc = pn_link_send(pn_link, zero_block, amount);
+        if (rc < 0) {
+            debug("Link send failed error=%ld\n", rc);
+            abort();
+        }
+        bytes_sent += rc;
+
+        debug("message body bytes written=%"PRIu32" total=%"PRIu32" body_length=%"PRIu32"\n",
+              rc, bytes_sent, body_length);
+    }
+
+    if (bytes_sent == body_length) {
+        debug("message #%"PRIu64" sent!\n", sent);
+        pn_link_advance(pn_link);
+        sent += 1;
+
+        if (presettle) {
+            pn_delivery_settle(pn_dlv);
+            if (limit && sent == limit) {
+                // no need to wait for acks
+                debug("stopping...\n");
+                stop = true;
+                pn_reactor_wakeup(reactor);
+            }
+        }
+        pn_dlv = 0;
+        return true;
+    }
+
+    return false;
+}
+
+/* Process each event posted by the proactor.
+   Return true if client has stopped.
+ */
+static void event_handler(pn_handler_t *handler,
+                          pn_event_t *event,
+                          pn_event_type_t etype)
+{
+    debug("new event=%s\n", pn_event_type_name(etype));
+
+    switch (etype) {
+
+    case PN_CONNECTION_INIT: {
+        // Create and open all the endpoints needed to send a message
+        //
+        pn_connection_open(pn_conn);
+        pn_ssn = pn_session(pn_conn);
+        pn_session_open(pn_ssn);
+        pn_link = pn_sender(pn_ssn, "MyClogger");
+        if (!use_anonymous) {
+            pn_terminus_set_address(pn_link_target(pn_link), target_address);
+        }
+        pn_link_open(pn_link);
+    } break;
+
+
+    case PN_CONNECTION_REMOTE_OPEN: {
+        uint32_t rmf = pn_transport_get_remote_max_frame(pn_event_transport(event));
+        remote_max_frame = (rmf != 0) ? rmf : DEFAULT_MAX_FRAME;
+        debug("Remote MAX FRAME=%u\n", remote_max_frame);
+    } break;
+
+    case PN_LINK_FLOW: {
+        // the remote has given us some credit, now we can send messages
+        //
+        if (limit == 0 || sent < limit) {
+            if (pn_link_credit(pn_link) > 0) {
+                if (!pn_dlv) {
+                    start_message();
+                    pn_reactor_schedule(reactor, pause_msec, _handler);   // send body after pause
+                }
+            }
+        }
+    } break;
+
+
+    case PN_TRANSPORT: {
+        ssize_t pending = pn_transport_pending(pn_event_transport(event));
+        debug("PN_TRANSPORT pending=%ld\n", pending);
+    } break;
+
+    case PN_DELIVERY: {
+        pn_delivery_t *dlv = pn_event_delivery(event);
+        if (pn_delivery_updated(dlv)) {
+            uint64_t rs = pn_delivery_remote_state(dlv);
+            pn_delivery_clear(dlv);
+
+            switch (rs) {
+            case PN_RECEIVED:
+                debug("PN_DELIVERY: received\n");
+                // This is not a terminal state - it is informational, and the
+                // peer is still processing the message.
+                break;
+            case PN_ACCEPTED:
+                debug("PN_DELIVERY: accept\n");
+                ++acked;
+                ++accepted;
+                pn_delivery_settle(dlv);
+                break;
+            case PN_REJECTED:
+            case PN_RELEASED:
+            case PN_MODIFIED:
+            default:
+                ++acked;
+                ++not_accepted;
+                pn_delivery_settle(dlv);
+                debug("Message not accepted - code: 0x%lX\n", (unsigned long)rs);
+                break;
+            }
+
+            if (limit && acked == limit) {
+                // initiate clean shutdown of the endpoints
+                debug("stopping...\n");
+                stop = true;
+                pn_reactor_wakeup(reactor);
+            }
+        }
+    } break;
+
+    case PN_TIMER_TASK: {
+        if (!send_message_data()) {   // not done sending
+            pn_reactor_schedule(reactor, pause_msec, _handler);
+        } else if (limit == 0 || sent < limit) {
+            if (pn_link_credit(pn_link) > 0) {
+                // send next message
+                start_message();
+                pn_reactor_schedule(reactor, pause_msec, _handler);
+            }
+        }
+    } break;
+
+    default:
+        break;
+    }
+}
+
+
+static void delete_handler(pn_handler_t *handler)
+{
+}
+
+
+static void usage(const char *prog)
+{
+    printf("Usage: %s <options>\n", prog);
+    printf("-a      \tThe host address [%s]\n", host_address);
+    printf("-c      \t# of messages to send, 0 == nonstop [%"PRIu64"]\n", limit);
+    printf("-i      \tContainer name [%s]\n", container_name);
+    printf("-n      \tUse an anonymous link [%s]\n", BOOL2STR(use_anonymous));
+    printf("-s      \tBody size in bytes [%d]\n", body_length);
+    printf("-t      \tTarget address [%s]\n", target_address);
+    printf("-u      \tSend all messages presettled [%s]\n", BOOL2STR(presettle));
+    printf("-D      \tPrint debug info [off]\n");
+    printf("-P      \tPause between sending frames [%"PRIu32"]\n", pause_msec);
+    exit(1);
+}
+
+int main(int argc, char** argv)
+{
+    /* command line options */
+    opterr = 0;
+    int c;
+    while ((c = getopt(argc, argv, "ha:c:i:ns:t:uDP:")) != -1) {
+        switch(c) {
+        case 'h': usage(argv[0]); break;
+        case 'a': host_address = optarg; break;
+        case 'c':
+            if (sscanf(optarg, "%"SCNu64, &limit) != 1)
+                usage(argv[0]);
+            break;
+        case 'i': container_name = optarg; break;
+        case 'n': use_anonymous = true; break;
+        case 's':
+            if (sscanf(optarg, "%"SCNu32, &body_length) != 1)
+                usage(argv[0]);
+            break;
+        case 't': target_address = optarg; break;
+        case 'u': presettle = true; break;
+        case 'D': verbose = true; break;
+        case 'P':
+            if (sscanf(optarg, "%"SCNu32, &pause_msec) != 1)
+                usage(argv[0]);
+            break;
+        default:
+            usage(argv[0]);
+            break;
+        }
+    }
+
+    signal(SIGQUIT, signal_handler);
+    signal(SIGINT,  signal_handler);
+
+    // test infrastructure may add a "amqp[s]://" prefix to the address string.
+    // That causes proactor much grief, so strip it off
+    if (strncmp("amqps://", host_address, strlen("amqps://")) == 0) {
+        host_address += strlen("amqps://"); // no! no ssl for you!
+    } else if (strncmp("amqp://", host_address, strlen("amqp://")) == 0) {
+        host_address += strlen("amqp://");
+    }
+
+    // convert host_address to hostname and port
+    char *hostname = strdup(host_address);
+    char *port = strchr(hostname, ':');
+    if (!port) {
+        port = "5672";
+    } else {
+        *port++ = 0;
+    }
+
+    _handler = pn_handler_new(event_handler, 0, delete_handler);
+    pn_handler_add(_handler, pn_handshaker());
+
+    reactor = pn_reactor();
+    pn_conn = pn_reactor_connection_to_host(reactor,
+                                            hostname,
+                                            port,
+                                            _handler);
+
+    // the container name should be unique for each client
+    pn_connection_set_container(pn_conn, container_name);
+    pn_connection_set_hostname(pn_conn, hostname);
+
+    // break out of pn_reactor_process once a second to check if done
+    pn_reactor_set_timeout(reactor, 1000);
+
+    pn_reactor_start(reactor);
+
+    while (pn_reactor_process(reactor)) {
+        if (stop) {
+            // close the endpoints this will cause pn_reactor_process() to
+            // eventually break the loop
+            if (pn_link) pn_link_close(pn_link);
+            if (pn_ssn) pn_session_close(pn_ssn);
+            if (pn_conn) pn_connection_close(pn_conn);
+            pn_link = 0;
+            pn_ssn = 0;
+            pn_conn = 0;
+        }
+    }
+
+    if (pn_link) pn_link_free(pn_link);
+    if (pn_ssn) pn_session_free(pn_ssn);
+    if (pn_conn) pn_connection_close(pn_conn);
+
+    pn_reactor_free(reactor);
+
+    if (not_accepted) {
+        printf("Sent: %ld  Accepted: %ld Not Accepted: %ld\n", sent, accepted, not_accepted);
+        if (accepted + not_accepted != sent) {
+            printf("FAILURE! Sent: %ld  Acked: %ld\n", sent, accepted + not_accepted);
+            return 1;
+        }
+    }
+    return 0;
+}
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index d546a2f..4ed48da 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -22,7 +22,7 @@ from __future__ import division
 from __future__ import absolute_import
 from __future__ import print_function
 
-from os import path
+import os
 from time import sleep
 from threading import Event
 from threading import Timer
@@ -34,7 +34,9 @@ from system_test import AsyncTestSender
 from system_test import Logger
 from system_test import QdManager
 from system_test import unittest
+from system_test import Process
 from system_tests_link_routes import ConnLinkRouteService
+from test_broker import FakeBroker
 from test_broker import FakeService
 from proton.handlers import MessagingHandler
 from proton.reactor import Container, DynamicNodeProperties
@@ -1647,7 +1649,7 @@ class LinkRouteProxyTest(TestCase):
         numbers advertised in the incoming @open frame
         """
         lines = None
-        with open(path.join(self.INT_A.outdir, "INT.A.log")) as inta_log:
+        with open(os.path.join(self.INT_A.outdir, "INT.A.log")) as inta_log:
             lines = [l for l in inta_log.read().split("\n")
                      if "] Connection Opened: " in l
                      or "] Peer router version: " in l]
@@ -2936,5 +2938,470 @@ class EdgeListenerSender(TestCase):
         blocking_sender = blocking_connection.create_sender(address="multicast")
         self.assertTrue(blocking_sender!=None)
 
+
+class StreamingMessageTest(TestCase):
+    """
+    Test streaming message flows across edge and interior routers
+    """
+
+    SIG_TERM = -15  # Process.terminate() sets this exit value
+    BODY_MAX = 4294967295  # AMQP 1.0 allows types of length 2^32-1
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(StreamingMessageTest, cls).setUpClass()
+
+        def router(name, mode, extra):
+            config = [
+                ('router', {'mode': mode, 'id': name}),
+                ('listener', {'role': 'normal',
+                              'port': cls.tester.get_port(),
+                              'maxFrameSize': 65535}),
+
+                ('address', {'prefix': 'closest',   'distribution': 'closest'}),
+                ('address', {'prefix': 'balanced',  'distribution': 'balanced'}),
+                ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
+            ]
+
+            if extra:
+                config.extend(extra)
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
+            return cls.routers[-1]
+
+        # configuration:
+        # two edge routers connected via 2 interior routers.
+        # fake broker (route-container) on EB1
+        #
+        #  +-------+    +---------+    +---------+    +-------+
+        #  |  EA1  |<==>|  INT.A  |<==>|  INT.B  |<==>|  EB1  |<-- Fake Broker
+        #  +-------+    +---------+    +---------+    +-------+
+        #
+
+        cls.routers = []
+
+        interrouter_port = cls.tester.get_port()
+        cls.INTA_edge_port   = cls.tester.get_port()
+        cls.INTB_edge_port   = cls.tester.get_port()
+
+        router('INT.A', 'interior',
+               [('listener', {'role': 'inter-router', 'port': interrouter_port}),
+                ('listener', {'role': 'edge', 'port': cls.INTA_edge_port})])
+        cls.INT_A = cls.routers[0]
+        cls.INT_A.listener = cls.INT_A.addresses[0]
+
+        router('INT.B', 'interior',
+               [('connector', {'name': 'connectorToA', 'role': 'inter-router',
+                               'port': interrouter_port}),
+                ('listener', {'role': 'edge', 'port': cls.INTB_edge_port})])
+        cls.INT_B = cls.routers[1]
+        cls.INT_B.listener = cls.INT_B.addresses[0]
+
+        router('EA1', 'edge',
+               [('listener', {'name': 'rc', 'role': 'route-container',
+                              'port': cls.tester.get_port()}),
+                ('connector', {'name': 'uplink', 'role': 'edge',
+                               'port': cls.INTA_edge_port})
+               ])
+        cls.EA1 = cls.routers[2]
+        cls.EA1.listener = cls.EA1.addresses[0]
+
+        router('EB1', 'edge',
+               [('connector', {'name': 'uplink', 'role': 'edge',
+                               'port': cls.INTB_edge_port}),
+                # to connect to the fake broker
+                ('connector', {'name': 'broker',
+                               'role': 'route-container',
+                               'host': '127.0.0.1',
+                               'port': cls.tester.get_port(),
+                               'saslMechanisms': 'ANONYMOUS'}),
+                ('linkRoute', {'pattern': 'MyLinkRoute.#', 'containerId':
+                               'FakeBroker', 'direction': 'in'}),
+                ('linkRoute', {'pattern': 'MyLinkRoute.#', 'containerId':
+                               'FakeBroker', 'direction': 'out'})
+               ])
+        cls.EB1 = cls.routers[3]
+        cls.EB1.listener = cls.EB1.addresses[0]
+        cls.EB1.route_container = cls.EB1.connector_addresses[1];
+
+        cls.INT_A.wait_router_connected('INT.B')
+        cls.INT_B.wait_router_connected('INT.A')
+        cls.EA1.wait_connectors()
+
+        cls.skip = { 'test_01' : 0,
+                     'test_02' : 0,
+                     'test_03' : 0,
+                     'test_50' : 0,
+                     'test_51' : 0,
+                     'test_52' : 0
+                   }
+
+    def _get_address(self, router, address):
+        """Lookup address in route table"""
+        a_type = 'org.apache.qpid.dispatch.router.address'
+        addrs = router.management.query(a_type).get_dicts()
+        return list(filter(lambda a: a['name'].find(address) != -1,
+                           addrs))
+
+    def _wait_address_gone(self, router, address):
+        """Block until address is removed from the route table"""
+        while self._get_address(router, address):
+            sleep(0.1)
+
+    def _start_broker_EB1(self):
+        # start a new broker on EB1
+        fake_broker = FakeBroker(self.EB1.route_container)
+        # wait until the link route appears on the interior routers
+        self.INT_B.wait_address("MyLinkRoute")
+        self.INT_A.wait_address("MyLinkRoute")
+        return fake_broker
+
+    def spawn_receiver(self, router, count, address, expect=None):
+        if expect is None:
+            expect = Process.EXIT_OK
+        cmd = ["test-receiver",
+               "-a", router.listener,
+               "-c", str(count),
+               "-s", address]
+        env = dict(os.environ, PN_TRACE_FRM="1")
+        return self.popen(cmd, expect=expect, env=env)
+
+    def spawn_sender(self, router, count, address, expect=None, size=None):
+        if expect is None:
+            expect = Process.EXIT_OK
+        if size is None:
+            size = "-sm"
+        cmd = ["test-sender",
+               "-a", router.listener,
+               "-c", str(count),
+               "-t", address,
+               size]
+        env = dict(os.environ, PN_TRACE_FRM="1")
+        return self.popen(cmd, expect=expect, env=env)
+
+    def spawn_clogger(self, router, count, address,
+                      size, pause_ms, expect=None):
+        if expect is None:
+            expect = Process.EXIT_OK
+        cmd = ["clogger",
+               "-a", router.listener,
+               "-c", str(count),
+               "-t", address,
+               "-s", str(size),
+               "-D",
+               "-P", str(pause_ms)]
+        env = dict(os.environ, PN_TRACE_FRM="1")
+        return self.popen(cmd, expect=expect, env=env)
+
+    def test_01_streaming_link_route(self):
+        """
+        Verify that a streaming message can be delivered over a link route
+        """
+
+        fake_broker = self._start_broker_EB1()
+
+        rx = self.spawn_receiver(self.EB1, count=1,
+                                 address="MyLinkRoute/test-address")
+
+        # sender a streaming message, "-sx" causes the sender to generate a
+        # large streaming message
+        tx = self.spawn_sender(self.EA1, count=1,
+                               address="MyLinkRoute/test-address",
+                               expect=Process.EXIT_OK,
+                               size="-sx")
+
+        out_text, out_error = tx.communicate(timeout=TIMEOUT)
+        if tx.returncode:
+            raise Exception("Sender failed: %s %s" % (out_text, out_error))
+
+        out_text, out_error = rx.communicate(timeout=TIMEOUT)
+        if rx.returncode:
+            raise Exception("Receiver failed: %s %s" % (out_text, out_error))
+
+        fake_broker.join()
+        self.assertEqual(1, fake_broker.in_count)
+        self.assertEqual(1, fake_broker.out_count)
+
+        # cleanup - not EB1 since MyLinkRoute is configured
+        self._wait_address_gone(self.EA1, "MyLinkRoute")
+        self._wait_address_gone(self.INT_A, "MyLinkRoute")
+        self._wait_address_gone(self.INT_B, "MyLinkRoute")
+
+
+
+    def _streaming_test(self, address):
+
+        # send a streaming message to address across the routers
+        rx = self.spawn_receiver(self.EB1,
+                                 count=1,
+                                 address=address)
+        self.INT_A.wait_address(address)
+
+        tx = self.spawn_sender(self.EA1,
+                               count=1,
+                               address=address,
+                               expect=Process.EXIT_OK,
+                               size="-sx")
+        out_text, out_error = tx.communicate(timeout=TIMEOUT)
+        if tx.returncode:
+            raise Exception("Sender failed: %s %s" % (out_text, out_error))
+
+        out_text, out_error = rx.communicate(timeout=TIMEOUT)
+        if rx.returncode:
+            raise Exception("receiver failed: %s %s" % (out_text, out_error))
+
+    def test_02_streaming_closest(self):
+        """
+        Verify that a streaming message with closest treatment is forwarded
+        correctly.
+        """
+
+        self._streaming_test("closest/test-address")
+
+    def test_03_streaming_multicast(self):
+        """
+        Verify a streaming multicast message is forwarded correctly
+        """
+
+        routers = [self.EA1, self.EB1, self.INT_A, self.INT_B]
+        streaming_rx = [self.spawn_receiver(router,
+                                            count=1,
+                                            address="multicast/test-address")
+                        for router in routers]
+        self.INT_A.wait_address("multicast/test-address", subscribers=2, remotes=1)
+        self.INT_B.wait_address("multicast/test-address", subscribers=2, remotes=1)
+        self.EA1.wait_address("multicast/test-address", subscribers=1)
+        self.EB1.wait_address("multicast/test-address", subscribers=1)
+
+        # This sender will end up multicasting the message to ALL receivers.
+        tx = self.spawn_sender(self.EA1,
+                               count=1,
+                               address="multicast/test-address",
+                               expect=Process.EXIT_OK,
+                               size="-sx")
+
+        out_text, out_error = tx.communicate(timeout=TIMEOUT)
+        if tx.returncode:
+            raise Exception("sender failed: %s %s" % (out_text, out_error))
+
+        for rx in streaming_rx:
+            out_text, out_error = rx.communicate(timeout=TIMEOUT)
+            if rx.returncode:
+                raise Exception("receiver failed: %s %s" % (out_text, out_error))
+
+    def test_04_streaming_balanced(self):
+        """
+        Verify streaming balanced messages are forwarded correctly.
+        """
+        balanced_rx = [self.spawn_receiver(self.EB1,
+                                           count=1,
+                                           address="balanced/test-address")
+                       for _ in range(2)]
+        self.EB1.wait_address("balanced/test-address", subscribers=2)
+
+        tx = self.spawn_sender(self.EA1,
+                               count=2,
+                               address="balanced/test-address")
+        out_text, out_error = tx.communicate(timeout=TIMEOUT)
+        if tx.returncode:
+            raise Exception("sender failed: %s %s" % (out_text, out_error))
+
+        for rx in balanced_rx:
+            out_text, out_error = rx.communicate(timeout=TIMEOUT)
+            if rx.returncode:
+                raise Exception("receiver failed: %s %s" % (out_text, out_error))
+
+    def test_10_streaming_link_route_parallel(self):
+        """
+        Ensure that a streaming message sent across a link route does not block other
+        clients sending to the same container address.
+        """
+
+        fake_broker = self._start_broker_EB1()
+
+        clogger = self.spawn_clogger(self.EA1,
+                                     count=1,
+                                     address="MyLinkRoute/clogger",
+                                     size=self.BODY_MAX,
+                                     pause_ms=100,
+                                     expect=self.SIG_TERM)
+        sleep(0.5)  # allow clogger to set up streaming links
+
+        # start a sender in parallel
+        tx = self.spawn_sender(self.EA1, count=100, address="MyLinkRoute/clogger")
+        out_text, out_error = tx.communicate(timeout=TIMEOUT)
+        if tx.returncode:
+            raise Exception("Sender failed: %s %s" % (out_text, out_error))
+
+        clogger.terminate()
+        clogger.wait()
+
+        fake_broker.join()
+        self.assertEqual(100, fake_broker.in_count)
+
+        # cleanup - not EB1 since MyLinkRoute is configured
+        self._wait_address_gone(self.EA1, "MyLinkRoute")
+        self._wait_address_gone(self.INT_A, "MyLinkRoute")
+        self._wait_address_gone(self.INT_B, "MyLinkRoute")
+
+    def test_11_streaming_closest_parallel(self):
+        """
+        Ensure that a streaming message of closest treatment does not block
+        other non-streaming messages.
+        """
+
+        # this receiver should get the streaming message
+        rx1 = self.spawn_receiver(self.EB1,
+                                  count=1,
+                                  address="closest/test-address",
+                                  expect=self.SIG_TERM)
+
+        self.INT_A.wait_address("closest/test-address");
+
+        clogger = self.spawn_clogger(self.EA1,
+                                     count=1,
+                                     address="closest/test-address",
+                                     size=self.BODY_MAX,
+                                     pause_ms=100,
+                                     expect=self.SIG_TERM)
+        sleep(0.5)
+
+        # this receiver has less cost than rx1 since it is 1 less hop from the
+        # sender
+        rx2 = self.spawn_receiver(self.INT_A,
+                                  count=1,
+                                  address="closest/test-address")
+
+        # wait for rx2 to set up links to INT_A:
+        self.INT_A.wait_address("closest/test-address", subscribers=1, remotes=1)
+
+        # start a sender in parallel. Expect the message to arrive at rx1
+        tx = self.spawn_sender(self.EA1, count=1, address="closest/test-address")
+        out_text, out_error = tx.communicate(timeout=TIMEOUT)
+        if tx.returncode:
+            raise Exception("Sender failed: %s %s" % (out_text, out_error))
+
+        out_text, out_error = rx2.communicate(timeout=TIMEOUT)
+        if rx2.returncode:
+            raise Exception("receiver failed: %s %s" % (out_text, out_error))
+
+        rx1.terminate()
+        rx1.wait()
+
+        clogger.terminate()
+        clogger.wait()
+
+
+    def test_12_streaming_multicast_parallel(self):
+        """
+        Verify a streaming multicast message does not block other non-streaming
+        multicast messages
+
+        Start a group of receivers to consume the streaming message.  Then
+        start a separate group to consume the non-streaming message.  Ensure
+        that the second group properly receives the non-streaming message.
+        """
+
+        routers = [self.EA1, self.EB1, self.INT_A, self.INT_B]
+        streaming_rx = [self.spawn_receiver(router,
+                                            count=1,
+                                            address="multicast/test-address",
+                                            expect=self.SIG_TERM)
+                        for router in routers]
+        self.INT_A.wait_address("multicast/test-address", subscribers=2, remotes=1)
+        self.INT_B.wait_address("multicast/test-address", subscribers=2, remotes=1)
+        self.EA1.wait_address("multicast/test-address", subscribers=1)
+        self.EB1.wait_address("multicast/test-address", subscribers=1)
+
+        # this will block all of the above receivers with a streaming message
+
+        clogger = self.spawn_clogger(self.EA1,
+                                     count=1,
+                                     address="multicast/test-address",
+                                     size=self.BODY_MAX,
+                                     pause_ms=100,
+                                     expect=self.SIG_TERM)
+        sleep(0.5)
+
+        # this second set of receivers should be able to receive multicast
+        # messages sent _after_ the clogger's streaming message
+
+        blocking_rx = [self.spawn_receiver(router,
+                                           count=1,
+                                           address="multicast/test-address")
+                       for router in routers]
+        self.INT_A.wait_address("multicast/test-address", subscribers=3, remotes=1)
+        self.INT_B.wait_address("multicast/test-address", subscribers=3, remotes=1)
+        self.EA1.wait_address("multicast/test-address", subscribers=2)
+        self.EB1.wait_address("multicast/test-address", subscribers=2)
+
+        # This sender will end up multicasting the message to ALL receivers.
+        # Expect it to block since the first set of receivers will never get
+        # around to acking the message
+        tx = self.spawn_sender(self.EA1,
+                               count=1,
+                               address="multicast/test-address",
+                               expect=self.SIG_TERM)
+
+        # however the second set of receivers _should_ end up getting the
+        # message, acking it and exit (count=1)
+        for rx in blocking_rx:
+            out_text, out_error = rx.communicate(timeout=TIMEOUT)
+            if rx.returncode:
+                raise Exception("receiver failed: %s %s" % (out_text, out_error))
+
+        tx.terminate()
+        tx.wait()
+
+        for rx in streaming_rx:
+            rx.terminate()
+            rx.wait()
+
+        clogger.terminate()
+        clogger.wait()
+
+    def test_13_streaming_balanced_parallel(self):
+        """
+        Verify streaming does not block other balanced traffic.
+        """
+
+        # create 2 consumers on the balanced address. Since our Process class
+        # requires the exit code to be known when the process is spawned and we
+        # cannot predict which receiver will get the streaming message use
+        # count=2 to force the receivers to run until we force termination
+        balanced_rx = [self.spawn_receiver(self.EB1,
+                                           count=2,
+                                           address="balanced/test-address",
+                                           expect=self.SIG_TERM)
+                       for _ in range(2)]
+        self.EB1.wait_address("balanced/test-address", subscribers=2)
+
+        # this will block one of the above receivers with a streaming message
+
+        clogger = self.spawn_clogger(self.EA1,
+                                     count=1,
+                                     address="balanced/test-address",
+                                     size=self.BODY_MAX,
+                                     pause_ms=100,
+                                     expect=self.SIG_TERM)
+        sleep(0.5)
+
+        # This sender should get its message through to the other receiver
+        tx = self.spawn_sender(self.EA1,
+                               count=1,
+                               address="balanced/test-address")
+        out_text, out_error = tx.communicate(timeout=TIMEOUT)
+        if tx.returncode:
+            raise Exception("sender failed: %s %s" % (out_text, out_error))
+
+        for rx in balanced_rx:
+            rx.terminate()
+            rx.wait()
+
+        clogger.terminate()
+        clogger.wait()
+
+
 if __name__== '__main__':
     unittest.main(main_module())
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index f720423..3d28ce6 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -31,6 +31,7 @@ from proton import Message, Timeout, Delivery
 from system_test import TestCase, Process, Qdrouterd, main_module, TIMEOUT, DIR
 from system_test import AsyncTestReceiver
 from system_test import AsyncTestSender
+from system_test import get_inter_router_links
 from system_test import unittest
 
 from proton.handlers import MessagingHandler
@@ -1753,5 +1754,108 @@ class MulticastTestClient(MessagingHandler):
     def run(self):
         Container(self).run()
 
+
+class StreamingLinkScrubberTest(TestCase):
+    """
+    Verify that unused inter-router streaming links are eventually reclaimed
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        super(StreamingLinkScrubberTest, cls).setUpClass()
+
+        def router(name, extra):
+            config = [
+                ('router', {'id': 'Router%s' % name,
+                            'mode': 'interior'}),
+                ('listener', {'port': cls.tester.get_port(),
+                              'stripAnnotations': 'no'}),
+                ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+                ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
+                ('address', {'prefix': 'multicast', 'distribution': 'multicast'})
+
+            ]
+
+            if extra:
+                config.extend(extra)
+
+            config = Qdrouterd.Config(config)
+
+            # run routers in test mode to shorten the streaming link scrubber
+            # interval to 5 seconds an the maximum pool size to two links
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True, cl_args=["--test-hooks"]))
+
+        cls.routers = []
+
+        inter_router_port = cls.tester.get_port()
+
+        router('A',
+               [('listener', {'role': 'inter-router',
+                              'port': inter_router_port})])
+        cls.RouterA = cls.routers[-1]
+        cls.RouterA.listener = cls.RouterA.addresses[0]
+
+        router('B',
+               [('connector', {'name': 'connectorToA', 'role':
+                               'inter-router',
+                               'port': inter_router_port,
+                              'verifyHostname': 'no'})])
+        cls.RouterB = cls.routers[-1]
+        cls.RouterB.listener = cls.RouterB.addresses[0]
+
+        cls.RouterA.wait_router_connected('RouterB')
+        cls.RouterB.wait_router_connected('RouterA')
+
+    def test_01_streaming_link_scrubber(self):
+        """
+        Ensure extra streaming links are closed by the periodic scrubber
+        """
+        address = "closest/scrubber"
+
+        # scrubber removes at most 10 links per scan, the test pool size is 2
+        sender_count = 12
+
+        # fire up a receiver on RouterB to get 1 message from each sender:
+        env = dict(os.environ, PN_TRACE_FRM="1")
+        cmd = ["test-receiver",
+               "-a", self.RouterB.listener,
+               "-s", address,
+               "-c", str(sender_count)]
+        rx = self.popen(cmd, env=env)
+
+        self.RouterA.wait_address(address)
+
+        # remember the count of inter-router links on A before we start streaming
+        pre_count = len(get_inter_router_links(self.RouterA.listener))
+
+        # fire off the senders
+        cmd = ["test-sender",
+               "-a", self.RouterA.listener,
+               "-t", address,
+               "-c", "1",
+               "-sx"
+        ]
+        senders = [self.popen(cmd, env=env) for x in range(sender_count)]
+
+        for tx in senders:
+            out_text, out_error = tx.communicate(timeout=TIMEOUT)
+            if tx.returncode:
+                raise Exception("Sender failed: %s %s" % (out_text, out_error))
+
+        # expect: more inter-router links opened.  Should be 12 more, but
+        # depending on when the scrubber runs it may be as low as two
+        post_count = len(get_inter_router_links(self.RouterA.listener))
+        self.assertTrue(post_count > pre_count)
+
+        # expect: after 5 seconds 10 of the links should be closed and 2
+        # should remain (--test-hooks router option sets these parameters)
+        while (post_count - pre_count) > 2:
+            sleep(0.1)
+            post_count = len(get_inter_router_links(self.RouterA.listener))
+
+        out_text, out_error = rx.communicate(timeout=TIMEOUT)
+        if rx.returncode:
+            raise Exception("Receiver failed: %s %s" % (out_text, out_error))
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org