You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/02/28 14:00:04 UTC
qpid-dispatch git commit: DISPATCH-932 - Added counts to correlate
the ingress and egress routers of deliveries. This closes #253
Repository: qpid-dispatch
Updated Branches:
refs/heads/master b6805c0ce -> 1134dbae4
DISPATCH-932 - Added counts to correlate the ingress and egress routers of deliveries.
This closes #253
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/1134dbae
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/1134dbae
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/1134dbae
Branch: refs/heads/master
Commit: 1134dbae47db0801062a3991553ab07a36b0a861
Parents: b6805c0
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Feb 28 08:57:26 2018 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Feb 28 08:57:26 2018 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 5 +++--
include/qpid/dispatch/trace_mask.h | 3 ++-
python/qpid_dispatch/management/qdrouter.json | 8 ++++++++
python/qpid_dispatch_internal/router/node.py | 2 ++
src/router_core/agent_link.c | 12 ++++++++++++
src/router_core/agent_link.h | 2 +-
src/router_core/connections.c | 15 +++++++++++++++
src/router_core/forwarder.c | 2 ++
src/router_core/router_core_private.h | 16 +++++++++-------
src/router_core/transfer.c | 6 ++++--
src/router_node.c | 13 +++++++------
src/trace_mask.c | 16 +++++++++++-----
tests/parse_test.c | 15 +++++++++++++--
13 files changed, 89 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 63e3f46..5b964e9 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -563,13 +563,14 @@ void qdr_link_delete(qdr_link_t *link);
* @param link_exclusion If present, this is a bitmask of inter-router links that should not be used
* to send this message. This bitmask is created by the trace_mask module and
* it built on the trace header from a received message.
+ * @param ingress_index The bitmask index of the router that this delivery entered the network through.
* @return Pointer to the qdr_delivery that will track the lifecycle of this delivery on this link.
*/
qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterator_t *ingress,
- bool settled, qd_bitmask_t *link_exclusion);
+ bool settled, qd_bitmask_t *link_exclusion, int ingress_index);
qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
qd_iterator_t *ingress, qd_iterator_t *addr,
- bool settled, qd_bitmask_t *link_exclusion);
+ bool settled, qd_bitmask_t *link_exclusion, int ingress_index);
qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled,
const uint8_t *tag, int tag_length,
uint64_t disposition, pn_data_t* disposition_state);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/include/qpid/dispatch/trace_mask.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/trace_mask.h b/include/qpid/dispatch/trace_mask.h
index 9f1ec49..b92569f 100644
--- a/include/qpid/dispatch/trace_mask.h
+++ b/include/qpid/dispatch/trace_mask.h
@@ -89,9 +89,10 @@ void qd_tracemask_remove_link(qd_tracemask_t *tm, int router_maskbit);
*
* @param tm Tracemask created by qd_tracemask()
* @param tracelist The parsed field from a message's trace header
+ * @param ingress_index (out) The mask-bit for the first router in the trace list (the ingress router)
* @return A new bit mask with a set-bit for each neighbor router in the list. This must be freed
* by the caller when the caller is done with it.
*/
-qd_bitmask_t *qd_tracemask_create(qd_tracemask_t *tm, qd_parsed_field_t *tracelist);
+qd_bitmask_t *qd_tracemask_create(qd_tracemask_t *tm, qd_parsed_field_t *tracelist, int *ingress_index);
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 4da4ccf..ac9dda8 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1262,6 +1262,10 @@
"type": "integer",
"graph": true,
"description": "The total number of modified deliveries."
+ },
+ "ingressHistogram": {
+ "type": "list",
+ "description": "For outgoing links on connections with 'normal' role. This histogram shows the number of settled deliveries on the link that ingressed the network at each interior router node."
}
}
},
@@ -1390,6 +1394,10 @@
"lastTopoChange": {
"description": "Timestamp showing the most recent change to this node's neighborhood.",
"type": "integer"
+ },
+ "index": {
+ "description": "Index number used in statistics histograms for this router node. This index is specific to this router.",
+ "type": "integer"
}
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/python/qpid_dispatch_internal/router/node.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/node.py b/python/qpid_dispatch_internal/router/node.py
index d5a2353..31bd3fe 100644
--- a/python/qpid_dispatch_internal/router/node.py
+++ b/python/qpid_dispatch_internal/router/node.py
@@ -55,6 +55,7 @@ class NodeTracker(object):
"""Refresh management attributes"""
attributes.update({
"id": self.my_id,
+ "index": 0,
"protocolVersion": ProtocolVersion,
"instance": self.container.instance, # Boot number, integer
"linkState": [ls for ls in self.link_state.peers], # List of neighbour nodes
@@ -411,6 +412,7 @@ class RouterNode(object):
"""Refresh management attributes"""
attributes.update({
"id": self.id,
+ "index": self.maskbit,
"protocolVersion": self.version,
"instance": self.instance, # Boot number, integer
"linkState": [ls for ls in self.link_state.peers], # List of neighbour nodes
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/router_core/agent_link.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index fc9981d..8908312 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -42,6 +42,7 @@
#define QDR_LINK_REJECTED_COUNT 18
#define QDR_LINK_RELEASED_COUNT 19
#define QDR_LINK_MODIFIED_COUNT 20
+#define QDR_LINK_INGRESS_HISTOGRAM 21
const char *qdr_link_columns[] =
{"name",
@@ -65,6 +66,7 @@ const char *qdr_link_columns[] =
"rejectedCount",
"releasedCount",
"modifiedCount",
+ "ingressHistogram",
0};
static const char *qd_link_type_name(qd_link_type_t lt)
@@ -208,6 +210,16 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_li
qd_compose_insert_ulong(body, link->modified_deliveries);
break;
+ case QDR_LINK_INGRESS_HISTOGRAM:
+ if (link->ingress_histogram) {
+ qd_compose_start_list(body);
+ for (int i = 0; i < qd_bitmask_width(); i++)
+ qd_compose_insert_ulong(body, link->ingress_histogram[i]);
+ qd_compose_end_list(body);
+ } else
+ qd_compose_insert_null(body);
+ break;
+
default:
qd_compose_insert_null(body);
break;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/router_core/agent_link.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_link.h b/src/router_core/agent_link.h
index 44be29d..6402949 100644
--- a/src/router_core/agent_link.h
+++ b/src/router_core/agent_link.h
@@ -29,7 +29,7 @@ void qdra_link_update_CT(qdr_core_t *core,
qdr_query_t *query,
qd_parsed_field_t *in_body);
-#define QDR_LINK_COLUMN_COUNT 21
+#define QDR_LINK_COLUMN_COUNT 22
const char *qdr_link_columns[QDR_LINK_COLUMN_COUNT + 1];
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 00b0928..77efe7e 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -402,6 +402,16 @@ const char *qdr_link_name(const qdr_link_t *link)
}
+static void qdr_link_setup_histogram(qdr_connection_t *conn, qd_direction_t dir, qdr_link_t *link)
+{
+ if (dir == QD_OUTGOING && conn->role == QDR_ROLE_NORMAL) {
+ link->ingress_histogram = NEW_ARRAY(uint64_t, qd_bitmask_width());
+ for (int i = 0; i < qd_bitmask_width(); i++)
+ link->ingress_histogram[i] = 0;
+ }
+}
+
+
qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
qd_direction_t dir,
qdr_terminus_t *source,
@@ -441,6 +451,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
else if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_DATA))
link->link_type = QD_LINK_ROUTER;
+ qdr_link_setup_histogram(conn, dir, link);
+
action->args.connection.conn = conn;
action->args.connection.link = link;
action->args.connection.dir = dir;
@@ -827,6 +839,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
//
free(link->name);
free(link->terminus_addr);
+ free(link->ingress_histogram);
link->name = 0;
}
@@ -860,6 +873,8 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
link->strip_annotations_in = conn->strip_annotations_in;
link->strip_annotations_out = conn->strip_annotations_out;
+ qdr_link_setup_histogram(conn, dir, link);
+
DEQ_INSERT_TAIL(core->open_links, link);
qdr_add_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 3a0e3eb..ee66e0d 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -113,6 +113,8 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in
out_dlv->tag_length = 8;
out_dlv->error = 0;
+ out_dlv->ingress_index = in_dlv ? in_dlv->ingress_index : -1;
+
//
// Add one to the message fanout. This will later be used in the qd_message_send function that sends out messages.
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 9b7ba23..389857b 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -346,6 +346,7 @@ struct qdr_delivery_t {
qd_bitmask_t *link_exclusion;
qdr_address_t *tracking_addr;
int tracking_addr_bit;
+ int ingress_index;
qdr_link_work_t *link_work; ///< Delivery work item for this delivery
qdr_subscription_list_t subscriptions;
qdr_delivery_ref_list_t peers; /// Use this list if there if the delivery has more than one peer.
@@ -401,13 +402,14 @@ struct qdr_link_t {
bool drain_mode;
bool stalled_outbound; ///< Indicates that this link is stalled on outbound buffer backpressure
- uint64_t total_deliveries;
- uint64_t presettled_deliveries;
- uint64_t dropped_presettled_deliveries;
- uint64_t accepted_deliveries;
- uint64_t rejected_deliveries;
- uint64_t released_deliveries;
- uint64_t modified_deliveries;
+ uint64_t total_deliveries;
+ uint64_t presettled_deliveries;
+ uint64_t dropped_presettled_deliveries;
+ uint64_t accepted_deliveries;
+ uint64_t rejected_deliveries;
+ uint64_t released_deliveries;
+ uint64_t modified_deliveries;
+ uint64_t *ingress_histogram;
};
ALLOC_DECLARE(qdr_link_t);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 182082e..13d7462 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -42,7 +42,7 @@ void qdr_delivery_copy_extension_state(qdr_delivery_t *src, qdr_delivery_t *dest
//==================================================================================
qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterator_t *ingress,
- bool settled, qd_bitmask_t *link_exclusion)
+ bool settled, qd_bitmask_t *link_exclusion, int ingress_index)
{
qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
qdr_delivery_t *dlv = new_qdr_delivery_t();
@@ -55,6 +55,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterato
dlv->settled = settled;
dlv->presettled = settled;
dlv->link_exclusion = link_exclusion;
+ dlv->ingress_index = ingress_index;
dlv->error = 0;
qdr_delivery_incref(dlv, "qdr_link_deliver - newly created delivery, add to action list");
@@ -68,7 +69,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterato
qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
qd_iterator_t *ingress, qd_iterator_t *addr,
- bool settled, qd_bitmask_t *link_exclusion)
+ bool settled, qd_bitmask_t *link_exclusion, int ingress_index)
{
qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
qdr_delivery_t *dlv = new_qdr_delivery_t();
@@ -81,6 +82,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
dlv->settled = settled;
dlv->presettled = settled;
dlv->link_exclusion = link_exclusion;
+ dlv->ingress_index = ingress_index;
dlv->error = 0;
qdr_delivery_incref(dlv, "qdr_link_deliver_to - newly created delivery, add to action list");
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index b8c4a6b..719af5a 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -172,11 +172,11 @@ static int AMQP_writable_conn_handler(void *type_context, qd_connection_t *conn,
static qd_iterator_t *router_annotate_message(qd_router_t *router,
qd_message_t *msg,
qd_bitmask_t **link_exclusions,
- uint32_t *distance)
+ uint32_t *distance,
+ int *ingress_index)
{
qd_iterator_t *ingress_iter = 0;
-
qd_parsed_field_t *trace = qd_message_get_trace(msg);
qd_parsed_field_t *ingress = qd_message_get_ingress(msg);
qd_parsed_field_t *to = qd_message_get_to_override(msg);
@@ -205,7 +205,7 @@ static qd_iterator_t *router_annotate_message(qd_router_t *router,
// contain a one-bit for each link that leads to a neighbor router that
// the message has already passed through.
//
- *link_exclusions = qd_tracemask_create(router->tracemask, trace);
+ *link_exclusions = qd_tracemask_create(router->tracemask, trace, ingress_index);
//
// Append this router's ID to the trace.
@@ -454,8 +454,9 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
qd_message_message_annotations(msg);
qd_bitmask_t *link_exclusions;
uint32_t distance;
+ int ingress_index = 0; // Default to _this_ router
- qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions, &distance);
+ qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions, &distance, &ingress_index);
//
// If this delivery has traveled further than the known radius of the network topology (plus 1),
@@ -509,7 +510,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
if (phase > 0)
qd_iterator_annotate_phase(addr_iter, '0' + (char) phase);
delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd),
- link_exclusions);
+ link_exclusions, ingress_index);
}
} else {
//
@@ -533,7 +534,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
if (phase != 0)
qd_message_set_phase_annotation(msg, phase);
}
- delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions);
+ delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions, ingress_index);
}
if (delivery) {
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/trace_mask.c
----------------------------------------------------------------------
diff --git a/src/trace_mask.c b/src/trace_mask.c
index 76d0c1a..3adb8d5 100644
--- a/src/trace_mask.c
+++ b/src/trace_mask.c
@@ -124,10 +124,11 @@ void qd_tracemask_remove_link(qd_tracemask_t *tm, int router_maskbit)
}
-qd_bitmask_t *qd_tracemask_create(qd_tracemask_t *tm, qd_parsed_field_t *tracelist)
+qd_bitmask_t *qd_tracemask_create(qd_tracemask_t *tm, qd_parsed_field_t *tracelist, int *ingress_index)
{
- qd_bitmask_t *bm = qd_bitmask(0);
- int idx = 0;
+ qd_bitmask_t *bm = qd_bitmask(0);
+ int idx = 0;
+ bool first = true;
assert(qd_parse_is_list(tracelist));
@@ -138,10 +139,15 @@ qd_bitmask_t *qd_tracemask_create(qd_tracemask_t *tm, qd_parsed_field_t *traceli
qd_iterator_t *iter = qd_parse_raw(item);
qd_iterator_reset_view(iter, ITER_VIEW_NODE_HASH);
qd_hash_retrieve(tm->hash, iter, (void*) &router);
- if (router && router->link_maskbit >= 0)
- qd_bitmask_set_bit(bm, router->link_maskbit);
+ if (router) {
+ if (router->link_maskbit >= 0)
+ qd_bitmask_set_bit(bm, router->link_maskbit);
+ if (first)
+ *ingress_index = router->maskbit;
+ }
idx++;
item = qd_parse_sub_value(tracelist, idx);
+ first = false;
}
sys_rwlock_unlock(tm->lock);
return bm;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/tests/parse_test.c
----------------------------------------------------------------------
diff --git a/tests/parse_test.c b/tests/parse_test.c
index bc12efd..85ce936 100644
--- a/tests/parse_test.c
+++ b/tests/parse_test.c
@@ -302,11 +302,17 @@ static char *test_tracemask(void *context)
qd_parsed_field_t *pf = qd_parse(iter);
qd_iterator_free(iter);
- bm = qd_tracemask_create(tm, pf);
+ int ingress = -1;
+
+ bm = qd_tracemask_create(tm, pf, &ingress);
if (qd_bitmask_cardinality(bm) != 3) {
sprintf(error, "Expected cardinality of 3, got %d", qd_bitmask_cardinality(bm));
return error;
}
+ if (ingress != 0) {
+ sprintf(error, "(A) Expected ingress index of 0, got %d", ingress);
+ return error;
+ }
int total = 0;
int bit, c;
for (QD_BITMASK_EACH(bm, bit, c)) {
@@ -321,12 +327,17 @@ static char *test_tracemask(void *context)
qd_tracemask_del_router(tm, 3);
qd_tracemask_remove_link(tm, 0);
- bm = qd_tracemask_create(tm, pf);
+ ingress = -1;
+ bm = qd_tracemask_create(tm, pf, &ingress);
qd_parse_free(pf);
if (qd_bitmask_cardinality(bm) != 1) {
sprintf(error, "Expected cardinality of 1, got %d", qd_bitmask_cardinality(bm));
return error;
}
+ if (ingress != 0) {
+ sprintf(error, "(B) Expected ingress index of 0, got %d", ingress);
+ return error;
+ }
total = 0;
for (QD_BITMASK_EACH(bm, bit, c)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org