You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/01/07 22:56:45 UTC

qpid-dispatch git commit: DISPATCH-179 - Added multicast forwarder into core.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 77281d629 -> 1aa7fece4


DISPATCH-179 - Added multicast forwarder into core.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/1aa7fece
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/1aa7fece
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/1aa7fece

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 1aa7fece432fd2572f45f0379c84db2ebb69e88a
Parents: 77281d6
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Jan 7 16:56:08 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Jan 7 16:56:08 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   |  12 +-
 src/router_core/agent_link.c          |   4 +-
 src/router_core/connections.c         |   4 +-
 src/router_core/forwarder.c           | 174 ++++++++++++++++++++++++-----
 src/router_core/router_core.c         |   1 +
 src/router_core/router_core_private.h |  46 ++++----
 src/router_core/transfer.c            |  76 +++++++++++--
 src/router_node.c                     |  12 +-
 8 files changed, 259 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 25727c2..20840cd 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -474,11 +474,12 @@ void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error);
  * @param ingress Field iterator referencing the value of the ingress-router header.  NOTE: This
  *                iterator is assumed to reference content in the message that will stay valid
  *                through the lifetime of the message.
+ * @param settled True iff the delivery is pre-settled.
  * @return Pointer to the qdr_delivery that will track the lifecycle of this delivery on this link.
  */
-qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress);
+qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress, bool settled);
 qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
-                                    qd_field_iterator_t *ingress, qd_field_iterator_t *addr);
+                                    qd_field_iterator_t *ingress, qd_field_iterator_t *addr, bool settled);
 qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg);
 
 typedef void (*qdr_link_first_attach_t)  (void *context, qdr_connection_t *conn, qdr_link_t *link, 
@@ -486,13 +487,15 @@ typedef void (*qdr_link_first_attach_t)  (void *context, qdr_connection_t *conn,
 typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link,
                                           qdr_terminus_t *source, qdr_terminus_t *target);
 typedef void (*qdr_link_detach_t)        (void *context, qdr_link_t *link, qdr_error_t *error);
+typedef void (*qdr_link_flow_t)          (void *context, qdr_link_t *link);
 
 void qdr_connection_handlers(qdr_core_t                *core,
                              void                      *context,
                              qdr_connection_activate_t  activate,
                              qdr_link_first_attach_t    first_attach,
                              qdr_link_second_attach_t   second_attach,
-                             qdr_link_detach_t          detach);
+                             qdr_link_detach_t          detach,
+                             qdr_link_flow_t            flow);
 
 /**
  ******************************************************************************
@@ -501,6 +504,9 @@ void qdr_connection_handlers(qdr_core_t                *core,
  */
 void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context);
 void *qdr_delivery_get_context(qdr_delivery_t *delivery);
+uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery);
+bool qdr_delivery_is_settled(const qdr_delivery_t *delivery);
+
 void qdr_delivery_update_disposition(qdr_delivery_t *delivery);
 void qdr_delivery_update_flow(qdr_delivery_t *delivery);
 void qdr_delivery_process(qdr_delivery_t *delivery);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/src/router_core/agent_link.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index 350a717..e3cc484 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -75,11 +75,11 @@ static void qdr_agent_write_link_CT(qdr_query_t *query,  qdr_link_t *link )
             break;
 
         case QDR_LINK_MSG_FIFO_DEPTH:
-            qd_compose_insert_ulong(body, DEQ_SIZE(link->msg_fifo));
+            qd_compose_insert_ulong(body, 0); // FIXME
             break;
 
         case QDR_LINK_EVENT_FIFO_DEPTH:
-            qd_compose_insert_ulong(body, DEQ_SIZE(link->event_fifo));
+            qd_compose_insert_ulong(body, 0); // FIXME
             break;
 
         default:

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 01b658b..f93051e 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -267,13 +267,15 @@ void qdr_connection_handlers(qdr_core_t                *core,
                              qdr_connection_activate_t  activate,
                              qdr_link_first_attach_t    first_attach,
                              qdr_link_second_attach_t   second_attach,
-                             qdr_link_detach_t          detach)
+                             qdr_link_detach_t          detach,
+                             qdr_link_flow_t            flow)
 {
     core->user_context          = context;
     core->activate_handler      = activate;
     core->first_attach_handler  = first_attach;
     core->second_attach_handler = second_attach;
     core->detach_handler        = detach;
+    core->flow_handler          = flow;
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 29d4c42..a7400af 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -21,12 +21,16 @@
 #include <qpid/dispatch/amqp.h>
 #include <stdio.h>
 
-typedef void (*qdr_forward_message_t) (qdr_core_t      *core,
-                                       qdr_address_t   *addr,
-                                       qd_message_t    *msg,
-                                       qdr_delivery_t  *in_delivery,
-                                       bool             exclude_inprocess,
-                                       bool             control);
+//
+// NOTE: If the in_delivery argument is NULL, the resulting out deliveries
+//       shall be pre-settled.
+//
+typedef int (*qdr_forward_message_t) (qdr_core_t      *core,
+                                      qdr_address_t   *addr,
+                                      qd_message_t    *msg,
+                                      qdr_delivery_t  *in_delivery,
+                                      bool             exclude_inprocess,
+                                      bool             control);
 
 typedef void (*qdr_forward_attach_t) (qdr_core_t      *core,
                                       qdr_forwarder_t *forw,
@@ -42,34 +46,148 @@ struct qdr_forwarder_t {
 // Built-in Forwarders
 //==================================================================================
 
-void qdr_forward_multicast_CT(qdr_core_t      *core,
-                              qdr_address_t   *addr,
-                              qd_message_t    *msg,
-                              qdr_delivery_t  *in_delivery,
-                              bool             exclude_inprocess,
-                              bool             control)
+
+qdr_delivery_t *qdr_forward_new_delivery(qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg)
 {
-    //bool bypass_valid_origins = addr->forwarder->bypass_valid_origins;
-}
+    qdr_delivery_t *dlv = new_qdr_delivery_t();
 
+    ZERO(dlv);
+    dlv->link    = link;
+    dlv->peer    = peer;
+    dlv->msg     = qd_message_copy(msg);
+    dlv->settled = !peer || peer->settled;
 
-void qdr_forward_closest_CT(qdr_core_t      *core,
-                            qdr_address_t   *addr,
-                            qd_message_t    *msg,
-                            qdr_delivery_t  *in_delivery,
-                            bool             exclude_inprocess,
-                            bool             control)
-{
+    if (peer->peer == 0)
+        peer->peer = dlv;  // TODO - make this a back-list for multicast tracking
+
+    return dlv;
 }
 
 
-void qdr_forward_balanced_CT(qdr_core_t      *core,
+int qdr_forward_multicast_CT(qdr_core_t      *core,
                              qdr_address_t   *addr,
                              qd_message_t    *msg,
                              qdr_delivery_t  *in_delivery,
                              bool             exclude_inprocess,
                              bool             control)
 {
+    bool bypass_valid_origins = addr->forwarder->bypass_valid_origins;
+    int  fanout = 0;
+
+    //
+    // Forward to local subscribers
+    //
+    qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
+    while (link_ref) {
+        qdr_link_t     *out_link     = link_ref->link;
+        qdr_delivery_t *out_delivery = qdr_forward_new_delivery(in_delivery, out_link, msg);
+        DEQ_INSERT_TAIL(out_link->undelivered, out_delivery); // TODO - check locking on this list
+        // TODO activate the connection for the out link
+        fanout++;
+        link_ref = DEQ_NEXT(link_ref);
+    }
+
+    //
+    // Forward to remote routers with subscribers using the appropriate
+    // link for the traffic class: control or data
+    //
+    //
+    // Get the mask bit associated with the ingress router for the message.
+    // This will be compared against the "valid_origin" masks for each
+    // candidate destination router.
+    //
+    int origin = -1;
+    qd_field_iterator_t *ingress_iter = in_delivery ? in_delivery->origin : 0;
+
+    if (ingress_iter && !bypass_valid_origins) {
+        qd_address_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH);
+        qdr_address_t *origin_addr;
+        qd_hash_retrieve(core->addr_hash, ingress_iter, (void*) &origin_addr);
+        if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) {
+            qdr_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes);
+            origin = rref->router->mask_bit;
+        }
+    } else
+        origin = 0;
+
+    //
+    // Forward to the next-hops for remote destinations.
+    //
+    if (origin >= 0) {
+        qdr_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
+        qdr_link_t       *dest_link;
+        qdr_node_t       *next_node;
+        qd_bitmask_t     *link_set = qd_bitmask(0);
+
+        //
+        // Loop over the target nodes for this address.  Build a set of outgoing links
+        // for which there are valid targets.  We do this to avoid sending more than one
+        // message down a given link.  It's possible that there are multiple destinations
+        // for this address that are all reachable over the same link.  In this case, we
+        // will send only one copy of the message over the link and allow a downstream
+        // router to fan the message out.
+        //
+        while (dest_node_ref) {
+            if (dest_node_ref->router->next_hop)
+                next_node = dest_node_ref->router->next_hop;
+            else
+                next_node = dest_node_ref->router;
+            dest_link = control ? next_node->peer_control_link : next_node->peer_data_link;
+            if (dest_link && qd_bitmask_value(dest_node_ref->router->valid_origins, origin))
+                qd_bitmask_set_bit(link_set, dest_link->conn->mask_bit);
+            dest_node_ref = DEQ_NEXT(dest_node_ref);
+        }
+
+        //
+        // Send a copy of the message outbound on each identified link.
+        //
+        int link_bit;
+        while (qd_bitmask_first_set(link_set, &link_bit)) {
+            qd_bitmask_clear_bit(link_set, link_bit);
+            dest_link = control ?
+                core->control_links_by_mask_bit[link_bit] :
+                core->data_links_by_mask_bit[link_bit];
+            if (dest_link) {
+                qdr_delivery_t *out_delivery = qdr_forward_new_delivery(in_delivery, dest_link, msg);
+                DEQ_INSERT_TAIL(dest_link->undelivered, out_delivery); // TODO - check locking on this list
+                fanout++;
+                addr->deliveries_transit++;
+                // TODO - Activate link's connection
+            }
+        }
+
+        qd_bitmask_free(link_set);
+    }
+
+    if (!exclude_inprocess) {
+        //
+        // Forward to in-process subscribers
+        //
+    }
+
+    return fanout;
+}
+
+
+int qdr_forward_closest_CT(qdr_core_t      *core,
+                           qdr_address_t   *addr,
+                           qd_message_t    *msg,
+                           qdr_delivery_t  *in_delivery,
+                           bool             exclude_inprocess,
+                           bool             control)
+{
+    return 0;
+}
+
+
+int qdr_forward_balanced_CT(qdr_core_t      *core,
+                            qdr_address_t   *addr,
+                            qd_message_t    *msg,
+                            qdr_delivery_t  *in_delivery,
+                            bool             exclude_inprocess,
+                            bool             control)
+{
+    return 0;
 }
 
 
@@ -121,14 +239,14 @@ qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_semantics_t seman
 }
 
 
-void qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
-                            bool exclude_inprocess, bool control)
+int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
+                           bool exclude_inprocess, bool control)
 {
     if (addr->forwarder)
-        addr->forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control);
-    else {
-        // TODO - Deal with this delivery's disposition
-    }
+        return addr->forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control);
+
+    // TODO - Deal with this delivery's disposition
+    return 0;
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index c043e09..359a192 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -23,6 +23,7 @@
 ALLOC_DEFINE(qdr_query_t);
 ALLOC_DEFINE(qdr_address_t);
 ALLOC_DEFINE(qdr_node_t);
+ALLOC_DEFINE(qdr_delivery_t);
 ALLOC_DEFINE(qdr_link_t);
 ALLOC_DEFINE(qdr_router_ref_t);
 ALLOC_DEFINE(qdr_link_ref_t);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index bc94b68..aa9f3ee 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -34,8 +34,8 @@ typedef struct qdr_lrp_ref_t     qdr_lrp_ref_t;
 typedef struct qdr_forwarder_t   qdr_forwarder_t;
 
 qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_semantics_t semantics);
-void qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
-                            bool exclude_inprocess, bool control);
+int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
+                           bool exclude_inprocess, bool control);
 void qdr_forward_attach_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, qdr_link_t *in_link);
 
 /**
@@ -165,11 +165,14 @@ DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t);
 
 struct qdr_delivery_t {
     DEQ_LINKS(qdr_delivery_t);
-    void           *context;
-    qdr_link_t     *link;
-    qdr_delivery_t *peer;
-    uint64_t        disposition;
-    bool            settled;
+    void                *context;
+    qdr_link_t          *link;
+    qdr_delivery_t      *peer;
+    qd_message_t        *msg;
+    qd_field_iterator_t *to_addr;
+    qd_field_iterator_t *origin;
+    uint64_t             disposition;
+    bool                 settled;
 };
 
 ALLOC_DECLARE(qdr_delivery_t);
@@ -178,21 +181,19 @@ DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t);
 
 struct qdr_link_t {
     DEQ_LINKS(qdr_link_t);
-    qdr_core_t               *core;
-    void                     *user_context;
-    qdr_connection_t         *conn;            ///< [ref] Connection that owns this link
-    qd_link_type_t            link_type;
-    qd_direction_t            link_direction;
-    char                     *name;
-    qdr_address_t            *owning_addr;     ///< [ref] Address record that owns this link
-    //qd_waypoint_t            *waypoint;        ///< [ref] Waypoint that owns this link
-    qdr_link_t               *connected_link;  ///< [ref] If this is a link-route, reference the connected link
-    qdr_link_ref_t           *ref;             ///< Pointer to a containing reference object
-    qd_routed_event_list_t    event_fifo;      ///< FIFO of outgoing delivery/link events (no messages)
-    qd_routed_event_list_t    msg_fifo;        ///< FIFO of incoming or outgoing message deliveries
-    qd_router_delivery_list_t deliveries;      ///< [own] outstanding unsettled deliveries
-    bool                      strip_annotations_in;
-    bool                      strip_annotations_out;
+    qdr_core_t          *core;
+    void                *user_context;
+    qdr_connection_t    *conn;            ///< [ref] Connection that owns this link
+    qd_link_type_t       link_type;
+    qd_direction_t       link_direction;
+    char                *name;
+    qdr_address_t       *owning_addr;     ///< [ref] Address record that owns this link
+    qdr_link_t          *connected_link;  ///< [ref] If this is a link-route, reference the connected link
+    qdr_link_ref_t      *ref;             ///< Pointer to a containing reference object (TODO - check this!)
+    qdr_delivery_list_t  undelivered;     ///< Deliveries to be forwarded or sent
+    qdr_delivery_list_t  unsettled;       ///< Unsettled deliveries
+    bool                 strip_annotations_in;
+    bool                 strip_annotations_out;
 };
 
 ALLOC_DECLARE(qdr_link_t);
@@ -380,6 +381,7 @@ struct qdr_core_t {
     qdr_link_first_attach_t    first_attach_handler;
     qdr_link_second_attach_t   second_attach_handler;
     qdr_link_detach_t          detach_handler;
+    qdr_link_flow_t            flow_handler;
 
     const char *router_area;
     const char *router_id;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 48d3446..1c56f13 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -21,10 +21,8 @@
 #include <qpid/dispatch/amqp.h>
 #include <stdio.h>
 
-ALLOC_DEFINE(qdr_delivery_t);
 
 static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_link_deliver_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 
 
@@ -37,22 +35,38 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 // Interface Functions
 //==================================================================================
 
-qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress)
+qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress, bool settled)
 {
     qdr_action_t   *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
+    ZERO(dlv);
+    dlv->link    = link;
+    dlv->msg     = msg;
+    dlv->to_addr = 0;
+    dlv->origin  = ingress;
+    dlv->settled = settled;
+
+    action->args.connection.delivery = dlv;
     qdr_action_enqueue(link->core, action);
     return dlv;
 }
 
 
 qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
-                                    qd_field_iterator_t *ingress, qd_field_iterator_t *addr)
+                                    qd_field_iterator_t *ingress, qd_field_iterator_t *addr, bool settled)
 {
-    qdr_action_t   *action = qdr_action(qdr_link_deliver_to_CT, "link_deliver_to");
+    qdr_action_t   *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
+    ZERO(dlv);
+    dlv->link    = link;
+    dlv->msg     = msg;
+    dlv->to_addr = addr;
+    dlv->origin  = ingress;
+    dlv->settled = settled;
+
+    action->args.connection.delivery = dlv;
     qdr_action_enqueue(link->core, action);
     return dlv;
 }
@@ -101,6 +115,18 @@ void *qdr_delivery_get_context(qdr_delivery_t *delivery)
 }
 
 
+uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery)
+{
+    return delivery->disposition;
+}
+
+
+bool qdr_delivery_is_settled(const qdr_delivery_t *delivery)
+{
+    return delivery->settled;
+}
+
+
 //==================================================================================
 // In-Thread Functions
 //==================================================================================
@@ -109,13 +135,41 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
 {
     if (discard)
         return;
-}
 
+    qdr_delivery_t *dlv   = action->args.connection.delivery;
+    qdr_link_t     *link  = dlv->link;
+    int             count = 0;
+
+    if (DEQ_IS_EMPTY(link->undelivered)) {
+        qdr_address_t *addr = link->owning_addr;
+        if (!addr && dlv->to_addr) {
+            qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr);
+            if (addr)
+                count = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL);
+        }
+    }
 
-static void qdr_link_deliver_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
-{
-    if (discard)
-        return;
+    if (count == 0) {
+        if (link->owning_addr)
+            //
+            // Message was not delivered and the link is not anonymous.
+            // Queue the message for later delivery (when the address gets
+            // a valid destination).
+            //
+            DEQ_INSERT_TAIL(link->undelivered, dlv);
+        else {
+            //
+            // TODO - Release the delivery
+            //
+        }
+    } else if (count == 1) {
+        if (qdr_delivery_is_settled(dlv))
+            DEQ_INSERT_TAIL(link->unsettled, dlv);
+    } else {
+        //
+        // The count is greater than one.  Do something!  TODO
+        //
+    }
 }
 
 
@@ -129,7 +183,7 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
         qd_address_iterator_reset_view(addr_field->iterator, ITER_VIEW_ADDRESS_HASH);
         qd_hash_retrieve(core->addr_hash, addr_field->iterator, (void**) &addr);
         if (addr)
-            qdr_forward_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess, action->args.io.control);
+            (void) qdr_forward_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess, action->args.io.control);
         else
             qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown address");
     }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1aa7fece/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index c775434..08da120 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -286,9 +286,9 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd
                 addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO);
 
             if (addr_iter)
-                delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter);
+                delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd));
         } else
-            delivery = qdr_link_deliver(rlink, msg, ingress_iter);
+            delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd));
 
         if (delivery) {
             pn_delivery_set_context(pnd, delivery);
@@ -612,6 +612,11 @@ static void qd_router_link_detach(void *context, qdr_link_t *link, qdr_error_t *
 }
 
 
+static void qd_router_link_flow(void *context, qdr_link_t *link)
+{
+}
+
+
 void qd_router_setup_late(qd_dispatch_t *qd)
 {
     qd->router->router_core = qdr_core(qd, qd->router->router_area, qd->router->router_id);
@@ -620,7 +625,8 @@ void qd_router_setup_late(qd_dispatch_t *qd)
                             qd_router_connection_activate,
                             qd_router_link_first_attach,
                             qd_router_link_second_attach,
-                            qd_router_link_detach);
+                            qd_router_link_detach,
+                            qd_router_link_flow);
 
     qd_router_python_setup(qd->router);
     qd_timer_schedule(qd->router->timer, 1000);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org