You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2018/05/01 13:12:36 UTC
qpid-dispatch git commit: DISPATCH-981: disambiguated routed link
names at ingress
Repository: qpid-dispatch
Updated Branches:
refs/heads/master c685d046d -> 78cbc5e67
DISPATCH-981: disambiguated routed link names at ingress
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/78cbc5e6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/78cbc5e6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/78cbc5e6
Branch: refs/heads/master
Commit: 78cbc5e6799fc8ed65e8a869a8fcfa3291862bdc
Parents: c685d04
Author: Gordon Sim <gs...@redhat.com>
Authored: Fri Apr 27 22:51:29 2018 +0100
Committer: Gordon Sim <gs...@redhat.com>
Committed: Tue May 1 14:11:56 2018 +0100
----------------------------------------------------------------------
src/router_core/connections.c | 20 +++++
src/router_core/forwarder.c | 4 +-
src/router_core/router_core_private.h | 1 +
tests/system_tests_link_routes.py | 129 +++++++++++++++++++++++++++++
4 files changed, 151 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/78cbc5e6/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 9608d79..0e655d3 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -840,6 +840,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
// Free the link's name and terminus_addr
//
free(link->name);
+ free(link->disambiguated_name);
free(link->terminus_addr);
free(link->ingress_histogram);
link->name = 0;
@@ -867,6 +868,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
link->link_direction = dir;
link->capacity = conn->link_capacity;
link->name = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8);
+ link->disambiguated_name = 0;
link->terminus_addr = 0;
qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8);
link->admin_enabled = true;
@@ -1346,6 +1348,18 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
qdr_connection_free(conn);
}
+static char* disambiguated_link_name(qdr_connection_info_t *conn, char *original)
+{
+ size_t olen = strlen(original);
+ size_t clen = strlen(conn->container);
+ char *name = (char*) malloc(olen + clen + 2);
+ memset(name, 0, olen + clen + 2);
+ strcat(name, original);
+ name[olen] = '@';
+ strcat(name + olen + 1, conn->container);
+ return name;
+}
+
static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
if (discard)
@@ -1429,6 +1443,9 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
qdr_terminus_free(source);
qdr_terminus_free(target);
} else {
+ if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
+ link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
+ }
success = qdr_forward_attach_CT(core, addr, link, source, target);
if (!success) {
@@ -1522,6 +1539,9 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
qdr_terminus_free(source);
qdr_terminus_free(target);
} else {
+ if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
+ link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
+ }
bool success = qdr_forward_attach_CT(core, addr, link, source, target);
if (!success) {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/78cbc5e6/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index b8f2291..7ab8a46 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -675,7 +675,6 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
return 0;
}
-
bool qdr_forward_link_balanced_CT(qdr_core_t *core,
qdr_address_t *addr,
qdr_link_t *in_link,
@@ -750,8 +749,7 @@ bool qdr_forward_link_balanced_CT(qdr_core_t *core,
out_link->oper_status = QDR_LINK_OPER_DOWN;
- out_link->name = (char*) malloc(strlen(in_link->name) + 1);
- strcpy(out_link->name, in_link->name);
+ out_link->name = strdup(in_link->disambiguated_name ? in_link->disambiguated_name : in_link->name);
out_link->connected_link = in_link;
in_link->connected_link = out_link;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/78cbc5e6/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0f22df5..b3c9798 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -384,6 +384,7 @@ struct qdr_link_t {
qd_direction_t link_direction;
qdr_link_work_list_t work_list;
char *name;
+ char *disambiguated_name;
char *terminus_addr;
int detach_count; ///< 0, 1, or 2 depending on the state of the lifecycle
qdr_address_t *owning_addr; ///< [ref] Address record that owns this link
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/78cbc5e6/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 6134c0f..87fb448 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -713,6 +713,25 @@ class LinkRouteTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def _multi_link_send_receive(self, send_host, receive_host, name):
+ senders = ["%s/%s" % (send_host, address) for address in ["org.apache.foo", "org.apache.bar"]]
+ receivers = ["%s/%s" % (receive_host, address) for address in ["org.apache.foo", "org.apache.bar"]]
+ test = MultiLinkSendReceive(senders, receivers, name)
+ test.run()
+ self.assertEqual(None, test.error)
+
+ def test_same_name_route_receivers_through_B(self):
+ self._multi_link_send_receive(self.routers[0].addresses[0], self.routers[1].addresses[0], "recv_through_B")
+
+ def test_same_name_route_senders_through_B(self):
+ self._multi_link_send_receive(self.routers[1].addresses[0], self.routers[0].addresses[0], "send_through_B")
+
+ def test_same_name_route_receivers_through_C(self):
+ self._multi_link_send_receive(self.routers[0].addresses[0], self.routers[2].addresses[0], "recv_through_C")
+
+ def test_same_name_route_senders_through_C(self):
+ self._multi_link_send_receive(self.routers[2].addresses[0], self.routers[0].addresses[0], "send_through_C")
+
class Timeout(object):
def __init__(self, parent):
@@ -1194,6 +1213,116 @@ class TerminusAddrTest(MessagingHandler):
def run(self):
Container(self).run()
+class MultiLinkSendReceive(MessagingHandler):
+ class SendState(object):
+ def __init__(self, link):
+ self.link = link
+ self.sent = False
+ self.accepted = False
+ self.done = False
+ self.closed = False
+
+ def send(self, subject, body):
+ if not self.sent:
+ self.link.send(Message(subject=subject,body=body,address=self.link.target.address))
+ self.sent = True
+
+ def on_accepted(self):
+ self.accepted = True
+ self.done = True
+
+ def close(self):
+ if not self.closed:
+ self.closed = True
+ self.link.close()
+ self.link.connection.close()
+
+ class RecvState(object):
+ def __init__(self, link):
+ self.link = link
+ self.received = False
+ self.done = False
+ self.closed = False
+
+ def on_message(self):
+ self.received = True
+ self.done = True
+
+ def close(self):
+ if not self.closed:
+ self.closed = True
+ self.link.close()
+ self.link.connection.close()
+
+ def __init__(self, send_urls, recv_urls, name, message=None):
+ super(MultiLinkSendReceive, self).__init__()
+ self.send_urls = send_urls
+ self.recv_urls = recv_urls
+ self.senders = {}
+ self.receivers = {}
+ self.message = message or "SendReceiveTest"
+ self.sent = False
+ self.error = None
+ self.name = name
+
+ def close(self):
+ for sender in self.senders.values():
+ sender.close()
+ for receiver in self.receivers.values():
+ receiver.close()
+
+ def all_done(self):
+ for sender in self.senders.values():
+ if not sender.done:
+ return False
+ for receiver in self.receivers.values():
+ if not receiver.done:
+ return False
+ return True
+
+ def timeout(self):
+ self.error = "Timeout Expired"
+ self.close()
+
+ def stop_if_all_done(self):
+ if self.all_done():
+ self.stop()
+
+ def stop(self):
+ self.close()
+ self.timer.cancel()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+ event.container.container_id = None
+ for u in self.send_urls:
+ s = self.SendState(event.container.create_sender(u, name=self.name))
+ self.senders[s.link.connection.container] = s
+ for u in self.recv_urls:
+ r = self.RecvState(event.container.create_receiver(u, name=self.name))
+ self.receivers[r.link.connection.container] = r
+
+ def on_link_remote_open(self, event):
+ print("link opened: %s %s %s" % (event.link.source.address, event.link.target.address, event.connection.container))
+
+ def on_sendable(self, event):
+ print("sendable: %s %s" % (event.link.target.address, event.connection.container))
+ self.senders[event.connection.container].send(self.name, self.message)
+
+ def on_message(self, event):
+ print("message received: %s %s" % (event.link.source.address, event.connection.container))
+ if self.message != event.message.body:
+ error = "Incorrect message. Got %s, expected %s" % (event.message.body, self.message.body)
+ self.receivers[event.connection.container].on_message()
+ self.stop_if_all_done()
+
+ def on_accepted(self, event):
+ print("accepted: %s %s" % (event.link.target.address, event.connection.container))
+ self.senders[event.connection.container].on_accepted()
+ self.stop_if_all_done()
+
+ def run(self):
+ Container(self).run()
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org