You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/08/27 17:07:48 UTC
[qpid-dispatch] branch master updated: DISPATCH-1266: Fix unsettled
multicast forwarding
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 9d8d706 DISPATCH-1266: Fix unsettled multicast forwarding
9d8d706 is described below
commit 9d8d706c948ba10c4609009e7cb89b2a450190a9
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Feb 27 11:39:58 2019 -0500
DISPATCH-1266: Fix unsettled multicast forwarding
This closes #554
---
src/router_core/connections.c | 47 +++-
src/router_core/core_link_endpoint.c | 2 +-
src/router_core/delivery.c | 413 +++++++++++++++++++++++++++----
src/router_core/delivery.h | 26 +-
src/router_core/forwarder.c | 33 ---
src/router_core/transfer.c | 49 ++--
src/router_node.c | 25 +-
tests/system_test.py | 13 +-
tests/system_tests_multicast.py | 461 ++++++++++++++++++++++++++++++-----
tests/system_tests_one_router.py | 70 ++++--
10 files changed, 931 insertions(+), 208 deletions(-)
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index e62e609..8c97ec9 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -730,22 +730,26 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
qdr_delivery_t *peer;
while (dlv) {
DEQ_REMOVE_HEAD(undelivered);
+
+ // expect: an inbound undelivered multicast should
+ // have no peers (has not been forwarded yet)
+ assert(dlv->multicast
+ ? qdr_delivery_peer_count_CT(dlv) == 0
+ : true);
+
peer = qdr_delivery_first_peer_CT(dlv);
while (peer) {
if (peer->multicast) {
//
- // If the address of the delivery is a multicast address and if there are no receivers for this address, the peer delivery must be released.
- //
- // If the address of the delivery is a multicast address and there is at least one other receiver for the address, dont do anything
+ // dlv is outgoing mcast - tell its incoming peer that it has
+ // been released and settled. This will unlink these peers.
//
- if (DEQ_SIZE(peer->peers) == 1 || peer->peer) {
- qdr_delivery_release_CT(core, peer);
- }
+ qdr_delivery_mcast_outbound_update_CT(core, peer, dlv, PN_RELEASED, true);
}
else {
qdr_delivery_release_CT(core, peer);
+ qdr_delivery_unlink_peers_CT(core, dlv, peer);
}
- qdr_delivery_unlink_peers_CT(core, dlv, peer);
peer = qdr_delivery_next_peer_CT(dlv);
}
@@ -785,12 +789,29 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c
qdr_deliver_continue_peers_CT(core, dlv);
}
- peer = qdr_delivery_first_peer_CT(dlv);
- while (peer) {
- if (link->link_direction == QD_OUTGOING)
- qdr_delivery_failed_CT(core, peer);
- qdr_delivery_unlink_peers_CT(core, dlv, peer);
- peer = qdr_delivery_next_peer_CT(dlv);
+ if (dlv->multicast) {
+ //
+ // forward settlement
+ //
+ qdr_delivery_mcast_inbound_update_CT(core, dlv,
+ PN_MODIFIED,
+ true); // true == settled
+ } else {
+ peer = qdr_delivery_first_peer_CT(dlv);
+ while (peer) {
+ if (peer->multicast) {
+ //
+ // peer is incoming multicast and dlv is one of its corresponding
+ // outgoing deliveries. This will unlink these peers.
+ //
+ qdr_delivery_mcast_outbound_update_CT(core, peer, dlv, PN_MODIFIED, true);
+ } else {
+ if (link->link_direction == QD_OUTGOING)
+ qdr_delivery_failed_CT(core, peer);
+ qdr_delivery_unlink_peers_CT(core, dlv, peer);
+ }
+ peer = qdr_delivery_next_peer_CT(dlv);
+ }
}
//
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index 351ee28..268887d 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -194,7 +194,7 @@ void qdrc_endpoint_do_deliver_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_deli
void qdrc_endpoint_do_update_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t *dlv, bool settled)
{
if (!!ep->desc->on_update)
- ep->desc->on_update(ep->link_context, dlv, settled, dlv->disposition);
+ ep->desc->on_update(ep->link_context, dlv, settled, dlv->remote_disposition);
}
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 049dcbb..3992585 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -26,10 +26,12 @@ ALLOC_DEFINE(qdr_delivery_t);
static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static bool qdr_delivery_has_peer_CT(qdr_delivery_t *dlv);
static pn_data_t *qdr_delivery_extension_state(qdr_delivery_t *delivery);
static void qdr_delivery_free_extension_state(qdr_delivery_t *delivery);
static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
+ qdr_delivery_t *peer, uint64_t new_disp, bool settled,
+ qdr_error_t *error);
void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context)
@@ -82,6 +84,7 @@ bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery)
}
+// set the local disposition (to be send to remote endpoint)
void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition)
{
if (delivery)
@@ -89,6 +92,7 @@ void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition
}
+// get the current local disposition
uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery)
{
if (!delivery)
@@ -169,8 +173,9 @@ bool qdr_delivery_presettled(const qdr_delivery_t *delivery)
}
-void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition,
- bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given)
+// remote endpoint modified its disposition and/or settlement
+void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition,
+ bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given)
{
qdr_action_t *action = qdr_action(qdr_update_delivery_CT, "update_delivery");
action->args.delivery.delivery = delivery;
@@ -420,18 +425,21 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de
}
-static bool qdr_delivery_has_peer_CT(qdr_delivery_t *dlv)
+
+// Returns the number of peers for dlv
+int qdr_delivery_peer_count_CT(const qdr_delivery_t *dlv)
{
- return dlv->peer || DEQ_SIZE(dlv->peers) > 0;
+ return (dlv->peer) ? 1 : DEQ_SIZE(dlv->peers);
}
+
void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv)
{
// If there is no delivery or a peer, we cannot link each other.
if (!in_dlv || !out_dlv)
return;
- if (!qdr_delivery_has_peer_CT(in_dlv)) {
+ if (qdr_delivery_peer_count_CT(in_dlv) == 0) {
// This is the very first peer. Link them up.
assert(!out_dlv->peer);
in_dlv->peer = out_dlv;
@@ -484,6 +492,9 @@ void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_del
peer_ref = DEQ_NEXT(peer_ref);
}
assert(peer_ref != 0);
+ if (peer_ref == dlv->next_peer_ref)
+ // ok to unlink peers while iterating over them
+ dlv->next_peer_ref = DEQ_NEXT(peer_ref);
qdr_del_delivery_ref(&dlv->peers, peer_ref);
}
@@ -498,6 +509,9 @@ void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_del
peer_ref = DEQ_NEXT(peer_ref);
}
assert(peer_ref != 0);
+ if (peer_ref == peer->next_peer_ref)
+ // ok to unlink peers while iterating over them
+ peer->next_peer_ref = DEQ_NEXT(peer_ref);
qdr_del_delivery_ref(&peer->peers, peer_ref);
}
@@ -509,7 +523,7 @@ void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_del
qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv)
{
// What if there are no peers for this delivery?
- if (!qdr_delivery_has_peer_CT(dlv))
+ if (qdr_delivery_peer_count_CT(dlv) == 0)
return 0;
if (dlv->peer) {
@@ -558,39 +572,95 @@ void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *dlv, const char *l
}
+// the remote endpoint has change the state (disposition) or settlement for the
+// delivery. Update the local state/settlement accordingly.
+//
static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
- qdr_delivery_t *dlv = action->args.delivery.delivery;
- qdr_delivery_t *peer = qdr_delivery_first_peer_CT(dlv);
- bool push = false;
- bool peer_moved = false;
- bool dlv_moved = false;
- uint64_t disp = action->args.delivery.disposition;
- bool settled = action->args.delivery.settled;
- qdr_error_t *error = action->args.delivery.error;
- bool error_unassigned = true;
+ if (discard)
+ return;
+
+ qdr_delivery_t *dlv = action->args.delivery.delivery;
+ qdr_delivery_t *peer = qdr_delivery_first_peer_CT(dlv);
+ uint64_t new_disp = action->args.delivery.disposition;
+ bool settled = action->args.delivery.settled;
+ qdr_error_t *error = action->args.delivery.error;
+ bool free_error = true;
+
+ if (dlv->multicast) {
+ //
+ // remote state change for *inbound* multicast delivery,
+ // update downstream *outbound* peers
+ //
+ qdr_delivery_mcast_inbound_update_CT(core, dlv, new_disp, settled);
+
+ } else if (peer && peer->multicast) {
+ //
+ // remote state change for an *outbound* delivery to a multicast address,
+ // propagate upstream to *inbound* delivery (peer)
+ //
+ // coverity[swapped_arguments]
+ qdr_delivery_mcast_outbound_update_CT(core, peer, dlv, new_disp, settled);
+
+ } else {
+ //
+ // Anycast forwarding
+ //
+ free_error = !qdr_delivery_anycast_update_CT(core, dlv, peer, new_disp, settled, error);
+ }
+
+ //
+ // Release the action reference, possibly freeing the delivery
+ //
+ qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from action");
+
+ if (free_error)
+ qdr_error_free(error);
+}
+
+
+// The remote delivery state (disposition) and/or remote settlement for an
+// anycast delivery has changed. Propagate the changes to its peer delivery.
+//
+// returns true if ownership of error parameter is taken (do not free it)
+//
+static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
+ qdr_delivery_t *peer, uint64_t new_disp, bool settled,
+ qdr_error_t *error)
+{
+ bool push = false;
+ bool peer_moved = false;
+ bool dlv_moved = false;
+ bool error_assigned = false;
+ qdr_link_t *dlink = qdr_delivery_link(dlv);
+
+ assert(!dlv->multicast);
+ assert(!peer || !peer->multicast);
- qdr_link_t *dlv_link = qdr_delivery_link(dlv);
- qdr_link_t *peer_link = qdr_delivery_link(peer);
+ if (peer)
+ qdr_delivery_incref(peer, "qdr_delivery_anycast_update_CT - prevent peer from being freed");
//
- // Logic:
+ // Non-multicast Logic:
//
- // If disposition has changed and there is a peer link, set the disposition of the peer
- // If settled, the delivery must be unlinked and freed.
- // If settled and there is a peer, the peer shall be settled and unlinked. It shall not
- // be freed until the connection-side thread settles the PN delivery.
+ // If disposition has changed and there is a peer link, set the disposition
+ // of the peer
+ // If remote settled, the delivery must be unlinked and freed.
+ // If remote settled and there is a peer, the peer shall be settled and
+ // unlinked. It shall not be freed until the connection-side thread
+ // settles the PN delivery.
//
- if (disp != dlv->disposition) {
+ if (new_disp != dlv->remote_disposition) {
//
- // Disposition has changed, propagate the change to the peer delivery.
+ // Remote disposition has changed, propagate the change to the peer
+ // delivery local disposition.
//
- dlv->disposition = disp;
+ dlv->remote_disposition = new_disp;
if (peer) {
- peer->disposition = disp;
+ peer->disposition = new_disp;
peer->error = error;
push = true;
- error_unassigned = false;
+ error_assigned = true;
qdr_delivery_copy_extension_state(dlv, peer, false);
}
}
@@ -598,41 +668,290 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
if (settled) {
if (peer) {
peer->settled = true;
- if (peer_link) {
+ if (qdr_delivery_link(peer)) {
peer_moved = qdr_delivery_settled_CT(core, peer);
- if (peer_moved)
- push = true;
}
qdr_delivery_unlink_peers_CT(core, dlv, peer);
}
- if (dlv_link)
+ if (dlink)
dlv_moved = qdr_delivery_settled_CT(core, dlv);
}
//
// If the delivery's link has a core endpoint, notify the endpoint of the update
//
- if (dlv_link && dlv_link->core_endpoint)
- qdrc_endpoint_do_update_CT(core, dlv_link->core_endpoint, dlv, settled);
+ if (dlink && dlink->core_endpoint)
+ qdrc_endpoint_do_update_CT(core, dlink->core_endpoint, dlv, settled);
- if (push)
+ if (push || peer_moved)
qdr_delivery_push_CT(core, peer);
//
- // Release the action reference, possibly freeing the delivery
- //
- qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from action");
-
- //
- // Release the unsettled references if the deliveries were moved
+ // Release the unsettled references if the deliveries were moved/settled
//
if (dlv_moved)
- qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - removed from unsettled (1)");
+ qdr_delivery_decref_CT(core, dlv, "qdr_delivery_anycast_update CT - dlv removed from unsettled");
if (peer_moved)
- qdr_delivery_decref_CT(core, peer, "qdr_update_delivery_CT - removed from unsettled (2)");
- if (error_unassigned)
- qdr_error_free(error);
+ qdr_delivery_decref_CT(core, peer, "qdr_delivery_anycast_update_CT - peer removed from unsettled");
+ if (peer)
+ qdr_delivery_decref_CT(core, peer, "qdr_delivery_anycast_update_CT - allow free of peer");
+
+ return error_assigned;
+}
+
+
+// The remote delivery state (disposition) and/or remote settlement for an
+// incoming multicast delivery has changed. Propagate the changes "downstream"
+// to the outbound peers. Once all peers have settled then settle the in_dlv
+//
+void qdr_delivery_mcast_inbound_update_CT(qdr_core_t *core, qdr_delivery_t *in_dlv,
+ uint64_t new_disp, bool settled)
+{
+ if (!in_dlv)
+ return;
+
+ bool update_disp = new_disp && in_dlv->remote_disposition != new_disp;
+
+ assert(in_dlv->multicast); // expect in_dlv to be the inbound delivery
+
+ qd_log(core->log, QD_LOG_TRACE,
+ "Remote updated mcast delivery (%p) disp=0x%"PRIx64" settled=%s",
+ in_dlv, new_disp, (settled) ? "True" : "False");
+
+ if (update_disp)
+ in_dlv->remote_disposition = new_disp;
+
+ qdr_delivery_t *out_peer = qdr_delivery_first_peer_CT(in_dlv);
+ while (out_peer) {
+ bool push = false;
+ bool moved = false;
+ bool unlink = false;
+
+ //
+ // AMQP 1.0 allows the sender to specify a disposition
+ // so forward it along
+ //
+ if (update_disp && out_peer->disposition != new_disp) {
+ out_peer->disposition = new_disp;
+ push = true;
+ // extension state/error ignored, not sure how
+ // that can be supported for mcast...
+ }
+
+ //
+ // the sender has settled
+ //
+ if (settled) {
+ out_peer->settled = true;
+ if (qdr_delivery_link(out_peer)) {
+ moved = qdr_delivery_settled_CT(core, out_peer);
+ }
+ unlink = true;
+ }
+
+ if (push || moved) {
+ qdr_delivery_push_CT(core, out_peer);
+ }
+
+ if (moved) {
+ qdr_delivery_decref_CT(core, out_peer,
+ "qdr_delivery_mcast_inbound_update_CT - removed out_peer from unsettled");
+ }
+
+ qd_log(core->log, QD_LOG_TRACE,
+ "Updating mcast delivery (%p) out peer (%p) updated disp=%s settled=%s",
+ in_dlv, out_peer, (push) ? "True" : "False",
+ (unlink) ? "True" : "False");
+
+ if (unlink) {
+ qdr_delivery_unlink_peers_CT(core, in_dlv, out_peer); // may free out_peer!
+ }
+
+ out_peer = qdr_delivery_next_peer_CT(in_dlv);
+ }
+
+ if (settled) {
+ assert(qdr_delivery_peer_count_CT(in_dlv) == 0);
+ in_dlv->settled = true;
+ if (qdr_delivery_settled_CT(core, in_dlv)) {
+ qdr_delivery_decref_CT(core, in_dlv,
+ "qdr_delivery_mcast_inbound_update CT - in_dlv removed from unsettled");
+ }
+ }
+}
+
+
+// An outgoing peer delivery of an incoming multicast delivery has settled.
+// Settle the inbound delivery after all of its outbound deliveries
+// have been settled.
+//
+// Note: this call may free either in_dlv or out_dlv by unlinking them. The
+// caller must increment the reference count for these deliveries if they are
+// to be referenced after this call.
+//
+// moved: set to true if in_dlv has been removed from the unsettled list
+// return: true if in_dlv has been settled
+//
+static bool qdr_delivery_mcast_outbound_settled_CT(qdr_core_t *core, qdr_delivery_t *in_dlv,
+ qdr_delivery_t *out_dlv, bool *moved)
+{
+ bool push = false;
+ *moved = false;
+
+ assert(in_dlv->multicast && out_dlv->peer == in_dlv);
+ assert(qdr_delivery_peer_count_CT(out_dlv) == 1);
+
+ int peer_count = qdr_delivery_peer_count_CT(in_dlv);
+
+ if (peer_count == 1) {
+ //
+ // This out_dlv is the last outgoing peer so
+ // we can now settle in_dlv
+ //
+ in_dlv->settled = true;
+ push = true;
+ if (qdr_delivery_link(in_dlv)) {
+ *moved = qdr_delivery_settled_CT(core, in_dlv);
+ }
+
+ qd_log(core->log, QD_LOG_TRACE,
+ "mcast delivery (%p) has settled, disp=0x%"PRIx64,
+ in_dlv, in_dlv->disposition);
+ } else {
+
+ qd_log(core->log, QD_LOG_TRACE,
+ "mcast delivery (%p) out peer (%p) has settled, remaining peers=%d",
+ in_dlv, out_dlv, peer_count - 1);
+ }
+
+ // now settle the peer itself and remove it from link unsettled list
+
+ out_dlv->settled = true;
+ if (qdr_delivery_settled_CT(core, out_dlv)) {
+ qdr_delivery_decref_CT(core, out_dlv, "qdr_delivery_mcast_outbound_settled_CT - out_dlv removed from unsettled");
+ }
+
+ // do this last since it may free either dlv or out_dlv:
+ qdr_delivery_unlink_peers_CT(core, in_dlv, out_dlv);
+
+ return push;
+}
+
+
+// true if delivery state d is a terminal state as defined by AMQP 1.0
+//
+#define IS_TERMINAL(d) (PN_ACCEPTED <= (d) && (d) <= PN_MODIFIED)
+
+
+// an outbound mcast delivery has changed its remote state (disposition)
+// propagate the change back "upstream" to the inbound delivery
+//
+// returns true if dlv disposition has been updated
+//
+static bool qdr_delivery_mcast_outbound_disposition_CT(qdr_core_t *core, qdr_delivery_t *in_dlv,
+ qdr_delivery_t *out_dlv, uint64_t new_disp)
+{
+ // The AMQP 1.0 spec does not define a way to propagate disposition
+ // back to the sender in the case of unsettled multicast. In the
+ // case of multiple different terminal states we have to reconcile
+ // them. We assign an ad hoc priority to each terminal value and
+ // set the final disposition to the highest priority returned
+ // across all receivers.
+ static const int priority[] = {
+ 2, // Accepted
+ 3, // Rejected - highest because reject is a hard error
+ 0, // Released
+ 1, // Modified
+ };
+ bool push = false;
+
+ if (!in_dlv || !out_dlv)
+ return push;
+
+ assert(in_dlv->multicast && out_dlv->peer == in_dlv);
+ assert(qdr_delivery_peer_count_CT(out_dlv) == 1);
+
+ if (new_disp == 0x33) { // 0x33 == Declared
+ // hack alert - the transaction section of the AMQP 1.0 spec
+ // defines the Declared outcome (0x33) terminal state.
+ qd_log(core->log, QD_LOG_WARNING,
+ "Transactions are not supported for multicast messages");
+ new_disp = PN_REJECTED;
+ }
+
+ out_dlv->remote_disposition = new_disp;
+
+ if (IS_TERMINAL(new_disp)) {
+ // our mcast impl ignores non-terminal outcomes
+
+ qd_log(core->log, QD_LOG_TRACE,
+ "mcast delivery (%p) out peer (%p) disp updated: 0x%"PRIx64,
+ in_dlv, out_dlv, new_disp);
+
+ if (in_dlv->mcast_disposition == 0) {
+ in_dlv->mcast_disposition = new_disp;
+ } else {
+ int old_priority = priority[in_dlv->mcast_disposition - PN_ACCEPTED];
+ int new_priority = priority[new_disp - PN_ACCEPTED];
+ if (new_priority > old_priority)
+ in_dlv->mcast_disposition = new_disp;
+ }
+
+ // wait until all peers have set terminal state before setting it on
+ // the in_dlv
+ //
+ qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
+ while (peer) {
+ if (!IS_TERMINAL(peer->remote_disposition)) {
+ break;
+ }
+ peer = qdr_delivery_next_peer_CT(in_dlv);
+ }
+
+ if (!peer) { // all peers have set a terminal state
+ // TODO(kgiusti) what about error parameter?
+ in_dlv->disposition = in_dlv->mcast_disposition;
+ push = true;
+ qd_log(core->log, QD_LOG_TRACE,
+ "mcast delivery (%p) terminal state set: 0x%"PRIx64,
+ in_dlv, in_dlv->disposition);
+ }
+ }
+
+ return push;
+}
+
+
+//
+// The delivery state (disposition) and/or settlement state for a downstream
+// peer delivery of a multicast inbound delivery has changed. Update the
+// inbound delivery to reflect these changes.
+//
+// Note: if settled is true then in_dlv or out_dlv *may* be released. Callers
+// should increment their reference counts if either of these deliveries are
+// referenced after this call.
+//
+void qdr_delivery_mcast_outbound_update_CT(qdr_core_t *core,
+ qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv,
+ uint64_t new_disp, bool settled)
+{
+ bool dlv_moved = false;
+ bool push_dlv = qdr_delivery_mcast_outbound_disposition_CT(core, in_dlv, out_dlv, new_disp);
+
+ qdr_delivery_incref(in_dlv, "qdr_delivery_mcast_outbound_update_CT - prevent mcast free");
+
+ if (settled && qdr_delivery_mcast_outbound_settled_CT(core, in_dlv, out_dlv, &dlv_moved)) {
+ push_dlv = true;
+ }
+
+ if (push_dlv || dlv_moved) {
+ qdr_delivery_push_CT(core, in_dlv);
+ }
+ if (dlv_moved) {
+ qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_mcast_outbound_update_CT - removed mcast peer from unsettled");
+ }
+ qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_mcast_outbound_update_CT - allow mcast peer free");
}
@@ -706,8 +1025,8 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool
sub = DEQ_HEAD(in_dlv->subscriptions);
}
- // This is a multicast delivery or if this is a presettled multi-frame unicast delivery.
- if (in_dlv->multicast || in_dlv->settled) {
+ // This is a presettled multi-frame unicast delivery.
+ if (in_dlv->settled) {
//
// If a delivery is settled but did not go into one of the lists, that means that it is going nowhere.
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index e3fd88b..9619bfb 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -43,7 +43,9 @@ struct qdr_delivery_t {
qd_message_t *msg;
qd_iterator_t *to_addr;
qd_iterator_t *origin;
- uint64_t disposition;
+ uint64_t disposition; ///< local disposition, will be pushed to remote endpoint
+ uint64_t remote_disposition; ///< disposition as set by remote endpoint
+ uint64_t mcast_disposition; ///< temporary terminal disposition while multicast fwding
uint32_t ingress_time;
pn_data_t *extension_state;
qdr_error_t *error;
@@ -81,6 +83,7 @@ void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *len
bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery);
void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent);
+// note: access to _local_ endpoint disposition (not remote endpoint)
uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery);
void qdr_delivery_set_disposition(qdr_delivery_t *delivery, uint64_t disposition);
@@ -106,9 +109,10 @@ void qdr_delivery_copy_extension_state(qdr_delivery_t *src, qdr_delivery_t *dest
/* release dlv and possibly schedule its deletion on the core thread */
void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const char *label);
-/* handles disposition/settlement changes from remote delivery and schedules Core thread */
-void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disp,
- bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given);
+/* handles delivery disposition and settlement changes from the remote end of
+ * the link, and schedules Core thread */
+void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disp,
+ bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given);
/* invoked when incoming message data arrives - schedule core thread */
qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *delivery);
@@ -144,5 +148,19 @@ void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv);
/* update the links counters with respect to its delivery */
void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+/**
+ * multicast delivery state and settlement management
+ */
+
+// remote updated disposition/settlement for incoming delivery
+void qdr_delivery_mcast_inbound_update_CT(qdr_core_t *core, qdr_delivery_t *in_dlv,
+ uint64_t new_disp, bool settled);
+// remote update disposition/settlement for outgoing delivery
+void qdr_delivery_mcast_outbound_update_CT(qdr_core_t *core, qdr_delivery_t *in_dlv,
+ qdr_delivery_t *out_peer,
+ uint64_t new_disp, bool settled);
+// number of unsettled peer (outbound) deliveries for in_dlv
+int qdr_delivery_peer_count_CT(const qdr_delivery_t *in_dlv);
+
#endif // __delivery_h__
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index cfb9fe4..0a679aa 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -341,7 +341,6 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
bool bypass_valid_origins = addr->forwarder->bypass_valid_origins;
int fanout = 0;
qd_bitmask_t *link_exclusion = !!in_delivery ? in_delivery->link_exclusion : 0;
- bool presettled = !!in_delivery ? in_delivery->settled : true;
bool receive_complete = qd_message_receive_complete(qdr_delivery_message(in_delivery));
uint8_t priority = qdr_forward_effective_priority(msg, addr);
@@ -349,16 +348,6 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
DEQ_INIT(deliver_info_list);
//
- // If the delivery is not presettled, set the settled flag for forwarding so all
- // outgoing deliveries will be presettled.
- //
- // NOTE: This is the only multicast mode currently supported. Others will likely be
- // implemented in the future.
- //
- if (!presettled)
- in_delivery->settled = true;
-
- //
// Forward to local subscribers
//
if (!addr->local || exclude_inprocess) {
@@ -494,26 +483,6 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
deliver_info = DEQ_HEAD(deliver_info_list);
}
- if (in_delivery && !presettled) {
- if (fanout == 0)
- //
- // The delivery was not presettled and it was not forwarded to any
- // destinations, return it to its original unsettled state.
- //
- in_delivery->settled = false;
- else {
- //
- // The delivery was not presettled and it was forwarded to at least
- // one destination. Accept and settle the delivery only if the entire delivery
- // has been received.
- //
- if (receive_complete) {
- in_delivery->disposition = PN_ACCEPTED;
- qdr_delivery_push_CT(core, in_delivery);
- }
- }
- }
-
return fanout;
}
@@ -1000,8 +969,6 @@ int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *
int fanout = 0;
if (addr->forwarder)
fanout = addr->forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control);
-
- // TODO - Deal with this delivery's disposition
return fanout;
}
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 0b642d3..4223c8e 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -124,6 +124,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
}
+// send up to credit pending outgoing deliveries
int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
{
qdr_connection_t *conn = link->conn;
@@ -216,9 +217,13 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
}
sys_mutex_unlock(conn->work_lock);
- // the core will need to update the delivery's disposition
- if (new_disp)
- qdr_delivery_update_disposition(core, dlv, new_disp, true, 0, 0, false);
+ if (new_disp) {
+ // the remote sender-settle-mode forced us to pre-settle the
+ // message. The core needs to know this, so we "fake" receiving a
+ // settle+disposition update from the remote end of the link:
+ qdr_delivery_remote_state_updated(core, dlv, new_disp, true, 0, 0, false);
+ }
+
qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - release local reference - done processing");
} else {
sys_mutex_unlock(conn->work_lock);
@@ -424,38 +429,32 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
// We are trying to forward a delivery on an address that has no outbound paths
// AND the incoming link is targeted (not anonymous).
//
- // We shall release the delivery (it is currently undeliverable).
+ // We shall release the delivery (it is currently undeliverable). Since
+ // there are no receivers we will try to drain credit to prevent the
+ // sender from attempting to send more to this address.
//
if (dlv->settled) {
// Increment the presettled_dropped_deliveries on the in_link
link->dropped_presettled_deliveries++;
if (dlv_link->link_type == QD_LINK_ENDPOINT)
core->dropped_presettled_deliveries++;
-
- //
- // The delivery is pre-settled. Call the qdr_delivery_release_CT so if this delivery is multi-frame
- // we can restart receiving the delivery in case it is stalled. Note that messages will not
- // *actually* be released because these are presettled messages.
- //
- qdr_delivery_release_CT(core, dlv);
- } else {
- qdr_delivery_release_CT(core, dlv);
-
- //
- // Drain credit on the link if it is not in an edge connection
- //
- if (!link->edge)
- qdr_link_issue_credit_CT(core, link, 0, true);
}
//
- // If the distribution is multicast or it's on an edge connection, we will replenish the credit.
- // Otherwise, we will allow the credit to drain.
+ // Note if the message was pre-settled we still call the
+ // qdr_delivery_release_CT so if this delivery is multi-frame we can
+ // restart receiving the delivery in case it is stalled. Note that
+ // messages will not *actually* be released in this case because these
+ // are presettled messages.
//
- if (link->edge || qdr_is_addr_treatment_multicast(link->owning_addr))
- qdr_link_issue_credit_CT(core, link, 1, false);
- else
+ qdr_delivery_release_CT(core, dlv);
+
+ if (!link->edge) {
+ qdr_link_issue_credit_CT(core, link, 0, true); // drain
link->credit_pending++;
+ } else {
+ qdr_link_issue_credit_CT(core, link, 1, false);
+ }
qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (no path)");
return;
@@ -543,7 +542,7 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (1)");
qdr_link_issue_credit_CT(core, link, 1, false);
} else if (fanout > 0) {
- if (dlv->settled || dlv->multicast) {
+ if (dlv->settled) {
//
// The delivery is settled. Keep it off the unsettled list and issue
// replacement credit for it now.
diff --git a/src/router_node.c b/src/router_node.c
index b556c86..1e5ee6a 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -344,8 +344,10 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
pn_link_advance(pn_link);
next_delivery = pn_link_current(pn_link) != 0;
- if (qdr_delivery_disposition(delivery) != 0)
- pn_delivery_update(pnd, qdr_delivery_disposition(delivery));
+ uint64_t local_disp = qdr_delivery_disposition(delivery);
+ if (local_disp != 0) {
+ pn_delivery_update(pnd, local_disp);
+ }
}
if (qd_message_is_discard(msg)) {
@@ -689,12 +691,12 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery
//
// Update the disposition of the delivery
//
- qdr_delivery_update_disposition(router->router_core, delivery,
- pn_delivery_remote_state(pnd),
- pn_delivery_settled(pnd),
- error,
- pn_disposition_data(disp),
- false);
+ qdr_delivery_remote_state_updated(router->router_core, delivery,
+ pn_delivery_remote_state(pnd),
+ pn_delivery_settled(pnd),
+ error,
+ pn_disposition_data(disp),
+ false);
//
// If settled, close out the delivery
@@ -1707,10 +1709,13 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di
//
// If the delivery is still arriving, don't push out the disposition change yet.
//
- if (qd_message_receive_complete(msg))
+ if (qd_message_receive_complete(msg)) {
pn_delivery_update(pnd, disp);
- else
+ } else {
+ // just update the local disposition for now - AMQP_rx_handler will
+ // write this to proton once the message is fully received.
qdr_delivery_set_disposition(dlv, disp);
+ }
}
if (settled) {
diff --git a/tests/system_test.py b/tests/system_test.py
index ca8e1f0..f099560 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -46,7 +46,7 @@ import json
import uuid
import proton
-from proton import Message, Timeout
+from proton import Message
from proton.handlers import MessagingHandler
from proton.utils import BlockingConnection
from proton.reactor import AtLeastOnce, Container
@@ -995,3 +995,14 @@ class MgmtMsgProxy(object):
'name': name}
return Message(properties=ap, reply_to=self.reply_addr)
+
+class TestTimeout(object):
+ """
+ A callback object for MessagingHandler class
+ parent: A MessagingHandler with a timeout() method
+ """
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ self.parent.timeout()
diff --git a/tests/system_tests_multicast.py b/tests/system_tests_multicast.py
index 06a15de..2bab340 100644
--- a/tests/system_tests_multicast.py
+++ b/tests/system_tests_multicast.py
@@ -26,6 +26,7 @@ from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
+import sys
from time import sleep
import unittest2 as unittest
@@ -41,30 +42,24 @@ from system_test import TestCase
from system_test import Qdrouterd
from system_test import main_module
from system_test import TIMEOUT
+from system_test import TestTimeout
-class TestTimeout(object):
- def __init__(self, parent):
- self.parent = parent
-
- def on_timer_task(self, event):
- self.parent.timeout()
-
-
-MAX_FRAME=1025
+MAX_FRAME=1023
W_THREADS=2
+LARGE_PAYLOAD = ("X" * MAX_FRAME) * 19
# check for leaks of the following entities
ALLOC_STATS=["qd_message_t",
"qd_buffer_t",
"qdr_delivery_t"]
+
class MulticastLinearTest(TestCase):
"""
Verify the multicast forwarding logic across a multihop linear router
configuration
"""
-
@classmethod
def setUpClass(cls):
"""Start a router"""
@@ -158,35 +153,33 @@ class MulticastLinearTest(TestCase):
# edge router EA1:
{'router': cls.EA1,
'senders': ['S-EA1-1'],
- 'receivers': [],
- 'subscribers': 1,
+ 'receivers': ['R-EA1-1', 'R-EA1-2'],
+ 'subscribers': 2,
'remotes': 0
},
# Interior router INT_A:
{'router': cls.INT_A,
- 'senders': [],
- # 'receivers': ['R-INT_A-1'],
- 'receivers': [],
- 'subscribers': 0,
+ 'senders': ['S-INT_A-1'],
+ 'receivers': ['R-INT_A-1', 'R-INT_A-2'],
+ 'subscribers': 3,
'remotes': 1,
},
# Interior router INT_B:
{'router': cls.INT_B,
'senders': [],
- 'receivers': [],
- 'subscribers': 1,
- 'remotes': 0,
+ 'receivers': ['R-INT_B-1', 'R-INT_B-2'],
+ 'subscribers': 3,
+ 'remotes': 1,
},
# edge router EB1
{'router': cls.EB1,
'senders': [],
- 'receivers': ['R-EB1-1'],
- 'subscribers': 1,
+ 'receivers': ['R-EB1-1', 'R-EB1-2'],
+ 'subscribers': 2,
'remotes': 0,
}
]
-
def _get_alloc_stats(self, router, stats):
# return a map of the current allocator counters for each entity type
# name in stats
@@ -209,22 +202,6 @@ class MulticastLinearTest(TestCase):
d[name] = list(filter(lambda a: a['typeName'] == name, q))[0]
return d
- def test_51_maybe_presettled_large_msg(self):
- body = " MCAST MAYBE PRESETTLED LARGE "
- body += "X" * (MAX_FRAME * 19)
- for repeat in range(5):
- test = MulticastPresettled(self.config, 100, body, SendMaybePresettled())
- test.run()
- self.assertEqual(None, test.error)
-
- def test_51_presettled_large_msg(self):
- body = " MCAST PRESETTLED LARGE "
- body += "X" * (MAX_FRAME * 23)
- for repeat in range(5):
- test = MulticastPresettled(self.config, 100, body, SendMustBePresettled())
- test.run()
- self.assertEqual(None, test.error)
-
def _check_for_leaks(self):
for r in self.routers:
stats = self._get_alloc_stats(r, ALLOC_STATS)
@@ -233,42 +210,236 @@ class MulticastLinearTest(TestCase):
max_allowed = ((W_THREADS + 1)
* stats[name]['localFreeListMax'])
held = stats[name]['heldByThreads']
- import sys; sys.stdout.flush()
if held >= (2 * max_allowed):
print("OOPS!!! %s: (%s) - held=%d max=%d\n %s\n"
% (r.config.router_id,
name, held, max_allowed, stats))
- import sys; sys.stdout.flush()
+ sys.stdout.flush()
self.assertFalse(held >= (2 * max_allowed))
+ #
+ # run all the negative tests first so that if we screw up the internal
+ # state of the brokers the positive tests will likely fail
+ #
+
+ def _presettled_large_msg_rx_detach(self, config, count, drop_clients):
+ # detach receivers during receive
+ body = " MCAST PRESETTLED LARGE RX DETACH " + LARGE_PAYLOAD
+ test = MulticastPresettledRxFail(config, count,
+ drop_clients,
+ detach=True,
+ body=body)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_01_presettled_large_msg_rx_detach(self):
+ self._presettled_large_msg_rx_detach(self.config, 10, ['R-EA1-1', 'R-EB1-2'])
+ self._presettled_large_msg_rx_detach(self.config, 10, ['R-INT_A-2', 'R-INT_B-1'])
+
+ def _presettled_large_msg_rx_close(self, config, count, drop_clients):
+ # close receiver connections during receive
+ body = " MCAST PRESETTLED LARGE RX CLOSE " + LARGE_PAYLOAD
+ test = MulticastPresettledRxFail(config, count,
+ drop_clients,
+ detach=False,
+ body=body)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_02_presettled_large_msg_rx_close(self):
+ self._presettled_large_msg_rx_close(self.config, 10, ['R-EA1-2', 'R-EB1-1'])
+ self._presettled_large_msg_rx_close(self.config, 10, ['R-INT_A-1', 'R-INT_B-2'])
+
+ def _unsettled_large_msg_rx_detach(self, config, count, drop_clients):
+ # detach receivers during the test
+ body = " MCAST UNSETTLED LARGE RX DETACH " + LARGE_PAYLOAD
+ test = MulticastUnsettledRxFail(self.config, count, drop_clients, detach=True, body=body)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_10_unsettled_large_msg_rx_detach(self):
+ self._unsettled_large_msg_rx_detach(self.config, 10, ['R-EA1-1', 'R-EB1-2'])
+ self._unsettled_large_msg_rx_detach(self.config, 10, ['R-INT_A-2', 'R-INT_B-1'])
+
+ def _unsettled_large_msg_rx_close(self, config, count, drop_clients):
+ # close receiver connections during test
+ body = " MCAST UNSETTLED LARGE RX CLOSE " + LARGE_PAYLOAD
+ test = MulticastUnsettledRxFail(self.config, count, drop_clients, detach=False, body=body)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_11_unsettled_large_msg_rx_close(self):
+ self._unsettled_large_msg_rx_close(self.config, 10, ['R-EA1-2', 'R-EB1-1', ])
+ self._unsettled_large_msg_rx_close(self.config, 10, ['R-INT_A-1', 'R-INT_B-2'])
+
+ #
+ # now the positive tests
+ #
+
+ def test_50_presettled(self):
+ # Simply send a bunch of pre-settled multicast messages
+ body = " MCAST PRESETTLED "
+ test = MulticastPresettled(self.config, 10, body, SendPresettled())
+ test.run()
+
+ def test_51_presettled_mixed_large_msg(self):
+ # Same as above, but large message bodies (mixed sender settle mode)
+ body = " MCAST MAYBE PRESETTLED LARGE " + LARGE_PAYLOAD
+ test = MulticastPresettled(self.config, 11, body, SendMixed())
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_52_presettled_large_msg(self):
+ # Same as above, (pre-settled sender settle mode)
+ body = " MCAST PRESETTLED LARGE " + LARGE_PAYLOAD
+ test = MulticastPresettled(self.config, 13, body, SendPresettled())
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_60_unsettled_3ack(self):
+ # Sender sends unsettled, waits for Outcome from Receiver then settles
+ # Expect all messages to be accepted
+ body = " MCAST UNSETTLED "
+ test = MulticastUnsettled3Ack(self.config, 10, body)
+ test.run()
+ self.assertEqual(None, test.error)
+ self.assertEqual(test.n_outcomes[Delivery.ACCEPTED], test.n_sent)
+
+ def test_61_unsettled_3ack_large_msg(self):
+ # Same as above but with multiframe streaming
+ body = " MCAST UNSETTLED LARGE " + LARGE_PAYLOAD
+ test = MulticastUnsettled3Ack(self.config, 11, body=body)
+ test.run()
+ self.assertEqual(None, test.error)
+ self.assertEqual(test.n_outcomes[Delivery.ACCEPTED], test.n_sent)
+
+ def _unsettled_3ack_outcomes(self,
+ config,
+ count,
+ outcomes,
+ expected):
+ body = " MCAST UNSETTLED 3ACK OUTCOMES " + LARGE_PAYLOAD
+ test = MulticastUnsettled3Ack(self.config,
+ count,
+ body,
+ outcomes=outcomes)
+ test.run()
+ self.assertEqual(None, test.error)
+ self.assertEqual(test.n_outcomes[expected], test.n_sent)
+
+ def test_63_unsettled_3ack_outcomes(self):
+ # Verify the expected outcome is returned to the sender when the
+ # receivers return different outcome values. If no outcome is
+ # specified for a receiver it will default to ACCEPTED
+
+ # expect REJECTED if any reject:
+ self._unsettled_3ack_outcomes(self.config, 3,
+ {'R-EB1-1': Delivery.REJECTED,
+ 'R-EB1-2': Delivery.MODIFIED,
+ 'R-INT_B-2': Delivery.RELEASED},
+ Delivery.REJECTED)
+ self._unsettled_3ack_outcomes(self.config, 3,
+ {'R-EB1-1': Delivery.REJECTED,
+ 'R-INT_B-2': Delivery.RELEASED},
+ Delivery.REJECTED)
+ # expect ACCEPT if no rejects
+ self._unsettled_3ack_outcomes(self.config, 3,
+ {'R-EB1-2': Delivery.MODIFIED,
+ 'R-INT_B-2': Delivery.RELEASED},
+ Delivery.ACCEPTED)
+ # expect MODIFIED over RELEASED
+ self._unsettled_3ack_outcomes(self.config, 3,
+ {'R-EA1-1': Delivery.RELEASED,
+ 'R-EA1-2': Delivery.RELEASED,
+ 'R-INT_A-1': Delivery.RELEASED,
+ 'R-INT_A-2': Delivery.RELEASED,
+ 'R-INT_B-1': Delivery.RELEASED,
+ 'R-INT_B-2': Delivery.RELEASED,
+ 'R-EB1-1': Delivery.RELEASED,
+ 'R-EB1-2': Delivery.MODIFIED},
+ Delivery.MODIFIED)
+
+ # and released only if all released
+ self._unsettled_3ack_outcomes(self.config, 3,
+ {'R-EA1-1': Delivery.RELEASED,
+ 'R-EA1-2': Delivery.RELEASED,
+ 'R-INT_A-1': Delivery.RELEASED,
+ 'R-INT_A-2': Delivery.RELEASED,
+ 'R-INT_B-1': Delivery.RELEASED,
+ 'R-INT_B-2': Delivery.RELEASED,
+ 'R-EB1-1': Delivery.RELEASED,
+ 'R-EB1-2': Delivery.RELEASED},
+ Delivery.RELEASED)
+
+ def test_70_unsettled_1ack(self):
+ # Sender sends unsettled, expects both outcome and settlement from
+ # receiver before sender settles locally
+ body = " MCAST UNSETTLED 1ACK "
+ test = MulticastUnsettled1Ack(self.config, 10, body)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_71_unsettled_1ack_large_msg(self):
+ # Same as above but with multiframe streaming
+ body = " MCAST UNSETTLED 1ACK LARGE " + LARGE_PAYLOAD
+ test = MulticastUnsettled1Ack(self.config, 10, body)
+ test.run()
+ self.assertEqual(None, test.error)
+
def test_999_check_for_leaks(self):
self._check_for_leaks()
-class SendMaybePresettled(LinkOption):
+#
+# Settlement options for Link attach
+#
+
+class SendPresettled(LinkOption):
"""
- Set the default send settlement modes on link negotiation to mixed
+ All messages are sent presettled
+ """
+ def apply(self, link):
+ link.snd_settle_mode = Link.SND_SETTLED
+ link.rcv_settle_mode = Link.RCV_FIRST
+
+
+class SendMixed(LinkOption):
+ """
+ Messages may be sent unsettled or settled
"""
def apply(self, link):
link.snd_settle_mode = Link.SND_MIXED
link.rcv_settle_mode = Link.RCV_FIRST
-class SendMustBePresettled(LinkOption):
+class Link1Ack(LinkOption):
"""
- Set the default send settlement modes on a link to presettled
+ Messages will be sent unsettled
"""
def apply(self, link):
- link.snd_settle_mode = Link.SND_SETTLED
+ link.snd_settle_mode = Link.SND_UNSETTLED
link.rcv_settle_mode = Link.RCV_FIRST
+class Link3Ack(LinkOption):
+ """
+ Messages will be sent unsettled and the receiver will wait for sender to
+ settle first.
+ """
+ def apply(self, link):
+ link.snd_settle_mode = Link.SND_UNSETTLED
+ link.rcv_settle_mode = Link.RCV_SECOND
+
+
class MulticastBase(MessagingHandler):
+ """
+ Common multicast boilerplate code
+ """
def __init__(self, config, count, body, topic=None, **handler_kwargs):
super(MulticastBase, self).__init__(**handler_kwargs)
self.msg_count = count
self.config = config
- self.topic = topic or "whatevahcast/test"
+ self.topic = topic or "multicast/test"
self.body = body
# totals
@@ -295,12 +466,6 @@ class MulticastBase(MessagingHandler):
# count per outcome
self.n_outcomes = {}
- # self.c_accepted = {}
- # self.c_released = {}
- # self.c_rejected = {}
- # self.c_modified = {}
- # self.c_settled = {}
-
self.error = None
self.timers = []
self.reactor = None
@@ -402,7 +567,6 @@ class MulticastBase(MessagingHandler):
def on_settled(self, event):
self.n_settled += 1
- name = event.link.name
def run(self):
Container(self).run()
@@ -460,9 +624,9 @@ class MulticastPresettled(MulticastBase):
def check_if_done(self):
# wait for all present receivers to receive all messages
- # and for all received messagest to be settled by the
+ # and for all received messages to be settled by the
# sender
- to_rcv = self.n_senders * self.msg_count
+ to_rcv = self.n_senders * self.msg_count * self.n_receivers
if to_rcv == self.n_received and not self.unsettled_deliveries:
self.done()
@@ -470,6 +634,8 @@ class MulticastPresettled(MulticastBase):
super(MulticastPresettled, self).on_message(event)
if event.receiver:
if not event.delivery.settled:
+ # it may be that settle will come after on_message
+ # so track that here
event.delivery.update(Delivery.ACCEPTED)
self.unexpected_unsettled += 1
tag = str(event.delivery.tag)
@@ -488,6 +654,7 @@ class MulticastPresettled(MulticastBase):
self.sender_settled += 1
tag = str(event.delivery.tag)
try:
+ # got a delayed settle
self.unsettled_deliveries[tag] -= 1
if self.unsettled_deliveries[tag] == 0:
del self.unsettled_deliveries[tag]
@@ -496,5 +663,189 @@ class MulticastPresettled(MulticastBase):
self.check_if_done()
+class MulticastPresettledRxFail(MulticastPresettled):
+ """
+ Spontaineously close a receiver or connection on message received
+ """
+ def __init__(self, config, count, drop_clients, detach, body):
+ super(MulticastPresettledRxFail, self).__init__(config, count, body, SendPresettled())
+ self.drop_clients = drop_clients
+ self.detach = detach
+
+ def check_if_done(self):
+ # Verify each receiver got the expected number of messages.
+ # Avoid waiting for dropped receivers.
+ done = True
+ to_rcv = self.n_senders * self.msg_count
+ for name, count in self.c_received.items():
+ if name not in self.drop_clients:
+ if count != to_rcv:
+ done = False
+ if done:
+ self.done()
+
+ def on_message(self, event):
+ # close the receiver on arrival of the first message
+ r_name = event.receiver.name
+ if r_name in self.drop_clients:
+ if self.detach:
+ if event.receiver.state & Link.LOCAL_ACTIVE:
+ event.receiver.close()
+ elif event.connection.state & Connection.LOCAL_ACTIVE:
+ event.connection.close()
+ super(MulticastPresettledRxFail, self).on_message(event)
+
+
+class MulticastUnsettled3Ack(MulticastBase):
+ """
+ Send count messages per sender, senders wait for terminal outcome from
+ receivers before settling
+ """
+ def __init__(self, config, count, body, outcomes=None):
+ pfetch = int((count + 1)/2)
+ super(MulticastUnsettled3Ack, self).__init__(config,
+ count,
+ body,
+ prefetch=pfetch,
+ auto_accept=False,
+ auto_settle=False)
+ self.outcomes = outcomes or {}
+
+ def create_receiver(self, container, conn, source, name):
+ return container.create_receiver(conn, source=source, name=name,
+ options=Link3Ack())
+
+ def create_sender(self, container, conn, target, name):
+ return container.create_sender(conn, target=target, name=name,
+ options=Link3Ack())
+
+ def do_send(self, sender):
+ for i in range(self.msg_count):
+ msg = Message(body=" %s -> %s:%s" % (sender.name, i, self.body))
+ dlv = sender.send(msg)
+ self.n_sent += 1
+
+ def on_message(self, event):
+ # receiver: send outcome do not settle
+ super(MulticastUnsettled3Ack, self).on_message(event)
+ if event.delivery.settled:
+ self.error = "Unexpected pre-settled message received!"
+ self.done()
+ return
+ r_name = event.receiver.name
+ outcome = self.outcomes.get(r_name, Delivery.ACCEPTED)
+ event.delivery.update(outcome)
+ if event.receiver.credit == 0:
+ event.receiver.flow(1)
+
+ def on_settled(self, event):
+ super(MulticastUnsettled3Ack, self).on_settled(event)
+ event.delivery.settle()
+ self.check_if_done()
+
+ def on_accepted(self, event):
+ super(MulticastUnsettled3Ack, self).on_accepted(event)
+ event.delivery.settle()
+ self.check_if_done()
+
+ def on_released(self, event):
+ super(MulticastUnsettled3Ack, self).on_released(event)
+ event.delivery.settle()
+ self.check_if_done()
+
+ def on_modified(self, event):
+ super(MulticastUnsettled3Ack, self).on_modified(event)
+ event.delivery.settle()
+ self.check_if_done()
+
+ def on_rejected(self, event):
+ super(MulticastUnsettled3Ack, self).on_rejected(event)
+ event.delivery.settle()
+ self.check_if_done()
+
+ def check_if_done(self):
+ to_send = self.msg_count * self.n_senders
+ to_rcv = to_send * self.n_receivers
+
+ n_outcomes = (self.n_accepted + self.n_rejected
+ + self.n_modified + self.n_released)
+
+ # expect senders to see settlement
+ if (self.n_sent == to_send
+ and self.n_received == to_rcv
+ and n_outcomes == to_send
+ and self.n_settled == to_rcv):
+ self.done()
+
+
+class MulticastUnsettled1Ack(MulticastUnsettled3Ack):
+ """
+ Sender sends unsettled, the receiver sets outcome and immediately settles
+ """
+ def __init__(self, config, count, body, outcomes=None):
+ super(MulticastUnsettled1Ack, self).__init__(config,
+ count,
+ outcomes)
+
+ def create_receiver(self, container, conn, source, name):
+ return container.create_receiver(conn, source=source, name=name,
+ options=Link1Ack())
+
+ def create_sender(self, container, conn, target, name):
+ return container.create_sender(conn, target=target, name=name,
+ options=Link1Ack())
+
+ def on_message(self, event):
+ # receiver: send outcome and settle
+ super(MulticastUnsettled1Ack, self).on_message(event)
+ event.delivery.settle()
+
+ def check_if_done(self):
+ to_send = self.msg_count * self.n_senders
+ to_rcv = to_send * self.n_receivers
+
+ n_outcomes = (self.n_accepted + self.n_rejected
+ + self.n_modified + self.n_released)
+
+ # expect sender to see settlement
+ if (self.n_received == to_rcv
+ and n_outcomes == to_send
+ and self.n_settled == to_send):
+ self.done()
+
+
+class MulticastUnsettledRxFail(MulticastUnsettled3Ack):
+ """
+ Spontaineously close a receiver or connection on message received
+ """
+ def __init__(self, config, count, drop_clients, detach, body):
+ super(MulticastUnsettledRxFail, self).__init__(config, count, body)
+ self.drop_clients = drop_clients
+ self.detach = detach
+
+ def check_if_done(self):
+ # Verify each receiver got the expected number of messages.
+ # Avoid waiting for dropped receivers.
+ done = True
+ to_rcv = self.n_senders * self.msg_count
+ for name, count in self.c_received.items():
+ if name not in self.drop_clients:
+ if count != to_rcv:
+ done = False
+ if done:
+ self.done()
+
+ def on_message(self, event):
+ # close the receiver on arrival of the first message
+ r_name = event.receiver.name
+ if r_name in self.drop_clients:
+ if self.detach:
+ if event.receiver.state & Link.LOCAL_ACTIVE:
+ event.receiver.close()
+ elif event.connection.state & Connection.LOCAL_ACTIVE:
+ event.connection.close()
+ super(MulticastUnsettledRxFail, self).on_message(event)
+
+
if __name__ == '__main__':
unittest.main(main_module())
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 79f6a15..a189edb 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -670,17 +670,25 @@ class MessageAnnotaionsPreExistingOverride(MessagingHandler):
class SemanticsMulticast(MessagingHandler):
def __init__(self, address):
- super(SemanticsMulticast, self).__init__()
+ """
+ Verify that for every 1 unsettled mcast message received, N messages are sent
+ out (where N == number of receivers). Assert that multiple received
+ dispositions are summarized to send out one disposition.
+ """
+ super(SemanticsMulticast, self).__init__(auto_accept=False)
self.address = address
self.dest = "multicast.2"
self.error = None
self.n_sent = 0
+ self.n_settled = 0
self.count = 3
self.n_received_a = 0
self.n_received_b = 0
self.n_received_c = 0
+ self.n_accepts = 0
self.timer = None
- self.conn = None
+ self.conn_1 = None
+ self.conn_2 = None
self.sender = None
self.receiver_a = None
self.receiver_b = None
@@ -688,22 +696,29 @@ class SemanticsMulticast(MessagingHandler):
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
- self.conn = event.container.connect(self.address)
- self.sender = event.container.create_sender(self.conn, self.dest)
- self.receiver_a = event.container.create_receiver(self.conn, self.dest, name="A")
- self.receiver_b = event.container.create_receiver(self.conn, self.dest, name="B")
- self.receiver_c = event.container.create_receiver(self.conn, self.dest, name="C")
+ self.conn_1 = event.container.connect(self.address)
+ self.conn_2 = event.container.connect(self.address)
+ self.sender = event.container.create_sender(self.conn_1, self.dest)
+ self.receiver_a = event.container.create_receiver(self.conn_2, self.dest, name="A")
+ self.receiver_b = event.container.create_receiver(self.conn_1, self.dest, name="B")
+ self.receiver_c = event.container.create_receiver(self.conn_2, self.dest, name="C")
def timeout(self):
self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
(self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
- self.conn.close()
+ self.conn_1.close()
+ self.conn_2.close()
def check_if_done(self):
- if self.n_received_a + self.n_received_b + self.n_received_c == self.count and \
- self.n_received_a == self.n_received_b and self.n_received_c == self.n_received_b:
+ c = self.n_received_a + self.n_received_b + self.n_received_c
+ if (c == self.count
+ and self.n_received_a == self.n_received_b
+ and self.n_received_c == self.n_received_b
+ and self.n_accepts == self.n_sent
+ and self.n_settled == self.count):
self.timer.cancel()
- self.conn.close()
+ self.conn_1.close()
+ self.conn_2.close()
def on_sendable(self, event):
if self.n_sent == 0:
@@ -718,8 +733,14 @@ class SemanticsMulticast(MessagingHandler):
self.n_received_b += 1
if event.receiver == self.receiver_c:
self.n_received_c += 1
+ event.delivery.update(Delivery.ACCEPTED)
def on_accepted(self, event):
+ self.n_accepts += 1
+ event.delivery.settle()
+
+ def on_settled(self, event):
+ self.n_settled += 1
self.check_if_done()
def run(self):
@@ -1177,7 +1198,7 @@ class MulticastUnsettled ( MessagingHandler ) :
n_messages,
n_receivers
) :
- super ( MulticastUnsettled, self ) . __init__ ( prefetch = n_messages )
+ super ( MulticastUnsettled, self ) . __init__ (auto_accept=False, prefetch=n_messages)
self.addr = addr
self.n_messages = n_messages
self.n_receivers = n_receivers
@@ -1213,7 +1234,7 @@ class MulticastUnsettled ( MessagingHandler ) :
self.sender = event.container.create_sender ( self.send_conn, self.addr )
for i in range ( self.n_receivers ) :
- rcvr = event.container.create_receiver ( self.send_conn, self.addr, name = "receiver_" + str(i) )
+ rcvr = event.container.create_receiver ( self.recv_conn, self.addr, name = "receiver_" + str(i) )
self.receivers.append ( rcvr )
rcvr.flow ( self.n_messages )
self.n_received.append ( 0 )
@@ -2358,8 +2379,12 @@ class UnavailableReceiver(UnavailableBase):
self.receiver = event.container.create_receiver(self.conn, self.dest, name=self.link_name)
class MulticastUnsettledTest(MessagingHandler):
+ """
+ Send N unsettled multicast messages to 2 receivers. Ensure sender is
+ notified of settlement and disposition changes from the receivers.
+ """
def __init__(self, address):
- super(MulticastUnsettledTest, self).__init__(prefetch=0)
+ super(MulticastUnsettledTest, self).__init__(auto_accept=False, prefetch=0)
self.address = address
self.dest = "multicast.MUtest"
self.error = None
@@ -2380,9 +2405,14 @@ class MulticastUnsettledTest(MessagingHandler):
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
self.conn = event.container.connect(self.address)
- self.sender = event.container.create_sender(self.conn, self.dest)
- self.receiver1 = event.container.create_receiver(self.conn, self.dest, name="A")
- self.receiver2 = event.container.create_receiver(self.conn, self.dest, name="B")
+ self.sender = event.container.create_sender(self.conn, self.dest,
+ options=AtLeastOnce())
+ self.receiver1 = event.container.create_receiver(self.conn, self.dest,
+ name="A",
+ options=AtLeastOnce())
+ self.receiver2 = event.container.create_receiver(self.conn, self.dest,
+ name="B",
+ options=AtLeastOnce());
self.receiver1.flow(self.count)
self.receiver2.flow(self.count)
@@ -2397,8 +2427,10 @@ class MulticastUnsettledTest(MessagingHandler):
self.check_if_done()
def on_message(self, event):
- if not event.delivery.settled:
- self.error = "Received unsettled delivery"
+ if event.delivery.settled:
+ self.error = "Received settled delivery"
+ event.delivery.update(Delivery.ACCEPTED)
+ event.delivery.settle()
self.n_received += 1
self.check_if_done()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org