You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2015/03/10 13:52:46 UTC
svn commit: r1665514 - in /qpid/dispatch/trunk: include/qpid/dispatch/ src/
tests/
Author: kgiusti
Date: Tue Mar 10 12:52:46 2015
New Revision: 1665514
URL: http://svn.apache.org/r1665514
Log:
DISPATCH-99: pluggable forwarding logic
Added:
qpid/dispatch/trunk/src/router_forwarders.c (with props)
Modified:
qpid/dispatch/trunk/include/qpid/dispatch/container.h
qpid/dispatch/trunk/include/qpid/dispatch/iterator.h
qpid/dispatch/trunk/include/qpid/dispatch/router.h
qpid/dispatch/trunk/src/CMakeLists.txt
qpid/dispatch/trunk/src/container.c
qpid/dispatch/trunk/src/iterator.c
qpid/dispatch/trunk/src/lrp.c
qpid/dispatch/trunk/src/python_embedded.c
qpid/dispatch/trunk/src/router_agent.c
qpid/dispatch/trunk/src/router_config.c
qpid/dispatch/trunk/src/router_node.c
qpid/dispatch/trunk/src/router_private.h
qpid/dispatch/trunk/src/router_pynode.c
qpid/dispatch/trunk/src/waypoint.c
qpid/dispatch/trunk/tests/system_tests_one_router.py
qpid/dispatch/trunk/tests/system_tests_two_routers.py
Modified: qpid/dispatch/trunk/include/qpid/dispatch/container.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/container.h?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/container.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/container.h Tue Mar 10 12:52:46 2015
@@ -183,6 +183,7 @@ bool qd_link_drain_changed(qd_link_t *li
* thrown.
*/
qd_delivery_t *qd_delivery(qd_link_t *link, pn_delivery_tag_t tag);
+void qd_delivery_set_undeliverable_LH(qd_delivery_t *delivery);
void qd_delivery_free_LH(qd_delivery_t *delivery, uint64_t final_disposition);
void qd_delivery_link_peers_LH(qd_delivery_t *left, qd_delivery_t *right);
void qd_delivery_unlink_LH(qd_delivery_t *delivery);
Modified: qpid/dispatch/trunk/include/qpid/dispatch/iterator.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/iterator.h?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/iterator.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/iterator.h Tue Mar 10 12:52:46 2015
@@ -169,6 +169,7 @@ void qd_field_iterator_reset(qd_field_it
void qd_address_iterator_reset_view(qd_field_iterator_t *iter,
qd_iterator_view_t view);
+qd_iterator_view_t qd_address_iterator_get_view(const qd_field_iterator_t *iter);
void qd_address_iterator_set_phase(qd_field_iterator_t *iter, char phase);
Modified: qpid/dispatch/trunk/include/qpid/dispatch/router.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/router.h?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/router.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/router.h Tue Mar 10 12:52:46 2015
@@ -32,6 +32,7 @@
#include <qpid/dispatch/iterator.h>
#include <stdbool.h>
+typedef struct qd_router_t qd_router_t;
typedef struct qd_address_t qd_address_t;
typedef uint8_t qd_address_semantics_t;
@@ -96,6 +97,44 @@ typedef uint8_t qd_address_s
#define QD_SEMANTICS_DEFAULT (QD_FANOUT_MULTIPLE | QD_BIAS_NONE | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS)
///@}
+/** Message forwarding descriptor
+ *
+ * Defines a forwarding method that can be associated with an address
+ * (qd_address_t). This method is called for each message that matches the
+ * associated address. The qd_router_forwarder_t is a 'base class' that can be
+ * subclassed to provide a per-forwarder context for custom forwarding
+ * algorithms.
+ */
+typedef struct qd_router_forwarder_t qd_router_forwarder_t;
+struct qd_router_forwarder_t {
+
+ /** forwarding method
+ *
+ * Returns true if the message was successfully forwarded or has been
+ * scheduled to be forwarded at a later time. Returns false if the handler
+ * is unable to forward the message.
+ *
+ * If the message is going to be forwarded at a later time (asynchronous
+ * forwarding), then this method must make a copy of the message.
+ *
+ * NOTE: ** Called with router lock held! **
+ */
+ bool (*forward)(qd_router_forwarder_t *forwarder,
+ qd_router_t *router,
+ qd_message_t *msg,
+ qd_delivery_t *delivery,
+ qd_address_t *addr,
+ qd_field_iterator_t *ingress_iterator,
+ bool is_direct);
+
+ /** release the descriptor
+ *
+ * Called when the associated qd_address_t is freed.
+ * NOTE: ** Called with router lock held! **
+ */
+ void (*release)(qd_router_forwarder_t *forwarder);
+};
+
typedef void (*qd_router_message_cb_t)(void *context, qd_message_t *msg, int link_id);
const char *qd_router_id(const qd_dispatch_t *qd);
@@ -103,17 +142,22 @@ const char *qd_router_id(const qd_dispat
/** Register an address in the router's hash table.
* @param qd Pointer to the dispatch instance.
* @param address String form of address
- * @param handler Callback to be called when a message is received for the address.
+ * @param on_message Optional callback to be called when a message is received
+ * for the address.
+ * @param context Context to be passed to the on_message handler.
* @param semantics Semantics for the address.
* @param global True if the address is global.
- * @param context Context to be passed to the handler.
+ * @param forwarder Optional custom forwarder to use when a message is received
+ * for the address. If null, a default forwarder based on the semantics will
+ * be used.
*/
qd_address_t *qd_router_register_address(qd_dispatch_t *qd,
const char *address,
- qd_router_message_cb_t handler,
+ qd_router_message_cb_t on_message,
+ void *context,
qd_address_semantics_t semantics,
bool global,
- void *context);
+ qd_router_forwarder_t *forwarder);
void qd_router_unregister_address(qd_address_t *address);
@@ -138,6 +182,9 @@ void qd_router_build_node_list(qd_dispat
/** String form of address for logging */
const char* qd_address_logstr(qd_address_t* address);
+/** Retrieve the proper forwarder for a given semantic */
+qd_router_forwarder_t *qd_router_get_forwarder(qd_address_semantics_t s);
+
///@}
#endif
Modified: qpid/dispatch/trunk/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/CMakeLists.txt?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/src/CMakeLists.txt Tue Mar 10 12:52:46 2015
@@ -66,6 +66,7 @@ set(qpid_dispatch_SOURCES
router_agent.c
router_config.c
router_node.c
+ router_forwarders.c
router_pynode.c
schema_enum.c
server.c
Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Tue Mar 10 12:52:46 2015
@@ -860,6 +860,17 @@ qd_delivery_t *qd_delivery(qd_link_t *li
return delivery;
}
+// mark the delivery as 'undeliverable-here' so peers won't re-forward it to
+// us.
+void qd_delivery_set_undeliverable_LH(qd_delivery_t *delivery)
+{
+ if (delivery->pn_delivery) {
+ pn_disposition_t *dp = pn_delivery_local(delivery->pn_delivery);
+ if (dp) {
+ pn_disposition_set_undeliverable(dp, true);
+ }
+ }
+}
void qd_delivery_free_LH(qd_delivery_t *delivery, uint64_t final_disposition)
{
Modified: qpid/dispatch/trunk/src/iterator.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/iterator.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/iterator.c (original)
+++ qpid/dispatch/trunk/src/iterator.c Tue Mar 10 12:52:46 2015
@@ -385,6 +385,12 @@ void qd_address_iterator_reset_view(qd_f
}
+qd_iterator_view_t qd_address_iterator_get_view(const qd_field_iterator_t *iter)
+{
+ return iter->view;
+}
+
+
void qd_address_iterator_set_phase(qd_field_iterator_t *iter, char phase)
{
iter->phase = phase;
Modified: qpid/dispatch/trunk/src/lrp.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/lrp.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/lrp.c (original)
+++ qpid/dispatch/trunk/src/lrp.c Tue Mar 10 12:52:46 2015
@@ -58,10 +58,9 @@ static void qd_lrpc_open_handler(void *c
sys_mutex_lock(router->lock);
qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
- addr = qd_address();
+ addr = qd_address(router_semantics_for_addr(router, iter, '\0', &unused));
qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
- addr->semantics = router_semantics_for_addr(router, iter, '\0', &unused);
qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
}
Modified: qpid/dispatch/trunk/src/python_embedded.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/python_embedded.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/python_embedded.c (original)
+++ qpid/dispatch/trunk/src/python_embedded.c Tue Mar 10 12:52:46 2015
@@ -539,7 +539,8 @@ static int IoAdapter_init(IoAdapter *sel
if (!address) return -1;
qd_error_clear();
self->addr =
- qd_router_register_address(self->qd, address, qd_io_rx_handler, py_semantics, global, self);
+ qd_router_register_address(self->qd, address, qd_io_rx_handler, self,
+ py_semantics, global, 0);
if (qd_error_code()) {
PyErr_SetString(PyExc_RuntimeError, qd_error_message());
return -1;
Modified: qpid/dispatch/trunk/src/router_agent.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_agent.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_agent.c (original)
+++ qpid/dispatch/trunk/src/router_agent.c Tue Mar 10 12:52:46 2015
@@ -60,7 +60,7 @@ qd_error_t qd_entity_refresh_router_addr
uint32_t subCount = DEQ_SIZE(addr->rlinks);
if (DEQ_SIZE(addr->lrps) > 0)
subCount = DEQ_SIZE(addr->lrps);
- if (qd_entity_set_bool(entity, "inProcess", addr->handler != 0) == 0 &&
+ if (qd_entity_set_bool(entity, "inProcess", addr->on_message != 0) == 0 &&
qd_entity_set_long(entity, "subscriberCount", subCount) == 0 &&
qd_entity_set_long(entity, "remoteCount", DEQ_SIZE(addr->rnodes)) == 0 &&
qd_entity_set_long(entity, "deliveriesIngress", addr->deliveries_ingress) == 0 &&
Modified: qpid/dispatch/trunk/src/router_config.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_config.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_config.c (original)
+++ qpid/dispatch/trunk/src/router_config.c Tue Mar 10 12:52:46 2015
@@ -219,10 +219,9 @@ qd_error_t qd_router_configure_lrp(qd_ro
//
qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
- addr = qd_address();
+ addr = qd_address(router_semantics_for_addr(router, iter, '\0', &unused));
qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
- addr->semantics = router_semantics_for_addr(router, iter, '\0', &unused);
qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
}
@@ -272,6 +271,7 @@ void qd_router_configure_free(qd_router_
qd_address_semantics_t router_semantics_for_addr(qd_router_t *router, qd_field_iterator_t *iter,
char in_phase, char *out_phase)
{
+ const qd_iterator_view_t old_view = qd_address_iterator_get_view(iter);
qd_address_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
qd_config_address_t *addr = DEQ_HEAD(router->config_addrs);
@@ -294,5 +294,6 @@ qd_address_semantics_t router_semantics_
}
}
+ qd_address_iterator_reset_view(iter, old_view);
return phase ? phase->semantics : QD_SEMANTICS_DEFAULT;
}
Added: qpid/dispatch/trunk/src/router_forwarders.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_forwarders.c?rev=1665514&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/router_forwarders.c (added)
+++ qpid/dispatch/trunk/src/router_forwarders.c Tue Mar 10 12:52:46 2015
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "dispatch_private.h"
+
+/** defines a default set of forwarding behaviors based on the semantics of an
+ * address.
+ */
+
+
+static void forward_to_direct_subscribers_LH(qd_address_t *addr,
+ qd_delivery_t *delivery,
+ qd_message_t *msg,
+ int *fanout)
+{
+ qd_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
+ while (dest_link_ref) {
+ qd_routed_event_t *re = new_qd_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = qd_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
+
+ (*fanout)++;
+ if (*fanout == 1) {
+ re->delivery = delivery;
+ qd_delivery_fifo_enter_LH(delivery);
+ }
+
+ addr->deliveries_egress++;
+ qd_link_activate(dest_link_ref->link->link);
+
+ //
+ // If the fanout is single, exit the loop here. We only want to send one message copy.
+ //
+ if (QD_FANOUT(addr->semantics) == QD_FANOUT_SINGLE)
+ break;
+
+ dest_link_ref = DEQ_NEXT(dest_link_ref);
+ }
+
+ //
+ // If dest_link_ref is not null here, we exited after sending one message copy.
+ // If the number of local links is greater than one, rotate the head link to the
+ // tail so we balance the message deliveries.
+ //
+ if (dest_link_ref && DEQ_SIZE(addr->rlinks) > 1) {
+ assert(DEQ_HEAD(addr->rlinks) == dest_link_ref);
+ DEQ_REMOVE_HEAD(addr->rlinks);
+ DEQ_INSERT_TAIL(addr->rlinks, dest_link_ref);
+ }
+}
+
+
+static void forward_to_remote_subscribers_LH(qd_router_t *router,
+ qd_address_t *addr,
+ qd_delivery_t *delivery,
+ qd_message_t *msg,
+ int *fanout,
+ qd_field_iterator_t *ingress_iter)
+{
+ //
+ // Get the mask bit associated with the ingress router for the message.
+ // This will be compared against the "valid_origin" masks for each
+ // candidate destination router.
+ //
+ int origin = -1;
+ if (ingress_iter && !(addr->semantics & QD_BYPASS_VALID_ORIGINS)) {
+ qd_address_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH);
+ qd_address_t *origin_addr;
+ qd_hash_retrieve(router->addr_hash, ingress_iter, (void*) &origin_addr);
+ if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) {
+ qd_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes);
+ origin = rref->router->mask_bit;
+ }
+ } else
+ origin = 0;
+
+ //
+ // Forward to the next-hops for remote destinations.
+ //
+ if (origin >= 0) {
+ qd_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
+ qd_router_link_t *dest_link;
+ qd_bitmask_t *link_set = qd_bitmask(0);
+
+ //
+ // Loop over the target nodes for this address. Build a set of outgoing links
+ // for which there are valid targets. We do this to avoid sending more than one
+ // message down a given link. It's possible that there are multiple destinations
+ // for this address that are all reachable over the same link. In this case, we
+ // will send only one copy of the message over the link and allow a downstream
+ // router to fan the message out.
+ //
+ while (dest_node_ref) {
+ if (dest_node_ref->router->next_hop)
+ dest_link = dest_node_ref->router->next_hop->peer_link;
+ else
+ dest_link = dest_node_ref->router->peer_link;
+ if (dest_link && qd_bitmask_value(dest_node_ref->router->valid_origins, origin))
+ qd_bitmask_set_bit(link_set, dest_link->mask_bit);
+ dest_node_ref = DEQ_NEXT(dest_node_ref);
+ }
+
+ //
+ // Send a copy of the message outbound on each identified link.
+ //
+ int link_bit;
+ while (qd_bitmask_first_set(link_set, &link_bit)) {
+ qd_bitmask_clear_bit(link_set, link_bit);
+ dest_link = router->out_links_by_mask_bit[link_bit];
+ if (dest_link) {
+ qd_routed_event_t *re = new_qd_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = qd_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+ (*fanout)++;
+ if (*fanout == 1) {
+ re->delivery = delivery;
+ qd_delivery_fifo_enter_LH(delivery);
+ }
+
+ addr->deliveries_transit++;
+ qd_link_activate(dest_link->link);
+ }
+ }
+
+ qd_bitmask_free(link_set);
+ }
+}
+
+
+/** Multicast forwarder:
+ */
+static bool forwarder_multicast_LH(qd_router_forwarder_t *forwarder,
+ qd_router_t *router,
+ qd_message_t *msg,
+ qd_delivery_t *delivery,
+ qd_address_t *addr,
+ qd_field_iterator_t *ingress_iter,
+ bool is_direct)
+{
+ int fanout = 0;
+ //
+ // Forward to all of the local links receiving this address.
+ //
+ forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
+
+ //
+ // If the address form is direct to this router node, don't relay it on to
+ // any other part of the network.
+ //
+ if (!is_direct)
+ forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
+
+ return fanout != 0;
+}
+
+
+/** Forward using the 'closest' bias:
+ */
+static bool forwarder_anycast_closest_LH(qd_router_forwarder_t *forwarder,
+ qd_router_t *router,
+ qd_message_t *msg,
+ qd_delivery_t *delivery,
+ qd_address_t *addr,
+ qd_field_iterator_t *ingress_iter,
+ bool is_direct)
+{
+ int fanout = 0;
+ //
+ // First, try to find a directly connected consumer for the address. If
+ // there is none, then look for the closest remote consumer.
+ //
+ forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
+ if (fanout == 0 && !is_direct)
+ forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
+
+ return fanout != 0;
+}
+
+
+/** Forwarding using a 'balanced' bias:
+ */
+static bool forwarder_anycast_balanced_LH(qd_router_forwarder_t *forwarder,
+ qd_router_t *router,
+ qd_message_t *msg,
+ qd_delivery_t *delivery,
+ qd_address_t *addr,
+ qd_field_iterator_t *ingress_iter,
+ bool is_direct)
+{
+ int fanout = 0;
+ //
+ // Alternate between looking first for a local consumer and looking first
+ // for a remote consumer.
+ //
+ addr->toggle = !addr->toggle;
+ if (addr->toggle) {
+ forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
+ if (fanout == 0 && !is_direct)
+ forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
+ } else {
+ if (!is_direct)
+ forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
+ if (fanout == 0)
+ forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
+ }
+
+ return fanout != 0;
+}
+
+
+/* release method for default forwarders:
+ */
+static void forwarder_release(qd_router_forwarder_t *forwarder)
+{
+ // no-op - they're static singletons!
+}
+
+
+/* The default forwarders:
+ */
+static qd_router_forwarder_t multicast_forwarder = {
+ forwarder_multicast_LH, /* forward method */
+ forwarder_release,
+};
+static qd_router_forwarder_t anycast_closest_forwarder = {
+ forwarder_anycast_closest_LH, /* forward method */
+ forwarder_release,
+};
+static qd_router_forwarder_t anycast_balanced_forwarder = {
+ forwarder_anycast_balanced_LH, /* forward method */
+ forwarder_release,
+};
+
+
+/** Get the proper default forwarder for an address of the given semantics:
+ */
+qd_router_forwarder_t *qd_router_get_forwarder(qd_address_semantics_t semantics)
+{
+ switch (QD_FANOUT(semantics)) {
+ case QD_FANOUT_MULTIPLE:
+ return &multicast_forwarder;
+ case QD_FANOUT_SINGLE:
+ switch (QD_BIAS(semantics)) {
+ case QD_BIAS_CLOSEST:
+ return &anycast_closest_forwarder;
+ case QD_BIAS_SPREAD:
+ return &anycast_balanced_forwarder;
+ }
+ }
+ assert(false); // invalid semantics? need new forwarder?
+ return 0;
+}
+
+
+
Propchange: qpid/dispatch/trunk/src/router_forwarders.c
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Tue Mar 10 12:52:46 2015
@@ -63,13 +63,17 @@ ALLOC_DEFINE(qd_router_lrp_ref_t);
ALLOC_DEFINE(qd_address_t);
ALLOC_DEFINE(qd_router_conn_t);
-qd_address_t* qd_address() {
+
+qd_address_t* qd_address(qd_address_semantics_t semantics)
+{
qd_address_t* addr = new_qd_address_t();
memset(addr, 0, sizeof(qd_address_t));
DEQ_ITEM_INIT(addr);
DEQ_INIT(addr->lrps);
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
+ addr->semantics = semantics;
+ addr->forwarder = qd_router_get_forwarder(semantics);
return addr;
}
@@ -162,9 +166,10 @@ void qd_router_check_addr(qd_router_t *r
sys_mutex_lock(router->lock);
//
- // If the address has no handlers or destinations, it should be deleted.
+ // If the address has no in-process consumer or destinations, it should be
+ // deleted.
//
- if (addr->handler == 0 &&
+ if (addr->on_message == 0 &&
DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0 &&
!addr->waypoint && !addr->block_deletion)
to_delete = 1;
@@ -646,127 +651,6 @@ static void router_link_route_delivery_L
}
-static void router_forward_to_direct_subscribers_LH(qd_address_t *addr, qd_delivery_t *delivery, qd_message_t *msg, int *fanout)
-{
- qd_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
- while (dest_link_ref) {
- qd_routed_event_t *re = new_qd_routed_event_t();
- DEQ_ITEM_INIT(re);
- re->delivery = 0;
- re->message = qd_message_copy(msg);
- re->settle = 0;
- re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
-
- (*fanout)++;
- if (*fanout == 1) {
- re->delivery = delivery;
- qd_delivery_fifo_enter_LH(delivery);
- }
-
- addr->deliveries_egress++;
- qd_link_activate(dest_link_ref->link->link);
-
- //
- // If the fanout is single, exit the loop here. We only want to send one message copy.
- //
- if (QD_FANOUT(addr->semantics) == QD_FANOUT_SINGLE)
- break;
-
- dest_link_ref = DEQ_NEXT(dest_link_ref);
- }
-
- //
- // If dest_link_ref is not null here, we exited after sending one message copy.
- // If the number of local links is greater than one, rotate the head link to the
- // tail so we balance the message deliveries.
- //
- if (dest_link_ref && DEQ_SIZE(addr->rlinks) > 1) {
- assert(DEQ_HEAD(addr->rlinks) == dest_link_ref);
- DEQ_REMOVE_HEAD(addr->rlinks);
- DEQ_INSERT_TAIL(addr->rlinks, dest_link_ref);
- }
-}
-
-
-static void router_forward_to_remote_subscribers_LH(qd_router_t *router, qd_address_t *addr, qd_delivery_t *delivery,
- qd_message_t *msg, int *fanout, qd_field_iterator_t *ingress_iter)
-{
- //
- // Get the mask bit associated with the ingress router for the message.
- // This will be compared against the "valid_origin" masks for each
- // candidate destination router.
- //
- int origin = -1;
- if (ingress_iter && !(addr->semantics & QD_BYPASS_VALID_ORIGINS)) {
- qd_address_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH);
- qd_address_t *origin_addr;
- qd_hash_retrieve(router->addr_hash, ingress_iter, (void*) &origin_addr);
- if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) {
- qd_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes);
- origin = rref->router->mask_bit;
- }
- } else
- origin = 0;
-
- //
- // Forward to the next-hops for remote destinations.
- //
- if (origin >= 0) {
- qd_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
- qd_router_link_t *dest_link;
- qd_bitmask_t *link_set = qd_bitmask(0);
-
- //
- // Loop over the target nodes for this address. Build a set of outgoing links
- // for which there are valid targets. We do this to avoid sending more than one
- // message down a given link. It's possible that there are multiple destinations
- // for this address that are all reachable over the same link. In this case, we
- // will send only one copy of the message over the link and allow a downstream
- // router to fan the message out.
- //
- while (dest_node_ref) {
- if (dest_node_ref->router->next_hop)
- dest_link = dest_node_ref->router->next_hop->peer_link;
- else
- dest_link = dest_node_ref->router->peer_link;
- if (dest_link && qd_bitmask_value(dest_node_ref->router->valid_origins, origin))
- qd_bitmask_set_bit(link_set, dest_link->mask_bit);
- dest_node_ref = DEQ_NEXT(dest_node_ref);
- }
-
- //
- // Send a copy of the message outbound on each identified link.
- //
- int link_bit;
- while (qd_bitmask_first_set(link_set, &link_bit)) {
- qd_bitmask_clear_bit(link_set, link_bit);
- dest_link = router->out_links_by_mask_bit[link_bit];
- if (dest_link) {
- qd_routed_event_t *re = new_qd_routed_event_t();
- DEQ_ITEM_INIT(re);
- re->delivery = 0;
- re->message = qd_message_copy(msg);
- re->settle = 0;
- re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
-
- (*fanout)++;
- if (*fanout == 1) {
- re->delivery = delivery;
- qd_delivery_fifo_enter_LH(delivery);
- }
-
- addr->deliveries_transit++;
- qd_link_activate(dest_link->link);
- }
- }
-
- qd_bitmask_free(link_set);
- }
-}
-
-
/**
* Inbound Delivery Handler
*/
@@ -814,9 +698,9 @@ static void router_rx_handler(void* cont
//
// Validate the message through the Properties section so we can access the TO field.
//
- qd_message_t *in_process_copy = 0;
- qd_router_message_cb_t handler = 0;
- void *handler_context = 0;
+ qd_message_t *in_process_copy = 0;
+ qd_router_message_cb_t on_message = 0;
+ void *on_message_context = 0;
valid_message = qd_message_check(msg, QD_DEPTH_PROPERTIES);
@@ -824,9 +708,8 @@ static void router_rx_handler(void* cont
qd_parsed_field_t *in_ma = 0;
qd_field_iterator_t *iter = 0;
bool free_iter = true;
- qd_address_t *addr;
- int fanout = 0;
char *to_override = 0;
+ bool forwarded = false;
//
// Only respect the delivery annotations if the message came from another router.
@@ -874,8 +757,6 @@ static void router_rx_handler(void* cont
}
if (iter) {
- qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
-
//
// Note: This function is going to need to be refactored so we can put an
// asynchronous address lookup here. In the event there is a translation
@@ -884,11 +765,9 @@ static void router_rx_handler(void* cont
//
// Note that this lookup is only done for global/mobile class addresses.
//
-
- qd_hash_retrieve(router->addr_hash, iter, (void*) &addr);
- qd_address_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
- int is_local = qd_field_iterator_prefix(iter, local_prefix);
- int is_direct = qd_field_iterator_prefix(iter, direct_prefix);
+ bool is_local;
+ bool is_direct;
+ qd_address_t *addr = qd_router_address_lookup_LH(router, iter, &is_local, &is_direct);
if (free_iter)
qd_field_iterator_free(iter);
@@ -910,80 +789,39 @@ static void router_rx_handler(void* cont
int drop = 0;
qd_field_iterator_t *ingress_iter = router_annotate_message(router, in_ma, msg, &drop, to_override);
- //
- // Forward to the in-process handler for this address if there is one. The
- // actual invocation of the handler will occur later after we've released
- // the lock.
- //
- if (!drop && addr->handler) {
- in_process_copy = qd_message_copy(msg);
- handler = addr->handler;
- handler_context = addr->handler_context;
- addr->deliveries_to_container++;
- }
+ if (!drop) {
+ //
+ // Forward a copy of the message to the in-process endpoint for
+ // this address if there is one. The actual invocation of the
+ // handler will occur later after we've released the lock.
+ //
+ if (addr->on_message) {
+ in_process_copy = qd_message_copy(msg);
+ on_message = addr->on_message;
+ on_message_context = addr->on_message_context;
+ addr->deliveries_to_container++;
+ }
- //
- // If the address form is local (i.e. is prefixed by _local), don't forward
- // outside of the router process.
- //
- if (!drop && !is_local && router->router_mode != QD_ROUTER_MODE_ENDPOINT) {
//
- // Handle the various fanout and bias cases:
+ // If the address form is local (i.e. is prefixed by _local), don't forward
+ // outside of the router process.
//
- if (QD_FANOUT(addr->semantics) == QD_FANOUT_MULTIPLE) {
- //
- // Forward to all of the local links receiving this address.
- //
- router_forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
-
- //
- // If the address form is direct to this router node, don't relay it on
- // to any other part of the network.
- //
- if (!is_direct)
- router_forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
-
- } else if (QD_FANOUT(addr->semantics) == QD_FANOUT_SINGLE) {
- if (QD_BIAS(addr->semantics) == QD_BIAS_CLOSEST) {
- //
- // Bias is "closest". First, try to find a directly connected consumer for the address.
- // If there is none, then look for the closest remote consumer.
- //
- router_forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
- if (fanout == 0 && !is_direct)
- router_forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
-
- } else if (QD_BIAS(addr->semantics) == QD_BIAS_SPREAD) {
- //
- // Bias is "spread". Alternate between looking first for a local consumer and looking
- // first for a remote consumer.
- //
- addr->toggle = !addr->toggle;
- if (addr->toggle) {
- router_forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
- if (fanout == 0 && !is_direct)
- router_forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
- } else {
- if (!is_direct)
- router_forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
- if (fanout == 0)
- router_forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
- }
- }
+ if (!is_local && router->router_mode != QD_ROUTER_MODE_ENDPOINT) {
+ qd_router_forwarder_t *f = addr->forwarder;
+ forwarded = f->forward(f, router, msg, delivery, addr, ingress_iter, is_direct);
}
}
}
+ }
- //
- // In message-routing mode, the handling of the incoming delivery depends on the
- // number of copies of the received message that were forwarded.
- //
- if (handler) {
+ if (!forwarded) {
+ if (on_message)
+ // our local in-process handler will accept it:
qd_delivery_free_LH(delivery, PN_ACCEPTED);
- } else if (fanout == 0) {
- qd_delivery_free_LH(delivery, PN_RELEASED);
- } else if (qd_delivery_settled(delivery)) {
- qd_delivery_free_LH(delivery, 0);
+ else {
+ // no one has accepted it, so inform sender
+ qd_delivery_set_undeliverable_LH(delivery);
+ qd_delivery_free_LH(delivery, PN_MODIFIED);
}
}
} else {
@@ -999,8 +837,8 @@ static void router_rx_handler(void* cont
//
// Invoke the in-process handler now that the lock is released.
//
- if (handler) {
- handler(handler_context, in_process_copy, rlink->mask_bit);
+ if (on_message) {
+ on_message(on_message_context, in_process_copy, rlink->mask_bit);
qd_message_free(in_process_copy);
}
}
@@ -1472,10 +1310,9 @@ static int router_outgoing_link_handler(
qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
- addr = qd_address();
+ addr = qd_address(semantics);
qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
- addr->semantics = semantics;
qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
}
@@ -1917,9 +1754,9 @@ qd_router_t *qd_router(qd_dispatch_t *qd
// locally later in the initialization sequence.
//
if (router->router_mode == QD_ROUTER_MODE_INTERIOR) {
- router->router_addr = qd_router_register_address(qd, "qdrouter", 0, QD_SEMANTICS_ROUTER_CONTROL, false, 0);
- router->routerma_addr = qd_router_register_address(qd, "qdrouter.ma", 0, QD_SEMANTICS_DEFAULT, false, 0);
- router->hello_addr = qd_router_register_address(qd, "qdhello", 0, QD_SEMANTICS_ROUTER_CONTROL, false, 0);
+ router->router_addr = qd_router_register_address(qd, "qdrouter", 0, 0, QD_SEMANTICS_ROUTER_CONTROL, false, 0);
+ router->routerma_addr = qd_router_register_address(qd, "qdrouter.ma", 0, 0, QD_SEMANTICS_DEFAULT, false, 0);
+ router->hello_addr = qd_router_register_address(qd, "qdhello", 0, 0, QD_SEMANTICS_ROUTER_CONTROL, false, 0);
}
//
@@ -1996,10 +1833,11 @@ const char *qd_router_id(const qd_dispat
qd_address_t *qd_router_register_address(qd_dispatch_t *qd,
const char *address,
- qd_router_message_cb_t handler,
+ qd_router_message_cb_t on_message,
+ void *context,
qd_address_semantics_t semantics,
bool global,
- void *context)
+ qd_router_forwarder_t *forwarder)
{
char addr_string[1000];
qd_router_t *router = qd->router;
@@ -2012,8 +1850,7 @@ qd_address_t *qd_router_register_address
sys_mutex_lock(router->lock);
qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
- addr = qd_address();
- addr->semantics = semantics;
+ addr = qd_address(semantics);
qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(router->addrs, addr);
@@ -2021,12 +1858,16 @@ qd_address_t *qd_router_register_address
}
qd_field_iterator_free(iter);
- addr->handler = handler;
- addr->handler_context = context;
+ addr->on_message = on_message;
+ addr->on_message_context = context;
+ if (forwarder) {
+ if (addr->forwarder) addr->forwarder->release(addr->forwarder);
+ addr->forwarder = forwarder;
+ }
sys_mutex_unlock(router->lock);
- if (handler)
+ if (on_message)
qd_log(router->log_source, QD_LOG_INFO, "In-Process Address Registered: %s", address);
assert(addr);
return addr;
@@ -2035,6 +1876,7 @@ qd_address_t *qd_router_register_address
void qd_router_unregister_address(qd_address_t *ad)
{
+ // if (ad->forwarder) ad->forwarder->release(ad->forwarder);
//free_qd_address_t(ad);
}
@@ -2057,6 +1899,20 @@ void qd_address_set_dynamic_cc(qd_addres
}
+qd_address_t *qd_router_address_lookup_LH(qd_router_t *router,
+ qd_field_iterator_t *addr_iter,
+ bool *is_local, bool *is_direct)
+{
+ qd_address_t *addr = 0;
+ qd_address_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
+ qd_hash_retrieve(router->addr_hash, addr_iter, (void*) &addr);
+ qd_address_iterator_reset_view(addr_iter, ITER_VIEW_NO_HOST);
+ *is_local = (bool) qd_field_iterator_prefix(addr_iter, local_prefix);
+ *is_direct = (bool) qd_field_iterator_prefix(addr_iter, direct_prefix);
+ return addr;
+}
+
+
void qd_router_send(qd_dispatch_t *qd,
qd_field_iterator_t *address,
qd_message_t *msg)
Modified: qpid/dispatch/trunk/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_private.h?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_private.h (original)
+++ qpid/dispatch/trunk/src/router_private.h Tue Mar 10 12:52:46 2015
@@ -160,16 +160,15 @@ struct qd_router_conn_t {
ALLOC_DECLARE(qd_router_conn_t);
-
/** A router address */
struct qd_address_t {
DEQ_LINKS(qd_address_t);
- qd_router_message_cb_t handler; ///< In-Process Consumer
- void *handler_context; ///< In-Process Consumer context
- qd_router_lrp_ref_list_t lrps; ///< Local link-route destinations
- qd_router_link_ref_list_t rlinks; ///< Locally-Connected Consumers
- qd_router_ref_list_t rnodes; ///< Remotely-Connected Consumers
- qd_hash_handle_t *hash_handle; ///< Linkage back to the hash table entry
+ qd_router_message_cb_t on_message; ///< In-Process Message Consumer
+ void *on_message_context; ///< In-Process Consumer context
+ qd_router_lrp_ref_list_t lrps; ///< Local link-route destinations
+ qd_router_link_ref_list_t rlinks; ///< Locally-Connected Consumers
+ qd_router_ref_list_t rnodes; ///< Remotely-Connected Consumers
+ qd_hash_handle_t *hash_handle; ///< Linkage back to the hash table entry
qd_address_semantics_t semantics;
qd_address_t *redirect;
qd_address_t *static_cc;
@@ -177,6 +176,7 @@ struct qd_address_t {
bool toggle;
bool waypoint;
bool block_deletion;
+ qd_router_forwarder_t *forwarder;
//
// TODO - Add support for asynchronous address lookup:
@@ -198,7 +198,7 @@ ALLOC_DECLARE(qd_address_t);
DEQ_DECLARE(qd_address_t, qd_address_list_t);
/** Constructor for qd_address_t */
-qd_address_t* qd_address();
+qd_address_t* qd_address(qd_address_semantics_t semantics);
struct qd_config_phase_t {
DEQ_LINKS(qd_config_phase_t);
@@ -269,8 +269,6 @@ struct qd_router_t {
qd_waypoint_list_t waypoints;
};
-
-
void qd_router_check_addr(qd_router_t *router, qd_address_t *addr, int was_local);
void qd_router_add_link_ref_LH(qd_router_link_ref_list_t *ref_list, qd_router_link_t *link);
void qd_router_del_link_ref_LH(qd_router_link_ref_list_t *ref_list, qd_router_link_t *link);
@@ -287,4 +285,8 @@ void qd_router_link_lost(qd_router_t *ro
qd_address_semantics_t router_semantics_for_addr(qd_router_t *router, qd_field_iterator_t *iter,
char in_phase, char *out_phase);
+qd_address_t *qd_router_address_lookup_LH(qd_router_t *router,
+ qd_field_iterator_t *addr_iter,
+ bool *is_local, bool *is_direct);
+
#endif
Modified: qpid/dispatch/trunk/src/router_pynode.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_pynode.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_pynode.c (original)
+++ qpid/dispatch/trunk/src/router_pynode.c Tue Mar 10 12:52:46 2015
@@ -81,8 +81,7 @@ static PyObject *qd_add_router(PyObject
// This record will be found whenever a "foreign" topological address to this
// remote router is looked up.
//
- addr = qd_address();
- addr->semantics = router_addr_semantics;
+ addr = qd_address(router_addr_semantics);
qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
@@ -462,9 +461,8 @@ static PyObject* qd_map_destination(PyOb
sys_mutex_lock(router->lock);
qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
- addr = qd_address();
+ addr = qd_address(router_semantics_for_addr(router, iter, phase, &unused));
qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
- addr->semantics = router_semantics_for_addr(router, iter, phase, &unused);
DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(router->addrs, addr);
qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
Modified: qpid/dispatch/trunk/src/waypoint.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/waypoint.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/waypoint.c (original)
+++ qpid/dispatch/trunk/src/waypoint.c Tue Mar 10 12:52:46 2015
@@ -64,11 +64,10 @@ static void qd_waypoint_visit_sink_LH(qd
qd_hash_retrieve(router->addr_hash, iter, (void*) &addr);
if (!addr) {
- addr = qd_address();
+ addr = qd_address(router_semantics_for_addr(router, iter, wp->in_phase, &unused));
qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
addr->waypoint = true;
- addr->semantics = router_semantics_for_addr(router, iter, wp->in_phase, &unused);
qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
}
@@ -138,11 +137,10 @@ static void qd_waypoint_visit_source_LH(
qd_hash_retrieve(router->addr_hash, iter, (void*) &addr);
if (!addr) {
- addr = qd_address();
+ addr = qd_address(router_semantics_for_addr(router, iter, wp->out_phase, &unused));
qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
addr->waypoint = true;
- addr->semantics = router_semantics_for_addr(router, iter, wp->out_phase, &unused);
qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
}
Modified: qpid/dispatch/trunk/tests/system_tests_one_router.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_one_router.py?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_one_router.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_one_router.py Tue Mar 10 12:52:46 2015
@@ -21,6 +21,13 @@ import unittest
from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED
from system_test import TestCase, Messenger, Qdrouterd, main_module
+# PROTON-828:
+try:
+ from proton import MODIFIED
+except ImportError:
+ from proton import PN_STATUS_MODIFIED as MODIFIED
+
+
class RouterTest(TestCase):
"""System tests involving a single router"""
@@ -327,7 +334,7 @@ class RouterTest(TestCase):
tx_tracker = M1.put(tm)
M1.send(0)
M1.flush()
- self.assertEqual(RELEASED, M1.status(tx_tracker))
+ self.assertEqual(MODIFIED, M1.status(tx_tracker))
M1.stop()
Modified: qpid/dispatch/trunk/tests/system_tests_two_routers.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_two_routers.py?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_two_routers.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_two_routers.py Tue Mar 10 12:52:46 2015
@@ -21,6 +21,12 @@ import unittest, os
from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable
from system_test import TestCase, Qdrouterd, main_module
+# PROTON-828:
+try:
+ from proton import MODIFIED
+except ImportError:
+ from proton import PN_STATUS_MODIFIED as MODIFIED
+
class RouterTest(TestCase):
@classmethod
@@ -320,7 +326,7 @@ class RouterTest(TestCase):
tx_tracker = M1.put(tm)
M1.send(0)
M1.flush()
- self.assertEqual(RELEASED, M1.status(tx_tracker))
+ self.assertEqual(MODIFIED, M1.status(tx_tracker))
M1.stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org