You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/02/16 19:44:18 UTC
qpid-dispatch git commit: DISPATCH-634 - Added support for dynamic
targets in inbound links.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 6f3ef6771 -> 3c8f95c8d
DISPATCH-634 - Added support for dynamic targets in inbound links.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/3c8f95c8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/3c8f95c8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/3c8f95c8
Branch: refs/heads/master
Commit: 3c8f95c8dd21c1a9274e43d4b2c3e1bf43a62461
Parents: 6f3ef67
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Feb 16 14:33:18 2017 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Feb 16 14:33:18 2017 -0500
----------------------------------------------------------------------
src/router_core/connections.c | 22 ++-
src/router_core/terminus.c | 2 +-
tests/CMakeLists.txt | 1 +
tests/system_tests_dynamic_terminus.py | 243 ++++++++++++++++++++++++++++
tests/system_tests_link_routes.py | 65 +++++++-
5 files changed, 328 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3c8f95c8/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 2f5316c..2abb7fc 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -501,6 +501,18 @@ static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t length
/**
+ * Generate a temporary mobile address for a producer connected to this
+ * router node.
+ */
+static void qdr_generate_mobile_addr(qdr_core_t *core, char *buffer, size_t length)
+{
+ char discriminator[QDR_DISCRIMINATOR_SIZE];
+ qdr_generate_discriminator(discriminator);
+ snprintf(buffer, length, "amqp:/_$temp.%s", discriminator);
+}
+
+
+/**
* Generate a link name
*/
static void qdr_generate_link_name(const char *label, char *buffer, size_t length)
@@ -953,11 +965,15 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core,
// address collides with a previously generated address (this should be _highly_
// unlikely).
//
- qdr_generate_temp_addr(core, temp_addr, 200);
+ if (dir == QD_OUTGOING)
+ qdr_generate_temp_addr(core, temp_addr, 200);
+ else
+ qdr_generate_mobile_addr(core, temp_addr, 200);
+
qd_iterator_t *temp_iter = qd_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr);
if (!addr) {
- addr = qdr_address_CT(core, QD_TREATMENT_ANYCAST_CLOSEST);
+ addr = qdr_address_CT(core, QD_TREATMENT_ANYCAST_BALANCED);
qd_hash_insert(core->addr_hash, temp_iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(core->addrs, addr);
qdr_terminus_set_address(terminus, temp_addr);
@@ -1222,7 +1238,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
// This link has a target address
//
bool link_route;
- qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, target, true, false, &link_route);
+ qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, target, true, true, &link_route);
if (!addr) {
//
// No route to this destination, reject the link
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3c8f95c8/src/router_core/terminus.c
----------------------------------------------------------------------
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
index aae690f..5564cc4 100644
--- a/src/router_core/terminus.c
+++ b/src/router_core/terminus.c
@@ -141,7 +141,7 @@ bool qdr_terminus_has_capability(qdr_terminus_t *term, const char *capability)
bool qdr_terminus_is_anonymous(qdr_terminus_t *term)
{
- return term == 0 || term->address == 0;
+ return term == 0 || (term->address == 0 && !term->dynamic);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3c8f95c8/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index dc72e85..4d95dbc 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -91,6 +91,7 @@ foreach(py_test_module
system_tests_two_routers
system_tests_three_routers
system_tests_multi_tenancy
+ system_tests_dynamic_terminus
${SYSTEM_TESTS_HTTP}
)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3c8f95c8/tests/system_tests_dynamic_terminus.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_dynamic_terminus.py b/tests/system_tests_dynamic_terminus.py
new file mode 100644
index 0000000..39606d3
--- /dev/null
+++ b/tests/system_tests_dynamic_terminus.py
@@ -0,0 +1,243 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest, os, json
+from subprocess import PIPE, STDOUT
+from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout
+from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties, LinkOption
+
+# PROTON-828:
+try:
+ from proton import MODIFIED
+except ImportError:
+ from proton import PN_STATUS_MODIFIED as MODIFIED
+
+
+class RouterTest(TestCase):
+
+ inter_router_port = None
+
+ @classmethod
+ def setUpClass(cls):
+ """Start a router"""
+ super(RouterTest, cls).setUpClass()
+
+ def router(name, connection):
+
+ config = [
+ ('router', {'mode': 'interior', 'id': name}),
+ ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}),
+ ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+ ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+ ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+ connection
+ ]
+
+ config = Qdrouterd.Config(config)
+
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+ cls.routers = []
+
+ inter_router_port = cls.tester.get_port()
+
+ router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port}))
+ router('B', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port, 'verifyHostName': 'no'}))
+
+ cls.routers[0].wait_router_connected('B')
+ cls.routers[1].wait_router_connected('A')
+
+
+ def test_01_dynamic_source_test(self):
+ test = DynamicSourceTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_02_dynamic_target_one_router_test(self):
+ test = DynamicTargetTest(self.routers[0].addresses[0], self.routers[0].addresses[0])
+ test.run()
+ if test.skip:
+ self.skipTest(test.skip)
+ self.assertEqual(None, test.error)
+
+ def test_03_dynamic_target_two_router_test(self):
+ test = DynamicTargetTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
+ test.run()
+ if test.skip:
+ self.skipTest(test.skip)
+ self.assertEqual(None, test.error)
+
+
+class Timeout(object):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ self.parent.timeout()
+
+
+class DynamicSourceTest(MessagingHandler):
+ def __init__(self, router_1_host, router_2_host):
+ super(DynamicSourceTest, self).__init__()
+ self.router_1_host = router_1_host
+ self.router_2_host = router_2_host
+
+ self.error = None
+ self.receiver_conn = None
+ self.sender_1_conn = None
+ self.sender_2_conn = None
+ self.sender_1 = None
+ self.sender_2 = None
+ self.receiver = None
+ self.address = None
+
+ self.count = 10
+ self.n_sent_1 = 0
+ self.n_sent_2 = 0
+ self.n_rcvd = 0
+ self.n_accepted = 0
+
+ def timeout(self):
+ self.error = "Timeout Expired: n_sent_1=%d n_sent_2=%d n_rcvd=%d n_accepted=%d" %\
+ (self.n_sent_1, self.n_sent_2, self.n_rcvd, self.n_accepted)
+ self.sender_1_conn.close()
+ self.sender_2_conn.close()
+ self.receiver_conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+
+ self.sender_1_conn = event.container.connect(self.router_1_host)
+ self.sender_2_conn = event.container.connect(self.router_2_host)
+ self.receiver_conn = event.container.connect(self.router_1_host)
+
+ self.receiver = event.container.create_receiver(self.receiver_conn, dynamic=True)
+ self.sender_1 = event.container.create_sender(self.sender_1_conn)
+ self.sender_2 = event.container.create_sender(self.sender_2_conn)
+
+ def send(self):
+ if self.address == None:
+ return
+
+ while self.sender_1.credit > 0 and self.n_sent_1 < self.count:
+ self.n_sent_1 += 1
+ m = Message(address=self.address, body="Message %d of %d (via 1)" % (self.n_sent_1, self.count))
+ self.sender_1.send(m)
+
+ while self.sender_2.credit > 0 and self.n_sent_2 < self.count:
+ self.n_sent_2 += 1
+ m = Message(address=self.address, body="Message %d of %d (via 2)" % (self.n_sent_2, self.count))
+ self.sender_2.send(m)
+
+ def on_link_opened(self, event):
+ if event.receiver == self.receiver:
+ self.address = self.receiver.remote_source.address
+ self.send()
+
+ def on_sendable(self, event):
+ self.send()
+
+ def on_message(self, event):
+ if event.receiver == self.receiver:
+ self.n_rcvd += 1
+
+ def on_accepted(self, event):
+ self.n_accepted += 1
+ if self.n_accepted == self.count * 2:
+ self.sender_1_conn.close()
+ self.sender_2_conn.close()
+ self.receiver_conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ Container(self).run()
+
+
+class DynamicTarget(LinkOption):
+ def apply(self, link):
+ link.target.dynamic = True
+ link.target.address = None
+
+
+class DynamicTargetTest(MessagingHandler):
+ def __init__(self, sender_host, receiver_host):
+ super(DynamicTargetTest, self).__init__()
+ self.sender_host = sender_host
+ self.receiver_host = receiver_host
+
+ self.error = None
+ self.skip = None
+ self.receiver_conn = None
+ self.sender_conn = None
+ self.sender = None
+ self.receiver = None
+ self.address = None
+
+ self.count = 10
+ self.n_sent = 0
+ self.n_rcvd = 0
+ self.n_accepted = 0
+
+ def timeout(self):
+ self.error = "Timeout Expired: n_sent=%d n_rcvd=%d n_accepted=%d" %\
+ (self.n_sent, self.n_rcvd, self.n_accepted)
+ self.sender_conn.close()
+ self.receiver_conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+
+ self.sender_conn = event.container.connect(self.sender_host)
+ self.receiver_conn = event.container.connect(self.receiver_host)
+
+ self.sender = event.container.create_sender(self.sender_conn, options=DynamicTarget())
+
+ def send(self):
+ while self.sender.credit > 0 and self.n_sent < self.count:
+ self.n_sent += 1
+ m = Message(address=self.address, body="Message %d of %d" % (self.n_sent, self.count))
+ self.sender.send(m)
+
+ def on_link_opened(self, event):
+ if event.sender == self.sender:
+ self.address = self.sender.remote_target.address
+ self.receiver = event.container.create_receiver(self.receiver_conn, self.address)
+
+ def on_sendable(self, event):
+ self.send()
+
+ def on_message(self, event):
+ if event.receiver == self.receiver:
+ self.n_rcvd += 1
+
+ def on_accepted(self, event):
+ self.n_accepted += 1
+ if self.n_accepted == self.count:
+ self.sender_conn.close()
+ self.receiver_conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ Container(self).run()
+
+
+if __name__ == '__main__':
+ unittest.main(main_module())
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3c8f95c8/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 15749dc..c74a629 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -25,7 +25,7 @@ from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process
from proton import Message, Endpoint
from proton.handlers import MessagingHandler
-from proton.reactor import AtMostOnce, Container, DynamicNodeProperties
+from proton.reactor import AtMostOnce, Container, DynamicNodeProperties, LinkOption
from proton.utils import BlockingConnection, LinkDetached
from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler, DrainNoMessagesHandler, DrainNoMoreMessagesHandler
@@ -551,6 +551,11 @@ class LinkRouteTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_dynamic_target(self):
+ test = DynamicTargetTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
+ test.run()
+ self.assertEqual(None, test.error)
+
def test_detach_without_close(self):
test = DetachNoCloseTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
test.run()
@@ -748,6 +753,64 @@ class DynamicSourceTest(MessagingHandler):
Container(self).run()
+class DynamicTarget(LinkOption):
+ def apply(self, link):
+ link.target.dynamic = True
+ link.target.address = None
+
+
+class DynamicTargetTest(MessagingHandler):
+ ##
+ ## This test verifies that a dynamic source can be propagated via link-route to
+ ## a route-container.
+ ##
+ def __init__(self, normal_addr, route_addr):
+ super(DynamicTargetTest, self).__init__(prefetch=0, auto_accept=False)
+ self.normal_addr = normal_addr
+ self.route_addr = route_addr
+ self.dest = "pulp.task.DynamicTarget"
+ self.address = "DynamicTargetAddress"
+ self.error = None
+
+ def timeout(self):
+ self.error = "Timeout Expired - Check for cores"
+ self.conn_normal.close()
+ self.conn_route.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+ self.conn_route = event.container.connect(self.route_addr)
+
+ def on_connection_opened(self, event):
+ if event.connection == self.conn_route:
+ self.conn_normal = event.container.connect(self.normal_addr)
+ elif event.connection == self.conn_normal:
+ self.sender = event.container.create_sender(self.conn_normal, None, options=\
+ [DynamicTarget(), DynamicNodeProperties({"x-opt-qd.address":u"pulp.task.abc"})])
+
+ def on_link_opened(self, event):
+ if event.sender == self.sender:
+ if self.sender.remote_target.address != self.address:
+ self.error = "Expected %s, got %s" % (self.address, self.receiver.remote_source.address)
+ self.conn_normal.close()
+ self.conn_route.close()
+ self.timer.cancel()
+
+ def on_link_opening(self, event):
+ if event.receiver:
+ self.receiver = event.receiver
+ if not self.receiver.remote_target.dynamic:
+ self.error = "Expected receiver with dynamic source"
+ self.conn_normal.close()
+ self.conn_route.close()
+ self.timer.cancel()
+ self.receiver.target.address = self.address
+ self.receiver.open()
+
+ def run(self):
+ Container(self).run()
+
+
class DetachNoCloseTest(MessagingHandler):
##
## This test verifies that link-detach (not close) is propagated properly
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org