You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/11/27 17:41:31 UTC
[1/2] qpid-dispatch git commit: DISPATCH-1177 - Added new option -e
to qdstat. This will show only the edge connections and show which of those
connections is active - Added code to make qdstat -e backward compatible -
Added code to set the edge connec
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 5ede7da83 -> 2ac2c6ee5
DISPATCH-1177 - Added new option -e to qdstat. This will show only the edge connections and show which of those connections is active
- Added code to make qdstat -e backward compatible
- Added code to set the edge connection as active on the interior router
- Mark edge connections from interior as active
This closes #420
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/187cd35a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/187cd35a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/187cd35a
Branch: refs/heads/master
Commit: 187cd35a364cf1cee8818ded9b21403e75c1b481
Parents: 5ede7da
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Tue Nov 13 12:12:38 2018 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Nov 27 12:34:14 2018 -0500
----------------------------------------------------------------------
src/router_core/agent_connection.c | 31 +++-
src/router_core/agent_connection.h | 2 +-
.../modules/edge_router/connection_manager.c | 2 +
src/router_core/router_core_private.h | 1 +
tests/system_tests_edge_router.py | 181 +++++++++++++++++++
tools/qdstat.in | 60 ++++++
6 files changed, 270 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/187cd35a/src/router_core/agent_connection.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_connection.c b/src/router_core/agent_connection.c
index 9ab6d7a..06bc9cf 100644
--- a/src/router_core/agent_connection.c
+++ b/src/router_core/agent_connection.c
@@ -40,6 +40,7 @@
#define QDR_CONNECTION_TYPE 15
#define QDR_CONNECTION_SSL 16
#define QDR_CONNECTION_OPENED 17
+#define QDR_CONNECTION_ACTIVE 18
const char * const QDR_CONNECTION_DIR_IN = "in";
const char * const QDR_CONNECTION_DIR_OUT = "out";
@@ -70,6 +71,7 @@ const char *qdr_connection_columns[] =
"type",
"ssl",
"opened",
+ "active",
0};
const char *CONNECTION_TYPE = "org.apache.qpid.dispatch.connection";
@@ -97,7 +99,7 @@ static void qd_get_next_pn_data(pn_data_t **data, const char **d, int *d1)
}
-static void qdr_connection_insert_column_CT(qdr_connection_t *conn, int col, qd_composed_field_t *body, bool as_map)
+static void qdr_connection_insert_column_CT(qdr_core_t *core, qdr_connection_t *conn, int col, qd_composed_field_t *body, bool as_map)
{
char id_str[100];
@@ -196,6 +198,23 @@ static void qdr_connection_insert_column_CT(qdr_connection_t *conn, int col, qd_
qd_compose_insert_bool(body, conn->connection_info->opened);
break;
+ case QDR_CONNECTION_ACTIVE:
+ if (conn->role == QDR_ROLE_EDGE_CONNECTION) {
+ if (core->router_mode == QD_ROUTER_MODE_INTERIOR) {
+ qd_compose_insert_bool(body, true);
+ }
+ else if (core->router_mode == QD_ROUTER_MODE_EDGE){
+ if (core->active_edge_connection == conn)
+ qd_compose_insert_bool(body, true);
+ else
+ qd_compose_insert_bool(body, false);
+ }
+ }
+ else {
+ qd_compose_insert_bool(body, true);
+ }
+ break;
+
case QDR_CONNECTION_PROPERTIES: {
pn_data_t *data = conn->connection_info->connection_properties;
qd_compose_start_map(body);
@@ -238,14 +257,14 @@ static void qdr_connection_insert_column_CT(qdr_connection_t *conn, int col, qd_
}
-static void qdr_agent_write_connection_CT(qdr_query_t *query, qdr_connection_t *conn)
+static void qdr_agent_write_connection_CT(qdr_core_t *core, qdr_query_t *query, qdr_connection_t *conn)
{
qd_composed_field_t *body = query->body;
qd_compose_start_list(body);
int i = 0;
while (query->columns[i] >= 0) {
- qdr_connection_insert_column_CT(conn, query->columns[i], body, false);
+ qdr_connection_insert_column_CT(core, conn, query->columns[i], body, false);
i++;
}
qd_compose_end_list(body);
@@ -287,7 +306,7 @@ void qdra_connection_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offs
//
// Write the columns of the object into the response body.
//
- qdr_agent_write_connection_CT(query, conn);
+ qdr_agent_write_connection_CT(core, query, conn);
//
// Advance to the next connection
@@ -315,7 +334,7 @@ void qdra_connection_get_next_CT(qdr_core_t *core, qdr_query_t *query)
//
// Write the columns of the connection entity into the response body.
//
- qdr_agent_write_connection_CT(query, conn);
+ qdr_agent_write_connection_CT(core, query, conn);
//
// Advance to the next object
@@ -339,7 +358,7 @@ static void qdr_manage_write_connection_map_CT(qdr_core_t *core,
for(int i = 0; i < QDR_CONNECTION_COLUMN_COUNT; i++) {
qd_compose_insert_string(body, qdr_connection_columns[i]);
- qdr_connection_insert_column_CT(conn, i, body, false);
+ qdr_connection_insert_column_CT(core, conn, i, body, false);
}
qd_compose_end_map(body);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/187cd35a/src/router_core/agent_connection.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_connection.h b/src/router_core/agent_connection.h
index c701fb0..7536969 100644
--- a/src/router_core/agent_connection.h
+++ b/src/router_core/agent_connection.h
@@ -30,7 +30,7 @@ void qdra_connection_get_CT(qdr_core_t *core,
const char *qdr_connection_columns[]);
-#define QDR_CONNECTION_COLUMN_COUNT 18
+#define QDR_CONNECTION_COLUMN_COUNT 19
const char *qdr_connection_columns[QDR_CONNECTION_COLUMN_COUNT + 1];
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/187cd35a/src/router_core/modules/edge_router/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/edge_router/connection_manager.c b/src/router_core/modules/edge_router/connection_manager.c
index ca39421..bb4ae1c 100644
--- a/src/router_core/modules/edge_router/connection_manager.c
+++ b/src/router_core/modules/edge_router/connection_manager.c
@@ -54,6 +54,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
if (cm->active_edge_connection == 0 && conn->role == QDR_ROLE_EDGE_CONNECTION) {
qd_log(cm->core->log, QD_LOG_INFO, "Edge connection (id=%"PRIu64") to interior established", conn->identity);
cm->active_edge_connection = conn;
+ cm->core->active_edge_connection = conn;
qdrc_event_conn_raise(cm->core, QDRC_EVENT_CONN_EDGE_ESTABLISHED, conn);
}
break;
@@ -69,6 +70,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
"Edge connection (id=%"PRIu64") to interior lost, activating alternate id=%"PRIu64"",
conn->identity, alternate->identity);
cm->active_edge_connection = alternate;
+ cm->core->active_edge_connection = alternate;
qdrc_event_conn_raise(cm->core, QDRC_EVENT_CONN_EDGE_ESTABLISHED, alternate);
} else {
qd_log(cm->core->log, QD_LOG_INFO,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/187cd35a/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 19a1d1e..91d0016 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -723,6 +723,7 @@ struct qdr_core_t {
qd_timer_t *work_timer;
qdr_connection_list_t open_connections;
+ qdr_connection_t *active_edge_connection;
qdr_connection_list_t connections_to_activate;
qdr_link_list_t open_links;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/187cd35a/tests/system_tests_edge_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index de6a723..04b9349 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -23,6 +23,7 @@ from __future__ import absolute_import
from __future__ import print_function
from time import sleep
+from threading import Timer
import unittest2 as unittest
from proton import Message, Timeout
@@ -34,6 +35,7 @@ from test_broker import FakeService
from proton.handlers import MessagingHandler
from proton.reactor import Container, DynamicNodeProperties
from qpid_dispatch.management.client import Node
+from subprocess import PIPE, STDOUT
class AddrTimer(object):
@@ -44,6 +46,185 @@ class AddrTimer(object):
self.parent.check_address()
+class EdgeRouterTest(TestCase):
+
+ inter_router_port = None
+
+ @classmethod
+ def setUpClass(cls):
+ """Start a router"""
+ super(EdgeRouterTest, cls).setUpClass()
+
+ def router(name, mode, connection, extra=None):
+ config = [
+ ('router', {'mode': mode, 'id': name}),
+ ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
+ ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'multiTenant': 'yes'}),
+ ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'role': 'route-container'}),
+ ('linkRoute', {'prefix': '0.0.0.0/link', 'direction': 'in', 'containerId': 'LRC'}),
+ ('linkRoute', {'prefix': '0.0.0.0/link', 'direction': 'out', 'containerId': 'LRC'}),
+ ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'direction': 'in'}),
+ ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'direction': 'out'}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+ ('address', {'prefix': '0.0.0.0/queue', 'waypoint': 'yes'}),
+ connection
+ ]
+
+ if extra:
+ config.append(extra)
+ config = Qdrouterd.Config(config)
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+ cls.routers = []
+
+ inter_router_port = cls.tester.get_port()
+ edge_port_A = cls.tester.get_port()
+ edge_port_B = cls.tester.get_port()
+
+ router('INT.A', 'interior', ('listener', {'role': 'inter-router', 'port': inter_router_port}),
+ ('listener', {'role': 'edge', 'port': edge_port_A}))
+ router('INT.B', 'interior', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port}),
+ ('listener', {'role': 'edge', 'port': edge_port_B}))
+ router('EA1', 'edge', ('connector', {'name': 'edge', 'role': 'edge',
+ 'port': edge_port_A}
+ ),
+ ('connector', {'name': 'edge.1', 'role': 'edge',
+ 'port': edge_port_B}
+ )
+ )
+
+ cls.routers[0].wait_router_connected('INT.B')
+ cls.routers[1].wait_router_connected('INT.A')
+
+ def __init__(self, test_method):
+ TestCase.__init__(self, test_method)
+ self.success = False
+ self.timer_delay = 2
+ self.max_attempts = 3
+ self.attempts = 0
+
+ def run_qdstat(self, args, regexp=None, address=None):
+ p = self.popen(
+ ['qdstat', '--bus', str(address or self.router.addresses[0]),
+ '--timeout', str(TIMEOUT)] + args,
+ name='qdstat-' + self.id(), stdout=PIPE, expect=None,
+ universal_newlines=True)
+
+ out = p.communicate()[0]
+ assert p.returncode == 0, \
+ "qdstat exit status %s, output:\n%s" % (p.returncode, out)
+ if regexp: assert re.search(regexp, out,
+ re.I), "Can't find '%s' in '%s'" % (
+ regexp, out)
+ return out
+
+ def can_terminate(self):
+ if self.attempts == self.max_attempts:
+ return True
+
+ if self.success:
+ return True
+
+ return False
+
+ def run_int_b_edge_qdstat(self):
+ outs = self.run_qdstat(['--edge'],
+ address=self.routers[2].addresses[0])
+ lines = outs.split("\n")
+ for line in lines:
+ if "INT.B" in line and "yes" in line:
+ self.success = True
+
+ def run_int_a_edge_qdstat(self):
+ outs = self.run_qdstat(['--edge'],
+ address=self.routers[2].addresses[0])
+ lines = outs.split("\n")
+ for line in lines:
+ if "INT.A" in line and "yes" in line:
+ self.success = True
+
+ def schedule_int_a_qdstat_test(self):
+ if self.attempts < self.max_attempts:
+ if not self.success:
+ Timer(self.timer_delay, self.run_int_a_edge_qdstat).start()
+ self.attempts += 1
+
+ def schedule_int_b_qdstat_test(self):
+ if self.attempts < self.max_attempts:
+ if not self.success:
+ Timer(self.timer_delay, self.run_int_b_edge_qdstat).start()
+ self.attempts += 1
+
+ def test_01_active_flag(self):
+ """
+ In this test, we have one edge router connected to two interior
+ routers. One connection is to INT.A and another connection is to
+ INT.B . But only one of these connections is active. We use qdstat
+ to make sure that only one of these connections is active.
+ Then we kill the router with the active connection and make sure
+ that the other connection is now the active one
+ """
+ success = False
+ outs = self.run_qdstat(['--edge'],
+ address=self.routers[0].addresses[0])
+ lines = outs.split("\n")
+ for line in lines:
+ if "EA1" in line and "yes" in line:
+ success = True
+ if not success:
+ self.fail("Active edge connection not found not found for "
+ "interior router")
+
+ outs = self.run_qdstat(['--edge'],
+ address=self.routers[2].addresses[0])
+ conn_map_edge = dict()
+ #
+ # We dont know which interior router the edge will connect to.
+ #
+ conn_map_edge["INT.A"] = False
+ conn_map_edge["INT.B"] = False
+ lines = outs.split("\n")
+ for line in lines:
+ if "INT.A" in line and "yes" in line:
+ conn_map_edge["INT.A"] = True
+ if "INT.B" in line and "yes" in line:
+ conn_map_edge["INT.B"] = True
+
+ if conn_map_edge["INT.A"] and conn_map_edge["INT.B"]:
+ self.fail("Edhe router has two active connections to interior "
+ "routers. Should have only one")
+
+ if not conn_map_edge["INT.A"] and not conn_map_edge["INT.B"]:
+ self.fail("There are no active aconnections to interior routers")
+
+ if conn_map_edge["INT.A"]:
+ #
+ # INT.A has the active connection. Let's kill INT.A and see
+ # if the other connection becomes active
+ #
+ EdgeRouterTest.routers[0].teardown()
+ self.schedule_int_b_qdstat_test()
+
+ while not self.can_terminate():
+ pass
+
+ self.assertTrue(self.success)
+
+ elif conn_map_edge["INT.B"]:
+ #
+ # INT.B has the active connection. Let's kill INT.B and see
+ # if the other connection becomes active
+ #
+ EdgeRouterTest.routers[1].teardown()
+ self.schedule_int_a_qdstat_test()
+
+ while not self.can_terminate():
+ pass
+
+ self.assertTrue(self.success)
+
class RouterTest(TestCase):
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/187cd35a/tools/qdstat.in
----------------------------------------------------------------------
diff --git a/tools/qdstat.in b/tools/qdstat.in
index 18281f1..6e2f2ab 100755
--- a/tools/qdstat.in
+++ b/tools/qdstat.in
@@ -52,6 +52,7 @@ def parse_args(argv):
parser.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show")
parser.add_option("-l", "--links", help="Show Router Links", action="store_const", const="l", dest="show")
parser.add_option("-n", "--nodes", help="Show Router Nodes", action="store_const", const="n", dest="show")
+ parser.add_option("-e", "--edge", help="Show edge connections", action="store_const", const="e", dest="show")
parser.add_option("-a", "--address", help="Show Router Addresses", action="store_const", const="a", dest="show")
parser.add_option("-m", "--memory", help="Show Router Memory Stats", action="store_const", const="m", dest="show")
parser.add_option("--autolinks", help="Show Auto Links", action="store_const", const="autolinks", dest="show")
@@ -141,6 +142,64 @@ class BusManager(Node):
return text[:-1]
return text
+ def displayEdges(self):
+ disp = Display(prefix=" ")
+ heads = []
+ heads.append(Header("id"))
+ heads.append(Header("host"))
+ heads.append(Header("container"))
+ heads.append(Header("role"))
+ heads.append(Header("dir"))
+ heads.append(Header("security"))
+ heads.append(Header("authentication"))
+ heads.append(Header("tenant"))
+
+ rows = []
+ objects = self.query('org.apache.qpid.dispatch.connection', limit=self.opts.limit)
+
+ if not objects:
+ print("No Edge Router Connections")
+ return
+ has_active = False
+
+ first = objects[0]
+ try:
+ if first:
+ active = first.active
+ has_active = True
+ except:
+ pass
+
+ if has_active:
+ heads.append(Header("active"))
+
+ for conn in objects:
+ #print (conn)
+ if conn.role == "edge":
+ row = []
+ row.append(conn.identity)
+ row.append(conn.host)
+ row.append(conn.container)
+ row.append(conn.role)
+ row.append(conn.dir)
+ row.append(self.connSecurity(conn))
+ row.append(self.connAuth(conn))
+ row.append(self.noTrailingSlash(get(conn, 'tenant')))
+ if has_active:
+ if conn.active:
+ row.append("yes")
+ else:
+ row.append("no")
+ rows.append(row)
+
+ if rows:
+ title = "Connections"
+ else:
+ return
+
+ dispRows = rows
+ disp.formattedTable(title, heads, dispRows)
+
def displayConnections(self):
disp = Display(prefix=" ")
heads = []
@@ -557,6 +616,7 @@ class BusManager(Node):
elif main == 'a': self.displayAddresses()
elif main == 'm': self.displayMemory()
elif main == 'g': self.displayGeneral()
+ elif main == 'e': self.displayEdges()
elif main == 'c': self.displayConnections()
elif main == 'autolinks': self.displayAutolinks()
elif main == 'linkroutes': self.displayLinkRoutes()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-dispatch git commit: DISPATCH-1194 - Establish an address
for the address-lookup feature
Posted by tr...@apache.org.
DISPATCH-1194 - Establish an address for the address-lookup feature
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/2ac2c6ee
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/2ac2c6ee
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/2ac2c6ee
Branch: refs/heads/master
Commit: 2ac2c6ee5e87a15a82b9453ef149ecd8f4243572
Parents: 187cd35
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Nov 27 12:40:26 2018 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Nov 27 12:40:26 2018 -0500
----------------------------------------------------------------------
include/qpid/dispatch/amqp.h | 1 +
src/amqp.c | 1 +
2 files changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2ac2c6ee/include/qpid/dispatch/amqp.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index 6a92175..e62b7f7 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -151,6 +151,7 @@ extern const char * const QD_CONNECTION_PROPERTY_FAILOVER_HOSTNAME_KEY;
/** @name Terminus Addresses */
/// @{
extern const char * const QD_TERMINUS_EDGE_ADDRESS_TRACKING;
+extern const char * const QD_TERMINUS_ADDRESS_LOOKUP;
/// @}
/** @name AMQP error codes. */
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2ac2c6ee/src/amqp.c
----------------------------------------------------------------------
diff --git a/src/amqp.c b/src/amqp.c
index 1492e4b..0169c68 100644
--- a/src/amqp.c
+++ b/src/amqp.c
@@ -51,6 +51,7 @@ const char * const QD_CONNECTION_PROPERTY_FAILOVER_SCHEME_KEY = "scheme";
const char * const QD_CONNECTION_PROPERTY_FAILOVER_HOSTNAME_KEY = "hostname";
const char * const QD_TERMINUS_EDGE_ADDRESS_TRACKING = "_$qd.edge_addr_tracking";
+const char * const QD_TERMINUS_ADDRESS_LOOKUP = "_$qd.addr_lookup";
const qd_amqp_error_t QD_AMQP_OK = { 200, "OK" };
const qd_amqp_error_t QD_AMQP_CREATED = { 201, "Created" };
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org