You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/03/22 13:04:28 UTC

qpid-dispatch git commit: DISPATCH-357 - Added terminus_addr field on link to display the terminus addr of a link route

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 9ee07b1d6 -> adc3ca6a4


DISPATCH-357 - Added terminus_addr field on link to display the terminus addr of a link route


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/adc3ca6a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/adc3ca6a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/adc3ca6a

Branch: refs/heads/master
Commit: adc3ca6a4b98572a8f18f99fe75349ef0b84fb71
Parents: 9ee07b1
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Tue Mar 21 13:16:59 2017 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Wed Mar 22 08:56:13 2017 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   |   5 +-
 src/router_core/agent_link.c          |   6 +-
 src/router_core/connections.c         |  16 +++-
 src/router_core/forwarder.c           |   2 +
 src/router_core/router_core_private.h |   1 +
 src/router_node.c                     |  12 ++-
 tests/system_tests_link_routes.py     | 129 +++++++++++++++++++++++++++++
 7 files changed, 165 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 0d66d1c..2f749b7 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -481,13 +481,16 @@ const char *qdr_link_name(const qdr_link_t *link);
  * @param dir Direction of the new link, incoming or outgoing
  * @param source Source terminus of the attach
  * @param target Target terminus of the attach
+ * @param name - name of the link
+ * @param terminus_addr - terminus address if any
  * @return A pointer to a new qdr_link_t object to track the link
  */
 qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
                                   qd_direction_t    dir,
                                   qdr_terminus_t   *source,
                                   qdr_terminus_t   *target,
-                                  const char       *name);
+                                  const char       *name,
+                                  const char       *terminus_addr);
 
 /**
  * qdr_link_second_attach

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_core/agent_link.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index fca63c0..692931f 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -119,7 +119,11 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_li
         break;
 
     case QDR_LINK_OWNING_ADDR:
-        if (link->owning_addr)
+        if(link->terminus_addr)
+            qd_compose_insert_string(body, link->terminus_addr);
+        else if (link->connected_link && link->connected_link->terminus_addr)
+            qd_compose_insert_string(body, link->connected_link->terminus_addr);
+        else if (link->owning_addr)
             qd_compose_insert_string(body, address_key(link->owning_addr));
         else
             qd_compose_insert_null(body);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index abc6a4c..5d80bf1 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -377,7 +377,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
                                   qd_direction_t    dir,
                                   qdr_terminus_t   *source,
                                   qdr_terminus_t   *target,
-                                  const char       *name)
+                                  const char       *name,
+                                  const char       *terminus_addr)
 {
     qdr_action_t   *action         = qdr_action(qdr_link_inbound_first_attach_CT, "link_first_attach");
     qdr_link_t     *link           = new_qdr_link_t();
@@ -388,6 +389,15 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     link->identity = qdr_identifier(conn->core);
     link->conn = conn;
     link->name = (char*) malloc(strlen(name) + 1);
+
+    if (terminus_addr) {
+         char *term_addr = malloc((strlen(terminus_addr) + 3) * sizeof(char));
+         term_addr[0] = '\0';
+         strcat(term_addr, "M0");
+         strcat(term_addr, terminus_addr);
+         link->terminus_addr = term_addr;
+    }
+
     strcpy(link->name, name);
     link->link_direction = dir;
     link->capacity       = conn->link_capacity;
@@ -729,9 +739,10 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     sys_mutex_unlock(conn->work_lock);
 
     //
-    // Free the link's name
+    // Free the link's name and terminus_addr
     //
     free(link->name);
+    free(link->terminus_addr);
     link->name = 0;
 }
 
@@ -757,6 +768,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
     link->link_direction = dir;
     link->capacity       = conn->link_capacity;
     link->name           = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8);
+    link->terminus_addr  = 0;
     qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8);
     link->admin_enabled  = true;
     link->oper_status    = QDR_LINK_OPER_DOWN;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index b37b4b8..407404f 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -704,6 +704,8 @@ bool qdr_forward_link_balanced_CT(qdr_core_t     *core,
         out_link->link_type      = QD_LINK_ENDPOINT;
         out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ? QD_INCOMING : QD_OUTGOING;
         out_link->admin_enabled  = true;
+        out_link->terminus_addr  = 0;
+
         out_link->oper_status    = QDR_LINK_OPER_DOWN;
 
         out_link->name = (char*) malloc(strlen(in_link->name) + 1);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 414647b..c3c81c5 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -362,6 +362,7 @@ struct qdr_link_t {
     qd_direction_t           link_direction;
     qdr_link_work_list_t     work_list;
     char                    *name;
+    char                    *terminus_addr;
     int                      detach_count;       ///< 0, 1, or 2 depending on the state of the lifecycle
     qdr_address_t           *owning_addr;        ///< [ref] Address record that owns this link
     qdr_link_t              *connected_link;     ///< [ref] If this is a link-route, reference the connected link

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 5868570..e16ae33 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -509,10 +509,14 @@ static int AMQP_incoming_link_handler(void* context, qd_link_t *link)
 {
     qd_connection_t  *conn     = qd_link_connection(link);
     qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
+
+    char *terminus_addr = (char*)pn_terminus_get_address(pn_link_remote_target((pn_link_t  *)qd_link_pn(link)));
+
     qdr_link_t       *qdr_link = qdr_link_first_attach(qdr_conn, QD_INCOMING,
                                                        qdr_terminus(qd_link_remote_source(link)),
                                                        qdr_terminus(qd_link_remote_target(link)),
-                                                       pn_link_name(qd_link_pn(link)));
+                                                       pn_link_name(qd_link_pn(link)),
+                                                       terminus_addr);
     qdr_link_set_context(qdr_link, link);
     qd_link_set_context(link, qdr_link);
 
@@ -527,10 +531,14 @@ static int AMQP_outgoing_link_handler(void* context, qd_link_t *link)
 {
     qd_connection_t  *conn     = qd_link_connection(link);
     qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
+
+    char *terminus_addr = (char*)pn_terminus_get_address(pn_link_remote_source((pn_link_t  *)qd_link_pn(link)));
+
     qdr_link_t       *qdr_link = qdr_link_first_attach(qdr_conn, QD_OUTGOING,
                                                        qdr_terminus(qd_link_remote_source(link)),
                                                        qdr_terminus(qd_link_remote_target(link)),
-                                                       pn_link_name(qd_link_pn(link)));
+                                                       pn_link_name(qd_link_pn(link)),
+                                                       terminus_addr);
     qdr_link_set_context(qdr_link, link);
     qd_link_set_context(link, qdr_link);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 5648920..7aed283 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -547,6 +547,24 @@ class LinkRouteTest(TestCase):
         drain_support.run()
         self.assertEqual(None, drain_support.error)
 
+    def test_link_route_terminus_address(self):
+        # The receiver is attaching to router B to a listener that has link route for address 'pulp.task' setup.
+        listening_address = self.routers[1].addresses[1]
+        # Run the query on a normal port
+        query_address_listening = self.routers[1].addresses[0]
+
+        # Sender is attaching to router C
+        sender_address = self.routers[2].addresses[0]
+        query_address_sending = self.routers[2].addresses[0]
+
+        test = TerminusAddrTest(sender_address, listening_address, query_address_sending, query_address_listening)
+        test.run()
+
+        self.assertTrue(test.in_receiver_found)
+        self.assertTrue(test.out_receiver_found)
+        self.assertTrue(test.in_sender_found)
+        self.assertTrue(test.out_sender_found)
+
     def test_dynamic_source(self):
         test = DynamicSourceTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
         test.run()
@@ -937,6 +955,117 @@ class DetachMixedCloseTest(MessagingHandler):
     def run(self):
         Container(self).run()
 
+class TerminusAddrTest(MessagingHandler):
+    """
+    This tests makes sure that the link route address is visible in the output of qdstat -l command.
+
+    Sets up a sender on address pulp.task.terminusTestSender and a receiver on pulp.task.terminusTestReceiver.
+    Connects to the router to which the sender is attached and makes sure that the pulp.task.terminusTestSender address
+    shows up with an 'in' and 'out'
+    Similarly connects to the router to which the receiver is attached and makes sure that the
+    pulp.task.terminusTestReceiver address shows up with an 'in' and 'out'
+
+    """
+    def __init__(self, sender_address, listening_address, query_address_sending, query_address_listening):
+        super(TerminusAddrTest, self).__init__()
+        self.sender_address = sender_address
+        self.listening_address = listening_address
+        self.sender = None
+        self.receiver = None
+        self.message_received = False
+        self.receiver_connection = None
+        self.sender_connection = None
+        # We will run a query on the same router where the sender is attached
+        self.query_address_sending = query_address_sending
+
+        # We will run a query on the same router where the receiver is attached
+        self.query_address_listening = query_address_listening
+        self.count = 0
+
+        self.in_receiver_found = False
+        self.out_receiver_found = False
+        self.in_sender_found = False
+        self.out_sender_found = False
+
+        self.receiver_link_opened = False
+        self.sender_link_opened = False
+
+    def on_start(self, event):
+        self.receiver_connection = event.container.connect(self.listening_address)
+
+    def on_connection_remote_open(self, event):
+        if event.connection == self.receiver_connection:
+            continue_loop = True
+            # The following loops introduces a wait. It gives time to the
+            # router so that the address Dpulp.task can show up on the remoteCount
+            i = 0
+            while continue_loop:
+                if i > 100: # If we have run the read command for more than hundred times and we still do not have
+                    # the remoteCount set to 1, there is a problem, just exit out of the function instead
+                    # of looping to infinity.
+                    self.receiver_connection.close()
+                    return
+                local_node = Node.connect(self.query_address_sending, timeout=TIMEOUT)
+                out = local_node.read(type='org.apache.qpid.dispatch.router.address', name='Dpulp.task').remoteCount
+                if out == 1:
+                    continue_loop = False
+                i += 1
+                sleep(0.25)
+
+            self.sender_connection = event.container.connect(self.sender_address)
+
+            # Notice here that the receiver and sender are listening on different addresses. Receiver on
+            # pulp.task.terminusTestReceiver and the sender on pulp.task.terminusTestSender
+            self.receiver = event.container.create_receiver(self.receiver_connection, "pulp.task.terminusTestReceiver")
+            self.sender = event.container.create_sender(self.sender_connection, "pulp.task.terminusTestSender", options=AtMostOnce())
+
+    def on_link_opened(self, event):
+        if event.receiver == self.receiver:
+            self.receiver_link_opened = True
+
+            local_node = Node.connect(self.query_address_listening, timeout=TIMEOUT)
+            out = local_node.query(type='org.apache.qpid.dispatch.router.link')
+
+            link_dir_index = out.attribute_names.index("linkDir")
+            owning_addr_index = out.attribute_names.index("owningAddr")
+
+            # Make sure that the owningAddr M0pulp.task.terminusTestReceiver shows up on both in and out.
+            # The 'out' link is on address M0pulp.task.terminusTestReceiver outgoing from the router B to the receiver
+            # The 'in' link is on address M0pulp.task.terminusTestReceiver incoming from router C to router B
+            for result in out.results:
+                if result[link_dir_index] == 'in' and result[owning_addr_index] == 'M0pulp.task.terminusTestReceiver':
+                    self.in_receiver_found = True
+                if result[link_dir_index] == 'out' and result[owning_addr_index] == 'M0pulp.task.terminusTestReceiver':
+                    self.out_receiver_found = True
+
+        if event.sender == self.sender:
+            self.sender_link_opened = True
+
+            local_node = Node.connect(self.query_address_sending, timeout=TIMEOUT)
+            out = local_node.query(type='org.apache.qpid.dispatch.router.link')
+
+            link_dir_index = out.attribute_names.index("linkDir")
+            owning_addr_index = out.attribute_names.index("owningAddr")
+
+            # Make sure that the owningAddr M0pulp.task.terminusTestSender shows up on both in and out.
+            # The 'in' link is on address M0pulp.task.terminusTestSender incoming from sender to router
+            # The 'out' link is on address M0pulp.task.terminusTestSender outgoing from router C to router B
+            for result in out.results:
+                if result[link_dir_index] == 'in' and result[owning_addr_index] == 'M0pulp.task.terminusTestSender':
+                    self.in_sender_found = True
+                if result[link_dir_index] == 'out' and result[owning_addr_index] == 'M0pulp.task.terminusTestSender':
+                    self.out_sender_found = True
+
+        # Shutdown the connections only if the on_link_opened has been called for sender and receiver links.
+        if self.sender_link_opened and self.receiver_link_opened:
+            self.sender.close()
+            self.receiver.close()
+            self.sender_connection.close()
+            self.receiver_connection.close()
+
+    def run(self):
+        Container(self).run()
+
 
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org