You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/06/01 20:15:55 UTC

[2/7] qpid-dispatch git commit: DISPATCH-781 - Added management access to flow-control quantities

DISPATCH-781 - Added management access to flow-control quantities


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ef2f8955
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ef2f8955
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ef2f8955

Branch: refs/heads/tross-dispatch-781-1
Commit: ef2f89550630f275acc030c331c9325128c296f9
Parents: d00cc61
Author: Ted Ross <tr...@redhat.com>
Authored: Tue May 30 15:09:22 2017 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Jun 1 14:40:44 2017 -0400

----------------------------------------------------------------------
 python/qpid_dispatch/management/qdrouter.json | 20 +++++++++++++
 python/qpid_dispatch_internal/router/data.py  |  4 ++-
 src/router_core/agent_address.c               | 30 +++++++++++++++++++
 src/router_core/agent_address.h               |  2 +-
 src/router_core/connections.c                 | 12 ++++----
 src/router_core/router_core_private.h         | 11 +++++--
 tools/qdstat                                  | 34 ++++++++++++++++++++++
 7 files changed, 102 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ef2f8955/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index e80359f..19bca97 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1234,6 +1234,26 @@
                 "trackedDeliveries": {
                     "type": "integer",
                     "description": "Number of transit deliveries being tracked for this address (for balanced distribution)."
+                },
+                "localInLinks": {
+                    "type": "integer",
+                    "description": "Number of incoming links for this address attached to the local router."
+                },
+                "localOutCapacity": {
+                    "type": "integer",
+                    "description": "Total link capacity for outgoing links for this address attached to the local router."
+                },
+                "remoteInLinks": {
+                    "type": "integer",
+                    "description": "Number of incoming links for this address attached to remote routers."
+                },
+                "remoteOutCapacity": {
+                    "type": "integer",
+                    "description": "Total link capacity for outgoing links for this address attached to remote routers."
+                },
+                "targetInCredit": {
+                    "type": "integer",
+                    "description": "Calculated target credit for local incoming links for this address."
                 }
             }
         },

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ef2f8955/python/qpid_dispatch_internal/router/data.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/data.py b/python/qpid_dispatch_internal/router/data.py
index a2b669b..7e66a9e 100644
--- a/python/qpid_dispatch_internal/router/data.py
+++ b/python/qpid_dispatch_internal/router/data.py
@@ -21,7 +21,9 @@
 ## Define the current protocol version.  Any messages that do not contain version
 ## information shall be considered to be coming from routers using version 0.
 ##
-ProtocolVersion = 1L
+## Version 2 - Carry out-capacity and in-link-count per address in MAU
+##
+ProtocolVersion = 2L
 
 def getMandatory(data, key, cls=None):
     """

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ef2f8955/src/router_core/agent_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c
index 43909b4..b19c58d 100644
--- a/src/router_core/agent_address.c
+++ b/src/router_core/agent_address.c
@@ -37,6 +37,11 @@
 #define QDR_ADDRESS_DELIVERIES_FROM_CONTAINER 14
 #define QDR_ADDRESS_TRANSIT_OUTSTANDING       15
 #define QDR_ADDRESS_TRACKED_DELIVERIES        16
+#define QDR_ADDRESS_LOCAL_IN_LINKS            17
+#define QDR_ADDRESS_LOCAL_OUT_CAPACITY        18
+#define QDR_ADDRESS_REMOTE_IN_LINKS           19
+#define QDR_ADDRESS_REMOTE_OUT_CAPACITY       20
+#define QDR_ADDRESS_TARGET_IN_CREDIT          21
 
 const char *qdr_address_columns[] =
     {"name",
@@ -56,6 +61,11 @@ const char *qdr_address_columns[] =
      "deliveriesFromContainer",
      "transitOutstanding",
      "trackedDeliveries",
+     "localInLinks",
+     "localOutCapacity",
+     "remoteInLinks",
+     "remoteOutCapacity",
+     "targetInCredit",
      0};
 
 
@@ -154,6 +164,26 @@ static void qdr_insert_address_columns_CT(qdr_core_t          *core,
         qd_compose_insert_long(body, addr->tracked_deliveries);
         break;
 
+    case QDR_ADDRESS_LOCAL_IN_LINKS:
+        qd_compose_insert_long(body, DEQ_SIZE(addr->inlinks));
+        break;
+
+    case QDR_ADDRESS_LOCAL_OUT_CAPACITY:
+        qd_compose_insert_long(body, addr->local_out_capacity);
+        break;
+
+    case QDR_ADDRESS_REMOTE_IN_LINKS:
+        qd_compose_insert_long(body, addr->remote_inlinks);
+        break;
+
+    case QDR_ADDRESS_REMOTE_OUT_CAPACITY:
+        qd_compose_insert_long(body, addr->remote_out_capacity);
+        break;
+
+    case QDR_ADDRESS_TARGET_IN_CREDIT:
+        qd_compose_insert_long(body, addr->target_in_credit);
+        break;
+
     default:
         qd_compose_insert_null(body);
         break;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ef2f8955/src/router_core/agent_address.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.h b/src/router_core/agent_address.h
index d9bde2c..91a96cf 100644
--- a/src/router_core/agent_address.h
+++ b/src/router_core/agent_address.h
@@ -31,7 +31,7 @@ void qdra_address_get_CT(qdr_core_t *core,
                       const char *qdr_address_columns[]);
 
 
-#define QDR_ADDRESS_COLUMN_COUNT 17
+#define QDR_ADDRESS_COLUMN_COUNT 22
 
 const char *qdr_address_columns[QDR_ADDRESS_COLUMN_COUNT + 1];
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ef2f8955/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 952682e..0c8166f 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -948,7 +948,7 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
             if (DEQ_SIZE(addr->rlinks) == 0)
                 qdr_post_mobile_removed_CT(core, key);
             else
-                qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), addr->out_capacity);
+                qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), addr->local_out_capacity);
         }
     }
 
@@ -1265,9 +1265,9 @@ static void qdr_outgoing_link_added_CT(qdr_core_t *core, qdr_address_t *addr, qd
     const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
     if (key && *key == 'M') {
         if (DEQ_SIZE(addr->rlinks) == 1)
-            qdr_post_mobile_added_CT(core, key, DEQ_SIZE(addr->inlinks), addr->out_capacity);
+            qdr_post_mobile_added_CT(core, key, DEQ_SIZE(addr->inlinks), addr->local_out_capacity);
         else
-            qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), addr->out_capacity);
+            qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), addr->local_out_capacity);
     }
 
     //
@@ -1424,7 +1424,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                 //
                 // Add the link's capacity to the address's aggregate out_capacity
                 //
-                addr->out_capacity += link->capacity;
+                addr->local_out_capacity += link->capacity;
 
                 //
                 // Do all the action that is needed when an outgoing link is established
@@ -1522,7 +1522,7 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
                     //
                     // Add the link's capacity to the address's aggregate out_capacity
                     //
-                    link->owning_addr->out_capacity += link->capacity;
+                    link->owning_addr->local_out_capacity += link->capacity;
 
                     //
                     // Do all the action that is needed when an outgoing link is established
@@ -1624,7 +1624,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
         case QD_LINK_ENDPOINT:
             if (addr) {
                 qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
-                addr->out_capacity -= link->capacity;
+                addr->local_out_capacity -= link->capacity;
                 was_local = true;
             }
             break;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ef2f8955/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 6a24322..893dd6e 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -446,9 +446,14 @@ struct qdr_address_t {
     bool                       local;
     uint32_t                   tracked_deliveries;
     uint64_t                   cost_epoch;
-    uint32_t                   out_capacity;
-    uint32_t                   remote_inlinks;
-    uint32_t                   remote_out_capacity;
+
+    //
+    // State for outgoing-capacity-based-flow-control
+    //
+    uint32_t local_out_capacity;  ///< Total link capacity on local outgoing links
+    uint32_t remote_inlinks;      ///< Number of remote incoming links
+    uint32_t remote_out_capacity; ///< Total link capacity on remote outgoing links
+    uint32_t target_in_credit;    ///< Computed target credit for local incoming links
 
     //
     // State for "closest" treatment

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ef2f8955/tools/qdstat
----------------------------------------------------------------------
diff --git a/tools/qdstat b/tools/qdstat
index abd7e39..bf1d5c7 100755
--- a/tools/qdstat
+++ b/tools/qdstat
@@ -47,6 +47,7 @@ def parse_args(argv):
     parser.add_option("-l", "--links", help="Show Router Links",            action="store_const", const="l",   dest="show")
     parser.add_option("-n", "--nodes", help="Show Router Nodes",            action="store_const", const="n",   dest="show")
     parser.add_option("-a", "--address", help="Show Router Addresses",      action="store_const", const="a",   dest="show")
+    parser.add_option("-f", "--flowcontrol", help="Show Flow Control Data", action="store_const", const="f",   dest="show")
     parser.add_option("-m", "--memory", help="Show Router Memory Stats",    action="store_const", const="m",   dest="show")
     parser.add_option("--autolinks", help="Show Auto Links",                action="store_const", const="autolinks",  dest="show")
     parser.add_option("--linkroutes", help="Show Link Routes",              action="store_const", const="linkroutes", dest="show")
@@ -399,6 +400,38 @@ class BusManager(Node):
         dispRows = sorter.getSorted()
         disp.formattedTable(title, heads, dispRows)
 
+    def displayFlowControl(self):
+        disp = Display(prefix="  ")
+        heads = []
+        heads.append(Header("class"))
+        heads.append(Header("addr"))
+        heads.append(Header("phs"))
+        heads.append(Header("local-in-links", Header.COMMAS))
+        heads.append(Header("local-out-cap", Header.COMMAS))
+        heads.append(Header("remote-in-links", Header.COMMAS))
+        heads.append(Header("remote-out-cap", Header.COMMAS))
+        heads.append(Header("target-credit", Header.COMMAS))
+        rows = []
+        cols = ('localInLinks', 'localOutCapacity', 'remoteInLinks', 'remoteOutCapacity', 'targetInCredit', 'name')
+
+        objects = self.query('org.apache.qpid.dispatch.router.address', cols, limit=self.opts.limit)
+
+        for addr in objects:
+            row = []
+            row.append(self._addr_class(addr.name))
+            row.append(self._addr_text(addr.name))
+            row.append(self._addr_phase(addr.name))
+            row.append(addr.localInLinks)
+            row.append(addr.localOutCapacity)
+            row.append(addr.remoteInLinks)
+            row.append(addr.remoteOutCapacity)
+            row.append(addr.targetInCredit)
+            rows.append(row)
+        title = "Router Addresses"
+        sorter = Sorter(heads, rows, 'addr', 0, True)
+        dispRows = sorter.getSorted()
+        disp.formattedTable(title, heads, dispRows)
+
     def displayAutolinks(self):
         disp = Display(prefix="  ")
         heads = []
@@ -500,6 +533,7 @@ class BusManager(Node):
         if   main == 'l': self.displayRouterLinks()
         elif main == 'n': self.displayRouterNodes()
         elif main == 'a': self.displayAddresses()
+        elif main == 'f': self.displayFlowControl()
         elif main == 'm': self.displayMemory()
         elif main == 'g': self.displayGeneral()
         elif main == 'c': self.displayConnections()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org