You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/11/27 17:41:31 UTC

[1/2] qpid-dispatch git commit: DISPATCH-1177 - Added new option -e to qdstat. This will show only the edge connections and show which of those connections is active - Added code to make qdstat -e backward compatible - Added code to set the edge connec

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 5ede7da83 -> 2ac2c6ee5


DISPATCH-1177 - Added new option -e to qdstat. This will show only the edge connections and show which of those connections is active
 - Added code to make qdstat -e backward compatible
 - Added code to set the edge connection as active on the interior router
 - Mark edge connections from interior as active
This closes #420


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/187cd35a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/187cd35a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/187cd35a

Branch: refs/heads/master
Commit: 187cd35a364cf1cee8818ded9b21403e75c1b481
Parents: 5ede7da
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Tue Nov 13 12:12:38 2018 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Nov 27 12:34:14 2018 -0500

----------------------------------------------------------------------
 src/router_core/agent_connection.c              |  31 +++-
 src/router_core/agent_connection.h              |   2 +-
 .../modules/edge_router/connection_manager.c    |   2 +
 src/router_core/router_core_private.h           |   1 +
 tests/system_tests_edge_router.py               | 181 +++++++++++++++++++
 tools/qdstat.in                                 |  60 ++++++
 6 files changed, 270 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/187cd35a/src/router_core/agent_connection.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_connection.c b/src/router_core/agent_connection.c
index 9ab6d7a..06bc9cf 100644
--- a/src/router_core/agent_connection.c
+++ b/src/router_core/agent_connection.c
@@ -40,6 +40,7 @@
 #define QDR_CONNECTION_TYPE             15
 #define QDR_CONNECTION_SSL              16
 #define QDR_CONNECTION_OPENED           17
+#define QDR_CONNECTION_ACTIVE           18
 
 const char * const QDR_CONNECTION_DIR_IN  = "in";
 const char * const QDR_CONNECTION_DIR_OUT = "out";
@@ -70,6 +71,7 @@ const char *qdr_connection_columns[] =
      "type",
      "ssl",
      "opened",
+     "active",
      0};
 
 const char *CONNECTION_TYPE = "org.apache.qpid.dispatch.connection";
@@ -97,7 +99,7 @@ static void qd_get_next_pn_data(pn_data_t **data, const char **d, int *d1)
     }
 
 
-static void qdr_connection_insert_column_CT(qdr_connection_t *conn, int col, qd_composed_field_t *body, bool as_map)
+static void qdr_connection_insert_column_CT(qdr_core_t *core, qdr_connection_t *conn, int col, qd_composed_field_t *body, bool as_map)
 {
     char id_str[100];
 
@@ -196,6 +198,23 @@ static void qdr_connection_insert_column_CT(qdr_connection_t *conn, int col, qd_
         qd_compose_insert_bool(body, conn->connection_info->opened);
         break;
 
+    case QDR_CONNECTION_ACTIVE:
+        if (conn->role == QDR_ROLE_EDGE_CONNECTION) {
+            if (core->router_mode == QD_ROUTER_MODE_INTERIOR) {
+                qd_compose_insert_bool(body, true);
+            }
+            else if (core->router_mode  == QD_ROUTER_MODE_EDGE){
+                if (core->active_edge_connection == conn)
+                    qd_compose_insert_bool(body, true);
+                else
+                    qd_compose_insert_bool(body, false);
+            }
+        }
+        else {
+            qd_compose_insert_bool(body, true);
+        }
+        break;
+
     case QDR_CONNECTION_PROPERTIES: {
         pn_data_t *data = conn->connection_info->connection_properties;
         qd_compose_start_map(body);
@@ -238,14 +257,14 @@ static void qdr_connection_insert_column_CT(qdr_connection_t *conn, int col, qd_
 }
 
 
-static void qdr_agent_write_connection_CT(qdr_query_t *query,  qdr_connection_t *conn)
+static void qdr_agent_write_connection_CT(qdr_core_t *core, qdr_query_t *query,  qdr_connection_t *conn)
 {
     qd_composed_field_t *body = query->body;
 
     qd_compose_start_list(body);
     int i = 0;
     while (query->columns[i] >= 0) {
-        qdr_connection_insert_column_CT(conn, query->columns[i], body, false);
+        qdr_connection_insert_column_CT(core, conn, query->columns[i], body, false);
         i++;
     }
     qd_compose_end_list(body);
@@ -287,7 +306,7 @@ void qdra_connection_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offs
     //
     // Write the columns of the object into the response body.
     //
-    qdr_agent_write_connection_CT(query, conn);
+    qdr_agent_write_connection_CT(core, query, conn);
 
     //
     // Advance to the next connection
@@ -315,7 +334,7 @@ void qdra_connection_get_next_CT(qdr_core_t *core, qdr_query_t *query)
         //
         // Write the columns of the connection entity into the response body.
         //
-        qdr_agent_write_connection_CT(query, conn);
+        qdr_agent_write_connection_CT(core, query, conn);
 
         //
         // Advance to the next object
@@ -339,7 +358,7 @@ static void qdr_manage_write_connection_map_CT(qdr_core_t          *core,
 
     for(int i = 0; i < QDR_CONNECTION_COLUMN_COUNT; i++) {
         qd_compose_insert_string(body, qdr_connection_columns[i]);
-        qdr_connection_insert_column_CT(conn, i, body, false);
+        qdr_connection_insert_column_CT(core, conn, i, body, false);
     }
 
     qd_compose_end_map(body);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/187cd35a/src/router_core/agent_connection.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_connection.h b/src/router_core/agent_connection.h
index c701fb0..7536969 100644
--- a/src/router_core/agent_connection.h
+++ b/src/router_core/agent_connection.h
@@ -30,7 +30,7 @@ void qdra_connection_get_CT(qdr_core_t          *core,
                             const char          *qdr_connection_columns[]);
 
 
-#define QDR_CONNECTION_COLUMN_COUNT 18
+#define QDR_CONNECTION_COLUMN_COUNT 19
 const char *qdr_connection_columns[QDR_CONNECTION_COLUMN_COUNT + 1];
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/187cd35a/src/router_core/modules/edge_router/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/edge_router/connection_manager.c b/src/router_core/modules/edge_router/connection_manager.c
index ca39421..bb4ae1c 100644
--- a/src/router_core/modules/edge_router/connection_manager.c
+++ b/src/router_core/modules/edge_router/connection_manager.c
@@ -54,6 +54,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
         if (cm->active_edge_connection == 0 && conn->role == QDR_ROLE_EDGE_CONNECTION) {
             qd_log(cm->core->log, QD_LOG_INFO, "Edge connection (id=%"PRIu64") to interior established", conn->identity);
             cm->active_edge_connection = conn;
+            cm->core->active_edge_connection = conn;
             qdrc_event_conn_raise(cm->core, QDRC_EVENT_CONN_EDGE_ESTABLISHED, conn);
         }
         break;
@@ -69,6 +70,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
                        "Edge connection (id=%"PRIu64") to interior lost, activating alternate id=%"PRIu64"",
                        conn->identity, alternate->identity);
                 cm->active_edge_connection = alternate;
+                cm->core->active_edge_connection = alternate;
                 qdrc_event_conn_raise(cm->core, QDRC_EVENT_CONN_EDGE_ESTABLISHED, alternate);
             } else {
                 qd_log(cm->core->log, QD_LOG_INFO,

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/187cd35a/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 19a1d1e..91d0016 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -723,6 +723,7 @@ struct qdr_core_t {
     qd_timer_t              *work_timer;
 
     qdr_connection_list_t open_connections;
+    qdr_connection_t     *active_edge_connection;
     qdr_connection_list_t connections_to_activate;
     qdr_link_list_t       open_links;
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/187cd35a/tests/system_tests_edge_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index de6a723..04b9349 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -23,6 +23,7 @@ from __future__ import absolute_import
 from __future__ import print_function
 
 from time import sleep
+from threading import Timer
 
 import unittest2 as unittest
 from proton import Message, Timeout
@@ -34,6 +35,7 @@ from test_broker import FakeService
 from proton.handlers import MessagingHandler
 from proton.reactor import Container, DynamicNodeProperties
 from qpid_dispatch.management.client import Node
+from subprocess import PIPE, STDOUT
 
 
 class AddrTimer(object):
@@ -44,6 +46,185 @@ class AddrTimer(object):
             self.parent.check_address()
 
 
+class EdgeRouterTest(TestCase):
+
+    inter_router_port = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(EdgeRouterTest, cls).setUpClass()
+
+        def router(name, mode, connection, extra=None):
+            config = [
+                ('router', {'mode': mode, 'id': name}),
+                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
+                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'multiTenant': 'yes'}),
+                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'role': 'route-container'}),
+                ('linkRoute', {'prefix': '0.0.0.0/link', 'direction': 'in', 'containerId': 'LRC'}),
+                ('linkRoute', {'prefix': '0.0.0.0/link', 'direction': 'out', 'containerId': 'LRC'}),
+                ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'direction': 'in'}),
+                ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'direction': 'out'}),
+                ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+                ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+                ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+                ('address', {'prefix': '0.0.0.0/queue', 'waypoint': 'yes'}),
+                connection
+            ]
+
+            if extra:
+                config.append(extra)
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+
+        inter_router_port = cls.tester.get_port()
+        edge_port_A = cls.tester.get_port()
+        edge_port_B = cls.tester.get_port()
+
+        router('INT.A', 'interior', ('listener', {'role': 'inter-router', 'port': inter_router_port}),
+               ('listener', {'role': 'edge', 'port': edge_port_A}))
+        router('INT.B', 'interior', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port}),
+               ('listener', {'role': 'edge', 'port': edge_port_B}))
+        router('EA1', 'edge', ('connector', {'name': 'edge', 'role': 'edge',
+                                             'port': edge_port_A}
+                               ),
+               ('connector', {'name': 'edge.1', 'role': 'edge',
+                              'port': edge_port_B}
+                )
+               )
+
+        cls.routers[0].wait_router_connected('INT.B')
+        cls.routers[1].wait_router_connected('INT.A')
+
+    def __init__(self, test_method):
+        TestCase.__init__(self, test_method)
+        self.success = False
+        self.timer_delay = 2
+        self.max_attempts = 3
+        self.attempts = 0
+
+    def run_qdstat(self, args, regexp=None, address=None):
+        p = self.popen(
+            ['qdstat', '--bus', str(address or self.router.addresses[0]),
+             '--timeout', str(TIMEOUT)] + args,
+            name='qdstat-' + self.id(), stdout=PIPE, expect=None,
+            universal_newlines=True)
+
+        out = p.communicate()[0]
+        assert p.returncode == 0, \
+            "qdstat exit status %s, output:\n%s" % (p.returncode, out)
+        if regexp: assert re.search(regexp, out,
+                                    re.I), "Can't find '%s' in '%s'" % (
+        regexp, out)
+        return out
+
+    def can_terminate(self):
+        if self.attempts == self.max_attempts:
+            return True
+
+        if self.success:
+            return True
+
+        return False
+
+    def run_int_b_edge_qdstat(self):
+        outs = self.run_qdstat(['--edge'],
+                               address=self.routers[2].addresses[0])
+        lines = outs.split("\n")
+        for line in lines:
+            if "INT.B" in line and "yes" in line:
+                self.success = True
+
+    def run_int_a_edge_qdstat(self):
+        outs = self.run_qdstat(['--edge'],
+                               address=self.routers[2].addresses[0])
+        lines = outs.split("\n")
+        for line in lines:
+            if "INT.A" in line and "yes" in line:
+                self.success = True
+
+    def schedule_int_a_qdstat_test(self):
+        if self.attempts < self.max_attempts:
+            if not self.success:
+                Timer(self.timer_delay, self.run_int_a_edge_qdstat).start()
+                self.attempts += 1
+
+    def schedule_int_b_qdstat_test(self):
+        if self.attempts < self.max_attempts:
+            if not self.success:
+                Timer(self.timer_delay, self.run_int_b_edge_qdstat).start()
+                self.attempts += 1
+
+    def test_01_active_flag(self):
+        """
+        In this test, we have one edge router connected to two interior
+        routers. One connection is to INT.A and another connection is to
+        INT.B . But only one of these connections is active. We use qdstat
+        to make sure that only one of these connections is active.
+        Then we kill the router with the active connection and make sure
+        that the other connection is now the active one
+        """
+        success = False
+        outs = self.run_qdstat(['--edge'],
+                               address=self.routers[0].addresses[0])
+        lines = outs.split("\n")
+        for line in lines:
+            if "EA1" in line and "yes" in line:
+                success = True
+        if not success:
+            self.fail("Active edge connection not found not found for "
+                      "interior router")
+
+        outs = self.run_qdstat(['--edge'],
+                               address=self.routers[2].addresses[0])
+        conn_map_edge = dict()
+        #
+        # We dont know which interior router the edge will connect to.
+        #
+        conn_map_edge["INT.A"] = False
+        conn_map_edge["INT.B"] = False
+        lines = outs.split("\n")
+        for line in lines:
+            if "INT.A" in line and "yes" in line:
+                conn_map_edge["INT.A"] = True
+            if "INT.B" in line and "yes" in line:
+                conn_map_edge["INT.B"] = True
+
+        if conn_map_edge["INT.A"] and conn_map_edge["INT.B"]:
+            self.fail("Edhe router has two active connections to interior "
+                      "routers. Should have only one")
+
+        if not conn_map_edge["INT.A"] and  not conn_map_edge["INT.B"]:
+            self.fail("There are no active aconnections to interior routers")
+
+        if conn_map_edge["INT.A"]:
+            #
+            # INT.A has the active connection. Let's kill INT.A and see
+            # if the other connection becomes active
+            #
+            EdgeRouterTest.routers[0].teardown()
+            self.schedule_int_b_qdstat_test()
+
+            while not self.can_terminate():
+                pass
+
+            self.assertTrue(self.success)
+
+        elif conn_map_edge["INT.B"]:
+            #
+            # INT.B has the active connection. Let's kill INT.B and see
+            # if the other connection becomes active
+            #
+            EdgeRouterTest.routers[1].teardown()
+            self.schedule_int_a_qdstat_test()
+
+            while not self.can_terminate():
+                pass
+
+            self.assertTrue(self.success)
+
 
 class RouterTest(TestCase):
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/187cd35a/tools/qdstat.in
----------------------------------------------------------------------
diff --git a/tools/qdstat.in b/tools/qdstat.in
index 18281f1..6e2f2ab 100755
--- a/tools/qdstat.in
+++ b/tools/qdstat.in
@@ -52,6 +52,7 @@ def parse_args(argv):
     parser.add_option("-c", "--connections", help="Show Connections",       action="store_const", const="c",   dest="show")
     parser.add_option("-l", "--links", help="Show Router Links",            action="store_const", const="l",   dest="show")
     parser.add_option("-n", "--nodes", help="Show Router Nodes",            action="store_const", const="n",   dest="show")
+    parser.add_option("-e", "--edge", help="Show edge connections",         action="store_const", const="e",   dest="show")
     parser.add_option("-a", "--address", help="Show Router Addresses",      action="store_const", const="a",   dest="show")
     parser.add_option("-m", "--memory", help="Show Router Memory Stats",    action="store_const", const="m",   dest="show")
     parser.add_option("--autolinks", help="Show Auto Links",                action="store_const", const="autolinks",  dest="show")
@@ -141,6 +142,64 @@ class BusManager(Node):
             return text[:-1]
         return text
 
+    def displayEdges(self):
+        disp = Display(prefix="  ")
+        heads = []
+        heads.append(Header("id"))
+        heads.append(Header("host"))
+        heads.append(Header("container"))
+        heads.append(Header("role"))
+        heads.append(Header("dir"))
+        heads.append(Header("security"))
+        heads.append(Header("authentication"))
+        heads.append(Header("tenant"))
+        
+        rows = []
+        objects = self.query('org.apache.qpid.dispatch.connection', limit=self.opts.limit)
+        
+        if not objects:
+            print("No Edge Router Connections")
+            return
+        has_active = False
+        
+        first = objects[0]
+        try:
+            if first:
+                active = first.active
+                has_active = True
+        except:
+            pass
+            
+        if has_active:
+            heads.append(Header("active"))
+        
+        for conn in objects:
+            #print (conn)
+            if conn.role == "edge":
+               row = []
+               row.append(conn.identity)
+               row.append(conn.host)
+               row.append(conn.container)
+               row.append(conn.role)
+               row.append(conn.dir)
+               row.append(self.connSecurity(conn))
+               row.append(self.connAuth(conn))
+               row.append(self.noTrailingSlash(get(conn, 'tenant')))
+               if has_active:
+                   if conn.active:
+                       row.append("yes")
+                   else:
+                       row.append("no")
+               rows.append(row)
+        
+        if rows:
+            title = "Connections"
+        else:
+            return
+        
+        dispRows = rows
+        disp.formattedTable(title, heads, dispRows)            
+        
     def displayConnections(self):
         disp = Display(prefix="  ")
         heads = []
@@ -557,6 +616,7 @@ class BusManager(Node):
         elif main == 'a': self.displayAddresses()
         elif main == 'm': self.displayMemory()
         elif main == 'g': self.displayGeneral()
+        elif main == 'e': self.displayEdges()
         elif main == 'c': self.displayConnections()
         elif main == 'autolinks': self.displayAutolinks()
         elif main == 'linkroutes': self.displayLinkRoutes()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-dispatch git commit: DISPATCH-1194 - Establish an address for the address-lookup feature

Posted by tr...@apache.org.
DISPATCH-1194 - Establish an address for the address-lookup feature


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/2ac2c6ee
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/2ac2c6ee
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/2ac2c6ee

Branch: refs/heads/master
Commit: 2ac2c6ee5e87a15a82b9453ef149ecd8f4243572
Parents: 187cd35
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Nov 27 12:40:26 2018 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Nov 27 12:40:26 2018 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/amqp.h | 1 +
 src/amqp.c                   | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2ac2c6ee/include/qpid/dispatch/amqp.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index 6a92175..e62b7f7 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -151,6 +151,7 @@ extern const char * const QD_CONNECTION_PROPERTY_FAILOVER_HOSTNAME_KEY;
 /** @name Terminus Addresses */
 /// @{
 extern const char * const QD_TERMINUS_EDGE_ADDRESS_TRACKING;
+extern const char * const QD_TERMINUS_ADDRESS_LOOKUP;
 /// @}
 
 /** @name AMQP error codes. */

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2ac2c6ee/src/amqp.c
----------------------------------------------------------------------
diff --git a/src/amqp.c b/src/amqp.c
index 1492e4b..0169c68 100644
--- a/src/amqp.c
+++ b/src/amqp.c
@@ -51,6 +51,7 @@ const char * const QD_CONNECTION_PROPERTY_FAILOVER_SCHEME_KEY   = "scheme";
 const char * const QD_CONNECTION_PROPERTY_FAILOVER_HOSTNAME_KEY = "hostname";
 
 const char * const QD_TERMINUS_EDGE_ADDRESS_TRACKING = "_$qd.edge_addr_tracking";
+const char * const QD_TERMINUS_ADDRESS_LOOKUP        = "_$qd.addr_lookup";
 
 const qd_amqp_error_t QD_AMQP_OK = { 200, "OK" };
 const qd_amqp_error_t QD_AMQP_CREATED = { 201, "Created" };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org