You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2019/10/16 17:43:55 UTC
[qpid-dispatch] branch master updated: DISPATCH-1419 - Added
connectionStatus and connectionMsg fields to the connector entity. These
fields give more information about the state of the connection
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new f619309 DISPATCH-1419 - Added connectionStatus and connectionMsg fields to the connector entity. These fields give more information about the state of the connection
f619309 is described below
commit f619309baa4353eff3f95afeb58200fb94a88ab0
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Mon Sep 16 16:32:52 2019 -0400
DISPATCH-1419 - Added connectionStatus and connectionMsg fields to the connector entity. These fields give more information about the state of the connection
---
include/qpid/dispatch/log.h | 2 +
python/qpid_dispatch/management/qdrouter.json | 10 ++
src/connection_manager.c | 28 ++++-
src/log.c | 8 ++
src/router_node.c | 10 ++
src/server.c | 18 +++-
src/server_private.h | 1 +
tests/CMakeLists.txt | 1 +
tests/system_tests_connector_status.py | 150 ++++++++++++++++++++++++++
9 files changed, 224 insertions(+), 4 deletions(-)
diff --git a/include/qpid/dispatch/log.h b/include/qpid/dispatch/log.h
index 0d69346..8d2f499 100644
--- a/include/qpid/dispatch/log.h
+++ b/include/qpid/dispatch/log.h
@@ -74,4 +74,6 @@ void qd_vlog_impl(qd_log_source_t *source, qd_log_level_t level, const char *fil
/** Maximum length for a log message */
int qd_log_max_len();
+void qd_format_string(char* buf, int buf_size, const char *fmt, ...);
+
#endif
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 961106a..9f12a84 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1026,6 +1026,16 @@
"create": false
},
+ "connectionStatus": {
+ "type": "string",
+ "description": "A read-only status of the connection. Could be one of CONNECTING, SUCCESS, FAILED",
+ "create": false
+ },
+ "connectionMsg": {
+ "type": "string",
+ "description": "A read-only connection message. Contains the connection message",
+ "create": false
+ },
"policyVhost": {
"type": "string",
"required": false,
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 9593198..4a8b0b2 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -811,7 +811,33 @@ qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl)
item = DEQ_HEAD(conn_info_list);
}
- if (qd_entity_set_string(entity, "failoverUrls", failover_info) == 0)
+ int state_length = 0;
+ char *state = 0;
+
+ if (ct->state == CXTR_STATE_CONNECTING) {
+ state_length = 13;
+ state = "CONNECTING\0";
+ }
+ else if (ct->state == CXTR_STATE_OPEN) {
+ state_length = 12;
+ state = "SUCCESS\0";
+ }
+ else if (ct->state == CXTR_STATE_FAILED) {
+ state_length = 9;
+ state = "FAILED\0";
+ }
+ else if (ct->state == CXTR_STATE_INIT) {
+ state_length = 15;
+ state = "INITIALIZING\0";
+ }
+
+ char state_info[state_length];
+ memset(state_info, 0, sizeof(state_length));;
+ strcat(state_info, state);
+
+ if (qd_entity_set_string(entity, "failoverUrls", failover_info) == 0
+ && qd_entity_set_string(entity, "connectionStatus", state_info) == 0
+ && qd_entity_set_string(entity, "connectionMsg", ct->conn_msg) == 0)
return QD_ERROR_NONE;
return qd_error_code();
diff --git a/src/log.c b/src/log.c
index 513b579..7ecea72 100644
--- a/src/log.c
+++ b/src/log.c
@@ -566,6 +566,14 @@ qd_error_t qd_log_entity(qd_entity_t *entity) {
return qd_error_code();
}
+void qd_format_string(char* buf, int buf_size, const char *fmt, ...)
+{
+ va_list args;
+ va_start(args, fmt);
+ vsnprintf(buf, buf_size, fmt, args);
+ va_end(args);
+}
+
qd_error_t qd_entity_refresh_logStats(qd_entity_t* entity, void *impl)
{
diff --git a/src/router_node.c b/src/router_node.c
index d634287..7613afd 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1202,6 +1202,16 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
pn_data_format(props, props_str, &props_len);
+ if (conn->connector) {
+ char conn_msg[300];
+ qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection Opened: dir=%s host=%s vhost=%s encrypted=%s"
+ " auth=%s user=%s container_id=%s",
+ connection_id, inbound ? "in" : "out", host, vhost ? vhost : "", encrypted ? proto : "no",
+ authenticated ? mech : "no", (char*) user, container);
+ strcpy(conn->connector->conn_msg, conn_msg);
+ }
+
+
qd_log(router->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection Opened: dir=%s host=%s vhost=%s encrypted=%s"
" auth=%s user=%s container_id=%s props=%s",
connection_id, inbound ? "in" : "out", host, vhost ? vhost : "", encrypted ? proto : "no",
diff --git a/src/server.c b/src/server.c
index a6c2548..7707efb 100644
--- a/src/server.c
+++ b/src/server.c
@@ -894,6 +894,7 @@ static void qd_increment_conn_index(qd_connection_t *ctx)
}
+
/* Events involving a connection or listener are serialized by the proactor so
* only one event per connection / listener will be processed at a time.
*/
@@ -963,11 +964,18 @@ static bool handle(qd_server_t *qd_server, pn_event_t *e, pn_connection_t *pn_co
if (ctx && ctx->connector) { /* Outgoing connection */
qd_increment_conn_index(ctx);
const qd_server_config_t *config = &ctx->connector->config;
+ ctx->connector->state = CXTR_STATE_FAILED;
+ char conn_msg[300];
if (condition && pn_condition_is_set(condition)) {
- qd_log(qd_server->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection to %s failed: %s %s", ctx->connection_id, config->host_port,
- pn_condition_get_name(condition), pn_condition_get_description(condition));
+ qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection to %s failed: %s %s", ctx->connection_id, config->host_port,
+ pn_condition_get_name(condition), pn_condition_get_description(condition));
+ strcpy(ctx->connector->conn_msg, conn_msg);
+
+ qd_log(qd_server->log_source, QD_LOG_INFO, conn_msg);
} else {
- qd_log(qd_server->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection to %s failed", ctx->connection_id, config->host_port);
+ qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection to %s failed", ctx->connection_id, config->host_port);
+ strcpy(ctx->connector->conn_msg, conn_msg);
+ qd_log(qd_server->log_source, QD_LOG_INFO, conn_msg);
}
} else if (ctx && ctx->listener) { /* Incoming connection */
if (condition && pn_condition_is_set(condition)) {
@@ -1063,6 +1071,7 @@ static void try_open_lh(qd_connector_t *ct)
qd_timer_schedule(ct->timer, ct->delay);
return;
}
+
ctx->connector = ct;
const qd_server_config_t *config = &ct->config;
@@ -1454,6 +1463,8 @@ qd_connector_t *qd_server_connector(qd_server_t *server)
ct->conn_index = 1;
ct->state = CXTR_STATE_INIT;
ct->lock = sys_mutex();
+ ct->conn_msg = (char*) malloc(300);
+ memset(ct->conn_msg, 0, 300);
ct->timer = qd_timer(ct->server->qd, try_open_cb, ct);
if (!ct->lock || !ct->timer) {
qd_connector_decref(ct);
@@ -1506,6 +1517,7 @@ bool qd_connector_decref(qd_connector_t* ct)
}
sys_mutex_free(ct->lock);
if (ct->policy_vhost) free(ct->policy_vhost);
+ free(ct->conn_msg);
free_qd_connector_t(ct);
return true;
}
diff --git a/src/server_private.h b/src/server_private.h
index f5a714b..7ce6dc6 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -118,6 +118,7 @@ struct qd_connector_t {
/* Connector state and ctx can be modified in proactor or management threads. */
sys_mutex_t *lock;
cxtr_state_t state;
+ char *conn_msg;
qd_connection_t *ctx;
/* This conn_list contains all the connection information needed to make a connection. It also includes failover connection information */
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index f36d05b..f7c86a1 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -126,6 +126,7 @@ foreach(py_test_module
system_tests_bad_configuration
system_tests_ssl
system_tests_edge_router
+ system_tests_connector_status
system_tests_core_endpoint
${SYSTEM_TESTS_HTTP}
${CONSOLE_TEST}
diff --git a/tests/system_tests_connector_status.py b/tests/system_tests_connector_status.py
new file mode 100644
index 0000000..6f3b6c0
--- /dev/null
+++ b/tests/system_tests_connector_status.py
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import json, re
+from threading import Timer
+
+from system_test import TestCase, Process, Qdrouterd, main_module, TIMEOUT, DIR
+from subprocess import PIPE, STDOUT
+
+class ConnectorStatusTest(TestCase):
+
+ inter_router_port = None
+ mgmt_port_a = None
+ mgmt_port_b = None
+
+ @classmethod
+ def setUpClass(cls):
+ super(ConnectorStatusTest, cls).setUpClass()
+
+ def router(name, config):
+ config = Qdrouterd.Config(config)
+ cls.routers.append(cls.tester.qdrouterd(name, config))
+
+ cls.routers = []
+ inter_router_port = cls.tester.get_port()
+ mgmt_port_a = cls.tester.get_port()
+ mgmt_port_b = cls.tester.get_port()
+
+ config_a = [
+ ('router', {'mode': 'interior', 'id': 'QDR.A'}),
+ ('listener', {'port': mgmt_port_a, 'host': '0.0.0.0'}),
+ ('listener', {'role': 'inter-router', 'port': inter_router_port, 'host': '0.0.0.0'}),
+ ]
+
+ router('QDR.A', config_a)
+
+ config_b = [
+ ('router', {'mode': 'interior', 'id': 'QDR.B'}),
+ ('listener', {'port': mgmt_port_b, 'host': '0.0.0.0'}),
+ ('connector', {'name': 'connectorToA', 'role': 'inter-router',
+ 'port': inter_router_port}),
+ ]
+
+
+ router('QDR.B', config_b)
+
+ cls.routers[0].wait_ports()
+ cls.routers[1].wait_ports()
+
+ def __init__(self, test_method):
+ TestCase.__init__(self, test_method)
+ self.success = False
+ self.timer_delay = 2
+ self.max_attempts = 5
+ self.attempts = 0
+
+ def address(self):
+ return self.routers[1].addresses[0]
+
+ def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
+ p = self.popen(
+ ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
+ stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
+ universal_newlines=True)
+ out = p.communicate(input)[0]
+ try:
+ p.teardown()
+ except Exception as e:
+ raise Exception("%s\n%s" % (e, out))
+ return out
+
+ def can_terminate(self):
+ if self.attempts == self.max_attempts:
+ return True
+
+ if self.success:
+ return True
+
+ return False
+
+ def schedule_B_connector_test(self):
+ if self.attempts < self.max_attempts:
+ if not self.success:
+ Timer(self.timer_delay, self.check_B_connector).start()
+ self.attempts += 1
+
+ def check_B_connector(self):
+ # Router A should now try to connect to Router B again since we killed Router C.
+ long_type = 'org.apache.qpid.dispatch.connector'
+ query_command = 'QUERY --type=' + long_type
+ output = json.loads(self.run_qdmanage(query_command, address=self.address()))
+
+ conn_status = output[0].get('connectionStatus')
+ conn_msg = output[0].get('connectionMsg')
+
+ if conn_status == 'CONNECTING' and "Connection" in conn_msg and "failed" in conn_msg:
+ self.success = True
+ else:
+ self.schedule_B_connector_test()
+
+
+ def test_conn_status_before_connect(self):
+ # The routers have connected and begun talking to each other
+ # Verify that the connectionStatus field of the connector is set to SUCCESS.
+ # Also make sure that the connectionMsg field of the connector has "Connection opened" in it.
+ long_type = 'org.apache.qpid.dispatch.connector'
+ query_command = 'QUERY --type=' + long_type
+ output = json.loads(self.run_qdmanage(query_command))
+ connection_msg = output[0][u'connectionMsg']
+ self.assertEqual('SUCCESS', output[0][u'connectionStatus'])
+ conn_opened = False
+ if "Connection Opened: dir=out" in connection_msg:
+ conn_opened = True
+ self.assertEqual(True, conn_opened)
+
+ # Now tear down Router QDR.A. On doing this, router QDR.B will lose connectivity to
+ # QDR.A.
+ # QDR.B will continually attempy to connect to QDR.A but will be unsuccessful.
+ # The status of the connector of B must now be 'CONNECTING'.
+ ConnectorStatusTest.routers[0].teardown()
+
+ self.schedule_B_connector_test()
+
+ while not self.can_terminate():
+ pass
+
+ self.assertTrue(self.success)
+
+ # NOTE: Since the router continually tries the re-establish a connection
+ # if it is lost, the router connection status goes between FAILED and
+ # SUCCESS. There is no good way to test if the connection status ever
+ # reaches the FAILED state because the router immediately tries to
+ # re-connect thus setting the status to CONNECTING in the process.
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org