You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/01/19 21:02:37 UTC
[4/4] qpid-dispatch git commit: DISPATCH-203 - Added linkage into the
rest of the system to resolve the issue.
DISPATCH-203 - Added linkage into the rest of the system to resolve the issue.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/b1c8cf5a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/b1c8cf5a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/b1c8cf5a
Branch: refs/heads/tross-DISPATCH-179-1
Commit: b1c8cf5a06867c6c7795ea1fe2c80a69ef560e85
Parents: d4f8343
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Jan 19 14:59:17 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Jan 19 14:59:17 2016 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 9 ++++++--
src/router_core/forwarder.c | 32 +++++++++++++++++++---------
src/router_core/router_core_private.h | 3 ++-
src/router_core/transfer.c | 21 ++++++++++++------
src/router_node.c | 34 ++++++++++++++++++------------
src/router_private.h | 2 ++
src/router_pynode.c | 4 ++++
7 files changed, 71 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1c8cf5a/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 9cd97e2..70b9294 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -475,11 +475,16 @@ void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error);
* iterator is assumed to reference content in the message that will stay valid
* through the lifetime of the message.
* @param settled True iff the delivery is pre-settled.
+ * @param link_exclusion If present, this is a bitmask of inter-router links that should not be used
+ * to send this message. This bitmask is created by the trace_mask module and
+ * it built on the trace header from a received message.
* @return Pointer to the qdr_delivery that will track the lifecycle of this delivery on this link.
*/
-qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress, bool settled);
+qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress,
+ bool settled, qd_bitmask_t *link_exclusion);
qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
- qd_field_iterator_t *ingress, qd_field_iterator_t *addr, bool settled);
+ qd_field_iterator_t *ingress, qd_field_iterator_t *addr,
+ bool settled, qd_bitmask_t *link_exclusion);
qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg);
void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1c8cf5a/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index d7dc176..865b4da 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -30,7 +30,8 @@ typedef int (*qdr_forward_message_t) (qdr_core_t *core,
qd_message_t *msg,
qdr_delivery_t *in_delivery,
bool exclude_inprocess,
- bool control);
+ bool control,
+ qd_bitmask_t *link_exclusion);
typedef void (*qdr_forward_attach_t) (qdr_core_t *core,
qdr_forwarder_t *forw,
@@ -105,7 +106,8 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
qd_message_t *msg,
qdr_delivery_t *in_delivery,
bool exclude_inprocess,
- bool control)
+ bool control,
+ qd_bitmask_t *link_exclusion)
{
bool bypass_valid_origins = addr->forwarder->bypass_valid_origins;
int fanout = 0;
@@ -187,7 +189,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
dest_link = control ?
core->control_links_by_mask_bit[link_bit] :
core->data_links_by_mask_bit[link_bit];
- if (dest_link) {
+ if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) {
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
qdr_forward_deliver_CT(core, dest_link, out_delivery);
fanout++;
@@ -211,6 +213,8 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
}
}
+ if (link_exclusion)
+ qd_bitmask_free(link_exclusion);
return fanout;
}
@@ -220,10 +224,17 @@ int qdr_forward_closest_CT(qdr_core_t *core,
qd_message_t *msg,
qdr_delivery_t *in_delivery,
bool exclude_inprocess,
- bool control)
+ bool control,
+ qd_bitmask_t *link_exclusion)
{
//
- // Forward to an in-process subscriber if there is one
+ // The Anycast forwarders don't respect link exclusions.
+ //
+ if (link_exclusion)
+ qd_bitmask_free(link_exclusion);
+
+ //
+ // Forward to an in-process subscriber if there is one.
//
if (!exclude_inprocess) {
qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
@@ -231,7 +242,7 @@ int qdr_forward_closest_CT(qdr_core_t *core,
qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg);
//
- // Rotate this subscription to the end of the list to get round-robin distribution
+ // Rotate this subscription to the end of the list to get round-robin distribution.
//
if (DEQ_SIZE(addr->subscriptions) > 1) {
DEQ_REMOVE_HEAD(addr->subscriptions);
@@ -243,7 +254,7 @@ int qdr_forward_closest_CT(qdr_core_t *core,
}
//
- // Forward to a local subscriber
+ // Forward to a local subscriber.
//
qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
if (link_ref) {
@@ -278,7 +289,8 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
qd_message_t *msg,
qdr_delivery_t *in_delivery,
bool exclude_inprocess,
- bool control)
+ bool control,
+ qd_bitmask_t *link_exclusion)
{
return 0;
}
@@ -333,10 +345,10 @@ qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_semantics_t seman
int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
- bool exclude_inprocess, bool control)
+ bool exclude_inprocess, bool control, qd_bitmask_t *link_exclusion)
{
if (addr->forwarder)
- return addr->forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control);
+ return addr->forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control, link_exclusion);
// TODO - Deal with this delivery's disposition
return 0;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1c8cf5a/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 691ddb0..57cfa8a 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -35,7 +35,7 @@ typedef struct qdr_forwarder_t qdr_forwarder_t;
qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_semantics_t semantics);
int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
- bool exclude_inprocess, bool control);
+ bool exclude_inprocess, bool control, qd_bitmask_t *link_exclusion);
void qdr_forward_attach_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, qdr_link_t *in_link);
/**
@@ -86,6 +86,7 @@ struct qdr_action_t {
qdr_terminus_t *target;
qdr_error_t *error;
qd_detach_type_t dt;
+ qd_bitmask_t *link_exclusion;
} connection;
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1c8cf5a/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index a6a412b..0cdd21e 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -35,7 +35,8 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
// Interface Functions
//==================================================================================
-qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress, bool settled)
+qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_iterator_t *ingress,
+ bool settled, qd_bitmask_t *link_exclusion)
{
qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
qdr_delivery_t *dlv = new_qdr_delivery_t();
@@ -48,13 +49,15 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_i
dlv->settled = settled;
action->args.connection.delivery = dlv;
+ action->args.connection.link_exclusion = link_exclusion;
qdr_action_enqueue(link->core, action);
return dlv;
}
qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
- qd_field_iterator_t *ingress, qd_field_iterator_t *addr, bool settled)
+ qd_field_iterator_t *ingress, qd_field_iterator_t *addr,
+ bool settled, qd_bitmask_t *link_exclusion)
{
qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
qdr_delivery_t *dlv = new_qdr_delivery_t();
@@ -67,6 +70,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
dlv->settled = settled;
action->args.connection.delivery = dlv;
+ action->args.connection.link_exclusion = link_exclusion;
qdr_action_enqueue(link->core, action);
return dlv;
}
@@ -232,9 +236,10 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
if (discard)
return;
- qdr_delivery_t *dlv = action->args.connection.delivery;
- qdr_link_t *link = dlv->link;
- int count = 0;
+ qdr_delivery_t *dlv = action->args.connection.delivery;
+ qd_bitmask_t *link_exclude = action->args.connection.link_exclusion;
+ qdr_link_t *link = dlv->link;
+ int count = 0;
//
// NOTE: The link->undelivered list does not need to be protected by the
@@ -247,7 +252,8 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
if (!addr && dlv->to_addr) {
qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr);
if (addr)
- count = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL);
+ count = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false,
+ link->link_type == QD_LINK_CONTROL, link_exclude);
}
}
@@ -291,7 +297,8 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
//
// Forward the message. We don't care what the fanout count is.
//
- (void) qdr_forward_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess, action->args.io.control);
+ (void) qdr_forward_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess,
+ action->args.io.control, 0);
else
qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown address");
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1c8cf5a/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 8e2dfab..7696754 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -106,7 +106,7 @@ static int router_writable_conn_handler(void *type_context, qd_connection_t *con
static qd_field_iterator_t *router_annotate_message(qd_router_t *router,
qd_parsed_field_t *in_ma,
qd_message_t *msg,
- bool *drop,
+ qd_bitmask_t **link_exclusions,
bool strip_inbound_annotations)
{
qd_field_iterator_t *ingress_iter = 0;
@@ -115,6 +115,8 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t *router,
qd_parsed_field_t *ingress = 0;
qd_parsed_field_t *to = 0;
+ *link_exclusions = 0;
+
if (in_ma && !strip_inbound_annotations) {
uint32_t count = qd_parse_sub_count(in_ma);
bool done = false;
@@ -147,14 +149,20 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t *router,
qd_compose_start_list(trace_field);
if (trace) {
if (qd_parse_is_list(trace)) {
+ //
+ // Create a link-exclusion map for the items in the trace. This map will
+ // contain a one-bit for each link that leads to a neighbor router that
+ // the message has already passed through.
+ //
+ *link_exclusions = qd_tracemask_create(router->tracemask, trace);
+
+ //
+ // Append this router's ID to the trace.
+ //
uint32_t idx = 0;
qd_parsed_field_t *trace_item = qd_parse_sub_value(trace, idx);
while (trace_item) {
qd_field_iterator_t *iter = qd_parse_raw(trace_item);
- if (qd_field_iterator_equal(iter, (unsigned char*) node_id)) {
- *drop = 1;
- return 0; // no further processing necessary
- }
qd_field_iterator_reset(iter);
qd_compose_insert_string_iterator(trace_field, iter);
idx++;
@@ -261,14 +269,9 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd
if (valid_message) {
qd_parsed_field_t *in_ma = qd_message_message_annotations(msg);
- bool drop = false;
+ qd_bitmask_t *link_exclusions;
bool strip = qdr_link_strip_annotations_in(rlink);
- qd_field_iterator_t *ingress_iter = router_annotate_message(router, in_ma, msg, &drop, strip);
-
- if (drop) {
- qd_message_free(msg);
- return;
- }
+ qd_field_iterator_t *ingress_iter = router_annotate_message(router, in_ma, msg, &link_exclusions, strip);
if (anonymous_link) {
qd_field_iterator_t *addr_iter = 0;
@@ -290,10 +293,11 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd
if (addr_iter) {
qd_address_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
- delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd));
+ delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd),
+ link_exclusions);
}
} else
- delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd));
+ delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions);
if (delivery) {
pn_delivery_set_context(pnd, delivery);
@@ -676,6 +680,7 @@ static void qd_router_link_deliver(void *context, qdr_link_t *link, qdr_delivery
void qd_router_setup_late(qd_dispatch_t *qd)
{
+ qd->router->tracemask = qd_tracemask();
qd->router->router_core = qdr_core(qd, qd->router->router_area, qd->router->router_id);
qdr_connection_handlers(qd->router->router_core, (void*) qd->router,
@@ -700,6 +705,7 @@ void qd_router_free(qd_router_t *router)
qd_container_set_default_node_type(router->qd, 0, 0, QD_DIST_BOTH);
qdr_core_free(router->router_core);
+ qd_tracemask_free(router->tracemask);
qd_timer_free(router->timer);
sys_mutex_free(router->lock);
qd_router_configure_free(router);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1c8cf5a/src/router_private.h
----------------------------------------------------------------------
diff --git a/src/router_private.h b/src/router_private.h
index 760ffaf..ee77671 100644
--- a/src/router_private.h
+++ b/src/router_private.h
@@ -29,6 +29,7 @@
#include <qpid/dispatch/router_core.h>
#include <qpid/dispatch/message.h>
#include <qpid/dispatch/bitmask.h>
+#include <qpid/dispatch/trace_mask.h>
#include <qpid/dispatch/hash.h>
#include <qpid/dispatch/log.h>
#include "dispatch_private.h"
@@ -219,6 +220,7 @@ DEQ_DECLARE(qd_waypoint_t, qd_waypoint_list_t);
struct qd_router_t {
qd_dispatch_t *qd;
qdr_core_t *router_core;
+ qd_tracemask_t *tracemask;
qd_log_source_t *log_source;
qd_router_mode_t router_mode;
const char *router_area;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1c8cf5a/src/router_pynode.c
----------------------------------------------------------------------
diff --git a/src/router_pynode.c b/src/router_pynode.c
index 3cdc98f..4d2c819 100644
--- a/src/router_pynode.c
+++ b/src/router_pynode.c
@@ -52,6 +52,7 @@ static PyObject *qd_add_router(PyObject *self, PyObject *args)
return 0;
qdr_core_add_router(router->router_core, address, router_maskbit);
+ qd_tracemask_add_router(router->tracemask, address, router_maskbit);
Py_INCREF(Py_None);
return Py_None;
@@ -68,6 +69,7 @@ static PyObject* qd_del_router(PyObject *self, PyObject *args)
return 0;
qdr_core_del_router(router->router_core, router_maskbit);
+ qd_tracemask_del_router(router->tracemask, router_maskbit);
Py_INCREF(Py_None);
return Py_None;
@@ -85,6 +87,7 @@ static PyObject* qd_set_link(PyObject *self, PyObject *args)
return 0;
qdr_core_set_link(router->router_core, router_maskbit, link_maskbit);
+ qd_tracemask_set_link(router->tracemask, router_maskbit, link_maskbit);
Py_INCREF(Py_None);
return Py_None;
@@ -101,6 +104,7 @@ static PyObject* qd_remove_link(PyObject *self, PyObject *args)
return 0;
qdr_core_remove_link(router->router_core, router_maskbit);
+ qd_tracemask_remove_link(router->tracemask, router_maskbit);
Py_INCREF(Py_None);
return Py_None;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org