You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/08/27 17:07:48 UTC

[qpid-dispatch] branch master updated: DISPATCH-1266: Fix unsettled multicast forwarding

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d8d706  DISPATCH-1266: Fix unsettled multicast forwarding
9d8d706 is described below

commit 9d8d706c948ba10c4609009e7cb89b2a450190a9
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Feb 27 11:39:58 2019 -0500

    DISPATCH-1266: Fix unsettled multicast forwarding
    
    This closes #554
---
 src/router_core/connections.c        |  47 +++-
 src/router_core/core_link_endpoint.c |   2 +-
 src/router_core/delivery.c           | 413 +++++++++++++++++++++++++++----
 src/router_core/delivery.h           |  26 +-
 src/router_core/forwarder.c          |  33 ---
 src/router_core/transfer.c           |  49 ++--
 src/router_node.c                    |  25 +-
 tests/system_test.py                 |  13 +-
 tests/system_tests_multicast.py      | 461 ++++++++++++++++++++++++++++++-----
 tests/system_tests_one_router.py     |  70 ++++--
 10 files changed, 931 insertions(+), 208 deletions(-)

diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index e62e609..8c97ec9 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -730,22 +730,26 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
     qdr_delivery_t *peer;
     while (dlv) {
         DEQ_REMOVE_HEAD(undelivered);
+
+        // expect: an inbound undelivered multicast should
+        // have no peers (has not been forwarded yet)
+        assert(dlv->multicast
+               ? qdr_delivery_peer_count_CT(dlv) == 0
+               : true);
+
         peer = qdr_delivery_first_peer_CT(dlv);
         while (peer) {
             if (peer->multicast) {
                 //
-                // If the address of the delivery is a multicast address and if there are no receivers for this address, the peer delivery must be released.
-                //
-                // If the address of the delivery is a multicast address and there is at least one other receiver for the address, dont do anything
+                // dlv is outgoing mcast - tell its incoming peer that it has
+                // been released and settled.  This will unlink these peers.
                 //
-                if (DEQ_SIZE(peer->peers) == 1 || peer->peer)  {
-                    qdr_delivery_release_CT(core, peer);
-                }
+                qdr_delivery_mcast_outbound_update_CT(core, peer, dlv, PN_RELEASED, true);
             }
             else {
                 qdr_delivery_release_CT(core, peer);
+                qdr_delivery_unlink_peers_CT(core, dlv, peer);
             }
-            qdr_delivery_unlink_peers_CT(core, dlv, peer);
             peer = qdr_delivery_next_peer_CT(dlv);
         }
 
@@ -785,12 +789,29 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
             qdr_deliver_continue_peers_CT(core, dlv);
         }
 
-        peer = qdr_delivery_first_peer_CT(dlv);
-        while (peer) {
-            if (link->link_direction == QD_OUTGOING)
-                qdr_delivery_failed_CT(core, peer);
-            qdr_delivery_unlink_peers_CT(core, dlv, peer);
-            peer = qdr_delivery_next_peer_CT(dlv);
+        if (dlv->multicast) {
+            //
+            // forward settlement
+            //
+            qdr_delivery_mcast_inbound_update_CT(core, dlv,
+                                                 PN_MODIFIED,
+                                                 true);  // true == settled
+        } else {
+            peer = qdr_delivery_first_peer_CT(dlv);
+            while (peer) {
+                if (peer->multicast) {
+                    //
+                    // peer is incoming multicast and dlv is one of its corresponding
+                    // outgoing deliveries.  This will unlink these peers.
+                    //
+                    qdr_delivery_mcast_outbound_update_CT(core, peer, dlv, PN_MODIFIED, true);
+                } else {
+                    if (link->link_direction == QD_OUTGOING)
+                        qdr_delivery_failed_CT(core, peer);
+                    qdr_delivery_unlink_peers_CT(core, dlv, peer);
+                }
+                peer = qdr_delivery_next_peer_CT(dlv);
+            }
         }
 
         //
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index 351ee28..268887d 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -194,7 +194,7 @@ void qdrc_endpoint_do_deliver_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_deli
 void qdrc_endpoint_do_update_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t *dlv, bool settled)
 {
     if (!!ep->desc->on_update)
-        ep->desc->on_update(ep->link_context, dlv, settled, dlv->disposition);
+        ep->desc->on_update(ep->link_context, dlv, settled, dlv->remote_disposition);
 }
 
 
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 049dcbb..3992585 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -26,10 +26,12 @@ ALLOC_DEFINE(qdr_delivery_t);
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static bool qdr_delivery_has_peer_CT(qdr_delivery_t *dlv);
 static pn_data_t *qdr_delivery_extension_state(qdr_delivery_t *delivery);
 static void qdr_delivery_free_extension_state(qdr_delivery_t *delivery);
 static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
+                                         qdr_delivery_t *peer, uint64_t new_disp, bool settled,
+                                         qdr_error_t *error);
 
 
 void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context)
@@ -82,6 +84,7 @@ bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery)
 }
 
 
+// set the local disposition (to be send to remote endpoint)
 void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition)
 {
     if (delivery)
@@ -89,6 +92,7 @@ void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition
 }
 
 
+// get the current local disposition
 uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery)
 {
     if (!delivery)
@@ -169,8 +173,9 @@ bool qdr_delivery_presettled(const qdr_delivery_t *delivery)
 }
 
 
-void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition,
-                                     bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given)
+// remote endpoint modified its disposition and/or settlement
+void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition,
+                                       bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given)
 {
     qdr_action_t *action = qdr_action(qdr_update_delivery_CT, "update_delivery");
     action->args.delivery.delivery    = delivery;
@@ -420,18 +425,21 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de
 
 }
 
-static bool qdr_delivery_has_peer_CT(qdr_delivery_t *dlv)
+
+// Returns the number of peers for dlv
+int qdr_delivery_peer_count_CT(const qdr_delivery_t *dlv)
 {
-    return dlv->peer || DEQ_SIZE(dlv->peers) > 0;
+    return (dlv->peer) ? 1 : DEQ_SIZE(dlv->peers);
 }
 
+
 void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv)
 {
     // If there is no delivery or a peer, we cannot link each other.
     if (!in_dlv || !out_dlv)
         return;
 
-    if (!qdr_delivery_has_peer_CT(in_dlv)) {
+    if (qdr_delivery_peer_count_CT(in_dlv) == 0) {
         // This is the very first peer. Link them up.
         assert(!out_dlv->peer);
         in_dlv->peer = out_dlv;
@@ -484,6 +492,9 @@ void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_del
             peer_ref = DEQ_NEXT(peer_ref);
         }
         assert(peer_ref != 0);
+        if (peer_ref == dlv->next_peer_ref)
+            // ok to unlink peers while iterating over them
+            dlv->next_peer_ref = DEQ_NEXT(peer_ref);
         qdr_del_delivery_ref(&dlv->peers, peer_ref);
     }
 
@@ -498,6 +509,9 @@ void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_del
             peer_ref = DEQ_NEXT(peer_ref);
         }
         assert(peer_ref != 0);
+        if (peer_ref == peer->next_peer_ref)
+            // ok to unlink peers while iterating over them
+            peer->next_peer_ref = DEQ_NEXT(peer_ref);
         qdr_del_delivery_ref(&peer->peers, peer_ref);
     }
 
@@ -509,7 +523,7 @@ void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_del
 qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv)
 {
     // What if there are no peers for this delivery?
-    if (!qdr_delivery_has_peer_CT(dlv))
+    if (qdr_delivery_peer_count_CT(dlv) == 0)
         return 0;
 
     if (dlv->peer) {
@@ -558,39 +572,95 @@ void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *dlv, const char *l
 }
 
 
+// the remote endpoint has change the state (disposition) or settlement for the
+// delivery.  Update the local state/settlement accordingly.
+//
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
-    qdr_delivery_t *dlv        = action->args.delivery.delivery;
-    qdr_delivery_t *peer       = qdr_delivery_first_peer_CT(dlv);
-    bool            push       = false;
-    bool            peer_moved = false;
-    bool            dlv_moved  = false;
-    uint64_t        disp       = action->args.delivery.disposition;
-    bool            settled    = action->args.delivery.settled;
-    qdr_error_t    *error      = action->args.delivery.error;
-    bool error_unassigned      = true;
+    if (discard)
+        return;
+
+    qdr_delivery_t *dlv      = action->args.delivery.delivery;
+    qdr_delivery_t *peer     = qdr_delivery_first_peer_CT(dlv);
+    uint64_t        new_disp = action->args.delivery.disposition;
+    bool            settled  = action->args.delivery.settled;
+    qdr_error_t    *error    = action->args.delivery.error;
+    bool free_error          = true;
+
+    if (dlv->multicast) {
+        //
+        // remote state change for *inbound* multicast delivery,
+        // update downstream *outbound* peers
+        //
+        qdr_delivery_mcast_inbound_update_CT(core, dlv, new_disp, settled);
+
+    } else if (peer && peer->multicast) {
+        //
+        // remote state change for an *outbound* delivery to a multicast address,
+        // propagate upstream to *inbound* delivery (peer)
+        //
+        // coverity[swapped_arguments]
+        qdr_delivery_mcast_outbound_update_CT(core, peer, dlv, new_disp, settled);
+
+    } else {
+        //
+        // Anycast forwarding
+        //
+        free_error = !qdr_delivery_anycast_update_CT(core, dlv, peer, new_disp, settled, error);
+    }
+
+    //
+    // Release the action reference, possibly freeing the delivery
+    //
+    qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from action");
+
+    if (free_error)
+        qdr_error_free(error);
+}
+
+
+// The remote delivery state (disposition) and/or remote settlement for an
+// anycast delivery has changed.  Propagate the changes to its peer delivery.
+//
+// returns true if ownership of error parameter is taken (do not free it)
+//
+static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
+                                           qdr_delivery_t *peer, uint64_t new_disp, bool settled,
+                                           qdr_error_t *error)
+{
+    bool push           = false;
+    bool peer_moved     = false;
+    bool dlv_moved      = false;
+    bool error_assigned = false;
+    qdr_link_t *dlink   = qdr_delivery_link(dlv);
+
+    assert(!dlv->multicast);
+    assert(!peer || !peer->multicast);
 
-    qdr_link_t *dlv_link  = qdr_delivery_link(dlv);
-    qdr_link_t *peer_link = qdr_delivery_link(peer);
+    if (peer)
+        qdr_delivery_incref(peer, "qdr_delivery_anycast_update_CT - prevent peer from being freed");
 
     //
-    // Logic:
+    // Non-multicast Logic:
     //
-    // If disposition has changed and there is a peer link, set the disposition of the peer
-    // If settled, the delivery must be unlinked and freed.
-    // If settled and there is a peer, the peer shall be settled and unlinked.  It shall not
-    //   be freed until the connection-side thread settles the PN delivery.
+    // If disposition has changed and there is a peer link, set the disposition
+    // of the peer
+    // If remote settled, the delivery must be unlinked and freed.
+    // If remote settled and there is a peer, the peer shall be settled and
+    // unlinked.  It shall not be freed until the connection-side thread
+    // settles the PN delivery.
     //
-    if (disp != dlv->disposition) {
+    if (new_disp != dlv->remote_disposition) {
         //
-        // Disposition has changed, propagate the change to the peer delivery.
+        // Remote disposition has changed, propagate the change to the peer
+        // delivery local disposition.
         //
-        dlv->disposition = disp;
+        dlv->remote_disposition = new_disp;
         if (peer) {
-            peer->disposition = disp;
+            peer->disposition = new_disp;
             peer->error       = error;
             push = true;
-            error_unassigned = false;
+            error_assigned = true;
             qdr_delivery_copy_extension_state(dlv, peer, false);
         }
     }
@@ -598,41 +668,290 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
     if (settled) {
         if (peer) {
             peer->settled = true;
-            if (peer_link) {
+            if (qdr_delivery_link(peer)) {
                 peer_moved = qdr_delivery_settled_CT(core, peer);
-                if (peer_moved)
-                    push = true;
             }
             qdr_delivery_unlink_peers_CT(core, dlv, peer);
         }
 
-        if (dlv_link)
+        if (dlink)
             dlv_moved = qdr_delivery_settled_CT(core, dlv);
     }
 
     //
     // If the delivery's link has a core endpoint, notify the endpoint of the update
     //
-    if (dlv_link && dlv_link->core_endpoint)
-        qdrc_endpoint_do_update_CT(core, dlv_link->core_endpoint, dlv, settled);
+    if (dlink && dlink->core_endpoint)
+        qdrc_endpoint_do_update_CT(core, dlink->core_endpoint, dlv, settled);
 
-    if (push)
+    if (push || peer_moved)
         qdr_delivery_push_CT(core, peer);
 
     //
-    // Release the action reference, possibly freeing the delivery
-    //
-    qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from action");
-
-    //
-    // Release the unsettled references if the deliveries were moved
+    // Release the unsettled references if the deliveries were moved/settled
     //
     if (dlv_moved)
-        qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - removed from unsettled (1)");
+        qdr_delivery_decref_CT(core, dlv, "qdr_delivery_anycast_update CT - dlv removed from unsettled");
     if (peer_moved)
-        qdr_delivery_decref_CT(core, peer, "qdr_update_delivery_CT - removed from unsettled (2)");
-    if (error_unassigned)
-        qdr_error_free(error);
+        qdr_delivery_decref_CT(core, peer, "qdr_delivery_anycast_update_CT - peer removed from unsettled");
+    if (peer)
+        qdr_delivery_decref_CT(core, peer, "qdr_delivery_anycast_update_CT - allow free of peer");
+
+    return error_assigned;
+}
+
+
+// The remote delivery state (disposition) and/or remote settlement for an
+// incoming multicast delivery has changed.  Propagate the changes "downstream"
+// to the outbound peers.  Once all peers have settled then settle the in_dlv
+//
+void qdr_delivery_mcast_inbound_update_CT(qdr_core_t *core, qdr_delivery_t *in_dlv,
+                                          uint64_t new_disp, bool settled)
+{
+    if (!in_dlv)
+        return;
+
+    bool update_disp = new_disp && in_dlv->remote_disposition != new_disp;
+
+    assert(in_dlv->multicast);  // expect in_dlv to be the inbound delivery
+
+    qd_log(core->log, QD_LOG_TRACE,
+           "Remote updated mcast delivery (%p) disp=0x%"PRIx64" settled=%s",
+           in_dlv, new_disp, (settled) ? "True" : "False");
+
+    if (update_disp)
+        in_dlv->remote_disposition = new_disp;
+
+    qdr_delivery_t *out_peer = qdr_delivery_first_peer_CT(in_dlv);
+    while (out_peer) {
+        bool push  = false;
+        bool moved  = false;
+        bool unlink = false;
+
+        //
+        // AMQP 1.0 allows the sender to specify a disposition
+        // so forward it along
+        //
+        if (update_disp && out_peer->disposition != new_disp) {
+            out_peer->disposition = new_disp;
+            push = true;
+            // extension state/error ignored, not sure how
+            // that can be supported for mcast...
+        }
+
+        //
+        // the sender has settled
+        //
+        if (settled) {
+            out_peer->settled = true;
+            if (qdr_delivery_link(out_peer)) {
+                moved = qdr_delivery_settled_CT(core, out_peer);
+            }
+            unlink = true;
+        }
+
+        if (push || moved) {
+            qdr_delivery_push_CT(core, out_peer);
+        }
+
+        if (moved) {
+            qdr_delivery_decref_CT(core, out_peer,
+                                   "qdr_delivery_mcast_inbound_update_CT - removed out_peer from unsettled");
+        }
+
+        qd_log(core->log, QD_LOG_TRACE,
+               "Updating mcast delivery (%p) out peer (%p) updated disp=%s settled=%s",
+               in_dlv, out_peer, (push) ? "True" : "False",
+               (unlink) ? "True" : "False");
+
+        if (unlink) {
+            qdr_delivery_unlink_peers_CT(core, in_dlv, out_peer);  // may free out_peer!
+        }
+
+        out_peer = qdr_delivery_next_peer_CT(in_dlv);
+    }
+
+    if (settled) {
+        assert(qdr_delivery_peer_count_CT(in_dlv) == 0);
+        in_dlv->settled = true;
+        if (qdr_delivery_settled_CT(core, in_dlv)) {
+            qdr_delivery_decref_CT(core, in_dlv,
+                                   "qdr_delivery_mcast_inbound_update CT - in_dlv removed from unsettled");
+        }
+    }
+}
+
+
+// An outgoing peer delivery of an incoming multicast delivery has settled.
+// Settle the inbound delivery after all of its outbound deliveries
+// have been settled.
+//
+// Note: this call may free either in_dlv or out_dlv by unlinking them. The
+// caller must increment the reference count for these deliveries if they are
+// to be referenced after this call.
+//
+// moved: set to true if in_dlv has been removed from the unsettled list
+// return: true if in_dlv has been settled
+//
+static bool qdr_delivery_mcast_outbound_settled_CT(qdr_core_t *core, qdr_delivery_t *in_dlv,
+                                                   qdr_delivery_t *out_dlv, bool *moved)
+{
+    bool push = false;
+    *moved = false;
+
+    assert(in_dlv->multicast && out_dlv->peer == in_dlv);
+    assert(qdr_delivery_peer_count_CT(out_dlv) == 1);
+
+    int peer_count = qdr_delivery_peer_count_CT(in_dlv);
+
+    if (peer_count == 1) {
+        //
+        // This out_dlv is the last outgoing peer so
+        // we can now settle in_dlv
+        //
+        in_dlv->settled = true;
+        push = true;
+        if (qdr_delivery_link(in_dlv)) {
+            *moved = qdr_delivery_settled_CT(core, in_dlv);
+        }
+
+        qd_log(core->log, QD_LOG_TRACE,
+               "mcast delivery (%p) has settled, disp=0x%"PRIx64,
+               in_dlv, in_dlv->disposition);
+    } else {
+
+        qd_log(core->log, QD_LOG_TRACE,
+               "mcast delivery (%p) out peer (%p) has settled, remaining peers=%d",
+               in_dlv, out_dlv, peer_count - 1);
+    }
+
+    // now settle the peer itself and remove it from link unsettled list
+
+    out_dlv->settled = true;
+    if (qdr_delivery_settled_CT(core, out_dlv)) {
+        qdr_delivery_decref_CT(core, out_dlv, "qdr_delivery_mcast_outbound_settled_CT - out_dlv removed from unsettled");
+    }
+
+    // do this last since it may free either dlv or out_dlv:
+    qdr_delivery_unlink_peers_CT(core, in_dlv, out_dlv);
+
+    return push;
+}
+
+
+// true if delivery state d is a terminal state as defined by AMQP 1.0
+//
+#define IS_TERMINAL(d) (PN_ACCEPTED <= (d) && (d) <= PN_MODIFIED)
+
+
+// an outbound mcast delivery has changed its remote state (disposition)
+// propagate the change back "upstream" to the inbound delivery
+//
+// returns true if dlv disposition has been updated
+//
+static bool qdr_delivery_mcast_outbound_disposition_CT(qdr_core_t *core, qdr_delivery_t *in_dlv,
+                                                       qdr_delivery_t *out_dlv, uint64_t new_disp)
+{
+    // The AMQP 1.0 spec does not define a way to propagate disposition
+    // back to the sender in the case of unsettled multicast.  In the
+    // case of multiple different terminal states we have to reconcile
+    // them. We assign an ad hoc priority to each terminal value and
+    // set the final disposition to the highest priority returned
+    // across all receivers.
+    static const int priority[] = {
+        2, // Accepted
+        3, // Rejected - highest because reject is a hard error
+        0, // Released
+        1, // Modified
+    };
+    bool push = false;
+
+    if (!in_dlv || !out_dlv)
+        return push;
+
+    assert(in_dlv->multicast && out_dlv->peer == in_dlv);
+    assert(qdr_delivery_peer_count_CT(out_dlv) == 1);
+
+    if (new_disp == 0x33) {  // 0x33 == Declared
+        // hack alert - the transaction section of the AMQP 1.0 spec
+        // defines the Declared outcome (0x33) terminal state.
+        qd_log(core->log, QD_LOG_WARNING,
+               "Transactions are not supported for multicast messages");
+        new_disp = PN_REJECTED;
+    }
+
+    out_dlv->remote_disposition = new_disp;
+
+    if (IS_TERMINAL(new_disp)) {
+        // our mcast impl ignores non-terminal outcomes
+
+        qd_log(core->log, QD_LOG_TRACE,
+               "mcast delivery (%p) out peer (%p) disp updated: 0x%"PRIx64,
+               in_dlv, out_dlv, new_disp);
+
+        if (in_dlv->mcast_disposition == 0) {
+            in_dlv->mcast_disposition = new_disp;
+        } else {
+            int old_priority = priority[in_dlv->mcast_disposition - PN_ACCEPTED];
+            int new_priority = priority[new_disp - PN_ACCEPTED];
+            if (new_priority > old_priority)
+                in_dlv->mcast_disposition = new_disp;
+        }
+
+        // wait until all peers have set terminal state before setting it on
+        // the in_dlv
+        //
+        qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
+        while (peer) {
+            if (!IS_TERMINAL(peer->remote_disposition)) {
+                break;
+            }
+            peer = qdr_delivery_next_peer_CT(in_dlv);
+        }
+
+        if (!peer) {  // all peers have set a terminal state
+            // TODO(kgiusti) what about error parameter?
+            in_dlv->disposition = in_dlv->mcast_disposition;
+            push = true;
+            qd_log(core->log, QD_LOG_TRACE,
+                   "mcast delivery (%p) terminal state set: 0x%"PRIx64,
+                   in_dlv, in_dlv->disposition);
+        }
+    }
+
+    return push;
+}
+
+
+//
+// The delivery state (disposition) and/or settlement state for a downstream
+// peer delivery of a multicast inbound delivery has changed.  Update the
+// inbound delivery to reflect these changes.
+//
+// Note: if settled is true then in_dlv or out_dlv *may* be released.  Callers
+// should increment their reference counts if either of these deliveries are
+// referenced after this call.
+//
+void qdr_delivery_mcast_outbound_update_CT(qdr_core_t *core,
+                                           qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv,
+                                           uint64_t new_disp, bool settled)
+{
+    bool dlv_moved = false;
+    bool push_dlv = qdr_delivery_mcast_outbound_disposition_CT(core, in_dlv, out_dlv, new_disp);
+
+    qdr_delivery_incref(in_dlv, "qdr_delivery_mcast_outbound_update_CT - prevent mcast free");
+
+    if (settled && qdr_delivery_mcast_outbound_settled_CT(core, in_dlv, out_dlv, &dlv_moved)) {
+        push_dlv = true;
+    }
+
+    if (push_dlv || dlv_moved) {
+        qdr_delivery_push_CT(core, in_dlv);
+    }
+    if (dlv_moved) {
+        qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_mcast_outbound_update_CT - removed mcast peer from unsettled");
+    }
+    qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_mcast_outbound_update_CT - allow mcast peer free");
 }
 
 
@@ -706,8 +1025,8 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
                 sub = DEQ_HEAD(in_dlv->subscriptions);
             }
 
-            // This is a multicast delivery or if this is a presettled multi-frame unicast delivery.
-            if (in_dlv->multicast || in_dlv->settled) {
+            // This is a presettled multi-frame unicast delivery.
+            if (in_dlv->settled) {
 
                 //
                 // If a delivery is settled but did not go into one of the lists, that means that it is going nowhere.
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index e3fd88b..9619bfb 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -43,7 +43,9 @@ struct qdr_delivery_t {
     qd_message_t           *msg;
     qd_iterator_t          *to_addr;
     qd_iterator_t          *origin;
-    uint64_t                disposition;
+    uint64_t                disposition;         ///< local disposition, will be pushed to remote endpoint
+    uint64_t                remote_disposition;  ///< disposition as set by remote endpoint
+    uint64_t                mcast_disposition;   ///< temporary terminal disposition while multicast fwding
     uint32_t                ingress_time;
     pn_data_t              *extension_state;
     qdr_error_t            *error;
@@ -81,6 +83,7 @@ void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *len
 bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery);
 void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent);
 
+// note: access to _local_ endpoint disposition (not remote endpoint)
 uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery);
 void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition);
 
@@ -106,9 +109,10 @@ void qdr_delivery_copy_extension_state(qdr_delivery_t *src, qdr_delivery_t *dest
 /* release dlv and possibly schedule its deletion on the core thread */
 void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const char *label);
 
-/* handles disposition/settlement changes from remote delivery and schedules Core thread */
-void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disp,
-                                     bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given);
+/* handles delivery disposition and settlement changes from the remote end of
+ * the link, and schedules Core thread */
+void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disp,
+                                       bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given);
 
 /* invoked when incoming message data arrives - schedule core thread */
 qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *delivery);
@@ -144,5 +148,19 @@ void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv);
 /* update the links counters with respect to its delivery */
 void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 
+/**
+ * multicast delivery state and settlement management
+ */
+
+// remote updated disposition/settlement for incoming delivery
+void qdr_delivery_mcast_inbound_update_CT(qdr_core_t *core, qdr_delivery_t *in_dlv,
+                                          uint64_t new_disp, bool settled);
+// remote update disposition/settlement for outgoing delivery
+void qdr_delivery_mcast_outbound_update_CT(qdr_core_t *core, qdr_delivery_t *in_dlv,
+                                           qdr_delivery_t *out_peer,
+                                           uint64_t new_disp, bool settled);
+// number of unsettled peer (outbound) deliveries for in_dlv
+int qdr_delivery_peer_count_CT(const qdr_delivery_t *in_dlv);
+
 
 #endif // __delivery_h__
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index cfb9fe4..0a679aa 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -341,7 +341,6 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     bool          bypass_valid_origins = addr->forwarder->bypass_valid_origins;
     int           fanout               = 0;
     qd_bitmask_t *link_exclusion       = !!in_delivery ? in_delivery->link_exclusion : 0;
-    bool          presettled           = !!in_delivery ? in_delivery->settled : true;
     bool          receive_complete     = qd_message_receive_complete(qdr_delivery_message(in_delivery));
     uint8_t       priority             = qdr_forward_effective_priority(msg, addr);
 
@@ -349,16 +348,6 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     DEQ_INIT(deliver_info_list);
 
     //
-    // If the delivery is not presettled, set the settled flag for forwarding so all
-    // outgoing deliveries will be presettled.
-    //
-    // NOTE:  This is the only multicast mode currently supported.  Others will likely be
-    //        implemented in the future.
-    //
-    if (!presettled)
-        in_delivery->settled = true;
-
-    //
     // Forward to local subscribers
     //
     if (!addr->local || exclude_inprocess) {
@@ -494,26 +483,6 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         deliver_info = DEQ_HEAD(deliver_info_list);
     }
 
-    if (in_delivery && !presettled) {
-        if (fanout == 0)
-            //
-            // The delivery was not presettled and it was not forwarded to any
-            // destinations, return it to its original unsettled state.
-            //
-            in_delivery->settled = false;
-        else {
-            //
-            // The delivery was not presettled and it was forwarded to at least
-            // one destination.  Accept and settle the delivery only if the entire delivery
-            // has been received.
-            //
-            if (receive_complete) {
-                in_delivery->disposition = PN_ACCEPTED;
-                qdr_delivery_push_CT(core, in_delivery);
-            }
-        }
-    }
-
     return fanout;
 }
 
@@ -1000,8 +969,6 @@ int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *
     int fanout = 0;
     if (addr->forwarder)
         fanout = addr->forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control);
-
-    // TODO - Deal with this delivery's disposition
     return fanout;
 }
 
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 0b642d3..4223c8e 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -124,6 +124,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
 }
 
 
+// send up to credit pending outgoing deliveries
 int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
 {
     qdr_connection_t *conn = link->conn;
@@ -216,9 +217,13 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
                 }
                 sys_mutex_unlock(conn->work_lock);
 
-                // the core will need to update the delivery's disposition
-                if (new_disp)
-                    qdr_delivery_update_disposition(core, dlv, new_disp, true, 0, 0, false);
+                if (new_disp) {
+                    // the remote sender-settle-mode forced us to pre-settle the
+                    // message.  The core needs to know this, so we "fake" receiving a
+                    // settle+disposition update from the remote end of the link:
+                    qdr_delivery_remote_state_updated(core, dlv, new_disp, true, 0, 0, false);
+                }
+
                 qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - release local reference - done processing");
             } else {
                 sys_mutex_unlock(conn->work_lock);
@@ -424,38 +429,32 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
         // We are trying to forward a delivery on an address that has no outbound paths
         // AND the incoming link is targeted (not anonymous).
         //
-        // We shall release the delivery (it is currently undeliverable).
+        // We shall release the delivery (it is currently undeliverable). Since
+        // there are no receivers we will try to drain credit to prevent the
+        // sender from attempting to send more to this address.
         //
         if (dlv->settled) {
             // Increment the presettled_dropped_deliveries on the in_link
             link->dropped_presettled_deliveries++;
             if (dlv_link->link_type == QD_LINK_ENDPOINT)
                 core->dropped_presettled_deliveries++;
-
-            //
-            // The delivery is pre-settled. Call the qdr_delivery_release_CT so if this delivery is multi-frame
-            // we can restart receiving the delivery in case it is stalled. Note that messages will not
-            // *actually* be released because these are presettled messages.
-            //
-            qdr_delivery_release_CT(core, dlv);
-        } else {
-            qdr_delivery_release_CT(core, dlv);
-
-            //
-            // Drain credit on the link if it is not in an edge connection
-            //
-            if (!link->edge)
-                qdr_link_issue_credit_CT(core, link, 0, true);
         }
 
         //
-        // If the distribution is multicast or it's on an edge connection, we will replenish the credit.
-        // Otherwise, we will allow the credit to drain.
+        // Note if the message was pre-settled we still call the
+        // qdr_delivery_release_CT so if this delivery is multi-frame we can
+        // restart receiving the delivery in case it is stalled. Note that
+        // messages will not *actually* be released in this case because these
+        // are presettled messages.
         //
-        if (link->edge || qdr_is_addr_treatment_multicast(link->owning_addr))
-            qdr_link_issue_credit_CT(core, link, 1, false);
-        else
+        qdr_delivery_release_CT(core, dlv);
+
+        if (!link->edge) {
+            qdr_link_issue_credit_CT(core, link, 0, true);  // drain
             link->credit_pending++;
+        } else {
+            qdr_link_issue_credit_CT(core, link, 1, false);
+        }
 
         qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (no path)");
         return;
@@ -543,7 +542,7 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
         qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (1)");
         qdr_link_issue_credit_CT(core, link, 1, false);
     } else if (fanout > 0) {
-        if (dlv->settled || dlv->multicast) {
+        if (dlv->settled) {
             //
             // The delivery is settled.  Keep it off the unsettled list and issue
             // replacement credit for it now.
diff --git a/src/router_node.c b/src/router_node.c
index b556c86..1e5ee6a 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -344,8 +344,10 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
         pn_link_advance(pn_link);
         next_delivery = pn_link_current(pn_link) != 0;
 
-        if (qdr_delivery_disposition(delivery) != 0)
-            pn_delivery_update(pnd, qdr_delivery_disposition(delivery));
+        uint64_t local_disp = qdr_delivery_disposition(delivery);
+        if (local_disp != 0) {
+            pn_delivery_update(pnd, local_disp);
+        }
     }
 
     if (qd_message_is_discard(msg)) {
@@ -689,12 +691,12 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery
     //
     // Update the disposition of the delivery
     //
-    qdr_delivery_update_disposition(router->router_core, delivery,
-                                    pn_delivery_remote_state(pnd),
-                                    pn_delivery_settled(pnd),
-                                    error,
-                                    pn_disposition_data(disp),
-                                    false);
+    qdr_delivery_remote_state_updated(router->router_core, delivery,
+                                      pn_delivery_remote_state(pnd),
+                                      pn_delivery_settled(pnd),
+                                      error,
+                                      pn_disposition_data(disp),
+                                      false);
 
     //
     // If settled, close out the delivery
@@ -1707,10 +1709,13 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di
         //
         // If the delivery is still arriving, don't push out the disposition change yet.
         //
-        if (qd_message_receive_complete(msg))
+        if (qd_message_receive_complete(msg)) {
             pn_delivery_update(pnd, disp);
-        else
+        } else {
+            // just update the local disposition for now - AMQP_rx_handler will
+            // write this to proton once the message is fully received.
             qdr_delivery_set_disposition(dlv, disp);
+        }
     }
 
     if (settled) {
diff --git a/tests/system_test.py b/tests/system_test.py
index ca8e1f0..f099560 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -46,7 +46,7 @@ import json
 import uuid
 
 import proton
-from proton import Message, Timeout
+from proton import Message
 from proton.handlers import MessagingHandler
 from proton.utils import BlockingConnection
 from proton.reactor import AtLeastOnce, Container
@@ -995,3 +995,14 @@ class MgmtMsgProxy(object):
               'name': name}
         return Message(properties=ap, reply_to=self.reply_addr)
 
+
+class TestTimeout(object):
+    """
+    A callback object for MessagingHandler class
+    parent: A MessagingHandler with a timeout() method
+    """
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
diff --git a/tests/system_tests_multicast.py b/tests/system_tests_multicast.py
index 06a15de..2bab340 100644
--- a/tests/system_tests_multicast.py
+++ b/tests/system_tests_multicast.py
@@ -26,6 +26,7 @@ from __future__ import division
 from __future__ import absolute_import
 from __future__ import print_function
 
+import sys
 from time import sleep
 import unittest2 as unittest
 
@@ -41,30 +42,24 @@ from system_test import TestCase
 from system_test import Qdrouterd
 from system_test import main_module
 from system_test import TIMEOUT
+from system_test import TestTimeout
 
 
-class TestTimeout(object):
-    def __init__(self, parent):
-        self.parent = parent
-
-    def on_timer_task(self, event):
-        self.parent.timeout()
-
-
-MAX_FRAME=1025
+MAX_FRAME=1023
 W_THREADS=2
+LARGE_PAYLOAD = ("X" * MAX_FRAME) * 19
 
 # check for leaks of the following entities
 ALLOC_STATS=["qd_message_t",
              "qd_buffer_t",
              "qdr_delivery_t"]
 
+
 class MulticastLinearTest(TestCase):
     """
     Verify the multicast forwarding logic across a multihop linear router
     configuration
     """
-
     @classmethod
     def setUpClass(cls):
         """Start a router"""
@@ -158,35 +153,33 @@ class MulticastLinearTest(TestCase):
             # edge router EA1:
             {'router':      cls.EA1,
              'senders':     ['S-EA1-1'],
-             'receivers':   [],
-             'subscribers': 1,
+             'receivers':   ['R-EA1-1', 'R-EA1-2'],
+             'subscribers': 2,
              'remotes':     0
             },
             # Interior router INT_A:
             {'router':      cls.INT_A,
-             'senders':     [],
-             # 'receivers':   ['R-INT_A-1'],
-             'receivers':   [],
-             'subscribers': 0,
+             'senders':     ['S-INT_A-1'],
+             'receivers':   ['R-INT_A-1', 'R-INT_A-2'],
+             'subscribers': 3,
              'remotes':     1,
             },
             # Interior router INT_B:
             {'router':      cls.INT_B,
              'senders':     [],
-             'receivers':   [],
-             'subscribers': 1,
-             'remotes':     0,
+             'receivers':   ['R-INT_B-1', 'R-INT_B-2'],
+             'subscribers': 3,
+             'remotes':     1,
             },
             # edge router EB1
             {'router':      cls.EB1,
              'senders':     [],
-             'receivers':   ['R-EB1-1'],
-             'subscribers': 1,
+             'receivers':   ['R-EB1-1', 'R-EB1-2'],
+             'subscribers': 2,
              'remotes':     0,
             }
         ]
 
-
     def _get_alloc_stats(self, router, stats):
         # return a map of the current allocator counters for each entity type
         # name in stats
@@ -209,22 +202,6 @@ class MulticastLinearTest(TestCase):
             d[name] = list(filter(lambda a: a['typeName'] == name, q))[0]
         return d
 
-    def test_51_maybe_presettled_large_msg(self):
-        body = " MCAST MAYBE PRESETTLED LARGE "
-        body += "X" * (MAX_FRAME * 19)
-        for repeat in range(5):
-            test = MulticastPresettled(self.config, 100, body, SendMaybePresettled())
-            test.run()
-            self.assertEqual(None, test.error)
-
-    def test_51_presettled_large_msg(self):
-        body = " MCAST PRESETTLED LARGE "
-        body += "X" * (MAX_FRAME * 23)
-        for repeat in range(5):
-            test = MulticastPresettled(self.config, 100, body, SendMustBePresettled())
-            test.run()
-            self.assertEqual(None, test.error)
-
     def _check_for_leaks(self):
         for r in self.routers:
             stats = self._get_alloc_stats(r, ALLOC_STATS)
@@ -233,42 +210,236 @@ class MulticastLinearTest(TestCase):
                 max_allowed  = ((W_THREADS + 1)
                                 * stats[name]['localFreeListMax'])
                 held = stats[name]['heldByThreads']
-                import sys; sys.stdout.flush()
                 if held >= (2 * max_allowed):
                     print("OOPS!!! %s: (%s) - held=%d max=%d\n   %s\n"
                           % (r.config.router_id,
                              name, held, max_allowed, stats))
-                    import sys; sys.stdout.flush()
+                    sys.stdout.flush()
                     self.assertFalse(held >= (2 * max_allowed))
 
+    #
+    # run all the negative tests first so that if we screw up the internal
+    # state of the brokers the positive tests will likely fail
+    #
+
+    def _presettled_large_msg_rx_detach(self, config, count, drop_clients):
+        # detach receivers during receive
+        body = " MCAST PRESETTLED LARGE RX DETACH " + LARGE_PAYLOAD
+        test = MulticastPresettledRxFail(config, count,
+                                         drop_clients,
+                                         detach=True,
+                                         body=body)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_01_presettled_large_msg_rx_detach(self):
+        self._presettled_large_msg_rx_detach(self.config, 10, ['R-EA1-1', 'R-EB1-2'])
+        self._presettled_large_msg_rx_detach(self.config, 10, ['R-INT_A-2', 'R-INT_B-1'])
+
+    def _presettled_large_msg_rx_close(self, config, count, drop_clients):
+        # close receiver connections during receive
+        body = " MCAST PRESETTLED LARGE RX CLOSE " + LARGE_PAYLOAD
+        test = MulticastPresettledRxFail(config, count,
+                                         drop_clients,
+                                         detach=False,
+                                         body=body)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_02_presettled_large_msg_rx_close(self):
+        self._presettled_large_msg_rx_close(self.config, 10, ['R-EA1-2', 'R-EB1-1'])
+        self._presettled_large_msg_rx_close(self.config, 10, ['R-INT_A-1', 'R-INT_B-2'])
+
+    def _unsettled_large_msg_rx_detach(self, config, count, drop_clients):
+        # detach receivers during the test
+        body = " MCAST UNSETTLED LARGE RX DETACH " + LARGE_PAYLOAD
+        test = MulticastUnsettledRxFail(self.config, count, drop_clients, detach=True, body=body)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_10_unsettled_large_msg_rx_detach(self):
+        self._unsettled_large_msg_rx_detach(self.config, 10, ['R-EA1-1', 'R-EB1-2'])
+        self._unsettled_large_msg_rx_detach(self.config, 10, ['R-INT_A-2', 'R-INT_B-1'])
+
+    def _unsettled_large_msg_rx_close(self, config, count, drop_clients):
+        # close receiver connections during test
+        body = " MCAST UNSETTLED LARGE RX CLOSE " + LARGE_PAYLOAD
+        test = MulticastUnsettledRxFail(self.config, count, drop_clients, detach=False, body=body)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_11_unsettled_large_msg_rx_close(self):
+        self._unsettled_large_msg_rx_close(self.config, 10, ['R-EA1-2', 'R-EB1-1', ])
+        self._unsettled_large_msg_rx_close(self.config, 10, ['R-INT_A-1', 'R-INT_B-2'])
+
+    #
+    # now the positive tests
+    #
+
+    def test_50_presettled(self):
+        # Simply send a bunch of pre-settled multicast messages
+        body = " MCAST PRESETTLED "
+        test = MulticastPresettled(self.config, 10, body, SendPresettled())
+        test.run()
+
+    def test_51_presettled_mixed_large_msg(self):
+        # Same as above, but large message bodies (mixed sender settle mode)
+        body = " MCAST MAYBE PRESETTLED LARGE " + LARGE_PAYLOAD
+        test = MulticastPresettled(self.config, 11, body, SendMixed())
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_52_presettled_large_msg(self):
+        # Same as above, (pre-settled sender settle mode)
+        body = " MCAST PRESETTLED LARGE " + LARGE_PAYLOAD
+        test = MulticastPresettled(self.config, 13, body, SendPresettled())
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_60_unsettled_3ack(self):
+        # Sender sends unsettled, waits for Outcome from Receiver then settles
+        # Expect all messages to be accepted
+        body = " MCAST UNSETTLED "
+        test = MulticastUnsettled3Ack(self.config, 10, body)
+        test.run()
+        self.assertEqual(None, test.error)
+        self.assertEqual(test.n_outcomes[Delivery.ACCEPTED], test.n_sent)
+
+    def test_61_unsettled_3ack_large_msg(self):
+        # Same as above but with multiframe streaming
+        body = " MCAST UNSETTLED LARGE " + LARGE_PAYLOAD
+        test = MulticastUnsettled3Ack(self.config, 11, body=body)
+        test.run()
+        self.assertEqual(None, test.error)
+        self.assertEqual(test.n_outcomes[Delivery.ACCEPTED], test.n_sent)
+
+    def _unsettled_3ack_outcomes(self,
+                                 config,
+                                 count,
+                                 outcomes,
+                                 expected):
+        body = " MCAST UNSETTLED 3ACK OUTCOMES " + LARGE_PAYLOAD
+        test = MulticastUnsettled3Ack(self.config,
+                                      count,
+                                      body,
+                                      outcomes=outcomes)
+        test.run()
+        self.assertEqual(None, test.error)
+        self.assertEqual(test.n_outcomes[expected], test.n_sent)
+
+    def test_63_unsettled_3ack_outcomes(self):
+        # Verify the expected outcome is returned to the sender when the
+        # receivers return different outcome values.  If no outcome is
+        # specified for a receiver it will default to ACCEPTED
+
+        # expect REJECTED if any reject:
+        self._unsettled_3ack_outcomes(self.config, 3,
+                                      {'R-EB1-1': Delivery.REJECTED,
+                                       'R-EB1-2': Delivery.MODIFIED,
+                                       'R-INT_B-2': Delivery.RELEASED},
+                                      Delivery.REJECTED)
+        self._unsettled_3ack_outcomes(self.config, 3,
+                                      {'R-EB1-1': Delivery.REJECTED,
+                                       'R-INT_B-2': Delivery.RELEASED},
+                                      Delivery.REJECTED)
+        # expect ACCEPT if no rejects
+        self._unsettled_3ack_outcomes(self.config, 3,
+                                      {'R-EB1-2': Delivery.MODIFIED,
+                                       'R-INT_B-2': Delivery.RELEASED},
+                                      Delivery.ACCEPTED)
+        # expect MODIFIED over RELEASED
+        self._unsettled_3ack_outcomes(self.config, 3,
+                                      {'R-EA1-1': Delivery.RELEASED,
+                                       'R-EA1-2': Delivery.RELEASED,
+                                       'R-INT_A-1': Delivery.RELEASED,
+                                       'R-INT_A-2': Delivery.RELEASED,
+                                       'R-INT_B-1': Delivery.RELEASED,
+                                       'R-INT_B-2': Delivery.RELEASED,
+                                       'R-EB1-1': Delivery.RELEASED,
+                                       'R-EB1-2': Delivery.MODIFIED},
+                                      Delivery.MODIFIED)
+
+        # and released only if all released
+        self._unsettled_3ack_outcomes(self.config, 3,
+                                      {'R-EA1-1': Delivery.RELEASED,
+                                       'R-EA1-2': Delivery.RELEASED,
+                                       'R-INT_A-1': Delivery.RELEASED,
+                                       'R-INT_A-2': Delivery.RELEASED,
+                                       'R-INT_B-1': Delivery.RELEASED,
+                                       'R-INT_B-2': Delivery.RELEASED,
+                                       'R-EB1-1': Delivery.RELEASED,
+                                       'R-EB1-2': Delivery.RELEASED},
+                                      Delivery.RELEASED)
+
+    def test_70_unsettled_1ack(self):
+        # Sender sends unsettled, expects both outcome and settlement from
+        # receiver before sender settles locally
+        body = " MCAST UNSETTLED 1ACK "
+        test = MulticastUnsettled1Ack(self.config, 10, body)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_71_unsettled_1ack_large_msg(self):
+        # Same as above but with multiframe streaming
+        body = " MCAST UNSETTLED 1ACK LARGE " + LARGE_PAYLOAD
+        test = MulticastUnsettled1Ack(self.config, 10, body)
+        test.run()
+        self.assertEqual(None, test.error)
+
     def test_999_check_for_leaks(self):
         self._check_for_leaks()
 
 
-class SendMaybePresettled(LinkOption):
+#
+# Settlement options for Link attach
+#
+
+class SendPresettled(LinkOption):
     """
-    Set the default send settlement modes on link negotiation to mixed
+    All messages are sent presettled
+    """
+    def apply(self, link):
+        link.snd_settle_mode = Link.SND_SETTLED
+        link.rcv_settle_mode = Link.RCV_FIRST
+
+
+class SendMixed(LinkOption):
+    """
+    Messages may be sent unsettled or settled
     """
     def apply(self, link):
         link.snd_settle_mode = Link.SND_MIXED
         link.rcv_settle_mode = Link.RCV_FIRST
 
 
-class SendMustBePresettled(LinkOption):
+class Link1Ack(LinkOption):
     """
-    Set the default send settlement modes on a link to presettled
+    Messages will be sent unsettled
     """
     def apply(self, link):
-        link.snd_settle_mode = Link.SND_SETTLED
+        link.snd_settle_mode = Link.SND_UNSETTLED
         link.rcv_settle_mode = Link.RCV_FIRST
 
 
+class Link3Ack(LinkOption):
+    """
+    Messages will be sent unsettled and the receiver will wait for sender to
+    settle first.
+    """
+    def apply(self, link):
+        link.snd_settle_mode = Link.SND_UNSETTLED
+        link.rcv_settle_mode = Link.RCV_SECOND
+
+
 class MulticastBase(MessagingHandler):
+    """
+    Common multicast boilerplate code
+    """
     def __init__(self, config, count, body, topic=None, **handler_kwargs):
         super(MulticastBase, self).__init__(**handler_kwargs)
         self.msg_count = count
         self.config = config
-        self.topic = topic or "whatevahcast/test"
+        self.topic = topic or "multicast/test"
         self.body = body
 
         # totals
@@ -295,12 +466,6 @@ class MulticastBase(MessagingHandler):
         # count per outcome
         self.n_outcomes = {}
 
-        # self.c_accepted = {}
-        # self.c_released = {}
-        # self.c_rejected = {}
-        # self.c_modified = {}
-        # self.c_settled  = {}
-
         self.error = None
         self.timers = []
         self.reactor = None
@@ -402,7 +567,6 @@ class MulticastBase(MessagingHandler):
 
     def on_settled(self, event):
         self.n_settled += 1
-        name = event.link.name
 
     def run(self):
         Container(self).run()
@@ -460,9 +624,9 @@ class MulticastPresettled(MulticastBase):
 
     def check_if_done(self):
         # wait for all present receivers to receive all messages
-        # and for all received messagest to be settled by the
+        # and for all received messages to be settled by the
         # sender
-        to_rcv = self.n_senders * self.msg_count
+        to_rcv = self.n_senders * self.msg_count * self.n_receivers
         if to_rcv == self.n_received and not self.unsettled_deliveries:
             self.done()
 
@@ -470,6 +634,8 @@ class MulticastPresettled(MulticastBase):
         super(MulticastPresettled, self).on_message(event)
         if event.receiver:
             if not event.delivery.settled:
+                # it may be that settle will come after on_message
+                # so track that here
                 event.delivery.update(Delivery.ACCEPTED)
                 self.unexpected_unsettled += 1
                 tag = str(event.delivery.tag)
@@ -488,6 +654,7 @@ class MulticastPresettled(MulticastBase):
             self.sender_settled += 1
             tag = str(event.delivery.tag)
             try:
+                # got a delayed settle
                 self.unsettled_deliveries[tag] -= 1
                 if self.unsettled_deliveries[tag] == 0:
                     del self.unsettled_deliveries[tag]
@@ -496,5 +663,189 @@ class MulticastPresettled(MulticastBase):
             self.check_if_done()
 
 
+class MulticastPresettledRxFail(MulticastPresettled):
+    """
+    Spontaineously close a receiver or connection on message received
+    """
+    def __init__(self, config, count, drop_clients, detach, body):
+        super(MulticastPresettledRxFail, self).__init__(config, count, body, SendPresettled())
+        self.drop_clients = drop_clients
+        self.detach = detach
+
+    def check_if_done(self):
+        # Verify each receiver got the expected number of messages.
+        # Avoid waiting for dropped receivers.
+        done = True
+        to_rcv = self.n_senders * self.msg_count
+        for name, count in self.c_received.items():
+            if name not in self.drop_clients:
+                if count != to_rcv:
+                    done = False
+        if done:
+            self.done()
+
+    def on_message(self, event):
+        # close the receiver on arrival of the first message
+        r_name = event.receiver.name
+        if r_name in self.drop_clients:
+            if self.detach:
+                if event.receiver.state & Link.LOCAL_ACTIVE:
+                    event.receiver.close()
+            elif event.connection.state & Connection.LOCAL_ACTIVE:
+                event.connection.close()
+        super(MulticastPresettledRxFail, self).on_message(event)
+
+
+class MulticastUnsettled3Ack(MulticastBase):
+    """
+    Send count messages per sender, senders wait for terminal outcome from
+    receivers before settling
+    """
+    def __init__(self, config, count, body, outcomes=None):
+        pfetch = int((count + 1)/2)
+        super(MulticastUnsettled3Ack, self).__init__(config,
+                                                     count,
+                                                     body,
+                                                     prefetch=pfetch,
+                                                     auto_accept=False,
+                                                     auto_settle=False)
+        self.outcomes = outcomes or {}
+
+    def create_receiver(self, container, conn, source, name):
+        return container.create_receiver(conn, source=source, name=name,
+                                         options=Link3Ack())
+
+    def create_sender(self, container, conn, target, name):
+        return container.create_sender(conn, target=target, name=name,
+                                       options=Link3Ack())
+
+    def do_send(self, sender):
+        for i in range(self.msg_count):
+            msg = Message(body=" %s -> %s:%s" % (sender.name, i, self.body))
+            dlv = sender.send(msg)
+            self.n_sent += 1
+
+    def on_message(self, event):
+        # receiver: send outcome do not settle
+        super(MulticastUnsettled3Ack, self).on_message(event)
+        if event.delivery.settled:
+            self.error = "Unexpected pre-settled message received!"
+            self.done()
+            return
+        r_name = event.receiver.name
+        outcome = self.outcomes.get(r_name, Delivery.ACCEPTED)
+        event.delivery.update(outcome)
+        if event.receiver.credit == 0:
+            event.receiver.flow(1)
+
+    def on_settled(self, event):
+        super(MulticastUnsettled3Ack, self).on_settled(event)
+        event.delivery.settle()
+        self.check_if_done()
+
+    def on_accepted(self, event):
+        super(MulticastUnsettled3Ack, self).on_accepted(event)
+        event.delivery.settle()
+        self.check_if_done()
+
+    def on_released(self, event):
+        super(MulticastUnsettled3Ack, self).on_released(event)
+        event.delivery.settle()
+        self.check_if_done()
+
+    def on_modified(self, event):
+        super(MulticastUnsettled3Ack, self).on_modified(event)
+        event.delivery.settle()
+        self.check_if_done()
+
+    def on_rejected(self, event):
+        super(MulticastUnsettled3Ack, self).on_rejected(event)
+        event.delivery.settle()
+        self.check_if_done()
+
+    def check_if_done(self):
+        to_send = self.msg_count * self.n_senders
+        to_rcv = to_send * self.n_receivers
+
+        n_outcomes = (self.n_accepted + self.n_rejected
+                      + self.n_modified + self.n_released)
+
+        # expect senders to see settlement
+        if (self.n_sent == to_send
+                and self.n_received == to_rcv
+                and n_outcomes == to_send
+                and self.n_settled == to_rcv):
+            self.done()
+
+
+class MulticastUnsettled1Ack(MulticastUnsettled3Ack):
+    """
+    Sender sends unsettled, the receiver sets outcome and immediately settles
+    """
+    def __init__(self, config, count, body, outcomes=None):
+        super(MulticastUnsettled1Ack, self).__init__(config,
+                                                     count,
+                                                     outcomes)
+
+    def create_receiver(self, container, conn, source, name):
+        return container.create_receiver(conn, source=source, name=name,
+                                         options=Link1Ack())
+
+    def create_sender(self, container, conn, target, name):
+        return container.create_sender(conn, target=target, name=name,
+                                       options=Link1Ack())
+
+    def on_message(self, event):
+        # receiver: send outcome and settle
+        super(MulticastUnsettled1Ack, self).on_message(event)
+        event.delivery.settle()
+
+    def check_if_done(self):
+        to_send = self.msg_count * self.n_senders
+        to_rcv = to_send * self.n_receivers
+
+        n_outcomes = (self.n_accepted + self.n_rejected
+                      + self.n_modified + self.n_released)
+
+        # expect sender to see settlement
+        if (self.n_received == to_rcv
+                and n_outcomes == to_send
+                and self.n_settled == to_send):
+            self.done()
+
+
+class MulticastUnsettledRxFail(MulticastUnsettled3Ack):
+    """
+    Spontaineously close a receiver or connection on message received
+    """
+    def __init__(self, config, count, drop_clients, detach, body):
+        super(MulticastUnsettledRxFail, self).__init__(config, count, body)
+        self.drop_clients = drop_clients
+        self.detach = detach
+
+    def check_if_done(self):
+        # Verify each receiver got the expected number of messages.
+        # Avoid waiting for dropped receivers.
+        done = True
+        to_rcv = self.n_senders * self.msg_count
+        for name, count in self.c_received.items():
+            if name not in self.drop_clients:
+                if count != to_rcv:
+                    done = False
+        if done:
+            self.done()
+
+    def on_message(self, event):
+        # close the receiver on arrival of the first message
+        r_name = event.receiver.name
+        if r_name in self.drop_clients:
+            if self.detach:
+                if event.receiver.state & Link.LOCAL_ACTIVE:
+                    event.receiver.close()
+            elif event.connection.state & Connection.LOCAL_ACTIVE:
+                event.connection.close()
+        super(MulticastUnsettledRxFail, self).on_message(event)
+
+
 if __name__ == '__main__':
     unittest.main(main_module())
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 79f6a15..a189edb 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -670,17 +670,25 @@ class MessageAnnotaionsPreExistingOverride(MessagingHandler):
 
 class SemanticsMulticast(MessagingHandler):
     def __init__(self, address):
-        super(SemanticsMulticast, self).__init__()
+        """
+        Verify that for every 1 unsettled mcast message received, N messages are sent
+        out (where N == number of receivers).  Assert that multiple received
+        dispositions are summarized to send out one disposition.
+        """
+        super(SemanticsMulticast, self).__init__(auto_accept=False)
         self.address = address
         self.dest = "multicast.2"
         self.error = None
         self.n_sent = 0
+        self.n_settled = 0
         self.count = 3
         self.n_received_a = 0
         self.n_received_b = 0
         self.n_received_c = 0
+        self.n_accepts = 0
         self.timer = None
-        self.conn = None
+        self.conn_1 = None
+        self.conn_2 = None
         self.sender = None
         self.receiver_a = None
         self.receiver_b = None
@@ -688,22 +696,29 @@ class SemanticsMulticast(MessagingHandler):
 
     def on_start(self, event):
         self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
-        self.conn = event.container.connect(self.address)
-        self.sender = event.container.create_sender(self.conn, self.dest)
-        self.receiver_a = event.container.create_receiver(self.conn, self.dest, name="A")
-        self.receiver_b = event.container.create_receiver(self.conn, self.dest, name="B")
-        self.receiver_c = event.container.create_receiver(self.conn, self.dest, name="C")
+        self.conn_1 = event.container.connect(self.address)
+        self.conn_2 = event.container.connect(self.address)
+        self.sender = event.container.create_sender(self.conn_1, self.dest)
+        self.receiver_a = event.container.create_receiver(self.conn_2, self.dest, name="A")
+        self.receiver_b = event.container.create_receiver(self.conn_1, self.dest, name="B")
+        self.receiver_c = event.container.create_receiver(self.conn_2, self.dest, name="C")
 
     def timeout(self):
         self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
                      (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
-        self.conn.close()
+        self.conn_1.close()
+        self.conn_2.close()
 
     def check_if_done(self):
-        if self.n_received_a + self.n_received_b + self.n_received_c == self.count and \
-                self.n_received_a == self.n_received_b and self.n_received_c == self.n_received_b:
+        c = self.n_received_a + self.n_received_b + self.n_received_c
+        if (c == self.count
+                and self.n_received_a == self.n_received_b
+                and self.n_received_c == self.n_received_b
+                and self.n_accepts == self.n_sent
+                and self.n_settled == self.count):
             self.timer.cancel()
-            self.conn.close()
+            self.conn_1.close()
+            self.conn_2.close()
 
     def on_sendable(self, event):
         if self.n_sent == 0:
@@ -718,8 +733,14 @@ class SemanticsMulticast(MessagingHandler):
             self.n_received_b += 1
         if event.receiver == self.receiver_c:
             self.n_received_c += 1
+        event.delivery.update(Delivery.ACCEPTED)
 
     def on_accepted(self, event):
+        self.n_accepts += 1
+        event.delivery.settle()
+
+    def on_settled(self, event):
+        self.n_settled += 1
         self.check_if_done()
 
     def run(self):
@@ -1177,7 +1198,7 @@ class MulticastUnsettled ( MessagingHandler ) :
                    n_messages,
                    n_receivers
                  ) :
-        super ( MulticastUnsettled, self ) . __init__ ( prefetch = n_messages )
+        super ( MulticastUnsettled, self ) . __init__ (auto_accept=False, prefetch=n_messages)
         self.addr        = addr
         self.n_messages  = n_messages
         self.n_receivers = n_receivers
@@ -1213,7 +1234,7 @@ class MulticastUnsettled ( MessagingHandler ) :
 
         self.sender = event.container.create_sender   ( self.send_conn, self.addr )
         for i in range ( self.n_receivers ) :
-            rcvr = event.container.create_receiver ( self.send_conn, self.addr, name = "receiver_" + str(i) )
+            rcvr = event.container.create_receiver ( self.recv_conn, self.addr, name = "receiver_" + str(i) )
             self.receivers.append ( rcvr )
             rcvr.flow ( self.n_messages )
             self.n_received.append ( 0 )
@@ -2358,8 +2379,12 @@ class UnavailableReceiver(UnavailableBase):
         self.receiver = event.container.create_receiver(self.conn, self.dest, name=self.link_name)
 
 class MulticastUnsettledTest(MessagingHandler):
+    """
+    Send N unsettled multicast messages to 2 receivers.  Ensure sender is
+    notified of settlement and disposition changes from the receivers.
+    """
     def __init__(self, address):
-        super(MulticastUnsettledTest, self).__init__(prefetch=0)
+        super(MulticastUnsettledTest, self).__init__(auto_accept=False, prefetch=0)
         self.address = address
         self.dest = "multicast.MUtest"
         self.error = None
@@ -2380,9 +2405,14 @@ class MulticastUnsettledTest(MessagingHandler):
     def on_start(self, event):
         self.timer     = event.reactor.schedule(TIMEOUT, Timeout(self))
         self.conn      = event.container.connect(self.address)
-        self.sender    = event.container.create_sender(self.conn, self.dest)
-        self.receiver1 = event.container.create_receiver(self.conn, self.dest, name="A")
-        self.receiver2 = event.container.create_receiver(self.conn, self.dest, name="B")
+        self.sender    = event.container.create_sender(self.conn, self.dest,
+                                                       options=AtLeastOnce())
+        self.receiver1 = event.container.create_receiver(self.conn, self.dest,
+                                                         name="A",
+                                                         options=AtLeastOnce())
+        self.receiver2 = event.container.create_receiver(self.conn, self.dest,
+                                                         name="B",
+                                                         options=AtLeastOnce());
         self.receiver1.flow(self.count)
         self.receiver2.flow(self.count)
 
@@ -2397,8 +2427,10 @@ class MulticastUnsettledTest(MessagingHandler):
         self.check_if_done()
 
     def on_message(self, event):
-        if not event.delivery.settled:
-            self.error = "Received unsettled delivery"
+        if event.delivery.settled:
+            self.error = "Received settled delivery"
+        event.delivery.update(Delivery.ACCEPTED)
+        event.delivery.settle()
         self.n_received += 1
         self.check_if_done()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org