You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/09/02 21:16:38 UTC

[1/2] qpid-dispatch git commit: DISPATCH-496 - Issue credit to blocked senders when an autolink attaches the first consumer.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master ddc85fab8 -> bba79f3c6


DISPATCH-496 - Issue credit to blocked senders when an autolink attaches the first consumer.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/41b74076
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/41b74076
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/41b74076

Branch: refs/heads/master
Commit: 41b740761adf7fb24dd0de8a5aa0bef604240d5c
Parents: ddc85fa
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Sep 2 16:24:37 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Sep 2 16:24:37 2016 -0400

----------------------------------------------------------------------
 src/router_core/connections.c | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41b74076/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 229571e..634a07f 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1224,6 +1224,7 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
                         const char *key = (const char*) qd_hash_key_by_handle(link->auto_link->addr->hash_handle);
                         if (key && *key == 'M')
                             qdr_post_mobile_added_CT(core, key);
+                        qdr_addr_start_inlinks_CT(core, link->auto_link->addr);
                     }
                 }
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-dispatch git commit: DISPATCH-495 - Improved stability of autolinks and added a set of autolink tests.

Posted by tr...@apache.org.
DISPATCH-495 - Improved stability of autolinks and added a set of autolink tests.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/bba79f3c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/bba79f3c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/bba79f3c

Branch: refs/heads/master
Commit: bba79f3c68d771624a9b1b54d36ae96f368205b4
Parents: 41b7407
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Sep 2 17:15:48 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Sep 2 17:15:48 2016 -0400

----------------------------------------------------------------------
 src/container.c                 |   6 +-
 src/server.c                    |   2 +-
 tests/CMakeLists.txt            |   1 +
 tests/system_tests_autolinks.py | 196 +++++++++++++++++++++++++++++++++++
 4 files changed, 202 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bba79f3c/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 1c9fec8..92c0e0e 100644
--- a/src/container.c
+++ b/src/container.c
@@ -309,13 +309,14 @@ static int close_handler(qd_container_t *container, void* conn_context, pn_conne
     pn_link_t *pn_link = pn_link_head(conn, 0);
     while (pn_link) {
         qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link);
+        pn_link_t *next = pn_link_next(pn_link, 0);
         if (link) {
             qd_node_t *node = link->node;
             if (node) {
                 node->ntype->link_detach_handler(node->context, link, QD_LOST);
             }
         }
-        pn_link = pn_link_next(pn_link, 0);
+        pn_link = next;
     }
 
     // close the connection
@@ -779,9 +780,10 @@ qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, c
     link->close_sess_with_link = true;
 
     //
-    // Keep the borrowed link reference
+    // Keep the borrowed references
     //
     pn_incref(link->pn_link);
+    pn_incref(link->pn_sess);
 
     pn_link_set_context(link->pn_link, link);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bba79f3c/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index e74b879..aa08ae6 100644
--- a/src/server.c
+++ b/src/server.c
@@ -798,7 +798,7 @@ static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
 
                     if (ctx->connector) {
                         ce = QD_CONN_EVENT_CONNECTOR_OPEN;
-                        ctx->connector->delay = 0;
+                        ctx->connector->delay = 2000;  // Delay on re-connect in case there is a recurring error
                     } else
                         assert(ctx->listener);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bba79f3c/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 1c1be1c..9d133a7 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -72,6 +72,7 @@ add_test(router_policy_test    ${TEST_WRAP} -m unittest -v router_policy_test)
 foreach(py_test_module
 #   system_tests_broker
     system_tests_link_routes
+    system_tests_autolinks
     system_tests_drain
     system_tests_management
     system_tests_one_router

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bba79f3c/tests/system_tests_autolinks.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_autolinks.py b/tests/system_tests_autolinks.py
new file mode 100644
index 0000000..cc79c64
--- /dev/null
+++ b/tests/system_tests_autolinks.py
@@ -0,0 +1,196 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+from proton import Message, Delivery, PENDING, ACCEPTED, REJECTED
+from system_test import TestCase, Qdrouterd, main_module
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, AtMostOnce, AtLeastOnce
+from proton.utils import BlockingConnection, SyncRequestResponse
+from qpid_dispatch.management.client import Node
+
+CONNECTION_PROPERTIES = {u'connection': u'properties', u'int_property': 6451}
+
+class AutolinkTest(TestCase):
+    """System tests involving a single router"""
+    @classmethod
+    def setUpClass(cls):
+        """Start a router and a messenger"""
+        super(AutolinkTest, cls).setUpClass()
+        name = "test-router"
+        config = Qdrouterd.Config([
+            ('router', {'mode': 'standalone', 'id': 'QDR'}),
+
+            #
+            # Create a general-purpose listener for sending and receiving deliveries
+            #
+            ('listener', {'port': cls.tester.get_port()}),
+
+            #
+            # Create a route-container listener for the autolinks
+            #
+            ('listener', {'port': cls.tester.get_port(), 'role': 'route-container'}),
+
+            #
+            # Create a pair of default auto-links for 'node.1'
+            #
+            ('autoLink', {'addr': 'node.1', 'containerId': 'container.1', 'dir': 'in'}),
+            ('autoLink', {'addr': 'node.1', 'containerId': 'container.1', 'dir': 'out'}),
+            ('address',  {'prefix': 'node', 'waypoint': 'yes'}),
+
+            #
+            # Create a pair of auto-links on non-default phases for container-to-container transfers
+            #
+            ('autoLink', {'addr': 'xfer.2', 'containerId': 'container.2', 'dir': 'in',  'phase': '4'}),
+            ('autoLink', {'addr': 'xfer.2', 'containerId': 'container.3', 'dir': 'out', 'phase': '4'}),
+        ])
+
+        cls.router = cls.tester.qdrouterd(name, config)
+        cls.router.wait_ready()
+        cls.normal_address = cls.router.addresses[0]
+        cls.route_address  = cls.router.addresses[1]
+
+
+    def test_01_autolink_attach(self):
+        """
+        Create the route-container connection and verify that the appropriate links are attached.
+        Disconnect, reconnect, and verify that the links are re-attached.
+        """
+        test = AutolinkAttachTest(self.route_address)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_02_autolink_credit(self):
+        """
+        Create a normal connection and a sender to the autolink address.  Then create the route-container
+        connection and ensure that the on_sendable did not arrive until after the autolinks were created.
+        """
+        test = AutolinkCreditTest(self.normal_address, self.route_address)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+class AutolinkAttachTest(MessagingHandler):
+    def __init__(self, address):
+        super(AutolinkAttachTest, self).__init__(prefetch=0)
+        self.address  = address
+        self.error    = None
+        self.sender   = None
+        self.receiver = None
+
+        self.n_rx_attach = 0
+        self.n_tx_attach = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired: n_rx_attach=%d n_tx_attach=%d" % (self.n_rx_attach, self.n_tx_attach)
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(5, Timeout(self))
+        self.conn  = event.container.connect(self.address)
+
+    def on_connection_closed(self, event):
+        if self.n_tx_attach == 1:
+            self.conn  = event.container.connect(self.address)
+
+    def on_link_opened(self, event):
+        if event.sender:
+            self.n_tx_attach += 1
+            if event.sender.remote_source.address != 'node.1':
+                self.error = "Expected sender address 'node.1', got '%s'" % event.sender.remote_source.address
+                self.timer.cancel()
+                self.conn.close()
+        elif event.receiver:
+            self.n_rx_attach += 1
+            if event.receiver.remote_target.address != 'node.1':
+                self.error = "Expected receiver address 'node.1', got '%s'" % event.receiver.remote_target.address
+                self.timer.cancel()
+                self.conn.close()
+        if self.n_tx_attach == 1 and self.n_rx_attach == 1:
+            self.conn.close()
+        if self.n_tx_attach == 2 and self.n_rx_attach == 2:
+            self.conn.close()
+            self.timer.cancel()
+
+    def run(self):
+        container = Container(self)
+        container.container_id = 'container.1'
+        container.run()
+
+
+class AutolinkCreditTest(MessagingHandler):
+    def __init__(self, normal_address, route_address):
+        super(AutolinkCreditTest, self).__init__(prefetch=0)
+        self.normal_address = normal_address
+        self.route_address  = route_address
+        self.dest           = 'node.1'
+        self.normal_conn    = None
+        self.route_conn     = None
+        self.error          = None
+        self.last_action    = "None"
+
+    def timeout(self):
+        self.error = "Timeout Expired: last_action=%s" % self.last_action
+        if self.normal_conn:
+            self.normal_conn.close()
+        if self.route_conn:
+            self.route_conn.close()
+
+    def on_start(self, event):
+        self.timer       = event.reactor.schedule(5, Timeout(self))
+        self.normal_conn = event.container.connect(self.normal_address)
+        self.sender      = event.container.create_sender(self.normal_conn, self.dest)
+        self.last_action = "Attached normal sender"
+
+    def on_link_opening(self, event):
+        if event.sender:
+            event.sender.source.address = event.sender.remote_source.address
+        if event.receiver:
+            event.receiver.target.address = event.receiver.remote_target.address
+
+    def on_link_opened(self, event):
+        if event.sender == self.sender:
+            self.route_conn = event.container.connect(self.route_address)
+            self.last_action = "Opened route connection"
+
+    def on_sendable(self, event):
+        if event.sender == self.sender:
+            if self.last_action != "Opened route connection":
+                self.error = "Events out of sequence:  last_action=%s" % self.last_action
+            self.timer.cancel()
+            self.route_conn.close()
+            self.normal_conn.close()
+
+    def run(self):
+        container = Container(self)
+        container.container_id = 'container.1'
+        container.run()
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org