You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/08/24 19:41:32 UTC
qpid-dispatch git commit: DISPATCH-1103 - Added code to retry failed
auto links using the core timeer API. This closes #366.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master b652fd408 -> c65fbc9d8
DISPATCH-1103 - Added code to retry failed auto links using the core timeer API. This closes #366.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/c65fbc9d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/c65fbc9d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/c65fbc9d
Branch: refs/heads/master
Commit: c65fbc9d82495348b5d5d0464852655847ebb5db
Parents: b652fd4
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Tue Aug 14 11:21:30 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Fri Aug 24 15:39:32 2018 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 8 ++
src/router_core/route_control.c | 64 +++++++++
src/router_core/route_control.h | 17 +++
src/router_core/router_core_private.h | 31 +++--
tests/system_tests_autolinks.py | 211 +++++++++++++++++++++++++++++
5 files changed, 317 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 482f600..30166b1 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1332,6 +1332,8 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
while (link_ref) {
qdr_link_t *link = link_ref->link;
+ qdr_route_auto_link_closed_CT(core, link);
+
//
// Clean up the link and all its associated state.
//
@@ -1842,6 +1844,12 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
link->auto_link->state = QDR_AUTO_LINK_STATE_FAILED;
free(link->auto_link->last_error);
link->auto_link->last_error = qdr_error_description(error);
+
+ //
+ // The auto link has failed. Periodically retry setting up the auto link until
+ // it succeeds.
+ //
+ qdr_route_auto_link_detached_CT(core, link);
}
if (link->link_direction == QD_INCOMING) {
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/src/router_core/route_control.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index 9c08d91..6b5e62e 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -34,6 +34,9 @@ ALLOC_DEFINE(qdr_conn_identifier_t);
const char CONTAINER_PREFIX = 'C';
const char CONNECTION_PREFIX = 'L';
+const int AUTO_LINK_FIRST_RETRY_INTERVAL = 2;
+const int AUTO_LINK_RETRY_INTERVAL = 5;
+
static qdr_conn_identifier_t *qdr_route_declare_id_CT(qdr_core_t *core,
qd_iterator_t *container,
@@ -263,6 +266,22 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr
}
+/**
+ * Attempts re-establishing auto links across the related connections/containers
+ */
+static void qdr_route_attempt_auto_link_CT(qdr_core_t *core,
+ void *context)
+{
+ qdr_auto_link_t *al = (qdr_auto_link_t *)context;
+ qdr_connection_ref_t * cref = DEQ_HEAD(al->conn_id->connection_refs);
+ while (cref) {
+ qdr_auto_link_activate_CT(core, al, cref->conn);
+ cref = DEQ_NEXT(cref);
+ }
+
+}
+
+
static void qdr_auto_link_deactivate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr_connection_t *conn)
{
qdr_route_log_CT(core, "Auto Link Deactivated", al->name, al->identity, conn);
@@ -386,6 +405,49 @@ qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t *core,
}
+void qdr_route_auto_link_detached_CT(qdr_core_t *core, qdr_link_t *link)
+{
+ if (!link->auto_link)
+ return;
+
+ if (!link->auto_link->retry_timer)
+ link->auto_link->retry_timer = qdr_core_timer_CT(core, qdr_route_attempt_auto_link_CT, (void *)link->auto_link);
+
+ char *activation_failed = "Auto Link Activation Failed. ";
+ int error_length = 0;
+ if (link->auto_link->last_error)
+ error_length = strlen(link->auto_link->last_error);
+ int total_length = strlen(activation_failed);
+ if (error_length)
+ total_length += error_length;
+
+ char error_msg[total_length];
+ memset(error_msg, 0, error_length);
+ strcat(error_msg, activation_failed);
+ if (error_length)
+ strcat(error_msg, link->auto_link->last_error);
+
+ if (link->auto_link->retry_attempts == 0) {
+ // First retry in 2 seconds
+ qdr_core_timer_schedule_CT(core, link->auto_link->retry_timer, AUTO_LINK_FIRST_RETRY_INTERVAL);
+ link->auto_link->retry_attempts += 1;
+ }
+ else {
+ // Successive retries every 5 seconds
+ qdr_core_timer_schedule_CT(core, link->auto_link->retry_timer, AUTO_LINK_RETRY_INTERVAL);
+ }
+
+ qdr_route_log_CT(core, error_msg, link->auto_link->name, link->auto_link->identity, link->conn);
+}
+
+
+void qdr_route_auto_link_closed_CT(qdr_core_t *core, qdr_link_t *link)
+{
+ if (link->auto_link && link->auto_link->retry_timer)
+ qdr_core_timer_cancel_CT(core, link->auto_link->retry_timer);
+}
+
+
void qdr_route_del_link_route_CT(qdr_core_t *core, qdr_link_route_t *lr)
{
//
@@ -469,6 +531,7 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core,
if (container_field || connection_field) {
al->conn_id = qdr_route_declare_id_CT(core, qd_parse_raw(container_field), qd_parse_raw(connection_field));
DEQ_INSERT_TAIL_N(REF, al->conn_id->auto_link_refs, al);
+
qdr_connection_ref_t * cref = DEQ_HEAD(al->conn_id->connection_refs);
while (cref) {
qdr_auto_link_activate_CT(core, al, cref->conn);
@@ -516,6 +579,7 @@ void qdr_route_del_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *al)
DEQ_REMOVE(core->auto_links, al);
free(al->name);
free(al->external_addr);
+ qdr_core_timer_free_CT(core, al->retry_timer);
free_qdr_auto_link_t(al);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/src/router_core/route_control.h
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.h b/src/router_core/route_control.h
index 291766c..a927948 100644
--- a/src/router_core/route_control.h
+++ b/src/router_core/route_control.h
@@ -55,4 +55,21 @@ void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn);
void qdr_link_route_map_pattern_CT(qdr_core_t *core, qd_iterator_t *address, qdr_address_t *addr);
void qdr_link_route_unmap_pattern_CT(qdr_core_t *core, qd_iterator_t *address);
+/**
+ * Actions to be performed when an auto link detaches.
+ * Retries to establishe an auto link that is associated with the passed in link.
+ * Uses the core thread timer API to schedule an auto link retry.
+ *
+ * @param core Pointer to the core object returned by qd_core()
+ * @param link qdr_link_t reference. The attach on this link for an auto link was rejected.
+ */
+void qdr_route_auto_link_detached_CT(qdr_core_t *core, qdr_link_t *link);
+
+/**
+ * Performs actions that need to be taken when an auto link is closed.
+ * For example, if a timer was setup to reconnect the autolink, it needs to be canceled.
+ * @param link qdr_link_t reference.
+ */
+void qdr_route_auto_link_closed_CT(qdr_core_t *core, qdr_link_t *link);
+
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 28f6b76..e9d7af4 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -603,6 +603,21 @@ ALLOC_DECLARE(qdr_link_route_t);
DEQ_DECLARE(qdr_link_route_t, qdr_link_route_list_t);
void qdr_core_delete_link_route(qdr_core_t *core, qdr_link_route_t *lr);
+// Core timer related field/data structures
+typedef void (*qdr_timer_cb_t)(qdr_core_t *core, void* context);
+
+typedef struct qdr_core_timer_t {
+ DEQ_LINKS(struct qdr_core_timer_t);
+ qdr_timer_cb_t handler;
+ void *context;
+ uint32_t delta_time_seconds;
+ bool scheduled;
+} qdr_core_timer_t;
+
+ALLOC_DECLARE(qdr_core_timer_t);
+DEQ_DECLARE(qdr_core_timer_t, qdr_core_timer_list_t);
+
+
typedef enum {
QDR_AUTO_LINK_STATE_INACTIVE,
QDR_AUTO_LINK_STATE_ATTACHING,
@@ -621,10 +636,12 @@ struct qdr_auto_link_t {
char *external_addr;
const char *internal_addr;
int phase;
+ int retry_attempts;
qd_direction_t dir;
qdr_conn_identifier_t *conn_id;
qdr_link_t *link;
qdr_auto_link_state_t state;
+ qdr_core_timer_t *retry_timer; // If the auto link attach fails or gets disconnected, this timer retries the attach.
char *last_error;
};
@@ -643,20 +660,6 @@ struct qdr_conn_identifier_t {
ALLOC_DECLARE(qdr_conn_identifier_t);
DEQ_DECLARE(qdr_exchange_t, qdr_exchange_list_t);
-// Core timer related field/data structures
-typedef void (*qdr_timer_cb_t)(qdr_core_t *core, void* context);
-
-typedef struct qdr_core_timer_t {
- DEQ_LINKS(struct qdr_core_timer_t);
- qdr_timer_cb_t handler;
- void *context;
- uint32_t delta_time_seconds;
- bool scheduled;
-} qdr_core_timer_t;
-
-ALLOC_DECLARE(qdr_core_timer_t);
-DEQ_DECLARE(qdr_core_timer_t, qdr_core_timer_list_t);
-
struct qdr_core_t {
qd_dispatch_t *qd;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/tests/system_tests_autolinks.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_autolinks.py b/tests/system_tests_autolinks.py
index 2a7244f..0dd9c10 100644
--- a/tests/system_tests_autolinks.py
+++ b/tests/system_tests_autolinks.py
@@ -25,6 +25,7 @@ from __future__ import print_function
import unittest2 as unittest
import json
+from threading import Timer
from proton import Message
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process
from proton.handlers import MessagingHandler
@@ -35,6 +36,216 @@ from qpid_dispatch.management.client import Node
CONNECTION_PROPERTIES = {u'connection': u'properties', u'int_property': 6451}
+class AutoLinkDetachAfterAttachTest(MessagingHandler):
+ def __init__(self, address, node_addr):
+ super(AutoLinkDetachAfterAttachTest, self).__init__(prefetch=0)
+ self.timer = None
+ self.error = None
+ self.conn = None
+ self.address = address
+ self.n_rx_attach = 0
+ self.n_tx_attach = 0
+ self.node_addr = node_addr
+ self.sender = None
+ self.receiver = None
+
+ def timeout(self):
+ self.error = "Timeout Expired: n_rx_attach=%d n_tx_attach=%d" % (self.n_rx_attach, self.n_tx_attach)
+ self.conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+ self.conn = event.container.connect(self.address)
+
+ def on_link_opened(self, event):
+ if event.sender:
+ self.sender = event.sender
+ self.n_tx_attach += 1
+ if event.sender.remote_source.address != self.node_addr:
+ self.error = "Expected sender address '%s', got '%s'" % (self.node_addr, event.sender.remote_source.address)
+ self.timer.cancel()
+ self.conn.close()
+ elif event.receiver:
+ self.receiver = event.receiver
+ self.n_rx_attach += 1
+ if event.receiver.remote_target.address != self.node_addr:
+ self.error = "Expected receiver address '%s', got '%s'" % (self.node_addr, event.receiver.remote_target.address)
+ self.timer.cancel()
+ self.conn.close()
+
+ if self.n_tx_attach == 1 and self.n_rx_attach == 1:
+ # we have received 2 attaches from the router on the
+ # autolink address. Now close the sender and the receiver
+ # The router will retry establishing the autolinks.
+ self.sender.close()
+ self.receiver.close()
+
+ # The router will retry the auto link and the n_tx_attach and
+ # n_rx_attach will be 2
+ if self.n_tx_attach == 2 and self.n_rx_attach == 2:
+ # This if statement will fail if you comment out the call to
+ # qdr_route_auto_link_detached_CT(core, link) in
+ # qdr_link_inbound_detach_CT() (connections.c)
+ self.conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ Container(self).run()
+
+
+class DetachAfterAttachTest(TestCase):
+ @classmethod
+ def setUpClass(cls):
+ super(DetachAfterAttachTest, cls).setUpClass()
+ name = "test-router"
+
+ config = Qdrouterd.Config([
+
+ ('router', {'mode': 'standalone', 'id': 'A'}),
+ ('listener', {'host': '127.0.0.1', 'role': 'normal',
+ 'port': cls.tester.get_port()}),
+
+ ('listener', {'role': 'route-container', 'name': 'myListener',
+ 'port': cls.tester.get_port()}),
+
+ ('autoLink', {'addr': 'myListener.1', 'connection': 'myListener',
+ 'direction': 'in'}),
+ ('autoLink', {'addr': 'myListener.1', 'connection': 'myListener',
+ 'direction': 'out'}),
+ ])
+
+ cls.router = cls.tester.qdrouterd(name, config)
+ cls.router.wait_ready()
+ cls.route_address = cls.router.addresses[1]
+
+ def test_auto_link_attach_detach_reattch(self):
+ test = AutoLinkDetachAfterAttachTest(self.route_address, 'myListener.1')
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+class AutoLinkRetryTest(TestCase):
+ inter_router_port = None
+
+ @classmethod
+ def router(cls, name, config):
+ config = Qdrouterd.Config(config)
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+ @classmethod
+ def setUpClass(cls):
+ super(AutoLinkRetryTest, cls).setUpClass()
+ cls.routers = []
+
+ cls.inter_router_port = cls.tester.get_port()
+
+ cls.router('B',
+ [
+ ('router', {'mode': 'standalone', 'id': 'B'}),
+ ('listener', {'role': 'normal',
+ 'port': cls.tester.get_port()}),
+ ('listener', {'host': '127.0.0.1',
+ 'role': 'normal',
+ 'port': cls.inter_router_port}),
+ # Note here that the distribution of the address
+ # 'examples' is set to 'unavailable'
+ # This will ensure that any attach coming in for
+ # this address will be rejected.
+ ('address',
+ {'prefix': 'examples',
+ 'name': 'unavailable-address',
+ 'distribution': 'unavailable'}),
+ ])
+
+ cls.router('A', [
+ ('router', {'mode': 'standalone', 'id': 'A'}),
+ ('listener', {'host': '127.0.0.1', 'role': 'normal',
+ 'port': cls.tester.get_port()}),
+
+ ('connector', {'host': '127.0.0.1', 'name': 'connectorToB',
+ 'role': 'route-container',
+ 'port': cls.inter_router_port}),
+
+ ('autoLink', {'connection': 'connectorToB',
+ 'addr': 'examples', 'direction': 'in'}),
+ ('autoLink', {'connection': 'connectorToB',
+ 'addr': 'examples', 'direction': 'out'}),
+ ])
+
+ def __init__(self, test_method):
+ TestCase.__init__(self, test_method)
+ self.success = False
+ self.timer_delay = 6
+ self.max_attempts = 2
+ self.attempts = 0
+
+ def address(self):
+ return self.routers[1].addresses[0]
+
+ def check_auto_link(self):
+ long_type = 'org.apache.qpid.dispatch.router.config.autoLink'
+ query_command = 'QUERY --type=' + long_type
+ output = json.loads(self.run_qdmanage(query_command))
+
+ if output[0].get('operStatus') == "active":
+ self.success = True
+ else:
+ self.schedule_auto_link_reconnect_test()
+
+ self.attempts += 1
+
+ def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
+ p = self.popen(
+ ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
+ stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
+ universal_newlines=True)
+ out = p.communicate(input)[0]
+ try:
+ p.teardown()
+ except Exception as e:
+ raise Exception("%s\n%s" % (e, out))
+ return out
+
+ def can_terminate(self):
+ if self.attempts == self.max_attempts:
+ return True
+
+ if self.success:
+ return True
+
+ return False
+
+ def schedule_auto_link_reconnect_test(self):
+ if self.attempts < self.max_attempts:
+ if not self.success:
+ Timer(self.timer_delay, self.check_auto_link).start()
+
+ def test_auto_link_reattch(self):
+ long_type = 'org.apache.qpid.dispatch.router.config.autoLink'
+ query_command = 'QUERY --type=' + long_type
+ output = json.loads(self.run_qdmanage(query_command))
+
+ # Since the distribution of the autoLinked address 'examples'
+ # is set to unavailable, the link route will initially be in the
+ # failed state
+ self.assertEqual(output[0]['operStatus'], 'failed')
+ self.assertEqual(output[0]['lastError'], 'Node not found')
+
+ # Now, we delete the address 'examples' (it becomes available)
+ # The Router A must now
+ # re-attempt to establish the autoLink and once the autoLink
+ # is up, it should return to the 'active' state.
+ delete_command = 'DELETE --type=address --name=unavailable-address'
+ self.run_qdmanage(delete_command, address=self.routers[0].addresses[0])
+
+ self.schedule_auto_link_reconnect_test()
+
+ while not self.can_terminate():
+ pass
+
+ self.assertTrue(self.success)
+
+
class WaypointReceiverPhaseTest(TestCase):
inter_router_port = None
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org