You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/03/22 13:04:28 UTC
qpid-dispatch git commit: DISPATCH-357 - Added terminus_addr field on
link to display the terminus addr of a link route
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 9ee07b1d6 -> adc3ca6a4
DISPATCH-357 - Added terminus_addr field on link to display the terminus addr of a link route
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/adc3ca6a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/adc3ca6a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/adc3ca6a
Branch: refs/heads/master
Commit: adc3ca6a4b98572a8f18f99fe75349ef0b84fb71
Parents: 9ee07b1
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Tue Mar 21 13:16:59 2017 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Wed Mar 22 08:56:13 2017 -0400
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 5 +-
src/router_core/agent_link.c | 6 +-
src/router_core/connections.c | 16 +++-
src/router_core/forwarder.c | 2 +
src/router_core/router_core_private.h | 1 +
src/router_node.c | 12 ++-
tests/system_tests_link_routes.py | 129 +++++++++++++++++++++++++++++
7 files changed, 165 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 0d66d1c..2f749b7 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -481,13 +481,16 @@ const char *qdr_link_name(const qdr_link_t *link);
* @param dir Direction of the new link, incoming or outgoing
* @param source Source terminus of the attach
* @param target Target terminus of the attach
+ * @param name - name of the link
+ * @param terminus_addr - terminus address if any
* @return A pointer to a new qdr_link_t object to track the link
*/
qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
qd_direction_t dir,
qdr_terminus_t *source,
qdr_terminus_t *target,
- const char *name);
+ const char *name,
+ const char *terminus_addr);
/**
* qdr_link_second_attach
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_core/agent_link.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index fca63c0..692931f 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -119,7 +119,11 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_li
break;
case QDR_LINK_OWNING_ADDR:
- if (link->owning_addr)
+ if(link->terminus_addr)
+ qd_compose_insert_string(body, link->terminus_addr);
+ else if (link->connected_link && link->connected_link->terminus_addr)
+ qd_compose_insert_string(body, link->connected_link->terminus_addr);
+ else if (link->owning_addr)
qd_compose_insert_string(body, address_key(link->owning_addr));
else
qd_compose_insert_null(body);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index abc6a4c..5d80bf1 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -377,7 +377,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
qd_direction_t dir,
qdr_terminus_t *source,
qdr_terminus_t *target,
- const char *name)
+ const char *name,
+ const char *terminus_addr)
{
qdr_action_t *action = qdr_action(qdr_link_inbound_first_attach_CT, "link_first_attach");
qdr_link_t *link = new_qdr_link_t();
@@ -388,6 +389,15 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
link->identity = qdr_identifier(conn->core);
link->conn = conn;
link->name = (char*) malloc(strlen(name) + 1);
+
+ if (terminus_addr) {
+ char *term_addr = malloc((strlen(terminus_addr) + 3) * sizeof(char));
+ term_addr[0] = '\0';
+ strcat(term_addr, "M0");
+ strcat(term_addr, terminus_addr);
+ link->terminus_addr = term_addr;
+ }
+
strcpy(link->name, name);
link->link_direction = dir;
link->capacity = conn->link_capacity;
@@ -729,9 +739,10 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
sys_mutex_unlock(conn->work_lock);
//
- // Free the link's name
+ // Free the link's name and terminus_addr
//
free(link->name);
+ free(link->terminus_addr);
link->name = 0;
}
@@ -757,6 +768,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
link->link_direction = dir;
link->capacity = conn->link_capacity;
link->name = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8);
+ link->terminus_addr = 0;
qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8);
link->admin_enabled = true;
link->oper_status = QDR_LINK_OPER_DOWN;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index b37b4b8..407404f 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -704,6 +704,8 @@ bool qdr_forward_link_balanced_CT(qdr_core_t *core,
out_link->link_type = QD_LINK_ENDPOINT;
out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ? QD_INCOMING : QD_OUTGOING;
out_link->admin_enabled = true;
+ out_link->terminus_addr = 0;
+
out_link->oper_status = QDR_LINK_OPER_DOWN;
out_link->name = (char*) malloc(strlen(in_link->name) + 1);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 414647b..c3c81c5 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -362,6 +362,7 @@ struct qdr_link_t {
qd_direction_t link_direction;
qdr_link_work_list_t work_list;
char *name;
+ char *terminus_addr;
int detach_count; ///< 0, 1, or 2 depending on the state of the lifecycle
qdr_address_t *owning_addr; ///< [ref] Address record that owns this link
qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 5868570..e16ae33 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -509,10 +509,14 @@ static int AMQP_incoming_link_handler(void* context, qd_link_t *link)
{
qd_connection_t *conn = qd_link_connection(link);
qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
+
+ char *terminus_addr = (char*)pn_terminus_get_address(pn_link_remote_target((pn_link_t *)qd_link_pn(link)));
+
qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_INCOMING,
qdr_terminus(qd_link_remote_source(link)),
qdr_terminus(qd_link_remote_target(link)),
- pn_link_name(qd_link_pn(link)));
+ pn_link_name(qd_link_pn(link)),
+ terminus_addr);
qdr_link_set_context(qdr_link, link);
qd_link_set_context(link, qdr_link);
@@ -527,10 +531,14 @@ static int AMQP_outgoing_link_handler(void* context, qd_link_t *link)
{
qd_connection_t *conn = qd_link_connection(link);
qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn);
+
+ char *terminus_addr = (char*)pn_terminus_get_address(pn_link_remote_source((pn_link_t *)qd_link_pn(link)));
+
qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_OUTGOING,
qdr_terminus(qd_link_remote_source(link)),
qdr_terminus(qd_link_remote_target(link)),
- pn_link_name(qd_link_pn(link)));
+ pn_link_name(qd_link_pn(link)),
+ terminus_addr);
qdr_link_set_context(qdr_link, link);
qd_link_set_context(link, qdr_link);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/adc3ca6a/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 5648920..7aed283 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -547,6 +547,24 @@ class LinkRouteTest(TestCase):
drain_support.run()
self.assertEqual(None, drain_support.error)
+ def test_link_route_terminus_address(self):
+ # The receiver is attaching to router B to a listener that has link route for address 'pulp.task' setup.
+ listening_address = self.routers[1].addresses[1]
+ # Run the query on a normal port
+ query_address_listening = self.routers[1].addresses[0]
+
+ # Sender is attaching to router C
+ sender_address = self.routers[2].addresses[0]
+ query_address_sending = self.routers[2].addresses[0]
+
+ test = TerminusAddrTest(sender_address, listening_address, query_address_sending, query_address_listening)
+ test.run()
+
+ self.assertTrue(test.in_receiver_found)
+ self.assertTrue(test.out_receiver_found)
+ self.assertTrue(test.in_sender_found)
+ self.assertTrue(test.out_sender_found)
+
def test_dynamic_source(self):
test = DynamicSourceTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
test.run()
@@ -937,6 +955,117 @@ class DetachMixedCloseTest(MessagingHandler):
def run(self):
Container(self).run()
+class TerminusAddrTest(MessagingHandler):
+ """
+ This tests makes sure that the link route address is visible in the output of qdstat -l command.
+
+ Sets up a sender on address pulp.task.terminusTestSender and a receiver on pulp.task.terminusTestReceiver.
+ Connects to the router to which the sender is attached and makes sure that the pulp.task.terminusTestSender address
+ shows up with an 'in' and 'out'
+ Similarly connects to the router to which the receiver is attached and makes sure that the
+ pulp.task.terminusTestReceiver address shows up with an 'in' and 'out'
+
+ """
+ def __init__(self, sender_address, listening_address, query_address_sending, query_address_listening):
+ super(TerminusAddrTest, self).__init__()
+ self.sender_address = sender_address
+ self.listening_address = listening_address
+ self.sender = None
+ self.receiver = None
+ self.message_received = False
+ self.receiver_connection = None
+ self.sender_connection = None
+ # We will run a query on the same router where the sender is attached
+ self.query_address_sending = query_address_sending
+
+ # We will run a query on the same router where the receiver is attached
+ self.query_address_listening = query_address_listening
+ self.count = 0
+
+ self.in_receiver_found = False
+ self.out_receiver_found = False
+ self.in_sender_found = False
+ self.out_sender_found = False
+
+ self.receiver_link_opened = False
+ self.sender_link_opened = False
+
+ def on_start(self, event):
+ self.receiver_connection = event.container.connect(self.listening_address)
+
+ def on_connection_remote_open(self, event):
+ if event.connection == self.receiver_connection:
+ continue_loop = True
+ # The following loops introduces a wait. It gives time to the
+ # router so that the address Dpulp.task can show up on the remoteCount
+ i = 0
+ while continue_loop:
+ if i > 100: # If we have run the read command for more than hundred times and we still do not have
+ # the remoteCount set to 1, there is a problem, just exit out of the function instead
+ # of looping to infinity.
+ self.receiver_connection.close()
+ return
+ local_node = Node.connect(self.query_address_sending, timeout=TIMEOUT)
+ out = local_node.read(type='org.apache.qpid.dispatch.router.address', name='Dpulp.task').remoteCount
+ if out == 1:
+ continue_loop = False
+ i += 1
+ sleep(0.25)
+
+ self.sender_connection = event.container.connect(self.sender_address)
+
+ # Notice here that the receiver and sender are listening on different addresses. Receiver on
+ # pulp.task.terminusTestReceiver and the sender on pulp.task.terminusTestSender
+ self.receiver = event.container.create_receiver(self.receiver_connection, "pulp.task.terminusTestReceiver")
+ self.sender = event.container.create_sender(self.sender_connection, "pulp.task.terminusTestSender", options=AtMostOnce())
+
+ def on_link_opened(self, event):
+ if event.receiver == self.receiver:
+ self.receiver_link_opened = True
+
+ local_node = Node.connect(self.query_address_listening, timeout=TIMEOUT)
+ out = local_node.query(type='org.apache.qpid.dispatch.router.link')
+
+ link_dir_index = out.attribute_names.index("linkDir")
+ owning_addr_index = out.attribute_names.index("owningAddr")
+
+ # Make sure that the owningAddr M0pulp.task.terminusTestReceiver shows up on both in and out.
+ # The 'out' link is on address M0pulp.task.terminusTestReceiver outgoing from the router B to the receiver
+ # The 'in' link is on address M0pulp.task.terminusTestReceiver incoming from router C to router B
+ for result in out.results:
+ if result[link_dir_index] == 'in' and result[owning_addr_index] == 'M0pulp.task.terminusTestReceiver':
+ self.in_receiver_found = True
+ if result[link_dir_index] == 'out' and result[owning_addr_index] == 'M0pulp.task.terminusTestReceiver':
+ self.out_receiver_found = True
+
+ if event.sender == self.sender:
+ self.sender_link_opened = True
+
+ local_node = Node.connect(self.query_address_sending, timeout=TIMEOUT)
+ out = local_node.query(type='org.apache.qpid.dispatch.router.link')
+
+ link_dir_index = out.attribute_names.index("linkDir")
+ owning_addr_index = out.attribute_names.index("owningAddr")
+
+ # Make sure that the owningAddr M0pulp.task.terminusTestSender shows up on both in and out.
+ # The 'in' link is on address M0pulp.task.terminusTestSender incoming from sender to router
+ # The 'out' link is on address M0pulp.task.terminusTestSender outgoing from router C to router B
+ for result in out.results:
+ if result[link_dir_index] == 'in' and result[owning_addr_index] == 'M0pulp.task.terminusTestSender':
+ self.in_sender_found = True
+ if result[link_dir_index] == 'out' and result[owning_addr_index] == 'M0pulp.task.terminusTestSender':
+ self.out_sender_found = True
+
+ # Shutdown the connections only if the on_link_opened has been called for sender and receiver links.
+ if self.sender_link_opened and self.receiver_link_opened:
+ self.sender.close()
+ self.receiver.close()
+ self.sender_connection.close()
+ self.receiver_connection.close()
+
+ def run(self):
+ Container(self).run()
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org