You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/01/19 21:02:37 UTC

[4/4] qpid-dispatch git commit: DISPATCH-203 - Added linkage into the rest of the system to resolve the issue.

DISPATCH-203 - Added linkage into the rest of the system to resolve the issue.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/b1c8cf5a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/b1c8cf5a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/b1c8cf5a

Branch: refs/heads/tross-DISPATCH-179-1
Commit: b1c8cf5a06867c6c7795ea1fe2c80a69ef560e85
Parents: d4f8343
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Jan 19 14:59:17 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Jan 19 14:59:17 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   |  9 ++++++--
 src/router_core/forwarder.c           | 32 +++++++++++++++++++---------
 src/router_core/router_core_private.h |  3 ++-
 src/router_core/transfer.c            | 21 ++++++++++++------
 src/router_node.c                     | 34 ++++++++++++++++++------------
 src/router_private.h                  |  2 ++
 src/router_pynode.c                   |  4 ++++
 7 files changed, 71 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1c8cf5a/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 9cd97e2..70b9294 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -475,11 +475,16 @@ void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error);
  *                iterator is assumed to reference content in the message that will stay valid
  *                through the lifetime of the message.
  * @param settled True iff the delivery is pre-settled.
+ * @param link_exclusion If present, this is a bitmask of inter-router links that should not be used
+ *                       to send this message.  This bitmask is created by the trace_mask module and
+ *                       it built on the trace header from a received message.
  * @return Pointer to the qdr_delivery that will track the lifecycle of this delivery on this link.
  */
-qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress, bool settled);
+qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress,
+                                 bool settled, qd_bitmask_t *link_exclusion);
 qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
-                                    qd_field_iterator_t *ingress, qd_field_iterator_t *addr, bool settled);
+                                    qd_field_iterator_t *ingress, qd_field_iterator_t *addr,
+                                    bool settled, qd_bitmask_t *link_exclusion);
 qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg);
 
 void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1c8cf5a/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index d7dc176..865b4da 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -30,7 +30,8 @@ typedef int (*qdr_forward_message_t) (qdr_core_t      *core,
                                       qd_message_t    *msg,
                                       qdr_delivery_t  *in_delivery,
                                       bool             exclude_inprocess,
-                                      bool             control);
+                                      bool             control,
+                                      qd_bitmask_t    *link_exclusion);
 
 typedef void (*qdr_forward_attach_t) (qdr_core_t      *core,
                                       qdr_forwarder_t *forw,
@@ -105,7 +106,8 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
                              qd_message_t    *msg,
                              qdr_delivery_t  *in_delivery,
                              bool             exclude_inprocess,
-                             bool             control)
+                             bool             control,
+                             qd_bitmask_t    *link_exclusion)
 {
     bool bypass_valid_origins = addr->forwarder->bypass_valid_origins;
     int  fanout = 0;
@@ -187,7 +189,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
             dest_link = control ?
                 core->control_links_by_mask_bit[link_bit] :
                 core->data_links_by_mask_bit[link_bit];
-            if (dest_link) {
+            if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) {
                 qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
                 qdr_forward_deliver_CT(core, dest_link, out_delivery);
                 fanout++;
@@ -211,6 +213,8 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         }
     }
 
+    if (link_exclusion)
+        qd_bitmask_free(link_exclusion);
     return fanout;
 }
 
@@ -220,10 +224,17 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
                            qd_message_t    *msg,
                            qdr_delivery_t  *in_delivery,
                            bool             exclude_inprocess,
-                           bool             control)
+                           bool             control,
+                           qd_bitmask_t    *link_exclusion)
 {
     //
-    // Forward to an in-process subscriber if there is one
+    // The Anycast forwarders don't respect link exclusions.
+    //
+    if (link_exclusion)
+        qd_bitmask_free(link_exclusion);
+
+    //
+    // Forward to an in-process subscriber if there is one.
     //
     if (!exclude_inprocess) {
         qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
@@ -231,7 +242,7 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
             qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg);
 
             //
-            // Rotate this subscription to the end of the list to get round-robin distribution
+            // Rotate this subscription to the end of the list to get round-robin distribution.
             //
             if (DEQ_SIZE(addr->subscriptions) > 1) {
                 DEQ_REMOVE_HEAD(addr->subscriptions);
@@ -243,7 +254,7 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
     }
 
     //
-    // Forward to a local subscriber
+    // Forward to a local subscriber.
     //
     qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
     if (link_ref) {
@@ -278,7 +289,8 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
                             qd_message_t    *msg,
                             qdr_delivery_t  *in_delivery,
                             bool             exclude_inprocess,
-                            bool             control)
+                            bool             control,
+                            qd_bitmask_t    *link_exclusion)
 {
     return 0;
 }
@@ -333,10 +345,10 @@ qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_semantics_t seman
 
 
 int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
-                           bool exclude_inprocess, bool control)
+                           bool exclude_inprocess, bool control, qd_bitmask_t *link_exclusion)
 {
     if (addr->forwarder)
-        return addr->forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control);
+        return addr->forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control, link_exclusion);
 
     // TODO - Deal with this delivery's disposition
     return 0;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1c8cf5a/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 691ddb0..57cfa8a 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -35,7 +35,7 @@ typedef struct qdr_forwarder_t   qdr_forwarder_t;
 
 qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_semantics_t semantics);
 int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
-                           bool exclude_inprocess, bool control);
+                           bool exclude_inprocess, bool control, qd_bitmask_t *link_exclusion);
 void qdr_forward_attach_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, qdr_link_t *in_link);
 
 /**
@@ -86,6 +86,7 @@ struct qdr_action_t {
             qdr_terminus_t   *target;
             qdr_error_t      *error;
             qd_detach_type_t  dt;
+            qd_bitmask_t     *link_exclusion;
         } connection;
 
         //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1c8cf5a/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index a6a412b..0cdd21e 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -35,7 +35,8 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 // Interface Functions
 //==================================================================================
 
-qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress, bool settled)
+qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress,
+                                 bool settled, qd_bitmask_t *link_exclusion)
 {
     qdr_action_t   *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
@@ -48,13 +49,15 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_i
     dlv->settled = settled;
 
     action->args.connection.delivery = dlv;
+    action->args.connection.link_exclusion = link_exclusion;
     qdr_action_enqueue(link->core, action);
     return dlv;
 }
 
 
 qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
-                                    qd_field_iterator_t *ingress, qd_field_iterator_t *addr, bool settled)
+                                    qd_field_iterator_t *ingress, qd_field_iterator_t *addr,
+                                    bool settled, qd_bitmask_t *link_exclusion)
 {
     qdr_action_t   *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
@@ -67,6 +70,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
     dlv->settled = settled;
 
     action->args.connection.delivery = dlv;
+    action->args.connection.link_exclusion = link_exclusion;
     qdr_action_enqueue(link->core, action);
     return dlv;
 }
@@ -232,9 +236,10 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
     if (discard)
         return;
 
-    qdr_delivery_t *dlv   = action->args.connection.delivery;
-    qdr_link_t     *link  = dlv->link;
-    int             count = 0;
+    qdr_delivery_t *dlv          = action->args.connection.delivery;
+    qd_bitmask_t   *link_exclude = action->args.connection.link_exclusion;
+    qdr_link_t     *link         = dlv->link;
+    int             count        = 0;
 
     //
     // NOTE: The link->undelivered list does not need to be protected by the
@@ -247,7 +252,8 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
         if (!addr && dlv->to_addr) {
             qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr);
             if (addr)
-                count = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL);
+                count = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false,
+                                               link->link_type == QD_LINK_CONTROL, link_exclude);
         }
     }
 
@@ -291,7 +297,8 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
             //
             // Forward the message.  We don't care what the fanout count is.
             //
-            (void) qdr_forward_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess, action->args.io.control);
+            (void) qdr_forward_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess,
+                                          action->args.io.control, 0);
         else
             qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown address");
     }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1c8cf5a/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 8e2dfab..7696754 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -106,7 +106,7 @@ static int router_writable_conn_handler(void *type_context, qd_connection_t *con
 static qd_field_iterator_t *router_annotate_message(qd_router_t       *router,
                                                     qd_parsed_field_t *in_ma,
                                                     qd_message_t      *msg,
-                                                    bool              *drop,
+                                                    qd_bitmask_t     **link_exclusions,
                                                     bool               strip_inbound_annotations)
 {
     qd_field_iterator_t *ingress_iter = 0;
@@ -115,6 +115,8 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t       *router,
     qd_parsed_field_t *ingress = 0;
     qd_parsed_field_t *to      = 0;
 
+    *link_exclusions = 0;
+
     if (in_ma && !strip_inbound_annotations) {
         uint32_t count = qd_parse_sub_count(in_ma);
         bool done = false;
@@ -147,14 +149,20 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t       *router,
     qd_compose_start_list(trace_field);
     if (trace) {
         if (qd_parse_is_list(trace)) {
+            //
+            // Create a link-exclusion map for the items in the trace.  This map will
+            // contain a one-bit for each link that leads to a neighbor router that
+            // the message has already passed through.
+            //
+            *link_exclusions = qd_tracemask_create(router->tracemask, trace);
+
+            //
+            // Append this router's ID to the trace.
+            //
             uint32_t idx = 0;
             qd_parsed_field_t *trace_item = qd_parse_sub_value(trace, idx);
             while (trace_item) {
                 qd_field_iterator_t *iter = qd_parse_raw(trace_item);
-                if (qd_field_iterator_equal(iter, (unsigned char*) node_id)) {
-                    *drop = 1;
-                    return 0;  // no further processing necessary
-                }
                 qd_field_iterator_reset(iter);
                 qd_compose_insert_string_iterator(trace_field, iter);
                 idx++;
@@ -261,14 +269,9 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd
 
     if (valid_message) {
         qd_parsed_field_t   *in_ma        = qd_message_message_annotations(msg);
-        bool                 drop         = false;
+        qd_bitmask_t        *link_exclusions;
         bool                 strip        = qdr_link_strip_annotations_in(rlink);
-        qd_field_iterator_t *ingress_iter = router_annotate_message(router, in_ma, msg, &drop, strip);
-
-        if (drop) {
-            qd_message_free(msg);
-            return;
-        }
+        qd_field_iterator_t *ingress_iter = router_annotate_message(router, in_ma, msg, &link_exclusions, strip);
 
         if (anonymous_link) {
             qd_field_iterator_t *addr_iter = 0;
@@ -290,10 +293,11 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd
 
             if (addr_iter) {
                 qd_address_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
-                delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd));
+                delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd),
+                                               link_exclusions);
             }
         } else
-            delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd));
+            delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions);
 
         if (delivery) {
             pn_delivery_set_context(pnd, delivery);
@@ -676,6 +680,7 @@ static void qd_router_link_deliver(void *context, qdr_link_t *link, qdr_delivery
 
 void qd_router_setup_late(qd_dispatch_t *qd)
 {
+    qd->router->tracemask   = qd_tracemask();
     qd->router->router_core = qdr_core(qd, qd->router->router_area, qd->router->router_id);
 
     qdr_connection_handlers(qd->router->router_core, (void*) qd->router,
@@ -700,6 +705,7 @@ void qd_router_free(qd_router_t *router)
     qd_container_set_default_node_type(router->qd, 0, 0, QD_DIST_BOTH);
 
     qdr_core_free(router->router_core);
+    qd_tracemask_free(router->tracemask);
     qd_timer_free(router->timer);
     sys_mutex_free(router->lock);
     qd_router_configure_free(router);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1c8cf5a/src/router_private.h
----------------------------------------------------------------------
diff --git a/src/router_private.h b/src/router_private.h
index 760ffaf..ee77671 100644
--- a/src/router_private.h
+++ b/src/router_private.h
@@ -29,6 +29,7 @@
 #include <qpid/dispatch/router_core.h>
 #include <qpid/dispatch/message.h>
 #include <qpid/dispatch/bitmask.h>
+#include <qpid/dispatch/trace_mask.h>
 #include <qpid/dispatch/hash.h>
 #include <qpid/dispatch/log.h>
 #include "dispatch_private.h"
@@ -219,6 +220,7 @@ DEQ_DECLARE(qd_waypoint_t, qd_waypoint_list_t);
 struct qd_router_t {
     qd_dispatch_t            *qd;
     qdr_core_t               *router_core;
+    qd_tracemask_t           *tracemask;
     qd_log_source_t          *log_source;
     qd_router_mode_t          router_mode;
     const char               *router_area;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1c8cf5a/src/router_pynode.c
----------------------------------------------------------------------
diff --git a/src/router_pynode.c b/src/router_pynode.c
index 3cdc98f..4d2c819 100644
--- a/src/router_pynode.c
+++ b/src/router_pynode.c
@@ -52,6 +52,7 @@ static PyObject *qd_add_router(PyObject *self, PyObject *args)
         return 0;
 
     qdr_core_add_router(router->router_core, address, router_maskbit);
+    qd_tracemask_add_router(router->tracemask, address, router_maskbit);
 
     Py_INCREF(Py_None);
     return Py_None;
@@ -68,6 +69,7 @@ static PyObject* qd_del_router(PyObject *self, PyObject *args)
         return 0;
 
     qdr_core_del_router(router->router_core, router_maskbit);
+    qd_tracemask_del_router(router->tracemask, router_maskbit);
 
     Py_INCREF(Py_None);
     return Py_None;
@@ -85,6 +87,7 @@ static PyObject* qd_set_link(PyObject *self, PyObject *args)
         return 0;
 
     qdr_core_set_link(router->router_core, router_maskbit, link_maskbit);
+    qd_tracemask_set_link(router->tracemask, router_maskbit, link_maskbit);
 
     Py_INCREF(Py_None);
     return Py_None;
@@ -101,6 +104,7 @@ static PyObject* qd_remove_link(PyObject *self, PyObject *args)
         return 0;
 
     qdr_core_remove_link(router->router_core, router_maskbit);
+    qd_tracemask_remove_link(router->tracemask, router_maskbit);
 
     Py_INCREF(Py_None);
     return Py_None;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org