You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2015/03/10 13:52:46 UTC

svn commit: r1665514 - in /qpid/dispatch/trunk: include/qpid/dispatch/ src/ tests/

Author: kgiusti
Date: Tue Mar 10 12:52:46 2015
New Revision: 1665514

URL: http://svn.apache.org/r1665514
Log:
DISPATCH-99: pluggable forwarding logic

Added:
    qpid/dispatch/trunk/src/router_forwarders.c   (with props)
Modified:
    qpid/dispatch/trunk/include/qpid/dispatch/container.h
    qpid/dispatch/trunk/include/qpid/dispatch/iterator.h
    qpid/dispatch/trunk/include/qpid/dispatch/router.h
    qpid/dispatch/trunk/src/CMakeLists.txt
    qpid/dispatch/trunk/src/container.c
    qpid/dispatch/trunk/src/iterator.c
    qpid/dispatch/trunk/src/lrp.c
    qpid/dispatch/trunk/src/python_embedded.c
    qpid/dispatch/trunk/src/router_agent.c
    qpid/dispatch/trunk/src/router_config.c
    qpid/dispatch/trunk/src/router_node.c
    qpid/dispatch/trunk/src/router_private.h
    qpid/dispatch/trunk/src/router_pynode.c
    qpid/dispatch/trunk/src/waypoint.c
    qpid/dispatch/trunk/tests/system_tests_one_router.py
    qpid/dispatch/trunk/tests/system_tests_two_routers.py

Modified: qpid/dispatch/trunk/include/qpid/dispatch/container.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/container.h?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/container.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/container.h Tue Mar 10 12:52:46 2015
@@ -183,6 +183,7 @@ bool qd_link_drain_changed(qd_link_t *li
  *            thrown.
  */
 qd_delivery_t *qd_delivery(qd_link_t *link, pn_delivery_tag_t tag);
+void qd_delivery_set_undeliverable_LH(qd_delivery_t *delivery);
 void qd_delivery_free_LH(qd_delivery_t *delivery, uint64_t final_disposition);
 void qd_delivery_link_peers_LH(qd_delivery_t *left, qd_delivery_t *right);
 void qd_delivery_unlink_LH(qd_delivery_t *delivery);

Modified: qpid/dispatch/trunk/include/qpid/dispatch/iterator.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/iterator.h?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/iterator.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/iterator.h Tue Mar 10 12:52:46 2015
@@ -169,6 +169,7 @@ void qd_field_iterator_reset(qd_field_it
 
 void qd_address_iterator_reset_view(qd_field_iterator_t *iter,
                                   qd_iterator_view_t   view);
+qd_iterator_view_t qd_address_iterator_get_view(const qd_field_iterator_t *iter);
 
 void qd_address_iterator_set_phase(qd_field_iterator_t *iter, char phase);
 

Modified: qpid/dispatch/trunk/include/qpid/dispatch/router.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/router.h?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/router.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/router.h Tue Mar 10 12:52:46 2015
@@ -32,6 +32,7 @@
 #include <qpid/dispatch/iterator.h>
 #include <stdbool.h>
 
+typedef struct qd_router_t  qd_router_t;
 typedef struct qd_address_t qd_address_t;
 typedef uint8_t             qd_address_semantics_t;
 
@@ -96,6 +97,44 @@ typedef uint8_t             qd_address_s
 #define QD_SEMANTICS_DEFAULT        (QD_FANOUT_MULTIPLE | QD_BIAS_NONE | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS)
 ///@}
 
+/** Message forwarding descriptor
+ *
+ * Defines a forwarding method that can be associated with an address
+ * (qd_address_t).  This method is called for each message that matches the
+ * associated address.  The qd_router_forwarder_t is a 'base class' that can be
+ * subclassed to provide a per-forwarder context for custom forwarding
+ * algorithms.
+ */
+typedef struct qd_router_forwarder_t qd_router_forwarder_t;
+struct qd_router_forwarder_t {
+
+    /** forwarding method
+     *
+     * Returns true if the message was successfully forwarded or has been
+     * scheduled to be forwarded at a later time.  Returns false if the handler
+     * is unable to forward the message.
+     *
+     * If the message is going to be forwarded at a later time (asynchronous
+     * forwarding), then this method must make a copy of the message.
+     *
+     * NOTE: ** Called with router lock held! **
+     */
+    bool (*forward)(qd_router_forwarder_t *forwarder,
+                    qd_router_t *router,
+                    qd_message_t *msg,
+                    qd_delivery_t *delivery,
+                    qd_address_t *addr,
+                    qd_field_iterator_t *ingress_iterator,
+                    bool is_direct);
+
+    /** release the descriptor
+     *
+     * Called when the associated qd_address_t is freed.
+     * NOTE: ** Called with router lock held! **
+     */
+    void (*release)(qd_router_forwarder_t *forwarder);
+};
+
 typedef void (*qd_router_message_cb_t)(void *context, qd_message_t *msg, int link_id);
 
 const char *qd_router_id(const qd_dispatch_t *qd);
@@ -103,17 +142,22 @@ const char *qd_router_id(const qd_dispat
 /** Register an address in the router's hash table.
  * @param qd Pointer to the dispatch instance.
  * @param address String form of address
- * @param handler Callback to be called when a message is received for the address.
+ * @param on_message Optional callback to be called when a message is received
+ * for the address.
+ * @param context Context to be passed to the on_message handler.
  * @param semantics Semantics for the address.
  * @param global True if the address is global.
- * @param context Context to be passed to the handler.
+ * @param forwarder Optional custom forwarder to use when a message is received
+ * for the address.  If null, a default forwarder based on the semantics will
+ * be used.
  */
 qd_address_t *qd_router_register_address(qd_dispatch_t          *qd,
                                          const char             *address,
-                                         qd_router_message_cb_t  handler,
+                                         qd_router_message_cb_t  on_message,
+                                         void                   *context,
                                          qd_address_semantics_t  semantics,
                                          bool                    global,
-                                         void                   *context);
+                                         qd_router_forwarder_t  *forwarder);
 
 void qd_router_unregister_address(qd_address_t *address);
 
@@ -138,6 +182,9 @@ void qd_router_build_node_list(qd_dispat
 /** String form of address for logging */
 const char* qd_address_logstr(qd_address_t* address);
 
+/** Retrieve the proper forwarder for a given semantic */
+qd_router_forwarder_t *qd_router_get_forwarder(qd_address_semantics_t s);
+
 ///@}
 
 #endif

Modified: qpid/dispatch/trunk/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/CMakeLists.txt?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/src/CMakeLists.txt Tue Mar 10 12:52:46 2015
@@ -66,6 +66,7 @@ set(qpid_dispatch_SOURCES
   router_agent.c
   router_config.c
   router_node.c
+  router_forwarders.c
   router_pynode.c
   schema_enum.c
   server.c

Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Tue Mar 10 12:52:46 2015
@@ -860,6 +860,17 @@ qd_delivery_t *qd_delivery(qd_link_t *li
     return delivery;
 }
 
+// mark the delivery as 'undeliverable-here' so peers won't re-forward it to
+// us.
+void qd_delivery_set_undeliverable_LH(qd_delivery_t *delivery)
+{
+    if (delivery->pn_delivery) {
+        pn_disposition_t *dp = pn_delivery_local(delivery->pn_delivery);
+        if (dp) {
+            pn_disposition_set_undeliverable(dp, true);
+        }
+    }
+}
 
 void qd_delivery_free_LH(qd_delivery_t *delivery, uint64_t final_disposition)
 {

Modified: qpid/dispatch/trunk/src/iterator.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/iterator.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/iterator.c (original)
+++ qpid/dispatch/trunk/src/iterator.c Tue Mar 10 12:52:46 2015
@@ -385,6 +385,12 @@ void qd_address_iterator_reset_view(qd_f
 }
 
 
+qd_iterator_view_t qd_address_iterator_get_view(const qd_field_iterator_t *iter)
+{
+    return iter->view;
+}
+
+
 void qd_address_iterator_set_phase(qd_field_iterator_t *iter, char phase)
 {
     iter->phase = phase;

Modified: qpid/dispatch/trunk/src/lrp.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/lrp.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/lrp.c (original)
+++ qpid/dispatch/trunk/src/lrp.c Tue Mar 10 12:52:46 2015
@@ -58,10 +58,9 @@ static void qd_lrpc_open_handler(void *c
         sys_mutex_lock(router->lock);
         qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
         if (!addr) {
-            addr = qd_address();
+            addr = qd_address(router_semantics_for_addr(router, iter, '\0', &unused));
             qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
             DEQ_INSERT_TAIL(router->addrs, addr);
-            addr->semantics = router_semantics_for_addr(router, iter, '\0', &unused);
             qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
         }
 

Modified: qpid/dispatch/trunk/src/python_embedded.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/python_embedded.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/python_embedded.c (original)
+++ qpid/dispatch/trunk/src/python_embedded.c Tue Mar 10 12:52:46 2015
@@ -539,7 +539,8 @@ static int IoAdapter_init(IoAdapter *sel
     if (!address) return -1;
     qd_error_clear();
     self->addr =
-        qd_router_register_address(self->qd, address, qd_io_rx_handler, py_semantics, global, self);
+        qd_router_register_address(self->qd, address, qd_io_rx_handler, self,
+                                   py_semantics, global, 0);
     if (qd_error_code()) {
         PyErr_SetString(PyExc_RuntimeError, qd_error_message());
         return -1;

Modified: qpid/dispatch/trunk/src/router_agent.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_agent.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_agent.c (original)
+++ qpid/dispatch/trunk/src/router_agent.c Tue Mar 10 12:52:46 2015
@@ -60,7 +60,7 @@ qd_error_t qd_entity_refresh_router_addr
     uint32_t      subCount = DEQ_SIZE(addr->rlinks);
     if (DEQ_SIZE(addr->lrps) > 0)
         subCount = DEQ_SIZE(addr->lrps);
-    if (qd_entity_set_bool(entity, "inProcess", addr->handler != 0) == 0 &&
+    if (qd_entity_set_bool(entity, "inProcess", addr->on_message != 0) == 0 &&
         qd_entity_set_long(entity, "subscriberCount", subCount) == 0 &&
         qd_entity_set_long(entity, "remoteCount", DEQ_SIZE(addr->rnodes)) == 0 &&
         qd_entity_set_long(entity, "deliveriesIngress", addr->deliveries_ingress) == 0 &&

Modified: qpid/dispatch/trunk/src/router_config.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_config.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_config.c (original)
+++ qpid/dispatch/trunk/src/router_config.c Tue Mar 10 12:52:46 2015
@@ -219,10 +219,9 @@ qd_error_t qd_router_configure_lrp(qd_ro
     //
     qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
     if (!addr) {
-        addr = qd_address();
+        addr = qd_address(router_semantics_for_addr(router, iter, '\0', &unused));
         qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
         DEQ_INSERT_TAIL(router->addrs, addr);
-        addr->semantics = router_semantics_for_addr(router, iter, '\0', &unused);
         qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
     }
 
@@ -272,6 +271,7 @@ void qd_router_configure_free(qd_router_
 qd_address_semantics_t router_semantics_for_addr(qd_router_t *router, qd_field_iterator_t *iter,
                                                  char in_phase, char *out_phase)
 {
+    const qd_iterator_view_t old_view = qd_address_iterator_get_view(iter);
     qd_address_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
 
     qd_config_address_t *addr  = DEQ_HEAD(router->config_addrs);
@@ -294,5 +294,6 @@ qd_address_semantics_t router_semantics_
         }
     }
 
+    qd_address_iterator_reset_view(iter, old_view);
     return phase ? phase->semantics : QD_SEMANTICS_DEFAULT;
 }

Added: qpid/dispatch/trunk/src/router_forwarders.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_forwarders.c?rev=1665514&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/router_forwarders.c (added)
+++ qpid/dispatch/trunk/src/router_forwarders.c Tue Mar 10 12:52:46 2015
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "dispatch_private.h"
+
+/** defines a default set of forwarding behaviors based on the semantics of an
+ * address.
+ */
+
+
+static void forward_to_direct_subscribers_LH(qd_address_t *addr,
+                                             qd_delivery_t *delivery,
+                                             qd_message_t *msg,
+                                             int *fanout)
+{
+    qd_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
+    while (dest_link_ref) {
+        qd_routed_event_t *re = new_qd_routed_event_t();
+        DEQ_ITEM_INIT(re);
+        re->delivery    = 0;
+        re->message     = qd_message_copy(msg);
+        re->settle      = 0;
+        re->disposition = 0;
+        DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
+
+        (*fanout)++;
+        if (*fanout == 1) {
+            re->delivery = delivery;
+            qd_delivery_fifo_enter_LH(delivery);
+        }
+
+        addr->deliveries_egress++;
+        qd_link_activate(dest_link_ref->link->link);
+
+        //
+        // If the fanout is single, exit the loop here.  We only want to send one message copy.
+        //
+        if (QD_FANOUT(addr->semantics) == QD_FANOUT_SINGLE)
+            break;
+
+        dest_link_ref = DEQ_NEXT(dest_link_ref);
+    }
+
+    //
+    // If dest_link_ref is not null here, we exited after sending one message copy.
+    // If the number of local links is greater than one, rotate the head link to the
+    // tail so we balance the message deliveries.
+    //
+    if (dest_link_ref && DEQ_SIZE(addr->rlinks) > 1) {
+        assert(DEQ_HEAD(addr->rlinks) == dest_link_ref);
+        DEQ_REMOVE_HEAD(addr->rlinks);
+        DEQ_INSERT_TAIL(addr->rlinks, dest_link_ref);
+    }
+}
+
+
+static void forward_to_remote_subscribers_LH(qd_router_t *router,
+                                             qd_address_t *addr,
+                                             qd_delivery_t *delivery,
+                                             qd_message_t *msg,
+                                             int *fanout,
+                                             qd_field_iterator_t *ingress_iter)
+{
+    //
+    // Get the mask bit associated with the ingress router for the message.
+    // This will be compared against the "valid_origin" masks for each
+    // candidate destination router.
+    //
+    int origin = -1;
+    if (ingress_iter && !(addr->semantics & QD_BYPASS_VALID_ORIGINS)) {
+        qd_address_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH);
+        qd_address_t *origin_addr;
+        qd_hash_retrieve(router->addr_hash, ingress_iter, (void*) &origin_addr);
+        if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) {
+            qd_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes);
+            origin = rref->router->mask_bit;
+        }
+    } else
+        origin = 0;
+
+    //
+    // Forward to the next-hops for remote destinations.
+    //
+    if (origin >= 0) {
+        qd_router_ref_t  *dest_node_ref = DEQ_HEAD(addr->rnodes);
+        qd_router_link_t *dest_link;
+        qd_bitmask_t     *link_set = qd_bitmask(0);
+
+        //
+        // Loop over the target nodes for this address.  Build a set of outgoing links
+        // for which there are valid targets.  We do this to avoid sending more than one
+        // message down a given link.  It's possible that there are multiple destinations
+        // for this address that are all reachable over the same link.  In this case, we
+        // will send only one copy of the message over the link and allow a downstream
+        // router to fan the message out.
+        //
+        while (dest_node_ref) {
+            if (dest_node_ref->router->next_hop)
+                dest_link = dest_node_ref->router->next_hop->peer_link;
+            else
+                dest_link = dest_node_ref->router->peer_link;
+            if (dest_link && qd_bitmask_value(dest_node_ref->router->valid_origins, origin))
+                qd_bitmask_set_bit(link_set, dest_link->mask_bit);
+            dest_node_ref = DEQ_NEXT(dest_node_ref);
+        }
+
+        //
+        // Send a copy of the message outbound on each identified link.
+        //
+        int link_bit;
+        while (qd_bitmask_first_set(link_set, &link_bit)) {
+            qd_bitmask_clear_bit(link_set, link_bit);
+            dest_link = router->out_links_by_mask_bit[link_bit];
+            if (dest_link) {
+                qd_routed_event_t *re = new_qd_routed_event_t();
+                DEQ_ITEM_INIT(re);
+                re->delivery    = 0;
+                re->message     = qd_message_copy(msg);
+                re->settle      = 0;
+                re->disposition = 0;
+                DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+                (*fanout)++;
+                if (*fanout == 1) {
+                    re->delivery = delivery;
+                    qd_delivery_fifo_enter_LH(delivery);
+                }
+
+                addr->deliveries_transit++;
+                qd_link_activate(dest_link->link);
+            }
+        }
+
+        qd_bitmask_free(link_set);
+    }
+}
+
+
+/** Multicast forwarder:
+ */
+static bool forwarder_multicast_LH(qd_router_forwarder_t *forwarder,
+                                   qd_router_t *router,
+                                   qd_message_t *msg,
+                                   qd_delivery_t *delivery,
+                                   qd_address_t *addr,
+                                   qd_field_iterator_t *ingress_iter,
+                                   bool is_direct)
+{
+    int fanout = 0;
+    //
+    // Forward to all of the local links receiving this address.
+    //
+    forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
+
+    //
+    // If the address form is direct to this router node, don't relay it on to
+    // any other part of the network.
+    //
+    if (!is_direct)
+        forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
+
+    return fanout != 0;
+}
+
+
+/** Forward using the 'closest' bias:
+ */
+static bool forwarder_anycast_closest_LH(qd_router_forwarder_t *forwarder,
+                                         qd_router_t *router,
+                                         qd_message_t *msg,
+                                         qd_delivery_t *delivery,
+                                         qd_address_t *addr,
+                                         qd_field_iterator_t *ingress_iter,
+                                         bool is_direct)
+{
+    int fanout = 0;
+    //
+    // First, try to find a directly connected consumer for the address.  If
+    // there is none, then look for the closest remote consumer.
+    //
+    forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
+    if (fanout == 0 && !is_direct)
+        forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
+
+    return fanout != 0;
+}
+
+
+/** Forwarding using a 'balanced' bias:
+ */
+static bool forwarder_anycast_balanced_LH(qd_router_forwarder_t *forwarder,
+                                          qd_router_t *router,
+                                          qd_message_t *msg,
+                                          qd_delivery_t *delivery,
+                                          qd_address_t *addr,
+                                          qd_field_iterator_t *ingress_iter,
+                                          bool is_direct)
+{
+    int fanout = 0;
+    //
+    // Alternate between looking first for a local consumer and looking first
+    // for a remote consumer.
+    //
+    addr->toggle = !addr->toggle;
+    if (addr->toggle) {
+        forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
+        if (fanout == 0 && !is_direct)
+            forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
+    } else {
+        if (!is_direct)
+            forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
+        if (fanout == 0)
+            forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
+    }
+
+    return fanout != 0;
+}
+
+
+/* release method for default forwarders:
+ */
+static void forwarder_release(qd_router_forwarder_t *forwarder)
+{
+    // no-op - they're static singletons!
+}
+
+
+/* The default forwarders:
+ */
+static qd_router_forwarder_t multicast_forwarder = {
+    forwarder_multicast_LH,     /* forward method */
+    forwarder_release,
+};
+static qd_router_forwarder_t anycast_closest_forwarder = {
+    forwarder_anycast_closest_LH,     /* forward method */
+    forwarder_release,
+};
+static qd_router_forwarder_t anycast_balanced_forwarder = {
+    forwarder_anycast_balanced_LH,     /* forward method */
+    forwarder_release,
+};
+
+
+/** Get the proper default forwarder for an address of the given semantics:
+ */
+qd_router_forwarder_t *qd_router_get_forwarder(qd_address_semantics_t semantics)
+{
+    switch (QD_FANOUT(semantics)) {
+    case QD_FANOUT_MULTIPLE:
+        return &multicast_forwarder;
+    case QD_FANOUT_SINGLE:
+        switch (QD_BIAS(semantics)) {
+        case QD_BIAS_CLOSEST:
+            return &anycast_closest_forwarder;
+        case QD_BIAS_SPREAD:
+            return &anycast_balanced_forwarder;
+        }
+    }
+    assert(false);  // invalid semantics? need new forwarder?
+    return 0;
+}
+
+
+

Propchange: qpid/dispatch/trunk/src/router_forwarders.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Tue Mar 10 12:52:46 2015
@@ -63,13 +63,17 @@ ALLOC_DEFINE(qd_router_lrp_ref_t);
 ALLOC_DEFINE(qd_address_t);
 ALLOC_DEFINE(qd_router_conn_t);
 
-qd_address_t* qd_address() {
+
+qd_address_t* qd_address(qd_address_semantics_t semantics)
+{
     qd_address_t* addr = new_qd_address_t();
     memset(addr, 0, sizeof(qd_address_t));
     DEQ_ITEM_INIT(addr);
     DEQ_INIT(addr->lrps);
     DEQ_INIT(addr->rlinks);
     DEQ_INIT(addr->rnodes);
+    addr->semantics = semantics;
+    addr->forwarder = qd_router_get_forwarder(semantics);
     return addr;
 }
 
@@ -162,9 +166,10 @@ void qd_router_check_addr(qd_router_t *r
     sys_mutex_lock(router->lock);
 
     //
-    // If the address has no handlers or destinations, it should be deleted.
+    // If the address has no in-process consumer or destinations, it should be
+    // deleted.
     //
-    if (addr->handler == 0 &&
+    if (addr->on_message == 0 &&
         DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0 &&
         !addr->waypoint && !addr->block_deletion)
         to_delete = 1;
@@ -646,127 +651,6 @@ static void router_link_route_delivery_L
 }
 
 
-static void router_forward_to_direct_subscribers_LH(qd_address_t *addr, qd_delivery_t *delivery, qd_message_t *msg, int *fanout)
-{
-    qd_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
-    while (dest_link_ref) {
-        qd_routed_event_t *re = new_qd_routed_event_t();
-        DEQ_ITEM_INIT(re);
-        re->delivery    = 0;
-        re->message     = qd_message_copy(msg);
-        re->settle      = 0;
-        re->disposition = 0;
-        DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
-
-        (*fanout)++;
-        if (*fanout == 1) {
-            re->delivery = delivery;
-            qd_delivery_fifo_enter_LH(delivery);
-        }
-
-        addr->deliveries_egress++;
-        qd_link_activate(dest_link_ref->link->link);
-
-        //
-        // If the fanout is single, exit the loop here.  We only want to send one message copy.
-        //
-        if (QD_FANOUT(addr->semantics) == QD_FANOUT_SINGLE)
-            break;
-
-        dest_link_ref = DEQ_NEXT(dest_link_ref);
-    }
-
-    //
-    // If dest_link_ref is not null here, we exited after sending one message copy.
-    // If the number of local links is greater than one, rotate the head link to the
-    // tail so we balance the message deliveries.
-    //
-    if (dest_link_ref && DEQ_SIZE(addr->rlinks) > 1) {
-        assert(DEQ_HEAD(addr->rlinks) == dest_link_ref);
-        DEQ_REMOVE_HEAD(addr->rlinks);
-        DEQ_INSERT_TAIL(addr->rlinks, dest_link_ref);
-    }
-}
-
-
-static void router_forward_to_remote_subscribers_LH(qd_router_t *router, qd_address_t *addr, qd_delivery_t *delivery,
-                                                    qd_message_t *msg, int *fanout, qd_field_iterator_t *ingress_iter)
-{
-    //
-    // Get the mask bit associated with the ingress router for the message.
-    // This will be compared against the "valid_origin" masks for each
-    // candidate destination router.
-    //
-    int origin = -1;
-    if (ingress_iter && !(addr->semantics & QD_BYPASS_VALID_ORIGINS)) {
-        qd_address_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH);
-        qd_address_t *origin_addr;
-        qd_hash_retrieve(router->addr_hash, ingress_iter, (void*) &origin_addr);
-        if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) {
-            qd_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes);
-            origin = rref->router->mask_bit;
-        }
-    } else
-        origin = 0;
-
-    //
-    // Forward to the next-hops for remote destinations.
-    //
-    if (origin >= 0) {
-        qd_router_ref_t  *dest_node_ref = DEQ_HEAD(addr->rnodes);
-        qd_router_link_t *dest_link;
-        qd_bitmask_t     *link_set = qd_bitmask(0);
-
-        //
-        // Loop over the target nodes for this address.  Build a set of outgoing links
-        // for which there are valid targets.  We do this to avoid sending more than one
-        // message down a given link.  It's possible that there are multiple destinations
-        // for this address that are all reachable over the same link.  In this case, we
-        // will send only one copy of the message over the link and allow a downstream
-        // router to fan the message out.
-        //
-        while (dest_node_ref) {
-            if (dest_node_ref->router->next_hop)
-                dest_link = dest_node_ref->router->next_hop->peer_link;
-            else
-                dest_link = dest_node_ref->router->peer_link;
-            if (dest_link && qd_bitmask_value(dest_node_ref->router->valid_origins, origin))
-                qd_bitmask_set_bit(link_set, dest_link->mask_bit);
-            dest_node_ref = DEQ_NEXT(dest_node_ref);
-        }
-
-        //
-        // Send a copy of the message outbound on each identified link.
-        //
-        int link_bit;
-        while (qd_bitmask_first_set(link_set, &link_bit)) {
-            qd_bitmask_clear_bit(link_set, link_bit);
-            dest_link = router->out_links_by_mask_bit[link_bit];
-            if (dest_link) {
-                qd_routed_event_t *re = new_qd_routed_event_t();
-                DEQ_ITEM_INIT(re);
-                re->delivery    = 0;
-                re->message     = qd_message_copy(msg);
-                re->settle      = 0;
-                re->disposition = 0;
-                DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
-
-                (*fanout)++;
-                if (*fanout == 1) {
-                    re->delivery = delivery;
-                    qd_delivery_fifo_enter_LH(delivery);
-                }
-
-                addr->deliveries_transit++;
-                qd_link_activate(dest_link->link);
-            }
-        }
-
-        qd_bitmask_free(link_set);
-    }
-}
-
-
 /**
  * Inbound Delivery Handler
  */
@@ -814,9 +698,9 @@ static void router_rx_handler(void* cont
     //
     // Validate the message through the Properties section so we can access the TO field.
     //
-    qd_message_t           *in_process_copy = 0;
-    qd_router_message_cb_t  handler         = 0;
-    void                   *handler_context = 0;
+    qd_message_t           *in_process_copy    = 0;
+    qd_router_message_cb_t  on_message         = 0;
+    void                   *on_message_context = 0;
 
     valid_message = qd_message_check(msg, QD_DEPTH_PROPERTIES);
 
@@ -824,9 +708,8 @@ static void router_rx_handler(void* cont
         qd_parsed_field_t   *in_ma     = 0;
         qd_field_iterator_t *iter      = 0;
         bool                 free_iter = true;
-        qd_address_t        *addr;
-        int                  fanout       = 0;
         char                *to_override  = 0;
+        bool                 forwarded = false;
 
         //
         // Only respect the delivery annotations if the message came from another router.
@@ -874,8 +757,6 @@ static void router_rx_handler(void* cont
         }
 
         if (iter) {
-            qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
-
             //
             // Note: This function is going to need to be refactored so we can put an
             //       asynchronous address lookup here.  In the event there is a translation
@@ -884,11 +765,9 @@ static void router_rx_handler(void* cont
             //
             //       Note that this lookup is only done for global/mobile class addresses.
             //
-
-            qd_hash_retrieve(router->addr_hash, iter, (void*) &addr);
-            qd_address_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
-            int is_local  = qd_field_iterator_prefix(iter, local_prefix);
-            int is_direct = qd_field_iterator_prefix(iter, direct_prefix);
+            bool is_local;
+            bool is_direct;
+            qd_address_t *addr = qd_router_address_lookup_LH(router, iter, &is_local, &is_direct);
             if (free_iter)
                 qd_field_iterator_free(iter);
 
@@ -910,80 +789,39 @@ static void router_rx_handler(void* cont
                 int drop = 0;
                 qd_field_iterator_t *ingress_iter = router_annotate_message(router, in_ma, msg, &drop, to_override);
 
-                //
-                // Forward to the in-process handler for this address if there is one.  The
-                // actual invocation of the handler will occur later after we've released
-                // the lock.
-                //
-                if (!drop && addr->handler) {
-                    in_process_copy = qd_message_copy(msg);
-                    handler         = addr->handler;
-                    handler_context = addr->handler_context;
-                    addr->deliveries_to_container++;
-                }
+                if (!drop) {
+                    //
+                    // Forward a copy of the message to the in-process endpoint for
+                    // this address if there is one.  The actual invocation of the
+                    // handler will occur later after we've released the lock.
+                    //
+                    if (addr->on_message) {
+                        in_process_copy = qd_message_copy(msg);
+                        on_message         = addr->on_message;
+                        on_message_context = addr->on_message_context;
+                        addr->deliveries_to_container++;
+                    }
 
-                //
-                // If the address form is local (i.e. is prefixed by _local), don't forward
-                // outside of the router process.
-                //
-                if (!drop && !is_local && router->router_mode != QD_ROUTER_MODE_ENDPOINT) {
                     //
-                    // Handle the various fanout and bias cases:
+                    // If the address form is local (i.e. is prefixed by _local), don't forward
+                    // outside of the router process.
                     //
-                    if (QD_FANOUT(addr->semantics) == QD_FANOUT_MULTIPLE) {
-                        //
-                        // Forward to all of the local links receiving this address.
-                        //
-                        router_forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
-
-                        //
-                        // If the address form is direct to this router node, don't relay it on
-                        // to any other part of the network.
-                        //
-                        if (!is_direct)
-                            router_forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
-
-                    } else if (QD_FANOUT(addr->semantics) == QD_FANOUT_SINGLE) {
-                        if (QD_BIAS(addr->semantics) == QD_BIAS_CLOSEST) {
-                            //
-                            // Bias is "closest".  First, try to find a directly connected consumer for the address.
-                            // If there is none, then look for the closest remote consumer.
-                            //
-                            router_forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
-                            if (fanout == 0 && !is_direct)
-                                router_forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
-
-                        } else if (QD_BIAS(addr->semantics) == QD_BIAS_SPREAD) {
-                            //
-                            // Bias is "spread".  Alternate between looking first for a local consumer and looking
-                            // first for a remote consumer.
-                            //
-                            addr->toggle = !addr->toggle;
-                            if (addr->toggle) {
-                                router_forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
-                                if (fanout == 0 && !is_direct)
-                                    router_forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
-                            } else {
-                                if (!is_direct)
-                                    router_forward_to_remote_subscribers_LH(router, addr, delivery, msg, &fanout, ingress_iter);
-                                if (fanout == 0)
-                                    router_forward_to_direct_subscribers_LH(addr, delivery, msg, &fanout);
-                            }
-                        }
+                    if (!is_local && router->router_mode != QD_ROUTER_MODE_ENDPOINT) {
+                        qd_router_forwarder_t *f = addr->forwarder;
+                        forwarded = f->forward(f, router, msg, delivery, addr, ingress_iter, is_direct);
                     }
                 }
             }
+        }
 
-            //
-            // In message-routing mode, the handling of the incoming delivery depends on the
-            // number of copies of the received message that were forwarded.
-            //
-            if (handler) {
+        if (!forwarded) {
+            if (on_message)
+                // our local in-process handler will accept it:
                 qd_delivery_free_LH(delivery, PN_ACCEPTED);
-            } else if (fanout == 0) {
-                qd_delivery_free_LH(delivery, PN_RELEASED);
-            } else if (qd_delivery_settled(delivery)) {
-                qd_delivery_free_LH(delivery, 0);
+            else {
+                // no one has accepted it, so inform sender
+                qd_delivery_set_undeliverable_LH(delivery);
+                qd_delivery_free_LH(delivery, PN_MODIFIED);
             }
         }
     } else {
@@ -999,8 +837,8 @@ static void router_rx_handler(void* cont
     //
     // Invoke the in-process handler now that the lock is released.
     //
-    if (handler) {
-        handler(handler_context, in_process_copy, rlink->mask_bit);
+    if (on_message) {
+        on_message(on_message_context, in_process_copy, rlink->mask_bit);
         qd_message_free(in_process_copy);
     }
 }
@@ -1472,10 +1310,9 @@ static int router_outgoing_link_handler(
 
             qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
             if (!addr) {
-                addr = qd_address();
+                addr = qd_address(semantics);
                 qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
                 DEQ_INSERT_TAIL(router->addrs, addr);
-                addr->semantics = semantics;
                 qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
             }
 
@@ -1917,9 +1754,9 @@ qd_router_t *qd_router(qd_dispatch_t *qd
     // locally later in the initialization sequence.
     //
     if (router->router_mode == QD_ROUTER_MODE_INTERIOR) {
-        router->router_addr   = qd_router_register_address(qd, "qdrouter", 0, QD_SEMANTICS_ROUTER_CONTROL, false, 0);
-        router->routerma_addr = qd_router_register_address(qd, "qdrouter.ma", 0, QD_SEMANTICS_DEFAULT, false, 0);
-        router->hello_addr    = qd_router_register_address(qd, "qdhello", 0, QD_SEMANTICS_ROUTER_CONTROL, false, 0);
+        router->router_addr   = qd_router_register_address(qd, "qdrouter", 0, 0, QD_SEMANTICS_ROUTER_CONTROL, false, 0);
+        router->routerma_addr = qd_router_register_address(qd, "qdrouter.ma", 0, 0, QD_SEMANTICS_DEFAULT, false, 0);
+        router->hello_addr    = qd_router_register_address(qd, "qdhello", 0, 0, QD_SEMANTICS_ROUTER_CONTROL, false, 0);
     }
 
     //
@@ -1996,10 +1833,11 @@ const char *qd_router_id(const qd_dispat
 
 qd_address_t *qd_router_register_address(qd_dispatch_t          *qd,
                                          const char             *address,
-                                         qd_router_message_cb_t  handler,
+                                         qd_router_message_cb_t  on_message,
+                                         void                   *context,
                                          qd_address_semantics_t  semantics,
                                          bool                    global,
-                                         void                   *context)
+                                         qd_router_forwarder_t  *forwarder)
 {
     char                 addr_string[1000];
     qd_router_t         *router = qd->router;
@@ -2012,8 +1850,7 @@ qd_address_t *qd_router_register_address
     sys_mutex_lock(router->lock);
     qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
     if (!addr) {
-        addr = qd_address();
-        addr->semantics = semantics;
+        addr = qd_address(semantics);
         qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
         DEQ_ITEM_INIT(addr);
         DEQ_INSERT_TAIL(router->addrs, addr);
@@ -2021,12 +1858,16 @@ qd_address_t *qd_router_register_address
     }
     qd_field_iterator_free(iter);
 
-    addr->handler         = handler;
-    addr->handler_context = context;
+    addr->on_message         = on_message;
+    addr->on_message_context = context;
+    if (forwarder) {
+        if (addr->forwarder) addr->forwarder->release(addr->forwarder);
+        addr->forwarder = forwarder;
+    }
 
     sys_mutex_unlock(router->lock);
 
-    if (handler)
+    if (on_message)
         qd_log(router->log_source, QD_LOG_INFO, "In-Process Address Registered: %s", address);
     assert(addr);
     return addr;
@@ -2035,6 +1876,7 @@ qd_address_t *qd_router_register_address
 
 void qd_router_unregister_address(qd_address_t *ad)
 {
+    // if (ad->forwarder) ad->forwarder->release(ad->forwarder);
     //free_qd_address_t(ad);
 }
 
@@ -2057,6 +1899,20 @@ void qd_address_set_dynamic_cc(qd_addres
 }
 
 
+qd_address_t *qd_router_address_lookup_LH(qd_router_t *router,
+                                          qd_field_iterator_t *addr_iter,
+                                          bool *is_local, bool *is_direct)
+{
+    qd_address_t *addr = 0;
+    qd_address_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
+    qd_hash_retrieve(router->addr_hash, addr_iter, (void*) &addr);
+    qd_address_iterator_reset_view(addr_iter, ITER_VIEW_NO_HOST);
+    *is_local  = (bool) qd_field_iterator_prefix(addr_iter, local_prefix);
+    *is_direct = (bool) qd_field_iterator_prefix(addr_iter, direct_prefix);
+    return addr;
+}
+
+
 void qd_router_send(qd_dispatch_t       *qd,
                     qd_field_iterator_t *address,
                     qd_message_t        *msg)

Modified: qpid/dispatch/trunk/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_private.h?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_private.h (original)
+++ qpid/dispatch/trunk/src/router_private.h Tue Mar 10 12:52:46 2015
@@ -160,16 +160,15 @@ struct qd_router_conn_t {
 
 ALLOC_DECLARE(qd_router_conn_t);
 
-
 /** A router address */
 struct qd_address_t {
     DEQ_LINKS(qd_address_t);
-    qd_router_message_cb_t     handler;          ///< In-Process Consumer
-    void                      *handler_context;  ///< In-Process Consumer context
-    qd_router_lrp_ref_list_t   lrps;             ///< Local link-route destinations
-    qd_router_link_ref_list_t  rlinks;           ///< Locally-Connected Consumers
-    qd_router_ref_list_t       rnodes;           ///< Remotely-Connected Consumers
-    qd_hash_handle_t          *hash_handle;      ///< Linkage back to the hash table entry
+    qd_router_message_cb_t     on_message;          ///< In-Process Message Consumer
+    void                      *on_message_context;  ///< In-Process Consumer context
+    qd_router_lrp_ref_list_t   lrps;              ///< Local link-route destinations
+    qd_router_link_ref_list_t  rlinks;            ///< Locally-Connected Consumers
+    qd_router_ref_list_t       rnodes;            ///< Remotely-Connected Consumers
+    qd_hash_handle_t          *hash_handle;       ///< Linkage back to the hash table entry
     qd_address_semantics_t     semantics;
     qd_address_t              *redirect;
     qd_address_t              *static_cc;
@@ -177,6 +176,7 @@ struct qd_address_t {
     bool                       toggle;
     bool                       waypoint;
     bool                       block_deletion;
+    qd_router_forwarder_t     *forwarder;
 
     //
     // TODO - Add support for asynchronous address lookup:
@@ -198,7 +198,7 @@ ALLOC_DECLARE(qd_address_t);
 DEQ_DECLARE(qd_address_t, qd_address_list_t);
 
 /** Constructor for qd_address_t */
-qd_address_t* qd_address();
+qd_address_t* qd_address(qd_address_semantics_t semantics);
 
 struct qd_config_phase_t {
     DEQ_LINKS(qd_config_phase_t);
@@ -269,8 +269,6 @@ struct qd_router_t {
     qd_waypoint_list_t        waypoints;
 };
 
-
-
 void qd_router_check_addr(qd_router_t *router, qd_address_t *addr, int was_local);
 void qd_router_add_link_ref_LH(qd_router_link_ref_list_t *ref_list, qd_router_link_t *link);
 void qd_router_del_link_ref_LH(qd_router_link_ref_list_t *ref_list, qd_router_link_t *link);
@@ -287,4 +285,8 @@ void qd_router_link_lost(qd_router_t *ro
 
 qd_address_semantics_t router_semantics_for_addr(qd_router_t *router, qd_field_iterator_t *iter,
                                                  char in_phase, char *out_phase);
+qd_address_t *qd_router_address_lookup_LH(qd_router_t *router,
+                                          qd_field_iterator_t *addr_iter,
+                                          bool *is_local, bool *is_direct);
+
 #endif

Modified: qpid/dispatch/trunk/src/router_pynode.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_pynode.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_pynode.c (original)
+++ qpid/dispatch/trunk/src/router_pynode.c Tue Mar 10 12:52:46 2015
@@ -81,8 +81,7 @@ static PyObject *qd_add_router(PyObject
         // This record will be found whenever a "foreign" topological address to this
         // remote router is looked up.
         //
-        addr = qd_address();
-        addr->semantics = router_addr_semantics;
+        addr = qd_address(router_addr_semantics);
         qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
         DEQ_INSERT_TAIL(router->addrs, addr);
         qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
@@ -462,9 +461,8 @@ static PyObject* qd_map_destination(PyOb
     sys_mutex_lock(router->lock);
     qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
     if (!addr) {
-        addr = qd_address();
+        addr = qd_address(router_semantics_for_addr(router, iter, phase, &unused));
         qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
-        addr->semantics = router_semantics_for_addr(router, iter, phase, &unused);
         DEQ_ITEM_INIT(addr);
         DEQ_INSERT_TAIL(router->addrs, addr);
         qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);

Modified: qpid/dispatch/trunk/src/waypoint.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/waypoint.c?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/waypoint.c (original)
+++ qpid/dispatch/trunk/src/waypoint.c Tue Mar 10 12:52:46 2015
@@ -64,11 +64,10 @@ static void qd_waypoint_visit_sink_LH(qd
         qd_hash_retrieve(router->addr_hash, iter, (void*) &addr);
 
         if (!addr) {
-            addr = qd_address();
+            addr = qd_address(router_semantics_for_addr(router, iter, wp->in_phase, &unused));
             qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
             DEQ_INSERT_TAIL(router->addrs, addr);
             addr->waypoint  = true;
-            addr->semantics = router_semantics_for_addr(router, iter, wp->in_phase, &unused);
             qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
         }
 
@@ -138,11 +137,10 @@ static void qd_waypoint_visit_source_LH(
         qd_hash_retrieve(router->addr_hash, iter, (void*) &addr);
 
         if (!addr) {
-            addr = qd_address();
+            addr = qd_address(router_semantics_for_addr(router, iter, wp->out_phase, &unused));
             qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
             DEQ_INSERT_TAIL(router->addrs, addr);
             addr->waypoint  = true;
-            addr->semantics = router_semantics_for_addr(router, iter, wp->out_phase, &unused);
             qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
         }
 

Modified: qpid/dispatch/trunk/tests/system_tests_one_router.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_one_router.py?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_one_router.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_one_router.py Tue Mar 10 12:52:46 2015
@@ -21,6 +21,13 @@ import unittest
 from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED
 from system_test import TestCase, Messenger, Qdrouterd, main_module
 
+# PROTON-828:
+try:
+    from proton import MODIFIED
+except ImportError:
+    from proton import PN_STATUS_MODIFIED as MODIFIED
+
+
 class RouterTest(TestCase):
     """System tests involving a single router"""
 
@@ -327,7 +334,7 @@ class RouterTest(TestCase):
         tx_tracker = M1.put(tm)
         M1.send(0)
         M1.flush()
-        self.assertEqual(RELEASED, M1.status(tx_tracker))
+        self.assertEqual(MODIFIED, M1.status(tx_tracker))
 
         M1.stop()
 

Modified: qpid/dispatch/trunk/tests/system_tests_two_routers.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_two_routers.py?rev=1665514&r1=1665513&r2=1665514&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_two_routers.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_two_routers.py Tue Mar 10 12:52:46 2015
@@ -21,6 +21,12 @@ import unittest, os
 from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable
 from system_test import TestCase, Qdrouterd, main_module
 
+# PROTON-828:
+try:
+    from proton import MODIFIED
+except ImportError:
+    from proton import PN_STATUS_MODIFIED as MODIFIED
+
 
 class RouterTest(TestCase):
     @classmethod
@@ -320,7 +326,7 @@ class RouterTest(TestCase):
         tx_tracker = M1.put(tm)
         M1.send(0)
         M1.flush()
-        self.assertEqual(RELEASED, M1.status(tx_tracker))
+        self.assertEqual(MODIFIED, M1.status(tx_tracker))
 
         M1.stop()
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org