You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/06/01 20:15:55 UTC
[2/7] qpid-dispatch git commit: DISPATCH-781 - Added management
access to flow-control quantities
DISPATCH-781 - Added management access to flow-control quantities
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ef2f8955
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ef2f8955
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ef2f8955
Branch: refs/heads/tross-dispatch-781-1
Commit: ef2f89550630f275acc030c331c9325128c296f9
Parents: d00cc61
Author: Ted Ross <tr...@redhat.com>
Authored: Tue May 30 15:09:22 2017 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Jun 1 14:40:44 2017 -0400
----------------------------------------------------------------------
python/qpid_dispatch/management/qdrouter.json | 20 +++++++++++++
python/qpid_dispatch_internal/router/data.py | 4 ++-
src/router_core/agent_address.c | 30 +++++++++++++++++++
src/router_core/agent_address.h | 2 +-
src/router_core/connections.c | 12 ++++----
src/router_core/router_core_private.h | 11 +++++--
tools/qdstat | 34 ++++++++++++++++++++++
7 files changed, 102 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ef2f8955/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index e80359f..19bca97 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1234,6 +1234,26 @@
"trackedDeliveries": {
"type": "integer",
"description": "Number of transit deliveries being tracked for this address (for balanced distribution)."
+ },
+ "localInLinks": {
+ "type": "integer",
+ "description": "Number of incoming links for this address attached to the local router."
+ },
+ "localOutCapacity": {
+ "type": "integer",
+ "description": "Total link capacity for outgoing links for this address attached to the local router."
+ },
+ "remoteInLinks": {
+ "type": "integer",
+ "description": "Number of incoming links for this address attached to remote routers."
+ },
+ "remoteOutCapacity": {
+ "type": "integer",
+ "description": "Total link capacity for outgoing links for this address attached to remote routers."
+ },
+ "targetInCredit": {
+ "type": "integer",
+ "description": "Calculated target credit for local incoming links for this address."
}
}
},
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ef2f8955/python/qpid_dispatch_internal/router/data.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/data.py b/python/qpid_dispatch_internal/router/data.py
index a2b669b..7e66a9e 100644
--- a/python/qpid_dispatch_internal/router/data.py
+++ b/python/qpid_dispatch_internal/router/data.py
@@ -21,7 +21,9 @@
## Define the current protocol version. Any messages that do not contain version
## information shall be considered to be coming from routers using version 0.
##
-ProtocolVersion = 1L
+## Version 2 - Carry out-capacity and in-link-count per address in MAU
+##
+ProtocolVersion = 2L
def getMandatory(data, key, cls=None):
"""
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ef2f8955/src/router_core/agent_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c
index 43909b4..b19c58d 100644
--- a/src/router_core/agent_address.c
+++ b/src/router_core/agent_address.c
@@ -37,6 +37,11 @@
#define QDR_ADDRESS_DELIVERIES_FROM_CONTAINER 14
#define QDR_ADDRESS_TRANSIT_OUTSTANDING 15
#define QDR_ADDRESS_TRACKED_DELIVERIES 16
+#define QDR_ADDRESS_LOCAL_IN_LINKS 17
+#define QDR_ADDRESS_LOCAL_OUT_CAPACITY 18
+#define QDR_ADDRESS_REMOTE_IN_LINKS 19
+#define QDR_ADDRESS_REMOTE_OUT_CAPACITY 20
+#define QDR_ADDRESS_TARGET_IN_CREDIT 21
const char *qdr_address_columns[] =
{"name",
@@ -56,6 +61,11 @@ const char *qdr_address_columns[] =
"deliveriesFromContainer",
"transitOutstanding",
"trackedDeliveries",
+ "localInLinks",
+ "localOutCapacity",
+ "remoteInLinks",
+ "remoteOutCapacity",
+ "targetInCredit",
0};
@@ -154,6 +164,26 @@ static void qdr_insert_address_columns_CT(qdr_core_t *core,
qd_compose_insert_long(body, addr->tracked_deliveries);
break;
+ case QDR_ADDRESS_LOCAL_IN_LINKS:
+ qd_compose_insert_long(body, DEQ_SIZE(addr->inlinks));
+ break;
+
+ case QDR_ADDRESS_LOCAL_OUT_CAPACITY:
+ qd_compose_insert_long(body, addr->local_out_capacity);
+ break;
+
+ case QDR_ADDRESS_REMOTE_IN_LINKS:
+ qd_compose_insert_long(body, addr->remote_inlinks);
+ break;
+
+ case QDR_ADDRESS_REMOTE_OUT_CAPACITY:
+ qd_compose_insert_long(body, addr->remote_out_capacity);
+ break;
+
+ case QDR_ADDRESS_TARGET_IN_CREDIT:
+ qd_compose_insert_long(body, addr->target_in_credit);
+ break;
+
default:
qd_compose_insert_null(body);
break;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ef2f8955/src/router_core/agent_address.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.h b/src/router_core/agent_address.h
index d9bde2c..91a96cf 100644
--- a/src/router_core/agent_address.h
+++ b/src/router_core/agent_address.h
@@ -31,7 +31,7 @@ void qdra_address_get_CT(qdr_core_t *core,
const char *qdr_address_columns[]);
-#define QDR_ADDRESS_COLUMN_COUNT 17
+#define QDR_ADDRESS_COLUMN_COUNT 22
const char *qdr_address_columns[QDR_ADDRESS_COLUMN_COUNT + 1];
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ef2f8955/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 952682e..0c8166f 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -948,7 +948,7 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
if (DEQ_SIZE(addr->rlinks) == 0)
qdr_post_mobile_removed_CT(core, key);
else
- qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), addr->out_capacity);
+ qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), addr->local_out_capacity);
}
}
@@ -1265,9 +1265,9 @@ static void qdr_outgoing_link_added_CT(qdr_core_t *core, qdr_address_t *addr, qd
const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
if (key && *key == 'M') {
if (DEQ_SIZE(addr->rlinks) == 1)
- qdr_post_mobile_added_CT(core, key, DEQ_SIZE(addr->inlinks), addr->out_capacity);
+ qdr_post_mobile_added_CT(core, key, DEQ_SIZE(addr->inlinks), addr->local_out_capacity);
else
- qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), addr->out_capacity);
+ qdr_post_mobile_update_CT(core, key, DEQ_SIZE(addr->inlinks), addr->local_out_capacity);
}
//
@@ -1424,7 +1424,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
//
// Add the link's capacity to the address's aggregate out_capacity
//
- addr->out_capacity += link->capacity;
+ addr->local_out_capacity += link->capacity;
//
// Do all the action that is needed when an outgoing link is established
@@ -1522,7 +1522,7 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
//
// Add the link's capacity to the address's aggregate out_capacity
//
- link->owning_addr->out_capacity += link->capacity;
+ link->owning_addr->local_out_capacity += link->capacity;
//
// Do all the action that is needed when an outgoing link is established
@@ -1624,7 +1624,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
case QD_LINK_ENDPOINT:
if (addr) {
qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
- addr->out_capacity -= link->capacity;
+ addr->local_out_capacity -= link->capacity;
was_local = true;
}
break;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ef2f8955/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 6a24322..893dd6e 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -446,9 +446,14 @@ struct qdr_address_t {
bool local;
uint32_t tracked_deliveries;
uint64_t cost_epoch;
- uint32_t out_capacity;
- uint32_t remote_inlinks;
- uint32_t remote_out_capacity;
+
+ //
+ // State for outgoing-capacity-based-flow-control
+ //
+ uint32_t local_out_capacity; ///< Total link capacity on local outgoing links
+ uint32_t remote_inlinks; ///< Number of remote incoming links
+ uint32_t remote_out_capacity; ///< Total link capacity on remote outgoing links
+ uint32_t target_in_credit; ///< Computed target credit for local incoming links
//
// State for "closest" treatment
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ef2f8955/tools/qdstat
----------------------------------------------------------------------
diff --git a/tools/qdstat b/tools/qdstat
index abd7e39..bf1d5c7 100755
--- a/tools/qdstat
+++ b/tools/qdstat
@@ -47,6 +47,7 @@ def parse_args(argv):
parser.add_option("-l", "--links", help="Show Router Links", action="store_const", const="l", dest="show")
parser.add_option("-n", "--nodes", help="Show Router Nodes", action="store_const", const="n", dest="show")
parser.add_option("-a", "--address", help="Show Router Addresses", action="store_const", const="a", dest="show")
+ parser.add_option("-f", "--flowcontrol", help="Show Flow Control Data", action="store_const", const="f", dest="show")
parser.add_option("-m", "--memory", help="Show Router Memory Stats", action="store_const", const="m", dest="show")
parser.add_option("--autolinks", help="Show Auto Links", action="store_const", const="autolinks", dest="show")
parser.add_option("--linkroutes", help="Show Link Routes", action="store_const", const="linkroutes", dest="show")
@@ -399,6 +400,38 @@ class BusManager(Node):
dispRows = sorter.getSorted()
disp.formattedTable(title, heads, dispRows)
+ def displayFlowControl(self):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header("class"))
+ heads.append(Header("addr"))
+ heads.append(Header("phs"))
+ heads.append(Header("local-in-links", Header.COMMAS))
+ heads.append(Header("local-out-cap", Header.COMMAS))
+ heads.append(Header("remote-in-links", Header.COMMAS))
+ heads.append(Header("remote-out-cap", Header.COMMAS))
+ heads.append(Header("target-credit", Header.COMMAS))
+ rows = []
+ cols = ('localInLinks', 'localOutCapacity', 'remoteInLinks', 'remoteOutCapacity', 'targetInCredit', 'name')
+
+ objects = self.query('org.apache.qpid.dispatch.router.address', cols, limit=self.opts.limit)
+
+ for addr in objects:
+ row = []
+ row.append(self._addr_class(addr.name))
+ row.append(self._addr_text(addr.name))
+ row.append(self._addr_phase(addr.name))
+ row.append(addr.localInLinks)
+ row.append(addr.localOutCapacity)
+ row.append(addr.remoteInLinks)
+ row.append(addr.remoteOutCapacity)
+ row.append(addr.targetInCredit)
+ rows.append(row)
+ title = "Router Addresses"
+ sorter = Sorter(heads, rows, 'addr', 0, True)
+ dispRows = sorter.getSorted()
+ disp.formattedTable(title, heads, dispRows)
+
def displayAutolinks(self):
disp = Display(prefix=" ")
heads = []
@@ -500,6 +533,7 @@ class BusManager(Node):
if main == 'l': self.displayRouterLinks()
elif main == 'n': self.displayRouterNodes()
elif main == 'a': self.displayAddresses()
+ elif main == 'f': self.displayFlowControl()
elif main == 'm': self.displayMemory()
elif main == 'g': self.displayGeneral()
elif main == 'c': self.displayConnections()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org