You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/09/02 21:16:38 UTC
[1/2] qpid-dispatch git commit: DISPATCH-496 - Issue credit to
blocked senders when an autolink attaches the first consumer.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master ddc85fab8 -> bba79f3c6
DISPATCH-496 - Issue credit to blocked senders when an autolink attaches the first consumer.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/41b74076
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/41b74076
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/41b74076
Branch: refs/heads/master
Commit: 41b740761adf7fb24dd0de8a5aa0bef604240d5c
Parents: ddc85fa
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Sep 2 16:24:37 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Sep 2 16:24:37 2016 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/41b74076/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 229571e..634a07f 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1224,6 +1224,7 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
const char *key = (const char*) qd_hash_key_by_handle(link->auto_link->addr->hash_handle);
if (key && *key == 'M')
qdr_post_mobile_added_CT(core, key);
+ qdr_addr_start_inlinks_CT(core, link->auto_link->addr);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-dispatch git commit: DISPATCH-495 - Improved stability of
autolinks and added a set of autolink tests.
Posted by tr...@apache.org.
DISPATCH-495 - Improved stability of autolinks and added a set of autolink tests.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/bba79f3c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/bba79f3c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/bba79f3c
Branch: refs/heads/master
Commit: bba79f3c68d771624a9b1b54d36ae96f368205b4
Parents: 41b7407
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Sep 2 17:15:48 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Sep 2 17:15:48 2016 -0400
----------------------------------------------------------------------
src/container.c | 6 +-
src/server.c | 2 +-
tests/CMakeLists.txt | 1 +
tests/system_tests_autolinks.py | 196 +++++++++++++++++++++++++++++++++++
4 files changed, 202 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bba79f3c/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 1c9fec8..92c0e0e 100644
--- a/src/container.c
+++ b/src/container.c
@@ -309,13 +309,14 @@ static int close_handler(qd_container_t *container, void* conn_context, pn_conne
pn_link_t *pn_link = pn_link_head(conn, 0);
while (pn_link) {
qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link);
+ pn_link_t *next = pn_link_next(pn_link, 0);
if (link) {
qd_node_t *node = link->node;
if (node) {
node->ntype->link_detach_handler(node->context, link, QD_LOST);
}
}
- pn_link = pn_link_next(pn_link, 0);
+ pn_link = next;
}
// close the connection
@@ -779,9 +780,10 @@ qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, c
link->close_sess_with_link = true;
//
- // Keep the borrowed link reference
+ // Keep the borrowed references
//
pn_incref(link->pn_link);
+ pn_incref(link->pn_sess);
pn_link_set_context(link->pn_link, link);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bba79f3c/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index e74b879..aa08ae6 100644
--- a/src/server.c
+++ b/src/server.c
@@ -798,7 +798,7 @@ static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
if (ctx->connector) {
ce = QD_CONN_EVENT_CONNECTOR_OPEN;
- ctx->connector->delay = 0;
+ ctx->connector->delay = 2000; // Delay on re-connect in case there is a recurring error
} else
assert(ctx->listener);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bba79f3c/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 1c1be1c..9d133a7 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -72,6 +72,7 @@ add_test(router_policy_test ${TEST_WRAP} -m unittest -v router_policy_test)
foreach(py_test_module
# system_tests_broker
system_tests_link_routes
+ system_tests_autolinks
system_tests_drain
system_tests_management
system_tests_one_router
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bba79f3c/tests/system_tests_autolinks.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_autolinks.py b/tests/system_tests_autolinks.py
new file mode 100644
index 0000000..cc79c64
--- /dev/null
+++ b/tests/system_tests_autolinks.py
@@ -0,0 +1,196 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+from proton import Message, Delivery, PENDING, ACCEPTED, REJECTED
+from system_test import TestCase, Qdrouterd, main_module
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, AtMostOnce, AtLeastOnce
+from proton.utils import BlockingConnection, SyncRequestResponse
+from qpid_dispatch.management.client import Node
+
+CONNECTION_PROPERTIES = {u'connection': u'properties', u'int_property': 6451}
+
+class AutolinkTest(TestCase):
+ """System tests involving a single router"""
+ @classmethod
+ def setUpClass(cls):
+ """Start a router and a messenger"""
+ super(AutolinkTest, cls).setUpClass()
+ name = "test-router"
+ config = Qdrouterd.Config([
+ ('router', {'mode': 'standalone', 'id': 'QDR'}),
+
+ #
+ # Create a general-purpose listener for sending and receiving deliveries
+ #
+ ('listener', {'port': cls.tester.get_port()}),
+
+ #
+ # Create a route-container listener for the autolinks
+ #
+ ('listener', {'port': cls.tester.get_port(), 'role': 'route-container'}),
+
+ #
+ # Create a pair of default auto-links for 'node.1'
+ #
+ ('autoLink', {'addr': 'node.1', 'containerId': 'container.1', 'dir': 'in'}),
+ ('autoLink', {'addr': 'node.1', 'containerId': 'container.1', 'dir': 'out'}),
+ ('address', {'prefix': 'node', 'waypoint': 'yes'}),
+
+ #
+ # Create a pair of auto-links on non-default phases for container-to-container transfers
+ #
+ ('autoLink', {'addr': 'xfer.2', 'containerId': 'container.2', 'dir': 'in', 'phase': '4'}),
+ ('autoLink', {'addr': 'xfer.2', 'containerId': 'container.3', 'dir': 'out', 'phase': '4'}),
+ ])
+
+ cls.router = cls.tester.qdrouterd(name, config)
+ cls.router.wait_ready()
+ cls.normal_address = cls.router.addresses[0]
+ cls.route_address = cls.router.addresses[1]
+
+
+ def test_01_autolink_attach(self):
+ """
+ Create the route-container connection and verify that the appropriate links are attached.
+ Disconnect, reconnect, and verify that the links are re-attached.
+ """
+ test = AutolinkAttachTest(self.route_address)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+ def test_02_autolink_credit(self):
+ """
+ Create a normal connection and a sender to the autolink address. Then create the route-container
+ connection and ensure that the on_sendable did not arrive until after the autolinks were created.
+ """
+ test = AutolinkCreditTest(self.normal_address, self.route_address)
+ test.run()
+ self.assertEqual(None, test.error)
+
+
+class Timeout(object):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ self.parent.timeout()
+
+
+class AutolinkAttachTest(MessagingHandler):
+ def __init__(self, address):
+ super(AutolinkAttachTest, self).__init__(prefetch=0)
+ self.address = address
+ self.error = None
+ self.sender = None
+ self.receiver = None
+
+ self.n_rx_attach = 0
+ self.n_tx_attach = 0
+
+ def timeout(self):
+ self.error = "Timeout Expired: n_rx_attach=%d n_tx_attach=%d" % (self.n_rx_attach, self.n_tx_attach)
+ self.conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+ self.conn = event.container.connect(self.address)
+
+ def on_connection_closed(self, event):
+ if self.n_tx_attach == 1:
+ self.conn = event.container.connect(self.address)
+
+ def on_link_opened(self, event):
+ if event.sender:
+ self.n_tx_attach += 1
+ if event.sender.remote_source.address != 'node.1':
+ self.error = "Expected sender address 'node.1', got '%s'" % event.sender.remote_source.address
+ self.timer.cancel()
+ self.conn.close()
+ elif event.receiver:
+ self.n_rx_attach += 1
+ if event.receiver.remote_target.address != 'node.1':
+ self.error = "Expected receiver address 'node.1', got '%s'" % event.receiver.remote_target.address
+ self.timer.cancel()
+ self.conn.close()
+ if self.n_tx_attach == 1 and self.n_rx_attach == 1:
+ self.conn.close()
+ if self.n_tx_attach == 2 and self.n_rx_attach == 2:
+ self.conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ container = Container(self)
+ container.container_id = 'container.1'
+ container.run()
+
+
+class AutolinkCreditTest(MessagingHandler):
+ def __init__(self, normal_address, route_address):
+ super(AutolinkCreditTest, self).__init__(prefetch=0)
+ self.normal_address = normal_address
+ self.route_address = route_address
+ self.dest = 'node.1'
+ self.normal_conn = None
+ self.route_conn = None
+ self.error = None
+ self.last_action = "None"
+
+ def timeout(self):
+ self.error = "Timeout Expired: last_action=%s" % self.last_action
+ if self.normal_conn:
+ self.normal_conn.close()
+ if self.route_conn:
+ self.route_conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+ self.normal_conn = event.container.connect(self.normal_address)
+ self.sender = event.container.create_sender(self.normal_conn, self.dest)
+ self.last_action = "Attached normal sender"
+
+ def on_link_opening(self, event):
+ if event.sender:
+ event.sender.source.address = event.sender.remote_source.address
+ if event.receiver:
+ event.receiver.target.address = event.receiver.remote_target.address
+
+ def on_link_opened(self, event):
+ if event.sender == self.sender:
+ self.route_conn = event.container.connect(self.route_address)
+ self.last_action = "Opened route connection"
+
+ def on_sendable(self, event):
+ if event.sender == self.sender:
+ if self.last_action != "Opened route connection":
+ self.error = "Events out of sequence: last_action=%s" % self.last_action
+ self.timer.cancel()
+ self.route_conn.close()
+ self.normal_conn.close()
+
+ def run(self):
+ container = Container(self)
+ container.container_id = 'container.1'
+ container.run()
+
+
+if __name__ == '__main__':
+ unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org