You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/02/16 19:44:18 UTC

qpid-dispatch git commit: DISPATCH-634 - Added support for dynamic targets in inbound links.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 6f3ef6771 -> 3c8f95c8d


DISPATCH-634 - Added support for dynamic targets in inbound links.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/3c8f95c8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/3c8f95c8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/3c8f95c8

Branch: refs/heads/master
Commit: 3c8f95c8dd21c1a9274e43d4b2c3e1bf43a62461
Parents: 6f3ef67
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Feb 16 14:33:18 2017 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Feb 16 14:33:18 2017 -0500

----------------------------------------------------------------------
 src/router_core/connections.c          |  22 ++-
 src/router_core/terminus.c             |   2 +-
 tests/CMakeLists.txt                   |   1 +
 tests/system_tests_dynamic_terminus.py | 243 ++++++++++++++++++++++++++++
 tests/system_tests_link_routes.py      |  65 +++++++-
 5 files changed, 328 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3c8f95c8/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 2f5316c..2abb7fc 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -501,6 +501,18 @@ static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t length
 
 
 /**
+ * Generate a temporary mobile address for a producer connected to this
+ * router node.
+ */
+static void qdr_generate_mobile_addr(qdr_core_t *core, char *buffer, size_t length)
+{
+    char discriminator[QDR_DISCRIMINATOR_SIZE];
+    qdr_generate_discriminator(discriminator);
+    snprintf(buffer, length, "amqp:/_$temp.%s", discriminator);
+}
+
+
+/**
  * Generate a link name
  */
 static void qdr_generate_link_name(const char *label, char *buffer, size_t length)
@@ -953,11 +965,15 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t       *core,
             // address collides with a previously generated address (this should be _highly_
             // unlikely).
             //
-            qdr_generate_temp_addr(core, temp_addr, 200);
+            if (dir == QD_OUTGOING)
+                qdr_generate_temp_addr(core, temp_addr, 200);
+            else
+                qdr_generate_mobile_addr(core, temp_addr, 200);
+
             qd_iterator_t *temp_iter = qd_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
             qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr);
             if (!addr) {
-                addr = qdr_address_CT(core, QD_TREATMENT_ANYCAST_CLOSEST);
+                addr = qdr_address_CT(core, QD_TREATMENT_ANYCAST_BALANCED);
                 qd_hash_insert(core->addr_hash, temp_iter, addr, &addr->hash_handle);
                 DEQ_INSERT_TAIL(core->addrs, addr);
                 qdr_terminus_set_address(terminus, temp_addr);
@@ -1222,7 +1238,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                 // This link has a target address
                 //
                 bool           link_route;
-                qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, target, true, false, &link_route);
+                qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, target, true, true, &link_route);
                 if (!addr) {
                     //
                     // No route to this destination, reject the link

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3c8f95c8/src/router_core/terminus.c
----------------------------------------------------------------------
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
index aae690f..5564cc4 100644
--- a/src/router_core/terminus.c
+++ b/src/router_core/terminus.c
@@ -141,7 +141,7 @@ bool qdr_terminus_has_capability(qdr_terminus_t *term, const char *capability)
 
 bool qdr_terminus_is_anonymous(qdr_terminus_t *term)
 {
-    return term == 0 || term->address == 0;
+    return term == 0 || (term->address == 0 && !term->dynamic);
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3c8f95c8/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index dc72e85..4d95dbc 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -91,6 +91,7 @@ foreach(py_test_module
     system_tests_two_routers
     system_tests_three_routers
     system_tests_multi_tenancy
+    system_tests_dynamic_terminus
     ${SYSTEM_TESTS_HTTP}
     )
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3c8f95c8/tests/system_tests_dynamic_terminus.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_dynamic_terminus.py b/tests/system_tests_dynamic_terminus.py
new file mode 100644
index 0000000..39606d3
--- /dev/null
+++ b/tests/system_tests_dynamic_terminus.py
@@ -0,0 +1,243 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest, os, json
+from subprocess import PIPE, STDOUT
+from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout
+from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties, LinkOption
+
+# PROTON-828:
+try:
+    from proton import MODIFIED
+except ImportError:
+    from proton import PN_STATUS_MODIFIED as MODIFIED
+
+    
+class RouterTest(TestCase):
+
+    inter_router_port = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(RouterTest, cls).setUpClass()
+
+        def router(name, connection):
+            
+            config = [
+                ('router', {'mode': 'interior', 'id': name}),
+                ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
+                ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+                ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+                ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+                connection
+            ]
+            
+            config = Qdrouterd.Config(config)
+
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+        
+        inter_router_port = cls.tester.get_port()
+        
+        router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port}))
+        router('B', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port, 'verifyHostName': 'no'}))
+
+        cls.routers[0].wait_router_connected('B')
+        cls.routers[1].wait_router_connected('A')
+
+
+    def test_01_dynamic_source_test(self):
+        test = DynamicSourceTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_02_dynamic_target_one_router_test(self):
+        test = DynamicTargetTest(self.routers[0].addresses[0], self.routers[0].addresses[0])
+        test.run()
+        if test.skip:
+            self.skipTest(test.skip)
+        self.assertEqual(None, test.error)
+
+    def test_03_dynamic_target_two_router_test(self):
+        test = DynamicTargetTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
+        test.run()
+        if test.skip:
+            self.skipTest(test.skip)
+        self.assertEqual(None, test.error)
+
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+class DynamicSourceTest(MessagingHandler):
+    def __init__(self, router_1_host, router_2_host):
+        super(DynamicSourceTest, self).__init__()
+        self.router_1_host = router_1_host
+        self.router_2_host = router_2_host
+
+        self.error         = None
+        self.receiver_conn = None
+        self.sender_1_conn = None
+        self.sender_2_conn = None
+        self.sender_1      = None
+        self.sender_2      = None
+        self.receiver      = None
+        self.address       = None
+
+        self.count      = 10
+        self.n_sent_1   = 0
+        self.n_sent_2   = 0
+        self.n_rcvd     = 0
+        self.n_accepted = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired: n_sent_1=%d n_sent_2=%d n_rcvd=%d n_accepted=%d" %\
+        (self.n_sent_1, self.n_sent_2, self.n_rcvd, self.n_accepted)
+        self.sender_1_conn.close()
+        self.sender_2_conn.close()
+        self.receiver_conn.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(5, Timeout(self))
+
+        self.sender_1_conn = event.container.connect(self.router_1_host)
+        self.sender_2_conn = event.container.connect(self.router_2_host)
+        self.receiver_conn = event.container.connect(self.router_1_host)
+
+        self.receiver = event.container.create_receiver(self.receiver_conn, dynamic=True)
+        self.sender_1 = event.container.create_sender(self.sender_1_conn)
+        self.sender_2 = event.container.create_sender(self.sender_2_conn)
+
+    def send(self):
+        if self.address == None:
+            return
+
+        while self.sender_1.credit > 0 and self.n_sent_1 < self.count:
+            self.n_sent_1 += 1
+            m = Message(address=self.address, body="Message %d of %d (via 1)" % (self.n_sent_1, self.count))
+            self.sender_1.send(m)
+
+        while self.sender_2.credit > 0 and self.n_sent_2 < self.count:
+            self.n_sent_2 += 1
+            m = Message(address=self.address, body="Message %d of %d (via 2)" % (self.n_sent_2, self.count))
+            self.sender_2.send(m)
+
+    def on_link_opened(self, event):
+        if event.receiver == self.receiver:
+            self.address = self.receiver.remote_source.address
+            self.send()
+
+    def on_sendable(self, event):
+        self.send()
+
+    def on_message(self, event):
+        if event.receiver == self.receiver:
+            self.n_rcvd += 1
+            
+    def on_accepted(self, event):
+        self.n_accepted += 1
+        if self.n_accepted == self.count * 2:
+            self.sender_1_conn.close()
+            self.sender_2_conn.close()
+            self.receiver_conn.close()
+            self.timer.cancel()
+
+    def run(self):
+        Container(self).run()
+
+
+class DynamicTarget(LinkOption):
+    def apply(self, link):
+        link.target.dynamic = True
+        link.target.address = None
+
+
+class DynamicTargetTest(MessagingHandler):
+    def __init__(self, sender_host, receiver_host):
+        super(DynamicTargetTest, self).__init__()
+        self.sender_host   = sender_host
+        self.receiver_host = receiver_host
+
+        self.error         = None
+        self.skip          = None
+        self.receiver_conn = None
+        self.sender_conn   = None
+        self.sender        = None
+        self.receiver      = None
+        self.address       = None
+
+        self.count      = 10
+        self.n_sent     = 0
+        self.n_rcvd     = 0
+        self.n_accepted = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_accepted=%d" %\
+        (self.n_sent, self.n_rcvd, self.n_accepted)
+        self.sender_conn.close()
+        self.receiver_conn.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(5, Timeout(self))
+
+        self.sender_conn   = event.container.connect(self.sender_host)
+        self.receiver_conn = event.container.connect(self.receiver_host)
+
+        self.sender = event.container.create_sender(self.sender_conn, options=DynamicTarget())
+
+    def send(self):
+        while self.sender.credit > 0 and self.n_sent < self.count:
+            self.n_sent += 1
+            m = Message(address=self.address, body="Message %d of %d" % (self.n_sent, self.count))
+            self.sender.send(m)
+
+    def on_link_opened(self, event):
+        if event.sender == self.sender:
+            self.address = self.sender.remote_target.address
+            self.receiver = event.container.create_receiver(self.receiver_conn, self.address)
+
+    def on_sendable(self, event):
+        self.send()
+
+    def on_message(self, event):
+        if event.receiver == self.receiver:
+            self.n_rcvd += 1
+            
+    def on_accepted(self, event):
+        self.n_accepted += 1
+        if self.n_accepted == self.count:
+            self.sender_conn.close()
+            self.receiver_conn.close()
+            self.timer.cancel()
+
+    def run(self):
+        Container(self).run()
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3c8f95c8/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 15749dc..c74a629 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -25,7 +25,7 @@ from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process
 
 from proton import Message, Endpoint
 from proton.handlers import MessagingHandler
-from proton.reactor import AtMostOnce, Container, DynamicNodeProperties
+from proton.reactor import AtMostOnce, Container, DynamicNodeProperties, LinkOption
 from proton.utils import BlockingConnection, LinkDetached
 
 from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler, DrainNoMessagesHandler, DrainNoMoreMessagesHandler
@@ -551,6 +551,11 @@ class LinkRouteTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_dynamic_target(self):
+        test = DynamicTargetTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
+        test.run()
+        self.assertEqual(None, test.error)
+
     def test_detach_without_close(self):
         test = DetachNoCloseTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
         test.run()
@@ -748,6 +753,64 @@ class DynamicSourceTest(MessagingHandler):
         Container(self).run()
 
 
+class DynamicTarget(LinkOption):
+    def apply(self, link):
+        link.target.dynamic = True
+        link.target.address = None
+
+
+class DynamicTargetTest(MessagingHandler):
+    ##
+    ## This test verifies that a dynamic source can be propagated via link-route to
+    ## a route-container.
+    ##
+    def __init__(self, normal_addr, route_addr):
+        super(DynamicTargetTest, self).__init__(prefetch=0, auto_accept=False)
+        self.normal_addr = normal_addr
+        self.route_addr  = route_addr
+        self.dest = "pulp.task.DynamicTarget"
+        self.address = "DynamicTargetAddress"
+        self.error = None
+
+    def timeout(self):
+        self.error = "Timeout Expired - Check for cores"
+        self.conn_normal.close()
+        self.conn_route.close()
+
+    def on_start(self, event):
+        self.timer      = event.reactor.schedule(5, Timeout(self))
+        self.conn_route = event.container.connect(self.route_addr)
+
+    def on_connection_opened(self, event):
+        if event.connection == self.conn_route:
+            self.conn_normal = event.container.connect(self.normal_addr)
+        elif event.connection == self.conn_normal:
+            self.sender = event.container.create_sender(self.conn_normal, None, options=\
+                                                        [DynamicTarget(), DynamicNodeProperties({"x-opt-qd.address":u"pulp.task.abc"})])
+
+    def on_link_opened(self, event):
+        if event.sender == self.sender:
+            if self.sender.remote_target.address != self.address:
+                self.error = "Expected %s, got %s" % (self.address, self.receiver.remote_source.address)
+            self.conn_normal.close()
+            self.conn_route.close()
+            self.timer.cancel()
+
+    def on_link_opening(self, event):
+        if event.receiver:
+            self.receiver = event.receiver
+            if not self.receiver.remote_target.dynamic:
+                self.error = "Expected receiver with dynamic source"
+                self.conn_normal.close()
+                self.conn_route.close()
+                self.timer.cancel()
+            self.receiver.target.address = self.address
+            self.receiver.open()
+
+    def run(self):
+        Container(self).run()
+
+
 class DetachNoCloseTest(MessagingHandler):
     ##
     ## This test verifies that link-detach (not close) is propagated properly


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org