You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2019/10/16 17:43:55 UTC

[qpid-dispatch] branch master updated: DISPATCH-1419 - Added connectionStatus and connectionMsg fields to the connector entity. These fields give more information about the state of the connection

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new f619309  DISPATCH-1419 - Added connectionStatus and connectionMsg fields to the connector entity. These fields give more information about the state of the connection
f619309 is described below

commit f619309baa4353eff3f95afeb58200fb94a88ab0
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Mon Sep 16 16:32:52 2019 -0400

    DISPATCH-1419 - Added connectionStatus and connectionMsg fields to the connector entity. These fields give more information about the state of the connection
---
 include/qpid/dispatch/log.h                   |   2 +
 python/qpid_dispatch/management/qdrouter.json |  10 ++
 src/connection_manager.c                      |  28 ++++-
 src/log.c                                     |   8 ++
 src/router_node.c                             |  10 ++
 src/server.c                                  |  18 +++-
 src/server_private.h                          |   1 +
 tests/CMakeLists.txt                          |   1 +
 tests/system_tests_connector_status.py        | 150 ++++++++++++++++++++++++++
 9 files changed, 224 insertions(+), 4 deletions(-)

diff --git a/include/qpid/dispatch/log.h b/include/qpid/dispatch/log.h
index 0d69346..8d2f499 100644
--- a/include/qpid/dispatch/log.h
+++ b/include/qpid/dispatch/log.h
@@ -74,4 +74,6 @@ void qd_vlog_impl(qd_log_source_t *source, qd_log_level_t level, const char *fil
 /** Maximum length for a log message */
 int qd_log_max_len();
 
+void qd_format_string(char* buf, int buf_size, const char *fmt, ...);
+
 #endif
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 961106a..9f12a84 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1026,6 +1026,16 @@
                     "create": false
                     
                 },
+                "connectionStatus": {
+                    "type": "string",
+                    "description": "A read-only status of the connection. Could be one of CONNECTING, SUCCESS, FAILED",
+                    "create": false  
+                },
+                "connectionMsg": {
+                    "type": "string",
+                    "description": "A read-only connection message. Contains the connection message",
+                    "create": false  
+                },                                   
                 "policyVhost": {
                     "type": "string",
                     "required": false,
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 9593198..4a8b0b2 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -811,7 +811,33 @@ qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl)
             item = DEQ_HEAD(conn_info_list);
     }
 
-    if (qd_entity_set_string(entity, "failoverUrls", failover_info) == 0)
+    int state_length = 0;
+    char *state = 0;
+
+    if (ct->state == CXTR_STATE_CONNECTING) {
+        state_length = 13;
+        state = "CONNECTING\0";
+    }
+    else if (ct->state == CXTR_STATE_OPEN) {
+        state_length = 12;
+        state = "SUCCESS\0";
+    }
+    else if (ct->state == CXTR_STATE_FAILED) {
+        state_length = 9;
+        state = "FAILED\0";
+    }
+    else if (ct->state == CXTR_STATE_INIT) {
+        state_length = 15;
+        state = "INITIALIZING\0";
+    }
+
+    char state_info[state_length];
+    memset(state_info, 0, sizeof(state_length));;
+    strcat(state_info, state);
+
+    if (qd_entity_set_string(entity, "failoverUrls", failover_info) == 0
+            && qd_entity_set_string(entity, "connectionStatus", state_info) == 0
+            && qd_entity_set_string(entity, "connectionMsg", ct->conn_msg) == 0)
         return QD_ERROR_NONE;
 
     return qd_error_code();
diff --git a/src/log.c b/src/log.c
index 513b579..7ecea72 100644
--- a/src/log.c
+++ b/src/log.c
@@ -566,6 +566,14 @@ qd_error_t qd_log_entity(qd_entity_t *entity) {
     return qd_error_code();
 }
 
+void qd_format_string(char* buf, int buf_size, const char *fmt, ...)
+{
+    va_list args;
+    va_start(args, fmt);
+    vsnprintf(buf, buf_size, fmt, args);
+    va_end(args);
+}
+
 
 qd_error_t qd_entity_refresh_logStats(qd_entity_t* entity, void *impl)
 {
diff --git a/src/router_node.c b/src/router_node.c
index d634287..7613afd 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1202,6 +1202,16 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
 
     pn_data_format(props, props_str, &props_len);
 
+    if (conn->connector) {
+        char conn_msg[300];
+        qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection Opened: dir=%s host=%s vhost=%s encrypted=%s"
+                " auth=%s user=%s container_id=%s",
+                connection_id, inbound ? "in" : "out", host, vhost ? vhost : "", encrypted ? proto : "no",
+                        authenticated ? mech : "no", (char*) user, container);
+        strcpy(conn->connector->conn_msg, conn_msg);
+    }
+
+
     qd_log(router->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection Opened: dir=%s host=%s vhost=%s encrypted=%s"
            " auth=%s user=%s container_id=%s props=%s",
            connection_id, inbound ? "in" : "out", host, vhost ? vhost : "", encrypted ? proto : "no",
diff --git a/src/server.c b/src/server.c
index a6c2548..7707efb 100644
--- a/src/server.c
+++ b/src/server.c
@@ -894,6 +894,7 @@ static void qd_increment_conn_index(qd_connection_t *ctx)
 
 }
 
+
 /* Events involving a connection or listener are serialized by the proactor so
  * only one event per connection / listener will be processed at a time.
  */
@@ -963,11 +964,18 @@ static bool handle(qd_server_t *qd_server, pn_event_t *e, pn_connection_t *pn_co
             if (ctx && ctx->connector) { /* Outgoing connection */
                 qd_increment_conn_index(ctx);
                 const qd_server_config_t *config = &ctx->connector->config;
+                ctx->connector->state = CXTR_STATE_FAILED;
+                char conn_msg[300];
                 if (condition  && pn_condition_is_set(condition)) {
-                    qd_log(qd_server->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection to %s failed: %s %s", ctx->connection_id, config->host_port,
-                           pn_condition_get_name(condition), pn_condition_get_description(condition));
+                    qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection to %s failed: %s %s", ctx->connection_id, config->host_port,
+                            pn_condition_get_name(condition), pn_condition_get_description(condition));
+                    strcpy(ctx->connector->conn_msg, conn_msg);
+
+                    qd_log(qd_server->log_source, QD_LOG_INFO, conn_msg);
                 } else {
-                    qd_log(qd_server->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection to %s failed", ctx->connection_id, config->host_port);
+                    qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection to %s failed", ctx->connection_id, config->host_port);
+                    strcpy(ctx->connector->conn_msg, conn_msg);
+                    qd_log(qd_server->log_source, QD_LOG_INFO, conn_msg);
                 }
             } else if (ctx && ctx->listener) { /* Incoming connection */
                 if (condition && pn_condition_is_set(condition)) {
@@ -1063,6 +1071,7 @@ static void try_open_lh(qd_connector_t *ct)
         qd_timer_schedule(ct->timer, ct->delay);
         return;
     }
+
     ctx->connector    = ct;
     const qd_server_config_t *config = &ct->config;
 
@@ -1454,6 +1463,8 @@ qd_connector_t *qd_server_connector(qd_server_t *server)
     ct->conn_index = 1;
     ct->state   = CXTR_STATE_INIT;
     ct->lock = sys_mutex();
+    ct->conn_msg = (char*) malloc(300);
+    memset(ct->conn_msg, 0, 300);
     ct->timer = qd_timer(ct->server->qd, try_open_cb, ct);
     if (!ct->lock || !ct->timer) {
         qd_connector_decref(ct);
@@ -1506,6 +1517,7 @@ bool qd_connector_decref(qd_connector_t* ct)
         }
         sys_mutex_free(ct->lock);
         if (ct->policy_vhost) free(ct->policy_vhost);
+        free(ct->conn_msg);
         free_qd_connector_t(ct);
         return true;
     }
diff --git a/src/server_private.h b/src/server_private.h
index f5a714b..7ce6dc6 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -118,6 +118,7 @@ struct qd_connector_t {
     /* Connector state and ctx can be modified in proactor or management threads. */
     sys_mutex_t              *lock;
     cxtr_state_t              state;
+    char                     *conn_msg;
     qd_connection_t          *ctx;
 
     /* This conn_list contains all the connection information needed to make a connection. It also includes failover connection information */
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index f36d05b..f7c86a1 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -126,6 +126,7 @@ foreach(py_test_module
     system_tests_bad_configuration
     system_tests_ssl
     system_tests_edge_router
+    system_tests_connector_status
     system_tests_core_endpoint
     ${SYSTEM_TESTS_HTTP}
     ${CONSOLE_TEST}
diff --git a/tests/system_tests_connector_status.py b/tests/system_tests_connector_status.py
new file mode 100644
index 0000000..6f3b6c0
--- /dev/null
+++ b/tests/system_tests_connector_status.py
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import json, re
+from threading import Timer
+
+from system_test import TestCase, Process, Qdrouterd, main_module, TIMEOUT, DIR
+from subprocess import PIPE, STDOUT
+
+class ConnectorStatusTest(TestCase):
+
+    inter_router_port = None
+    mgmt_port_a = None
+    mgmt_port_b = None
+
+    @classmethod
+    def setUpClass(cls):
+        super(ConnectorStatusTest, cls).setUpClass()
+
+        def router(name, config):
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config))
+
+        cls.routers = []
+        inter_router_port = cls.tester.get_port()
+        mgmt_port_a = cls.tester.get_port()
+        mgmt_port_b = cls.tester.get_port()
+
+        config_a = [
+            ('router', {'mode': 'interior', 'id': 'QDR.A'}),
+            ('listener', {'port': mgmt_port_a, 'host': '0.0.0.0'}),
+            ('listener', {'role': 'inter-router', 'port': inter_router_port, 'host': '0.0.0.0'}),
+        ]
+
+        router('QDR.A', config_a)
+
+        config_b = [
+            ('router', {'mode': 'interior', 'id': 'QDR.B'}),
+            ('listener', {'port': mgmt_port_b, 'host': '0.0.0.0'}),
+            ('connector', {'name': 'connectorToA', 'role': 'inter-router',
+                           'port': inter_router_port}),
+        ]
+
+
+        router('QDR.B', config_b)
+
+        cls.routers[0].wait_ports()
+        cls.routers[1].wait_ports()
+
+    def __init__(self, test_method):
+        TestCase.__init__(self, test_method)
+        self.success = False
+        self.timer_delay = 2
+        self.max_attempts = 5
+        self.attempts = 0
+
+    def address(self):
+        return self.routers[1].addresses[0]
+
+    def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
+        p = self.popen(
+            ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
+            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
+            universal_newlines=True)
+        out = p.communicate(input)[0]
+        try:
+            p.teardown()
+        except Exception as e:
+            raise Exception("%s\n%s" % (e, out))
+        return out
+
+    def can_terminate(self):
+        if self.attempts == self.max_attempts:
+            return True
+
+        if self.success:
+            return True
+
+        return False
+
+    def schedule_B_connector_test(self):
+        if self.attempts < self.max_attempts:
+            if not self.success:
+                Timer(self.timer_delay, self.check_B_connector).start()
+                self.attempts += 1
+
+    def check_B_connector(self):
+        # Router A should now try to connect to Router B again since we killed Router C.
+        long_type = 'org.apache.qpid.dispatch.connector'
+        query_command = 'QUERY --type=' + long_type
+        output = json.loads(self.run_qdmanage(query_command, address=self.address()))
+
+        conn_status = output[0].get('connectionStatus')
+        conn_msg = output[0].get('connectionMsg')
+
+        if conn_status == 'CONNECTING' and "Connection" in conn_msg and "failed" in conn_msg:
+            self.success = True
+        else:
+            self.schedule_B_connector_test()
+
+
+    def test_conn_status_before_connect(self):
+        # The routers have connected and begun talking to each other
+        # Verify that the connectionStatus field of the connector is set to SUCCESS.
+        # Also make sure that the connectionMsg field of the connector has "Connection opened" in it.
+        long_type = 'org.apache.qpid.dispatch.connector'
+        query_command = 'QUERY --type=' + long_type
+        output = json.loads(self.run_qdmanage(query_command))
+        connection_msg = output[0][u'connectionMsg']
+        self.assertEqual('SUCCESS', output[0][u'connectionStatus'])
+        conn_opened = False
+        if "Connection Opened: dir=out" in connection_msg:
+            conn_opened = True
+        self.assertEqual(True, conn_opened)
+
+        # Now tear down Router QDR.A. On doing this, router QDR.B will lose connectivity to
+        # QDR.A.
+        # QDR.B will continually attempy to connect to QDR.A but will be unsuccessful.
+        # The status of the connector of B must now be 'CONNECTING'.
+        ConnectorStatusTest.routers[0].teardown()
+
+        self.schedule_B_connector_test()
+
+        while not self.can_terminate():
+            pass
+
+        self.assertTrue(self.success)
+
+        # NOTE: Since the router continually tries the re-establish a connection
+        # if it is lost, the router connection status goes between FAILED and
+        # SUCCESS. There is no good way to test if the connection status ever
+        # reaches the FAILED state because the router immediately tries to
+        # re-connect thus setting the status to CONNECTING in the process.
+
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org