You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/02/28 14:00:04 UTC

qpid-dispatch git commit: DISPATCH-932 - Added counts to correlate the ingress and egress routers of deliveries. This closes #253

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master b6805c0ce -> 1134dbae4


DISPATCH-932 - Added counts to correlate the ingress and egress routers of deliveries.
This closes #253


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/1134dbae
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/1134dbae
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/1134dbae

Branch: refs/heads/master
Commit: 1134dbae47db0801062a3991553ab07a36b0a861
Parents: b6805c0
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Feb 28 08:57:26 2018 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Feb 28 08:57:26 2018 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h           |  5 +++--
 include/qpid/dispatch/trace_mask.h            |  3 ++-
 python/qpid_dispatch/management/qdrouter.json |  8 ++++++++
 python/qpid_dispatch_internal/router/node.py  |  2 ++
 src/router_core/agent_link.c                  | 12 ++++++++++++
 src/router_core/agent_link.h                  |  2 +-
 src/router_core/connections.c                 | 15 +++++++++++++++
 src/router_core/forwarder.c                   |  2 ++
 src/router_core/router_core_private.h         | 16 +++++++++-------
 src/router_core/transfer.c                    |  6 ++++--
 src/router_node.c                             | 13 +++++++------
 src/trace_mask.c                              | 16 +++++++++++-----
 tests/parse_test.c                            | 15 +++++++++++++--
 13 files changed, 89 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 63e3f46..5b964e9 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -563,13 +563,14 @@ void qdr_link_delete(qdr_link_t *link);
  * @param link_exclusion If present, this is a bitmask of inter-router links that should not be used
  *                       to send this message.  This bitmask is created by the trace_mask module and
  *                       it built on the trace header from a received message.
+ * @param ingress_index The bitmask index of the router that this delivery entered the network through.
  * @return Pointer to the qdr_delivery that will track the lifecycle of this delivery on this link.
  */
 qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterator_t *ingress,
-                                 bool settled, qd_bitmask_t *link_exclusion);
+                                 bool settled, qd_bitmask_t *link_exclusion, int ingress_index);
 qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
                                     qd_iterator_t *ingress, qd_iterator_t *addr,
-                                    bool settled, qd_bitmask_t *link_exclusion);
+                                    bool settled, qd_bitmask_t *link_exclusion, int ingress_index);
 qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled,
                                                 const uint8_t *tag, int tag_length,
                                                 uint64_t disposition, pn_data_t* disposition_state);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/include/qpid/dispatch/trace_mask.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/trace_mask.h b/include/qpid/dispatch/trace_mask.h
index 9f1ec49..b92569f 100644
--- a/include/qpid/dispatch/trace_mask.h
+++ b/include/qpid/dispatch/trace_mask.h
@@ -89,9 +89,10 @@ void qd_tracemask_remove_link(qd_tracemask_t *tm, int router_maskbit);
  *
  * @param tm Tracemask created by qd_tracemask()
  * @param tracelist The parsed field from a message's trace header
+ * @param ingress_index (out) The mask-bit for the first router in the trace list (the ingress router)
  * @return A new bit mask with a set-bit for each neighbor router in the list.  This must be freed
  *         by the caller when the caller is done with it.
  */
-qd_bitmask_t *qd_tracemask_create(qd_tracemask_t *tm, qd_parsed_field_t *tracelist);
+qd_bitmask_t *qd_tracemask_create(qd_tracemask_t *tm, qd_parsed_field_t *tracelist, int *ingress_index);
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 4da4ccf..ac9dda8 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1262,6 +1262,10 @@
                     "type": "integer",
                     "graph": true,
                     "description": "The total number of modified deliveries."
+                },
+                "ingressHistogram": {
+                    "type": "list",
+                    "description": "For outgoing links on connections with 'normal' role.  This histogram shows the number of settled deliveries on the link that ingressed the network at each interior router node."
                 }
             }
         },
@@ -1390,6 +1394,10 @@
                 "lastTopoChange": {
                      "description": "Timestamp showing the most recent change to this node's neighborhood.",
                      "type": "integer"
+                },
+                "index": {
+                    "description": "Index number used in statistics histograms for this router node.  This index is specific to this router.",
+                    "type": "integer"
                 }
 
             }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/python/qpid_dispatch_internal/router/node.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/node.py b/python/qpid_dispatch_internal/router/node.py
index d5a2353..31bd3fe 100644
--- a/python/qpid_dispatch_internal/router/node.py
+++ b/python/qpid_dispatch_internal/router/node.py
@@ -55,6 +55,7 @@ class NodeTracker(object):
         """Refresh management attributes"""
         attributes.update({
             "id": self.my_id,
+            "index": 0,
             "protocolVersion": ProtocolVersion,
             "instance": self.container.instance, # Boot number, integer
             "linkState": [ls for ls in self.link_state.peers], # List of neighbour nodes
@@ -411,6 +412,7 @@ class RouterNode(object):
         """Refresh management attributes"""
         attributes.update({
             "id": self.id,
+            "index": self.maskbit,
             "protocolVersion": self.version,
             "instance": self.instance, # Boot number, integer
             "linkState": [ls for ls in self.link_state.peers], # List of neighbour nodes

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/router_core/agent_link.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index fc9981d..8908312 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -42,6 +42,7 @@
 #define QDR_LINK_REJECTED_COUNT           18
 #define QDR_LINK_RELEASED_COUNT           19
 #define QDR_LINK_MODIFIED_COUNT           20
+#define QDR_LINK_INGRESS_HISTOGRAM        21
 
 const char *qdr_link_columns[] =
     {"name",
@@ -65,6 +66,7 @@ const char *qdr_link_columns[] =
      "rejectedCount",
      "releasedCount",
      "modifiedCount",
+     "ingressHistogram",
      0};
 
 static const char *qd_link_type_name(qd_link_type_t lt)
@@ -208,6 +210,16 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_li
         qd_compose_insert_ulong(body, link->modified_deliveries);
         break;
 
+    case QDR_LINK_INGRESS_HISTOGRAM:
+        if (link->ingress_histogram) {
+            qd_compose_start_list(body);
+            for (int i = 0; i < qd_bitmask_width(); i++)
+                qd_compose_insert_ulong(body, link->ingress_histogram[i]);
+            qd_compose_end_list(body);
+        } else
+            qd_compose_insert_null(body);
+        break;
+
     default:
         qd_compose_insert_null(body);
         break;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/router_core/agent_link.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_link.h b/src/router_core/agent_link.h
index 44be29d..6402949 100644
--- a/src/router_core/agent_link.h
+++ b/src/router_core/agent_link.h
@@ -29,7 +29,7 @@ void qdra_link_update_CT(qdr_core_t          *core,
                          qdr_query_t         *query,
                          qd_parsed_field_t   *in_body);
 
-#define QDR_LINK_COLUMN_COUNT  21
+#define QDR_LINK_COLUMN_COUNT  22
 
 const char *qdr_link_columns[QDR_LINK_COLUMN_COUNT + 1];
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 00b0928..77efe7e 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -402,6 +402,16 @@ const char *qdr_link_name(const qdr_link_t *link)
 }
 
 
+static void qdr_link_setup_histogram(qdr_connection_t *conn, qd_direction_t dir, qdr_link_t *link)
+{
+    if (dir == QD_OUTGOING && conn->role == QDR_ROLE_NORMAL) {
+        link->ingress_histogram = NEW_ARRAY(uint64_t, qd_bitmask_width());
+        for (int i = 0; i < qd_bitmask_width(); i++)
+            link->ingress_histogram[i] = 0;
+    }
+}
+
+
 qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
                                   qd_direction_t    dir,
                                   qdr_terminus_t   *source,
@@ -441,6 +451,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     else if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_DATA))
         link->link_type = QD_LINK_ROUTER;
 
+    qdr_link_setup_histogram(conn, dir, link);
+
     action->args.connection.conn   = conn;
     action->args.connection.link   = link;
     action->args.connection.dir    = dir;
@@ -827,6 +839,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     //
     free(link->name);
     free(link->terminus_addr);
+    free(link->ingress_histogram);
     link->name = 0;
 }
 
@@ -860,6 +873,8 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
     link->strip_annotations_in  = conn->strip_annotations_in;
     link->strip_annotations_out = conn->strip_annotations_out;
 
+    qdr_link_setup_histogram(conn, dir, link);
+
     DEQ_INSERT_TAIL(core->open_links, link);
     qdr_add_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 3a0e3eb..ee66e0d 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -113,6 +113,8 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in
     out_dlv->tag_length = 8;
     out_dlv->error      = 0;
 
+    out_dlv->ingress_index = in_dlv ? in_dlv->ingress_index : -1;
+
     //
     // Add one to the message fanout. This will later be used in the qd_message_send function that sends out messages.
     //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 9b7ba23..389857b 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -346,6 +346,7 @@ struct qdr_delivery_t {
     qd_bitmask_t           *link_exclusion;
     qdr_address_t          *tracking_addr;
     int                     tracking_addr_bit;
+    int                     ingress_index;
     qdr_link_work_t        *link_work;         ///< Delivery work item for this delivery
     qdr_subscription_list_t subscriptions;
     qdr_delivery_ref_list_t peers;             /// Use this list if there if the delivery has more than one peer.
@@ -401,13 +402,14 @@ struct qdr_link_t {
     bool                     drain_mode;
     bool                     stalled_outbound;  ///< Indicates that this link is stalled on outbound buffer backpressure
 
-    uint64_t total_deliveries;
-    uint64_t presettled_deliveries;
-    uint64_t dropped_presettled_deliveries;
-    uint64_t accepted_deliveries;
-    uint64_t rejected_deliveries;
-    uint64_t released_deliveries;
-    uint64_t modified_deliveries;
+    uint64_t  total_deliveries;
+    uint64_t  presettled_deliveries;
+    uint64_t  dropped_presettled_deliveries;
+    uint64_t  accepted_deliveries;
+    uint64_t  rejected_deliveries;
+    uint64_t  released_deliveries;
+    uint64_t  modified_deliveries;
+    uint64_t *ingress_histogram;
 };
 
 ALLOC_DECLARE(qdr_link_t);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 182082e..13d7462 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -42,7 +42,7 @@ void qdr_delivery_copy_extension_state(qdr_delivery_t *src, qdr_delivery_t *dest
 //==================================================================================
 
 qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterator_t *ingress,
-                                 bool settled, qd_bitmask_t *link_exclusion)
+                                 bool settled, qd_bitmask_t *link_exclusion, int ingress_index)
 {
     qdr_action_t   *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
@@ -55,6 +55,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterato
     dlv->settled        = settled;
     dlv->presettled     = settled;
     dlv->link_exclusion = link_exclusion;
+    dlv->ingress_index  = ingress_index;
     dlv->error          = 0;
 
     qdr_delivery_incref(dlv, "qdr_link_deliver - newly created delivery, add to action list");
@@ -68,7 +69,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterato
 
 qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
                                     qd_iterator_t *ingress, qd_iterator_t *addr,
-                                    bool settled, qd_bitmask_t *link_exclusion)
+                                    bool settled, qd_bitmask_t *link_exclusion, int ingress_index)
 {
     qdr_action_t   *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
@@ -81,6 +82,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
     dlv->settled        = settled;
     dlv->presettled     = settled;
     dlv->link_exclusion = link_exclusion;
+    dlv->ingress_index  = ingress_index;
     dlv->error          = 0;
 
     qdr_delivery_incref(dlv, "qdr_link_deliver_to - newly created delivery, add to action list");

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index b8c4a6b..719af5a 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -172,11 +172,11 @@ static int AMQP_writable_conn_handler(void *type_context, qd_connection_t *conn,
 static qd_iterator_t *router_annotate_message(qd_router_t   *router,
                                               qd_message_t  *msg,
                                               qd_bitmask_t **link_exclusions,
-                                              uint32_t      *distance)
+                                              uint32_t      *distance,
+                                              int           *ingress_index)
 {
     qd_iterator_t *ingress_iter = 0;
 
-
     qd_parsed_field_t *trace   = qd_message_get_trace(msg);
     qd_parsed_field_t *ingress = qd_message_get_ingress(msg);
     qd_parsed_field_t *to      = qd_message_get_to_override(msg);
@@ -205,7 +205,7 @@ static qd_iterator_t *router_annotate_message(qd_router_t   *router,
             // contain a one-bit for each link that leads to a neighbor router that
             // the message has already passed through.
             //
-            *link_exclusions = qd_tracemask_create(router->tracemask, trace);
+            *link_exclusions = qd_tracemask_create(router->tracemask, trace, ingress_index);
 
             //
             // Append this router's ID to the trace.
@@ -454,8 +454,9 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
     qd_message_message_annotations(msg);
     qd_bitmask_t *link_exclusions;
     uint32_t      distance;
+    int           ingress_index = 0; // Default to _this_ router
 
-    qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions, &distance);
+    qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions, &distance, &ingress_index);
 
     //
     // If this delivery has traveled further than the known radius of the network topology (plus 1),
@@ -509,7 +510,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
             if (phase > 0)
                 qd_iterator_annotate_phase(addr_iter, '0' + (char) phase);
             delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd),
-                                           link_exclusions);
+                                           link_exclusions, ingress_index);
         }
     } else {
         //
@@ -533,7 +534,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
             if (phase != 0)
                 qd_message_set_phase_annotation(msg, phase);
         }
-        delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions);
+        delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions, ingress_index);
     }
 
     if (delivery) {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/src/trace_mask.c
----------------------------------------------------------------------
diff --git a/src/trace_mask.c b/src/trace_mask.c
index 76d0c1a..3adb8d5 100644
--- a/src/trace_mask.c
+++ b/src/trace_mask.c
@@ -124,10 +124,11 @@ void qd_tracemask_remove_link(qd_tracemask_t *tm, int router_maskbit)
 }
 
 
-qd_bitmask_t *qd_tracemask_create(qd_tracemask_t *tm, qd_parsed_field_t *tracelist)
+qd_bitmask_t *qd_tracemask_create(qd_tracemask_t *tm, qd_parsed_field_t *tracelist, int *ingress_index)
 {
-    qd_bitmask_t *bm  = qd_bitmask(0);
-    int           idx = 0;
+    qd_bitmask_t *bm    = qd_bitmask(0);
+    int           idx   = 0;
+    bool          first = true;
 
     assert(qd_parse_is_list(tracelist));
 
@@ -138,10 +139,15 @@ qd_bitmask_t *qd_tracemask_create(qd_tracemask_t *tm, qd_parsed_field_t *traceli
         qd_iterator_t *iter = qd_parse_raw(item);
         qd_iterator_reset_view(iter, ITER_VIEW_NODE_HASH);
         qd_hash_retrieve(tm->hash, iter, (void*) &router);
-        if (router && router->link_maskbit >= 0)
-            qd_bitmask_set_bit(bm, router->link_maskbit);
+        if (router) {
+            if (router->link_maskbit >= 0)
+                qd_bitmask_set_bit(bm, router->link_maskbit);
+            if (first)
+                *ingress_index = router->maskbit;
+        }
         idx++;
         item = qd_parse_sub_value(tracelist, idx);
+        first = false;
     }
     sys_rwlock_unlock(tm->lock);
     return bm;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1134dbae/tests/parse_test.c
----------------------------------------------------------------------
diff --git a/tests/parse_test.c b/tests/parse_test.c
index bc12efd..85ce936 100644
--- a/tests/parse_test.c
+++ b/tests/parse_test.c
@@ -302,11 +302,17 @@ static char *test_tracemask(void *context)
     qd_parsed_field_t *pf   = qd_parse(iter);
     qd_iterator_free(iter);
 
-    bm = qd_tracemask_create(tm, pf);
+    int ingress = -1;
+
+    bm = qd_tracemask_create(tm, pf, &ingress);
     if (qd_bitmask_cardinality(bm) != 3) {
         sprintf(error, "Expected cardinality of 3, got %d", qd_bitmask_cardinality(bm));
         return error;
     }
+    if (ingress != 0) {
+        sprintf(error, "(A) Expected ingress index of 0, got %d", ingress);
+        return error;
+    }
     int total = 0;
     int bit, c;
     for (QD_BITMASK_EACH(bm, bit, c)) {
@@ -321,12 +327,17 @@ static char *test_tracemask(void *context)
     qd_tracemask_del_router(tm, 3);
     qd_tracemask_remove_link(tm, 0);
 
-    bm = qd_tracemask_create(tm, pf);
+    ingress = -1;
+    bm = qd_tracemask_create(tm, pf, &ingress);
     qd_parse_free(pf);
     if (qd_bitmask_cardinality(bm) != 1) {
         sprintf(error, "Expected cardinality of 1, got %d", qd_bitmask_cardinality(bm));
         return error;
     }
+    if (ingress != 0) {
+        sprintf(error, "(B) Expected ingress index of 0, got %d", ingress);
+        return error;
+    }
 
     total = 0;
     for (QD_BITMASK_EACH(bm, bit, c)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org