You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/07/06 19:01:45 UTC

[qpid-dispatch] branch master updated: DISPATCH-1703: correctly propagate disposition data

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a3aa93  DISPATCH-1703: correctly propagate disposition data
5a3aa93 is described below

commit 5a3aa934c7c0d26dc5136bd931e1c62b6be66c26
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Tue Jun 30 18:04:00 2020 -0400

    DISPATCH-1703: correctly propagate disposition data
---
 include/qpid/dispatch/router_core.h |  23 ++-
 src/router_core/delivery.c          |  88 ++++++------
 src/router_core/delivery.h          |   7 +-
 src/router_core/forwarder.c         |  21 ++-
 src/router_core/transfer.c          |  68 ++++-----
 src/router_node.c                   |  27 +++-
 tests/system_test.py                |   4 +-
 tests/system_tests_two_routers.py   | 277 ++++++++++++++++++++++++++++++++++++
 8 files changed, 415 insertions(+), 100 deletions(-)

diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 51806f7..14336d6 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -678,16 +678,23 @@ void qdr_link_delete(qdr_link_t *link);
  *                       to send this message.  This bitmask is created by the trace_mask module and
  *                       it built on the trace header from a received message.
  * @param ingress_index The bitmask index of the router that this delivery entered the network through.
+ * @param remote_disposition as set by sender on the transfer
+ * @param remote_disposition_state as set by sender on the transfer
  * @return Pointer to the qdr_delivery that will track the lifecycle of this delivery on this link.
  */
 qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterator_t *ingress,
-                                 bool settled, qd_bitmask_t *link_exclusion, int ingress_index);
+                                 bool settled, qd_bitmask_t *link_exclusion, int ingress_index,
+                                 uint64_t remote_disposition,
+                                 pn_data_t *remote_extension_state);
 qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
                                     qd_iterator_t *ingress, qd_iterator_t *addr,
-                                    bool settled, qd_bitmask_t *link_exclusion, int ingress_index);
+                                    bool settled, qd_bitmask_t *link_exclusion, int ingress_index,
+                                    uint64_t remote_disposition,
+                                    pn_data_t *remote_extension_state);
 qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled,
                                                 const uint8_t *tag, int tag_length,
-                                                uint64_t disposition, pn_data_t* disposition_state);
+                                                uint64_t remote_disposition,
+                                                pn_data_t *remote_extension_state);
 int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit);
 
 void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mode);
@@ -702,6 +709,16 @@ void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mo
  */
 void qdr_link_set_drained(qdr_core_t *core, qdr_link_t *link);
 
+/**
+ * Write the disposition and state data that has arrived from the remote endpoint to the delivery
+ */
+void qdr_delivery_set_remote_extension_state(qdr_delivery_t *dlv, uint64_t remote_dispo, pn_data_t *remote_ext_state);
+
+/**
+ * Extract the disposition and state data that is to be sent to the remote endpoint via the delivery
+ */
+pn_data_t *qdr_delivery_take_local_extension_state(qdr_delivery_t *dlv, uint64_t *dispo);
+
 typedef void (*qdr_link_first_attach_t)  (void *context, qdr_connection_t *conn, qdr_link_t *link,
                                           qdr_terminus_t *source, qdr_terminus_t *target,
                                           qd_session_class_t);
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index e3bc5f4..662ac2d 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -26,7 +26,6 @@ ALLOC_DEFINE(qdr_delivery_t);
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_delivery_free_extension_state(qdr_delivery_t *delivery);
 static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
                                          qdr_delivery_t *peer, uint64_t new_disp, bool settled,
@@ -198,7 +197,7 @@ void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *deliver
     action->args.delivery.error       = error;
 
     // handle delivery-state extensions e.g. declared, transactional-state
-    qdr_delivery_set_extension_state(delivery, disposition, ext_state, false);
+    qdr_delivery_set_remote_extension_state(delivery, disposition, ext_state);
 
     //
     // The delivery's ref_count must be incremented to protect its travels into the
@@ -390,7 +389,9 @@ void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delive
                 core->modified_deliveries++;
         }
 
-        qd_log(core->log, QD_LOG_DEBUG, "Delivery outcome for%s: dlv:%lx link:%"PRIu64" is %s", delivery->presettled?" pre-settled":"", (long) delivery,  link->identity, pn_disposition_type_name(outcome));
+        qd_log(core->log, QD_LOG_DEBUG, "Delivery outcome for%s: dlv:%lx link:%"PRIu64" is %s (0x%"PRIX64")",
+               delivery->presettled?" pre-settled":"", (long) delivery,  link->identity,
+               pn_disposition_type_name(outcome), outcome);
 
         uint32_t delay = core->uptime_ticks - delivery->ingress_time;
         if (delay > 10) {
@@ -480,7 +481,10 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de
 
     qd_bitmask_free(delivery->link_exclusion);
     qdr_error_free(delivery->error);
-    qdr_delivery_free_extension_state(delivery);
+    if (delivery->remote_extension_state)
+        pn_data_free(delivery->remote_extension_state);
+    if (delivery->local_extension_state)
+        pn_data_free(delivery->local_extension_state);
 
     free_qdr_delivery_t(delivery);
 }
@@ -726,10 +730,7 @@ static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv
             peer->error       = error;
             push = true;
             error_assigned = true;
-            qdr_delivery_set_extension_state(peer,
-                                             dlv->remote_disposition,
-                                             dlv->extension_state,
-                                             false);
+            qdr_delivery_move_extension_state_CT(dlv, peer);
         }
     }
 
@@ -1213,57 +1214,48 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
 }
 
 
-pn_data_t *qdr_delivery_extension_state(qdr_delivery_t *delivery)
-{
-    if (delivery->extension_state)
-        pn_data_rewind(delivery->extension_state);
-    return delivery->extension_state;
-}
-
-
-void qdr_delivery_free_extension_state(qdr_delivery_t *delivery)
+// copy remote disposition and extension state into a delivery
+//
+void qdr_delivery_set_remote_extension_state(qdr_delivery_t *dlv, uint64_t remote_dispo, pn_data_t *remote_ext_state)
 {
-    if (delivery->extension_state) {
-        pn_data_free(delivery->extension_state);
-        delivery->extension_state = 0;
-    }
-}
-
+    if (dlv->remote_extension_state)
+        // once set the I/O thread cannot overwrite this until the core has forwarded it
+        return;
 
-// copy local disposition data into proton delivery
-void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv, bool update_disposition)
-{
-    if (dlv->disposition > PN_MODIFIED) {
-        pn_data_t *src = dlv->extension_state;
-        if (src) {
-            pn_data_copy(pn_disposition_data(pn_delivery_local(pdlv)), src);
-            qdr_delivery_free_extension_state(dlv);
+    if (remote_dispo > PN_MODIFIED) {  // set only if non-terminal outcome
+        const size_t esize = pn_data_size(remote_ext_state);
+        if (esize) {
+            // note: performance tests show that creating a new pn_data instance
+            // can be expensive, so only do so if there is actually extension state
+            // data to copy
+            dlv->remote_extension_state = pn_data(esize);
+            pn_data_copy(dlv->remote_extension_state, remote_ext_state);
         }
-        if (update_disposition) pn_delivery_update(pdlv, dlv->disposition);
     }
 }
 
-void qdr_delivery_export_transfer_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv)
-{
-    qdr_delivery_write_extension_state(dlv, pdlv, true);
-}
-
 
-void qdr_delivery_export_disposition_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv)
+// take local disposition and extension state from the delivery
+//
+pn_data_t *qdr_delivery_take_local_extension_state(qdr_delivery_t *dlv, uint64_t *dispo)
 {
-    qdr_delivery_write_extension_state(dlv, pdlv, false);
+    pn_data_t *ext_state = dlv->local_extension_state;
+    dlv->local_extension_state = 0;
+    if (dispo) *dispo = dlv->disposition;
+    return ext_state;
 }
 
 
-void qdr_delivery_set_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_data, bool update_disposition)
+// move the REMOTE extension state from a delivery to the LOCAL extension state
+// of its peer delivery.  This causes the extension state data to propagate
+// from one delivery to another.
+//
+void qdr_delivery_move_extension_state_CT(qdr_delivery_t *dlv, qdr_delivery_t *peer)
 {
-    if (disposition > PN_MODIFIED) {
-        if (disposition_data) {
-            pn_data_rewind(disposition_data);
-            if (!dlv->extension_state)
-                dlv->extension_state = pn_data(0);
-            pn_data_copy(dlv->extension_state, disposition_data);
-        }
-        if (update_disposition) dlv->disposition = disposition;
+    // if extension_state is already present do not overwrite it as the outgoing
+    // I/O thread may be in the process of writing it to proton
+    if (!peer->local_extension_state) {
+        peer->local_extension_state = dlv->remote_extension_state;
+        dlv->remote_extension_state = 0;
     }
 }
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index 4c6e646..ce8fd60 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -47,7 +47,8 @@ struct qdr_delivery_t {
     uint64_t                remote_disposition;  ///< disposition as set by remote endpoint
     uint64_t                mcast_disposition;   ///< temporary terminal disposition while multicast fwding
     uint32_t                ingress_time;
-    pn_data_t              *extension_state;     ///< delivery-state in disposition performative
+    pn_data_t              *remote_extension_state;  ///< extension state from peer endpoint
+    pn_data_t              *local_extension_state;   ///< extension state to send to peer endpoint
     qdr_error_t            *error;
     bool                    settled;
     bool                    presettled; /// Proton does not have a notion of pre-settled. This flag is introduced in Dispatch and should exclusively be used only to update management counters like presettled delivery counts on links etc. This flag DOES NOT represent the remote settlement state of the delivery.
@@ -100,9 +101,7 @@ bool qdr_delivery_presettled(const qdr_delivery_t *delivery);
 void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label);
 
 pn_data_t *qdr_delivery_extension_state(qdr_delivery_t *dlv);
-void qdr_delivery_set_extension_state(qdr_delivery_t *dlv, uint64_t disposition, pn_data_t* disposition_data, bool update_disposition);
-void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv, bool update_disposition);
-
+void qdr_delivery_move_extension_state_CT(qdr_delivery_t *dlv, qdr_delivery_t *peer);
 
 //
 // I/O thread only functions
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 5f7a517..0dcc086 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -167,14 +167,25 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in
     ZERO(out_dlv);
     set_safe_ptr_qdr_link_t(out_link, &out_dlv->link_sp);
     out_dlv->msg        = qd_message_copy(msg);
-    out_dlv->settled    = !in_dlv || in_dlv->settled;
+
+    if (in_dlv) {
+        out_dlv->settled       = in_dlv->settled;
+        out_dlv->ingress_time  = in_dlv->ingress_time;
+        out_dlv->ingress_index = in_dlv->ingress_index;
+        if (in_dlv->remote_disposition) {
+            // propagate from disposition state from remote to peer
+            out_dlv->disposition = in_dlv->remote_disposition;
+            qdr_delivery_move_extension_state_CT(in_dlv, out_dlv);
+        }
+    } else {
+        out_dlv->settled       = true;
+        out_dlv->ingress_time  = core->uptime_ticks;
+        out_dlv->ingress_index = -1;
+    }
+
     out_dlv->presettled = out_dlv->settled;
     *tag                = core->next_tag++;
     out_dlv->tag_length = 8;
-    out_dlv->error      = 0;
-
-    out_dlv->ingress_time  = in_dlv ? in_dlv->ingress_time  : core->uptime_ticks;
-    out_dlv->ingress_index = in_dlv ? in_dlv->ingress_index : -1;
 
     //
     // Add one to the message fanout. This will later be used in the qd_message_send function that sends out messages.
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 456bd25..7d7aa9c 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -39,23 +39,25 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 //==================================================================================
 
 qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterator_t *ingress,
-                                 bool settled, qd_bitmask_t *link_exclusion, int ingress_index)
+                                 bool settled, qd_bitmask_t *link_exclusion, int ingress_index,
+                                 uint64_t remote_disposition,
+                                 pn_data_t *remote_extension_state)
 {
     qdr_action_t   *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
     ZERO(dlv);
     set_safe_ptr_qdr_link_t(link, &dlv->link_sp);
-    dlv->msg            = msg;
-    dlv->to_addr        = 0;
-    dlv->origin         = ingress;
-    dlv->settled        = settled;
-    dlv->presettled     = settled;
-    dlv->link_exclusion = link_exclusion;
-    dlv->ingress_index  = ingress_index;
-    dlv->error          = 0;
-    dlv->disposition    = 0;
-
+    dlv->msg                = msg;
+    dlv->origin             = ingress;
+    dlv->settled            = settled;
+    dlv->presettled         = settled;
+    dlv->link_exclusion     = link_exclusion;
+    dlv->ingress_index      = ingress_index;
+    dlv->remote_disposition = remote_disposition;
+
+    if (remote_disposition)
+        qdr_delivery_set_remote_extension_state(dlv, remote_disposition, remote_extension_state);
     qdr_delivery_incref(dlv, "qdr_link_deliver - newly created delivery, add to action list");
     qdr_delivery_incref(dlv, "qdr_link_deliver - protect returned value");
 
@@ -68,23 +70,26 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterato
 
 qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
                                     qd_iterator_t *ingress, qd_iterator_t *addr,
-                                    bool settled, qd_bitmask_t *link_exclusion, int ingress_index)
+                                    bool settled, qd_bitmask_t *link_exclusion, int ingress_index,
+                                    uint64_t remote_disposition,
+                                    pn_data_t *remote_extension_state)
 {
     qdr_action_t   *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
     ZERO(dlv);
     set_safe_ptr_qdr_link_t(link, &dlv->link_sp);
-    dlv->msg            = msg;
-    dlv->to_addr        = addr;
-    dlv->origin         = ingress;
-    dlv->settled        = settled;
-    dlv->presettled     = settled;
-    dlv->link_exclusion = link_exclusion;
-    dlv->ingress_index  = ingress_index;
-    dlv->error          = 0;
-    dlv->disposition    = 0;
-
+    dlv->msg                = msg;
+    dlv->to_addr            = addr;
+    dlv->origin             = ingress;
+    dlv->settled            = settled;
+    dlv->presettled         = settled;
+    dlv->link_exclusion     = link_exclusion;
+    dlv->ingress_index      = ingress_index;
+    dlv->remote_disposition = remote_disposition;
+
+    if (remote_disposition)
+        qdr_delivery_set_remote_extension_state(dlv, remote_disposition, remote_extension_state);
     qdr_delivery_incref(dlv, "qdr_link_deliver_to - newly created delivery, add to action list");
     qdr_delivery_incref(dlv, "qdr_link_deliver_to - protect returned value");
 
@@ -97,20 +102,21 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
 
 qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled,
                                                 const uint8_t *tag, int tag_length,
-                                                uint64_t disposition, pn_data_t* disposition_data)
+                                                uint64_t remote_disposition,
+                                                pn_data_t* remote_extension_state)
 {
     qdr_action_t   *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
     ZERO(dlv);
     set_safe_ptr_qdr_link_t(link, &dlv->link_sp);
-    dlv->msg          = msg;
-    dlv->settled      = settled;
-    dlv->presettled   = settled;
-    dlv->error        = 0;
-    dlv->disposition  = 0;
+    dlv->msg                = msg;
+    dlv->settled            = settled;
+    dlv->presettled         = settled;
+    dlv->remote_disposition = remote_disposition;
 
-    qdr_delivery_set_extension_state(dlv, disposition, disposition_data, true);
+    if (remote_disposition)
+        qdr_delivery_set_remote_extension_state(dlv, remote_disposition, remote_extension_state);
     qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - newly created delivery, add to action list");
     qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - protect returned value");
 
@@ -679,10 +685,6 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
         // If this is an attach-routed link, put the delivery directly onto the peer link
         //
         qdr_delivery_t *peer = qdr_forward_new_delivery_CT(core, dlv, link->connected_link, dlv->msg);
-        qdr_delivery_set_extension_state(peer,
-                                         dlv->remote_disposition,
-                                         qdr_delivery_extension_state(dlv),
-                                         true);
 
         //
         // Copy the delivery tag.  For link-routing, the delivery tag must be preserved.
diff --git a/src/router_node.c b/src/router_node.c
index cc04ea3..9579241 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -480,7 +480,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
                                                    pn_delivery_settled(pnd),
                                                    (uint8_t*) dtag.start,
                                                    dtag.size,
-                                                   pn_disposition_type(pn_delivery_remote(pnd)),
+                                                   pn_delivery_remote_state(pnd),
                                                    pn_disposition_data(pn_delivery_remote(pnd)));
         qd_link_set_incoming_msg(link, (qd_message_t*) 0);  // msg no longer exclusive to qd_link
         qdr_node_connect_deliveries(link, delivery, pnd);
@@ -660,7 +660,9 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
                 if (phase > 0)
                     qd_iterator_annotate_phase(addr_iter, '0' + (char) phase);
                 delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd),
-                                               link_exclusions, ingress_index);
+                                               link_exclusions, ingress_index,
+                                               pn_delivery_remote_state(pnd),
+                                               pn_disposition_data(pn_delivery_remote(pnd)));
             } else {
                 //reject
                 qd_log(router->log_source, QD_LOG_DEBUG, "Message rejected due to policy violation on target. User:%s", conn->user_id);
@@ -707,7 +709,9 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
             if (phase != 0)
                 qd_message_set_phase_annotation(msg, phase);
         }
-        delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions, ingress_index);
+        delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions, ingress_index,
+                                    pn_delivery_remote_state(pnd),
+                                    pn_disposition_data(pn_delivery_remote(pnd)));
     }
 
     //
@@ -1758,6 +1762,8 @@ static uint64_t CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_
     if (!qdr_delivery_tag_sent(dlv)) {
         const char *tag;
         int         tag_length;
+        uint64_t    disposition = 0;
+        pn_data_t   *extension_state = 0;
 
         qdr_delivery_tag(dlv, &tag, &tag_length);
 
@@ -1767,7 +1773,13 @@ static uint64_t CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_
         pdlv = pn_link_current(plink);
 
         // handle any delivery-state on the transfer e.g. transactional-state
-        qdr_delivery_write_extension_state(dlv, pdlv, true);
+        extension_state = qdr_delivery_take_local_extension_state(dlv, &disposition);
+        if (extension_state) {
+            pn_data_copy(pn_disposition_data(pn_delivery_local(pdlv)), extension_state);
+            pn_data_free(extension_state);
+        }
+        if (disposition)
+            pn_delivery_update(pdlv, disposition);
 
         //
         // If the remote send settle mode is set to 'settled', we should settle the delivery on behalf of the receiver.
@@ -1897,7 +1909,12 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di
 
         if (disp == PN_MODIFIED)
             pn_disposition_set_failed(pn_delivery_local(pnd), true);
-        qdr_delivery_write_extension_state(dlv, pnd, false);
+
+        pn_data_t *extension_state = qdr_delivery_take_local_extension_state(dlv, 0);
+        if (extension_state) {
+            pn_data_copy(pn_disposition_data(pn_delivery_local(pnd)), extension_state);
+            pn_data_free(extension_state);
+        }
 
         //
         // If the delivery is still arriving, don't push out the disposition change yet.
diff --git a/tests/system_test.py b/tests/system_test.py
index 216f7f6..7cac6d3 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -915,7 +915,7 @@ class AsyncTestReceiver(MessagingHandler):
             raise Exception("Timed out waiting for receiver start")
 
     def _main(self):
-        self._container.timeout = 5.0
+        self._container.timeout = 0.5
         self._container.start()
         while self._container.process():
             if self._stop_thread:
@@ -1000,7 +1000,7 @@ class AsyncTestSender(MessagingHandler):
         self._thread.start()
 
     def _main(self):
-        self._container.timeout = 5.0
+        self._container.timeout = 0.5
         self._container.start()
         while self._container.process():
             self._check_if_done()
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index 4d85ea3..02fae99 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -33,6 +33,7 @@ from system_test import AsyncTestReceiver
 from system_test import AsyncTestSender
 from system_test import get_inter_router_links
 from system_test import unittest
+from test_broker import FakeService
 
 from proton.handlers import MessagingHandler
 from proton.reactor import Container, AtLeastOnce
@@ -1895,5 +1896,281 @@ class StreamingLinkScrubberTest(TestCase):
         if rx.returncode:
             raise Exception("Receiver failed: %s %s" % (out_text, out_error))
 
+
+class TwoRouterExtensionStateTest(TestCase):
+    """
+    Verify that routers propagate extended Disposition state correctly.
+    See DISPATCH-1703
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        super(TwoRouterExtensionStateTest, cls).setUpClass()
+
+        def router(name, extra_config):
+
+            config = [
+                ('router', {'mode': 'interior',
+                            'id': name}),
+
+                ('listener', {'port': cls.tester.get_port() }),
+
+                ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+                ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
+                ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+            ] + extra_config
+
+            config = Qdrouterd.Config(config)
+            return cls.tester.qdrouterd(name, config, wait=False)
+
+        inter_router_port = cls.tester.get_port()
+        service_port = cls.tester.get_port()
+
+        cls.RouterA = router('RouterA',
+                             [
+                                 ('listener', {'role': 'inter-router',
+                                               'host': '0.0.0.0',
+                                               'port': inter_router_port,
+                                               'saslMechanisms': 'ANONYMOUS'}),
+                             ])
+
+        cls.RouterB = router('RouterB',
+                             [
+                                 ('connector', {'name': 'toRouterA',
+                                                'role': 'inter-router',
+                                                'port': inter_router_port}),
+
+
+
+                                 ('listener', {'role': 'route-container',
+                                               'host': '0.0.0.0',
+                                               'port': service_port,
+                                               'saslMechanisms': 'ANONYMOUS'}),
+
+                                 ('linkRoute', {'prefix': 'RoutieMcRouteFace',
+                                                'containerId': 'FakeService',
+                                                'direction': 'in'}),
+                                 ('linkRoute', {'prefix': 'RoutieMcRouteFace',
+                                                'containerId': 'FakeService',
+                                                'direction': 'out'}),
+                             ])
+
+
+        cls.RouterA.wait_router_connected('RouterB')
+        cls.RouterB.wait_router_connected('RouterA')
+
+    def test_01_link_route(self):
+        """
+        Verify non-terminal state and data propagates over a link route
+        """
+        class MyExtendedService(FakeService):
+            """
+            This service saves any outcome and extension data that arrives in a
+            transfer
+            """
+            def __init__(self, url, container_id=None):
+                self.remote_state = None
+                self.remote_data = None
+                super(MyExtendedService, self).__init__(url, container_id)
+
+            def on_message(self, event):
+                self.remote_state = event.delivery.remote_state;
+                self.remote_data = event.delivery.remote.data;
+                super(MyExtendedService, self).on_message(event)
+
+
+        fs = MyExtendedService(self.RouterB.addresses[1],
+                               container_id="FakeService")
+        self.RouterA.wait_address("RoutieMcRouteFace", remotes=1)
+
+        tx = MyExtendedSender(self.RouterA.addresses[0],
+                              "RoutieMcRouteFace")
+        tx.wait()
+        fs.join()
+        self.assertEqual(999, fs.remote_state)
+        self.assertEqual([1, 2, 3], fs.remote_data)
+
+    def test_02_closest(self):
+        """
+        Verify non-terminal state and data propagates over anycase
+        """
+        test = ExtensionStateTester(self.RouterA.addresses[0],
+                                    self.RouterB.addresses[0],
+                                    "closest/fleabag")
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_03_multicast(self):
+        """
+        Verify that disposition state set by the publisher is available to all
+        consumers
+        """
+        rxs = [MyExtendedReceiver(self.RouterA.addresses[0],
+                                  "multicast/thingy")
+               for x in range(3)]
+        self.RouterA.wait_address("multicast/thingy", subscribers=3)
+        sleep(0.5)  # let subscribers grant credit
+        tx = MyExtendedSender(self.RouterB.addresses[0],
+                              "multicast/thingy")
+        tx.wait()
+
+        # DISPATCH-1705: only one of the receivers gets the data, but all
+        # should get the state
+
+        ext_data = None
+        for rx in rxs:
+            rx.stop()
+            try:
+                while True:
+                    dispo = rx.remote_states.pop()
+                    self.assertEqual(999, dispo[0])
+                    ext_data = dispo[1] or ext_data
+            except IndexError:
+                pass
+        self.assertEqual([1, 2, 3], ext_data)
+
+
+class MyExtendedSender(AsyncTestSender):
+    """
+    This sender sets a non-terminal outcome and data on the outgoing
+    transfer
+    """
+    def on_sendable(self, event):
+        if self.sent < self.total:
+            dlv = event.sender.delivery(str(self.sent))
+            dlv.local.data = [1, 2, 3]
+            dlv.update(999)
+            event.sender.stream(self._message.encode())
+            event.sender.advance()
+            self.sent += 1
+
+
+class MyExtendedReceiver(AsyncTestReceiver):
+    """
+    This receiver stores any remote delivery state that arrives with a message
+    transfer
+    """
+    def __init__(self, *args, **kwargs):
+        self.remote_states = []
+        super(MyExtendedReceiver, self).__init__(*args, **kwargs)
+
+    def on_message(self, event):
+        self.remote_states.append((event.delivery.remote_state,
+                                   event.delivery.remote.data))
+        super(MyExtendedReceiver, self).on_message(event)
+
+
+class ExtensionStateTester(MessagingHandler):
+    """
+    Verify the routers propagate non-terminal outcome and extended state
+    disposition information in both message transfer and disposition frames.
+
+    This tester creates a receiver and a sender link to a given address.
+
+    The sender transfers a message with a non-terminal delivery state and
+    associated extension data.  The receiver expects to find this state in the
+    incoming delivery.
+
+    The receiver then responds with a non-terminal disposition that also has
+    extension state data.  The sender expects to find this new state associated
+    with its delivery.
+    """
+    def __init__(self, ingress_router, egress_router, address):
+        super(ExtensionStateTester, self).__init__(auto_settle=False,
+                                                   auto_accept=False)
+        self._in_router = ingress_router
+        self._out_router = egress_router
+        self._address = address
+        self._sender_conn = None
+        self._recvr_conn = None
+        self._sender = None
+        self._receiver = None
+        self._sent = 0
+        self._received = 0
+        self._settled = 0
+        self._total = 10
+        self._message = Message(body="XYZ" * (1024 * 1024 * 2))
+        self.error = None
+
+    def on_start(self, event):
+        self._reactor = event.reactor
+        self._sender_conn = event.container.connect(self._in_router)
+        self._sender = event.container.create_sender(self._sender_conn,
+                                                     target=self._address,
+                                                     name="ExtensionSender")
+        self._recvr_conn = event.container.connect(self._out_router)
+        self._receiver = event.container.create_receiver(self._recvr_conn,
+                                                         source=self._address,
+                                                         name="ExtensionReceiver")
+    def _done(self, error=None):
+        self.error = error or self.error
+        self._sender.close()
+        self._sender_conn.close()
+        self._receiver.close()
+        self._recvr_conn.close()
+
+    def on_sendable(self, event):
+        if self._sent < self._total:
+            self._sent += 1
+            dlv = event.sender.delivery(str(self._sent))
+            dlv.local.data = [1, 2, 3, self._sent]
+            dlv.update(666)  # non-terminal state
+            self._message.id = self._sent
+            event.sender.stream(self._message.encode())
+            event.sender.advance()
+
+    def on_message(self, event):
+        dlv = event.delivery
+        msg_id = event.message.id
+        if dlv.remote_state != 666:
+            return self._done(error="Unexpected outcome '%s', expected '666'"
+                              % dlv.remote_state)
+        remote_data = dlv.remote.data
+        expected_data = [1, 2, 3, msg_id]
+        if remote_data != expected_data:
+            return self._done(error="Unexpected dispo data '%s', expected '%s'"
+                              % (remote_data, expected_data))
+
+        # send back a non-terminal outcome and more data
+        dlv.local.data = [10, 9, 8, msg_id]
+        dlv.update(777)
+        self._received += 1
+
+    def _handle_sender_update(self, event):
+        dlv = event.delivery
+        if dlv.local_state != 666 or len(dlv.local.data) != 4:
+            return self._done(error="Unexpected local state at sender: %s %s" %
+                              (dlv.local_state, dlv.local.data))
+
+        if dlv.remote_state != 777 or len(dlv.remote.data) != 4:
+            return self._done(error="Unexpected remote state at sender: %s %s" %
+                              (dlv.remote_state, dlv.remote.data))
+        dlv.settle()
+
+    def _handle_receiver_update(self, event):
+        dlv = event.delivery
+        if dlv.settled:
+            if dlv.local_state != 777 or len(dlv.local.data) != 4:
+                return self._done(error="Unexpected local state at sender: %s %s" %
+                                  (dlv.local_state, dlv.local.data))
+
+            if dlv.remote_state != 666 or len(dlv.remote.data) != 4:
+                return self._done(error="Unexpected remote state at sender: %s %s" %
+                                  (dlv.remote_state, dlv.remote.data))
+            dlv.settle()
+            self._settled += 1
+            if self._settled == self._total:
+                self._done()
+
+    def on_delivery(self, event):
+        if event.delivery.link.is_sender:
+            self._handle_sender_update(event)
+        else:
+            self._handle_receiver_update(event)
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org