You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/08/24 19:41:32 UTC

qpid-dispatch git commit: DISPATCH-1103 - Added code to retry failed auto links using the core timeer API. This closes #366.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master b652fd408 -> c65fbc9d8


DISPATCH-1103 - Added code to retry failed auto links using the core timeer API. This closes #366.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/c65fbc9d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/c65fbc9d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/c65fbc9d

Branch: refs/heads/master
Commit: c65fbc9d82495348b5d5d0464852655847ebb5db
Parents: b652fd4
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Tue Aug 14 11:21:30 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Fri Aug 24 15:39:32 2018 -0400

----------------------------------------------------------------------
 src/router_core/connections.c         |   8 ++
 src/router_core/route_control.c       |  64 +++++++++
 src/router_core/route_control.h       |  17 +++
 src/router_core/router_core_private.h |  31 +++--
 tests/system_tests_autolinks.py       | 211 +++++++++++++++++++++++++++++
 5 files changed, 317 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 482f600..30166b1 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1332,6 +1332,8 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
     while (link_ref) {
         qdr_link_t *link = link_ref->link;
 
+        qdr_route_auto_link_closed_CT(core, link);
+
         //
         // Clean up the link and all its associated state.
         //
@@ -1842,6 +1844,12 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
         link->auto_link->state = QDR_AUTO_LINK_STATE_FAILED;
         free(link->auto_link->last_error);
         link->auto_link->last_error = qdr_error_description(error);
+
+        //
+        // The auto link has failed. Periodically retry setting up the auto link until
+        // it succeeds.
+        //
+        qdr_route_auto_link_detached_CT(core, link);
     }
 
     if (link->link_direction == QD_INCOMING) {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/src/router_core/route_control.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index 9c08d91..6b5e62e 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -34,6 +34,9 @@ ALLOC_DEFINE(qdr_conn_identifier_t);
 const char CONTAINER_PREFIX = 'C';
 const char CONNECTION_PREFIX = 'L';
 
+const int AUTO_LINK_FIRST_RETRY_INTERVAL = 2;
+const int AUTO_LINK_RETRY_INTERVAL = 5;
+
 
 static qdr_conn_identifier_t *qdr_route_declare_id_CT(qdr_core_t    *core,
                                                       qd_iterator_t *container,
@@ -263,6 +266,22 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr
 }
 
 
+/**
+ * Attempts re-establishing auto links across the related connections/containers
+ */
+static void qdr_route_attempt_auto_link_CT(qdr_core_t      *core,
+                                    void *context)
+{
+    qdr_auto_link_t *al = (qdr_auto_link_t *)context;
+    qdr_connection_ref_t * cref = DEQ_HEAD(al->conn_id->connection_refs);
+    while (cref) {
+        qdr_auto_link_activate_CT(core, al, cref->conn);
+        cref = DEQ_NEXT(cref);
+    }
+
+}
+
+
 static void qdr_auto_link_deactivate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr_connection_t *conn)
 {
     qdr_route_log_CT(core, "Auto Link Deactivated", al->name, al->identity, conn);
@@ -386,6 +405,49 @@ qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t             *core,
 }
 
 
+void qdr_route_auto_link_detached_CT(qdr_core_t *core, qdr_link_t *link)
+{
+    if (!link->auto_link)
+        return;
+
+    if (!link->auto_link->retry_timer)
+        link->auto_link->retry_timer = qdr_core_timer_CT(core, qdr_route_attempt_auto_link_CT, (void *)link->auto_link);
+
+    char *activation_failed = "Auto Link Activation Failed. ";
+    int error_length = 0;
+    if (link->auto_link->last_error)
+        error_length = strlen(link->auto_link->last_error);
+    int total_length = strlen(activation_failed);
+    if (error_length)
+        total_length += error_length;
+
+    char error_msg[total_length];
+    memset(error_msg, 0, error_length);
+    strcat(error_msg, activation_failed);
+    if (error_length)
+        strcat(error_msg, link->auto_link->last_error);
+
+    if (link->auto_link->retry_attempts == 0) {
+        // First retry in 2 seconds
+        qdr_core_timer_schedule_CT(core, link->auto_link->retry_timer, AUTO_LINK_FIRST_RETRY_INTERVAL);
+        link->auto_link->retry_attempts += 1;
+    }
+    else {
+        // Successive retries every 5 seconds
+        qdr_core_timer_schedule_CT(core, link->auto_link->retry_timer, AUTO_LINK_RETRY_INTERVAL);
+    }
+
+    qdr_route_log_CT(core, error_msg, link->auto_link->name, link->auto_link->identity, link->conn);
+}
+
+
+void qdr_route_auto_link_closed_CT(qdr_core_t *core, qdr_link_t *link)
+{
+    if (link->auto_link && link->auto_link->retry_timer)
+        qdr_core_timer_cancel_CT(core, link->auto_link->retry_timer);
+}
+
+
 void qdr_route_del_link_route_CT(qdr_core_t *core, qdr_link_route_t *lr)
 {
     //
@@ -469,6 +531,7 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t          *core,
     if (container_field || connection_field) {
         al->conn_id = qdr_route_declare_id_CT(core, qd_parse_raw(container_field), qd_parse_raw(connection_field));
         DEQ_INSERT_TAIL_N(REF, al->conn_id->auto_link_refs, al);
+
         qdr_connection_ref_t * cref = DEQ_HEAD(al->conn_id->connection_refs);
         while (cref) {
             qdr_auto_link_activate_CT(core, al, cref->conn);
@@ -516,6 +579,7 @@ void qdr_route_del_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *al)
     DEQ_REMOVE(core->auto_links, al);
     free(al->name);
     free(al->external_addr);
+    qdr_core_timer_free_CT(core, al->retry_timer);
     free_qdr_auto_link_t(al);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/src/router_core/route_control.h
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.h b/src/router_core/route_control.h
index 291766c..a927948 100644
--- a/src/router_core/route_control.h
+++ b/src/router_core/route_control.h
@@ -55,4 +55,21 @@ void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn);
 void qdr_link_route_map_pattern_CT(qdr_core_t *core, qd_iterator_t *address, qdr_address_t *addr);
 void qdr_link_route_unmap_pattern_CT(qdr_core_t *core, qd_iterator_t *address);
 
+/**
+ * Actions to be performed when an auto link detaches.
+ * Retries to establishe an auto link that is associated with the passed in link.
+ * Uses the core thread timer API to schedule an auto link retry.
+ *
+ * @param core Pointer to the core object returned by qd_core()
+ * @param link qdr_link_t reference. The attach on this link for an auto link was rejected.
+ */
+void qdr_route_auto_link_detached_CT(qdr_core_t *core, qdr_link_t *link);
+
+/**
+ * Performs actions that need to be taken when an auto link is closed.
+ * For example, if a timer was setup to reconnect the autolink, it needs to be canceled.
+ * @param link qdr_link_t reference.
+ */
+void qdr_route_auto_link_closed_CT(qdr_core_t *core, qdr_link_t *link);
+
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 28f6b76..e9d7af4 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -603,6 +603,21 @@ ALLOC_DECLARE(qdr_link_route_t);
 DEQ_DECLARE(qdr_link_route_t, qdr_link_route_list_t);
 void qdr_core_delete_link_route(qdr_core_t *core, qdr_link_route_t *lr);
 
+// Core timer related field/data structures
+typedef void (*qdr_timer_cb_t)(qdr_core_t *core, void* context);
+
+typedef struct qdr_core_timer_t {
+    DEQ_LINKS(struct qdr_core_timer_t);
+    qdr_timer_cb_t  handler;
+    void           *context;
+    uint32_t        delta_time_seconds;
+    bool            scheduled;
+} qdr_core_timer_t;
+
+ALLOC_DECLARE(qdr_core_timer_t);
+DEQ_DECLARE(qdr_core_timer_t, qdr_core_timer_list_t);
+
+
 typedef enum {
     QDR_AUTO_LINK_STATE_INACTIVE,
     QDR_AUTO_LINK_STATE_ATTACHING,
@@ -621,10 +636,12 @@ struct qdr_auto_link_t {
     char                  *external_addr;
     const char            *internal_addr;
     int                    phase;
+    int                    retry_attempts;
     qd_direction_t         dir;
     qdr_conn_identifier_t *conn_id;
     qdr_link_t            *link;
     qdr_auto_link_state_t  state;
+    qdr_core_timer_t      *retry_timer; // If the auto link attach fails or gets disconnected, this timer retries the attach.
     char                  *last_error;
 };
 
@@ -643,20 +660,6 @@ struct qdr_conn_identifier_t {
 ALLOC_DECLARE(qdr_conn_identifier_t);
 DEQ_DECLARE(qdr_exchange_t, qdr_exchange_list_t);
 
-// Core timer related field/data structures
-typedef void (*qdr_timer_cb_t)(qdr_core_t *core, void* context);
-
-typedef struct qdr_core_timer_t {
-    DEQ_LINKS(struct qdr_core_timer_t);
-    qdr_timer_cb_t  handler;
-    void           *context;
-    uint32_t        delta_time_seconds;
-    bool            scheduled;
-} qdr_core_timer_t;
-
-ALLOC_DECLARE(qdr_core_timer_t);
-DEQ_DECLARE(qdr_core_timer_t, qdr_core_timer_list_t);
-
 
 struct qdr_core_t {
     qd_dispatch_t     *qd;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/tests/system_tests_autolinks.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_autolinks.py b/tests/system_tests_autolinks.py
index 2a7244f..0dd9c10 100644
--- a/tests/system_tests_autolinks.py
+++ b/tests/system_tests_autolinks.py
@@ -25,6 +25,7 @@ from __future__ import print_function
 
 import unittest2 as unittest
 import json
+from threading import Timer
 from proton import Message
 from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process
 from proton.handlers import MessagingHandler
@@ -35,6 +36,216 @@ from qpid_dispatch.management.client import Node
 CONNECTION_PROPERTIES = {u'connection': u'properties', u'int_property': 6451}
 
 
+class AutoLinkDetachAfterAttachTest(MessagingHandler):
+    def __init__(self, address, node_addr):
+        super(AutoLinkDetachAfterAttachTest, self).__init__(prefetch=0)
+        self.timer = None
+        self.error = None
+        self.conn = None
+        self.address = address
+        self.n_rx_attach = 0
+        self.n_tx_attach = 0
+        self.node_addr = node_addr
+        self.sender = None
+        self.receiver = None
+
+    def timeout(self):
+        self.error = "Timeout Expired: n_rx_attach=%d n_tx_attach=%d" % (self.n_rx_attach, self.n_tx_attach)
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn = event.container.connect(self.address)
+
+    def on_link_opened(self, event):
+        if event.sender:
+            self.sender = event.sender
+            self.n_tx_attach += 1
+            if event.sender.remote_source.address != self.node_addr:
+                self.error = "Expected sender address '%s', got '%s'" % (self.node_addr, event.sender.remote_source.address)
+                self.timer.cancel()
+                self.conn.close()
+        elif event.receiver:
+            self.receiver = event.receiver
+            self.n_rx_attach += 1
+            if event.receiver.remote_target.address != self.node_addr:
+                self.error = "Expected receiver address '%s', got '%s'" % (self.node_addr, event.receiver.remote_target.address)
+                self.timer.cancel()
+                self.conn.close()
+
+        if self.n_tx_attach == 1 and self.n_rx_attach == 1:
+            # we have received 2 attaches from the router on the
+            # autolink address. Now close the sender and the receiver
+            # The router will retry establishing the autolinks.
+            self.sender.close()
+            self.receiver.close()
+
+        # The router will retry the auto link and the n_tx_attach and
+        # n_rx_attach will be 2
+        if self.n_tx_attach == 2 and self.n_rx_attach == 2:
+            # This if statement will fail if you comment out the call to
+            # qdr_route_auto_link_detached_CT(core, link) in
+            # qdr_link_inbound_detach_CT() (connections.c)
+            self.conn.close()
+            self.timer.cancel()
+
+    def run(self):
+        Container(self).run()
+
+
+class DetachAfterAttachTest(TestCase):
+    @classmethod
+    def setUpClass(cls):
+        super(DetachAfterAttachTest, cls).setUpClass()
+        name = "test-router"
+
+        config = Qdrouterd.Config([
+
+            ('router', {'mode': 'standalone', 'id': 'A'}),
+            ('listener', {'host': '127.0.0.1', 'role': 'normal',
+                          'port': cls.tester.get_port()}),
+
+            ('listener', {'role': 'route-container', 'name': 'myListener',
+                          'port': cls.tester.get_port()}),
+
+            ('autoLink', {'addr': 'myListener.1', 'connection': 'myListener',
+                          'direction': 'in'}),
+            ('autoLink', {'addr': 'myListener.1', 'connection': 'myListener',
+                          'direction': 'out'}),
+        ])
+
+        cls.router = cls.tester.qdrouterd(name, config)
+        cls.router.wait_ready()
+        cls.route_address = cls.router.addresses[1]
+
+    def test_auto_link_attach_detach_reattch(self):
+        test = AutoLinkDetachAfterAttachTest(self.route_address, 'myListener.1')
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+class AutoLinkRetryTest(TestCase):
+    inter_router_port = None
+
+    @classmethod
+    def router(cls, name, config):
+        config = Qdrouterd.Config(config)
+        cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+    @classmethod
+    def setUpClass(cls):
+        super(AutoLinkRetryTest, cls).setUpClass()
+        cls.routers = []
+
+        cls.inter_router_port = cls.tester.get_port()
+
+        cls.router('B',
+                    [
+                        ('router', {'mode': 'standalone', 'id': 'B'}),
+                        ('listener', {'role': 'normal',
+                                      'port': cls.tester.get_port()}),
+                        ('listener', {'host': '127.0.0.1',
+                                      'role': 'normal',
+                                      'port': cls.inter_router_port}),
+                        # Note here that the distribution of the address
+                        # 'examples' is set to 'unavailable'
+                        # This will ensure that any attach coming in for
+                        # this address will be rejected.
+                        ('address',
+                         {'prefix': 'examples',
+                          'name': 'unavailable-address',
+                          'distribution': 'unavailable'}),
+                    ])
+
+        cls.router('A', [
+                        ('router', {'mode': 'standalone', 'id': 'A'}),
+                        ('listener', {'host': '127.0.0.1', 'role': 'normal',
+                                      'port': cls.tester.get_port()}),
+
+            ('connector', {'host': '127.0.0.1', 'name': 'connectorToB',
+                           'role': 'route-container',
+                           'port': cls.inter_router_port}),
+
+                        ('autoLink', {'connection': 'connectorToB',
+                                      'addr': 'examples', 'direction': 'in'}),
+                        ('autoLink', {'connection': 'connectorToB',
+                                      'addr': 'examples', 'direction': 'out'}),
+        ])
+
+    def __init__(self, test_method):
+        TestCase.__init__(self, test_method)
+        self.success = False
+        self.timer_delay = 6
+        self.max_attempts = 2
+        self.attempts = 0
+
+    def address(self):
+        return self.routers[1].addresses[0]
+
+    def check_auto_link(self):
+        long_type = 'org.apache.qpid.dispatch.router.config.autoLink'
+        query_command = 'QUERY --type=' + long_type
+        output = json.loads(self.run_qdmanage(query_command))
+
+        if output[0].get('operStatus') == "active":
+            self.success = True
+        else:
+            self.schedule_auto_link_reconnect_test()
+
+        self.attempts += 1
+
+    def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
+        p = self.popen(
+            ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
+            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
+            universal_newlines=True)
+        out = p.communicate(input)[0]
+        try:
+            p.teardown()
+        except Exception as e:
+            raise Exception("%s\n%s" % (e, out))
+        return out
+
+    def can_terminate(self):
+        if self.attempts == self.max_attempts:
+            return True
+
+        if self.success:
+            return True
+
+        return False
+
+    def schedule_auto_link_reconnect_test(self):
+        if self.attempts < self.max_attempts:
+            if not self.success:
+                Timer(self.timer_delay, self.check_auto_link).start()
+
+    def test_auto_link_reattch(self):
+        long_type = 'org.apache.qpid.dispatch.router.config.autoLink'
+        query_command = 'QUERY --type=' + long_type
+        output = json.loads(self.run_qdmanage(query_command))
+
+        # Since the distribution of the autoLinked address 'examples'
+        # is set to unavailable, the link route will initially be in the
+        # failed state
+        self.assertEqual(output[0]['operStatus'], 'failed')
+        self.assertEqual(output[0]['lastError'], 'Node not found')
+
+        # Now, we delete the address 'examples' (it becomes available)
+        # The Router A must now
+        # re-attempt to establish the autoLink and once the  autoLink
+        # is up, it should return to the 'active' state.
+        delete_command = 'DELETE --type=address --name=unavailable-address'
+        self.run_qdmanage(delete_command,  address=self.routers[0].addresses[0])
+
+        self.schedule_auto_link_reconnect_test()
+
+        while not self.can_terminate():
+            pass
+
+        self.assertTrue(self.success)
+
+
 class WaypointReceiverPhaseTest(TestCase):
     inter_router_port = None
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org