You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/05/01 13:25:32 UTC

[1/2] qpid-dispatch git commit: DISPATCH-983: take snapshot of current links with work when copying connection work

Repository: qpid-dispatch
Updated Branches:
  refs/heads/1.1.x 1a2bde109 -> c6fac91d5


DISPATCH-983: take snapshot of current links with work when copying connection work


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/57fbce9e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/57fbce9e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/57fbce9e

Branch: refs/heads/1.1.x
Commit: 57fbce9e8c831a36998d0be417b8d98f205f9493
Parents: 1a2bde1
Author: Gordon Sim <gs...@redhat.com>
Authored: Mon Apr 30 20:44:02 2018 +0100
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Tue May 1 09:18:19 2018 -0400

----------------------------------------------------------------------
 src/router_core/connections.c | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/57fbce9e/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4598312..9608d79 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -205,6 +205,7 @@ const char *qdr_connection_get_tenant_space(const qdr_connection_t *conn, int *l
 int qdr_connection_process(qdr_connection_t *conn)
 {
     qdr_connection_work_list_t  work_list;
+    qdr_link_ref_list_t         links_with_work;
     qdr_core_t                 *core = conn->core;
 
     qdr_link_ref_t *ref;
@@ -215,6 +216,7 @@ int qdr_connection_process(qdr_connection_t *conn)
 
     sys_mutex_lock(conn->work_lock);
     DEQ_MOVE(conn->work_list, work_list);
+    DEQ_MOVE(conn->links_with_work, links_with_work);
     sys_mutex_unlock(conn->work_lock);
 
     event_count += DEQ_SIZE(work_list);
@@ -244,10 +246,10 @@ int qdr_connection_process(qdr_connection_t *conn)
         free_link = false;
 
         sys_mutex_lock(conn->work_lock);
-        ref = DEQ_HEAD(conn->links_with_work);
+        ref = DEQ_HEAD(links_with_work);
         if (ref) {
             link = ref->link;
-            qdr_del_link_ref(&conn->links_with_work, ref->link, QDR_LINK_LIST_CLASS_WORK);
+            qdr_del_link_ref(&links_with_work, ref->link, QDR_LINK_LIST_CLASS_WORK);
 
             link_work = DEQ_HEAD(link->work_list);
             if (link_work) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-dispatch git commit: DISPATCH-981: disambiguated routed link names at ingress

Posted by gm...@apache.org.
DISPATCH-981: disambiguated routed link names at ingress


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/c6fac91d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/c6fac91d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/c6fac91d

Branch: refs/heads/1.1.x
Commit: c6fac91d51cdbbeb348acd1e13a105664e6e8b8f
Parents: 57fbce9
Author: Gordon Sim <gs...@redhat.com>
Authored: Fri Apr 27 22:51:29 2018 +0100
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Tue May 1 09:18:47 2018 -0400

----------------------------------------------------------------------
 src/router_core/connections.c         |  20 +++++
 src/router_core/forwarder.c           |   4 +-
 src/router_core/router_core_private.h |   1 +
 tests/system_tests_link_routes.py     | 129 +++++++++++++++++++++++++++++
 4 files changed, 151 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c6fac91d/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 9608d79..0e655d3 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -840,6 +840,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     // Free the link's name and terminus_addr
     //
     free(link->name);
+    free(link->disambiguated_name);
     free(link->terminus_addr);
     free(link->ingress_histogram);
     link->name = 0;
@@ -867,6 +868,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
     link->link_direction = dir;
     link->capacity       = conn->link_capacity;
     link->name           = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8);
+    link->disambiguated_name = 0;
     link->terminus_addr  = 0;
     qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8);
     link->admin_enabled  = true;
@@ -1346,6 +1348,18 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
     qdr_connection_free(conn);
 }
 
+static char* disambiguated_link_name(qdr_connection_info_t *conn, char *original)
+{
+    size_t olen = strlen(original);
+    size_t clen = strlen(conn->container);
+    char *name = (char*) malloc(olen + clen + 2);
+    memset(name, 0, olen + clen + 2);
+    strcat(name, original);
+    name[olen] = '@';
+    strcat(name + olen + 1, conn->container);
+    return name;
+}
+
 static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     if (discard)
@@ -1429,6 +1443,9 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                         qdr_terminus_free(source);
                         qdr_terminus_free(target);
                     } else {
+                        if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
+                            link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
+                        }
                         success = qdr_forward_attach_CT(core, addr, link, source, target);
 
                         if (!success) {
@@ -1522,6 +1539,9 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                     qdr_terminus_free(source);
                     qdr_terminus_free(target);
                 } else {
+                    if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
+                        link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
+                    }
                     bool success = qdr_forward_attach_CT(core, addr, link, source, target);
                     if (!success) {
                         qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c6fac91d/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index b8f2291..7ab8a46 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -675,7 +675,6 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
     return 0;
 }
 
-
 bool qdr_forward_link_balanced_CT(qdr_core_t     *core,
                                   qdr_address_t  *addr,
                                   qdr_link_t     *in_link,
@@ -750,8 +749,7 @@ bool qdr_forward_link_balanced_CT(qdr_core_t     *core,
 
         out_link->oper_status    = QDR_LINK_OPER_DOWN;
 
-        out_link->name = (char*) malloc(strlen(in_link->name) + 1);
-        strcpy(out_link->name, in_link->name);
+        out_link->name = strdup(in_link->disambiguated_name ? in_link->disambiguated_name : in_link->name);
 
         out_link->connected_link = in_link;
         in_link->connected_link  = out_link;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c6fac91d/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0f22df5..b3c9798 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -384,6 +384,7 @@ struct qdr_link_t {
     qd_direction_t           link_direction;
     qdr_link_work_list_t     work_list;
     char                    *name;
+    char                    *disambiguated_name;
     char                    *terminus_addr;
     int                      detach_count;       ///< 0, 1, or 2 depending on the state of the lifecycle
     qdr_address_t           *owning_addr;        ///< [ref] Address record that owns this link

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c6fac91d/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 6134c0f..87fb448 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -713,6 +713,25 @@ class LinkRouteTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def _multi_link_send_receive(self, send_host, receive_host, name):
+        senders = ["%s/%s" % (send_host, address) for address in ["org.apache.foo", "org.apache.bar"]]
+        receivers = ["%s/%s" % (receive_host, address) for address in ["org.apache.foo", "org.apache.bar"]]
+        test = MultiLinkSendReceive(senders, receivers, name)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_same_name_route_receivers_through_B(self):
+        self._multi_link_send_receive(self.routers[0].addresses[0], self.routers[1].addresses[0], "recv_through_B")
+
+    def test_same_name_route_senders_through_B(self):
+        self._multi_link_send_receive(self.routers[1].addresses[0], self.routers[0].addresses[0], "send_through_B")
+
+    def test_same_name_route_receivers_through_C(self):
+        self._multi_link_send_receive(self.routers[0].addresses[0], self.routers[2].addresses[0], "recv_through_C")
+
+    def test_same_name_route_senders_through_C(self):
+        self._multi_link_send_receive(self.routers[2].addresses[0], self.routers[0].addresses[0], "send_through_C")
+
 
 class Timeout(object):
     def __init__(self, parent):
@@ -1194,6 +1213,116 @@ class TerminusAddrTest(MessagingHandler):
     def run(self):
         Container(self).run()
 
+class MultiLinkSendReceive(MessagingHandler):
+    class SendState(object):
+        def __init__(self, link):
+            self.link = link
+            self.sent = False
+            self.accepted = False
+            self.done = False
+            self.closed = False
+
+        def send(self, subject, body):
+            if not self.sent:
+                self.link.send(Message(subject=subject,body=body,address=self.link.target.address))
+                self.sent = True
+
+        def on_accepted(self):
+            self.accepted = True
+            self.done = True
+
+        def close(self):
+            if not self.closed:
+                self.closed = True
+                self.link.close()
+                self.link.connection.close()
+
+    class RecvState(object):
+        def __init__(self, link):
+            self.link = link
+            self.received = False
+            self.done = False
+            self.closed = False
+
+        def on_message(self):
+            self.received = True
+            self.done = True
+
+        def close(self):
+            if not self.closed:
+                self.closed = True
+                self.link.close()
+                self.link.connection.close()
+
+    def __init__(self, send_urls, recv_urls, name, message=None):
+        super(MultiLinkSendReceive, self).__init__()
+        self.send_urls = send_urls
+        self.recv_urls = recv_urls
+        self.senders = {}
+        self.receivers = {}
+        self.message = message or "SendReceiveTest"
+        self.sent = False
+        self.error = None
+        self.name = name
+
+    def close(self):
+        for sender in self.senders.values():
+            sender.close()
+        for receiver in self.receivers.values():
+            receiver.close()
+
+    def all_done(self):
+        for sender in self.senders.values():
+            if not sender.done:
+                return False
+        for receiver in self.receivers.values():
+            if not receiver.done:
+                return False
+        return True
+
+    def timeout(self):
+        self.error = "Timeout Expired"
+        self.close()
+
+    def stop_if_all_done(self):
+        if self.all_done():
+            self.stop()
+
+    def stop(self):
+        self.close()
+        self.timer.cancel()
+
+    def on_start(self, event):
+        self.timer      = event.reactor.schedule(TIMEOUT, Timeout(self))
+        event.container.container_id = None
+        for u in self.send_urls:
+            s = self.SendState(event.container.create_sender(u, name=self.name))
+            self.senders[s.link.connection.container] = s
+        for u in self.recv_urls:
+            r = self.RecvState(event.container.create_receiver(u, name=self.name))
+            self.receivers[r.link.connection.container] = r
+
+    def on_link_remote_open(self, event):
+        print("link opened: %s %s %s" % (event.link.source.address, event.link.target.address, event.connection.container))
+
+    def on_sendable(self, event):
+        print("sendable: %s %s" % (event.link.target.address, event.connection.container))
+        self.senders[event.connection.container].send(self.name, self.message)
+
+    def on_message(self, event):
+        print("message received: %s %s" % (event.link.source.address, event.connection.container))
+        if self.message != event.message.body:
+            error = "Incorrect message. Got %s, expected %s" % (event.message.body, self.message.body)
+        self.receivers[event.connection.container].on_message()
+        self.stop_if_all_done()
+
+    def on_accepted(self, event):
+        print("accepted: %s %s" % (event.link.target.address, event.connection.container))
+        self.senders[event.connection.container].on_accepted()
+        self.stop_if_all_done()
+
+    def run(self):
+        Container(self).run()
 
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org