You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/06/03 19:40:03 UTC
[1/3] qpid-dispatch git commit: DISPATCH-341 - From Ganesh Murthy -
Added some drain tests
Repository: qpid-dispatch
Updated Branches:
refs/heads/master b69283aa3 -> b6944e8af
DISPATCH-341 - From Ganesh Murthy - Added some drain tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ccf9533e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ccf9533e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ccf9533e
Branch: refs/heads/master
Commit: ccf9533e68a989ef646d8c684bba08823b057fcc
Parents: b69283a
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Jun 3 10:15:10 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Jun 3 10:15:10 2016 -0400
----------------------------------------------------------------------
tests/CMakeLists.txt | 1 +
tests/system_tests_drain.py | 55 +++++++++++++++++
tests/system_tests_drain_support.py | 101 +++++++++++++++++++++++++++++++
tests/system_tests_link_routes.py | 15 ++++-
4 files changed, 170 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf9533e/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index eef7900..1c1be1c 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -72,6 +72,7 @@ add_test(router_policy_test ${TEST_WRAP} -m unittest -v router_policy_test)
foreach(py_test_module
# system_tests_broker
system_tests_link_routes
+ system_tests_drain
system_tests_management
system_tests_one_router
system_tests_policy
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf9533e/tests/system_tests_drain.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py
new file mode 100644
index 0000000..ba503a1
--- /dev/null
+++ b/tests/system_tests_drain.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+from system_test import TestCase, Qdrouterd, main_module
+from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
+
+class DrainSupportTest(TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ """Start a router and a messenger"""
+ super(DrainSupportTest, cls).setUpClass()
+ name = "test-router"
+ config = Qdrouterd.Config([
+ ('router', {'mode': 'standalone', 'id': 'QDR'}),
+
+ # Setting the linkCapacity to 10 will allow the sender to send a burst of 10 messages
+ ('listener', {'port': cls.tester.get_port(), 'linkCapacity': 10}),
+
+ ])
+
+ cls.router = cls.tester.qdrouterd(name, config)
+ cls.router.wait_ready()
+ cls.address = cls.router.addresses[0]
+
+ def test_drain_support_all_messages(self):
+ drain_support = DrainMessagesHandler(self.address)
+ drain_support.run()
+ self.assertTrue(drain_support.drain_successful)
+
+ def test_drain_support_one_message(self):
+ drain_support = DrainOneMessageHandler(self.address)
+ drain_support.run()
+ self.assertTrue(drain_support.drain_successful)
+
+if __name__ == '__main__':
+ unittest.main(main_module())
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf9533e/tests/system_tests_drain_support.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py
new file mode 100644
index 0000000..6192a7c
--- /dev/null
+++ b/tests/system_tests_drain_support.py
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from proton import Message
+
+class DrainMessagesHandler(MessagingHandler):
+ def __init__(self, address):
+ # prefetch is set to zero so that proton does not automatically issue 10 credits.
+ super(DrainMessagesHandler, self).__init__(prefetch=0)
+ self.conn = None
+ self.sender = None
+ self.receiver = None
+ self.sent_count = 0
+ self.received_count = 0
+ self.address = address
+ self.drain_successful = False
+
+ def on_start(self, event):
+ self.conn = event.container.connect(self.address)
+
+ # Create a sender and a receiver. They are both listening on the same address
+ self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
+ self.sender = event.container.create_sender(self.conn, "org.apache.dev")
+ self.receiver.flow(1)
+
+ def on_sendable(self, event):
+ if self.sent_count < 10:
+ msg = Message(body="Hello World")
+ dlv = event.sender.send(msg)
+ dlv.settle()
+ self.sent_count += 1
+
+ def on_message(self, event):
+ if event.receiver == self.receiver:
+ if "Hello World" == event.message.body:
+ self.received_count += 1
+
+ if self.received_count < 4:
+ event.receiver.flow(1)
+ elif self.received_count == 4:
+ # We are issuing a drain of 20. This means that we will receive all the 10 messages
+ # that the sender is sending. The router will also send back a response flow frame with
+ # drain=True but I don't have any way of making sure that the response frame reached the
+ # receiver
+ event.receiver.drain(20)
+
+ # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
+ # messages. That along with 10 messages received indicates that the drain worked and we can
+ # declare that the test is successful
+ if self.received_count == 10 and event.link.credit == 0:
+ self.drain_successful = True
+ self.receiver.close()
+ self.sender.close()
+ self.conn.close()
+
+ def run(self):
+ Container(self).run()
+
+class DrainOneMessageHandler(DrainMessagesHandler):
+ def __init__(self, address):
+ super(DrainOneMessageHandler, self).__init__(address)
+
+ def on_message(self, event):
+ if event.receiver == self.receiver:
+ if "Hello World" == event.message.body:
+ self.received_count += 1
+
+ if self.received_count < 4:
+ event.receiver.flow(1)
+ elif self.received_count == 4:
+ # We are issuing a drain of 1 after we receive the 4th message.
+ # This means that going forward, we will receive only one more message.
+ event.receiver.drain(1)
+
+ # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
+ # messages. That along with 5 messages received (4 earlier messages and 1 extra message for drain=1)
+ # indicates that the drain worked and we can declare that the test is successful
+ if self.received_count == 5 and event.link.credit == 0:
+ self.drain_successful = True
+ self.receiver.close()
+ self.sender.close()
+ self.conn.close()
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf9533e/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 8f2ee8d..37317f9 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -28,6 +28,8 @@ from proton.handlers import MessagingHandler
from proton.reactor import AtMostOnce, Container
from proton.utils import BlockingConnection, LinkDetached
+from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
+
from qpid_dispatch.management.client import Node
class LinkRoutePatternTest(TestCase):
@@ -443,6 +445,16 @@ class LinkRoutePatternTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_www_drain_support_all_messages(self):
+ drain_support = DrainMessagesHandler(self.routers[2].addresses[1])
+ drain_support.run()
+ self.assertTrue(drain_support.drain_successful)
+
+ def test_www_drain_support_one_message(self):
+ drain_support = DrainOneMessageHandler(self.routers[2].addresses[1])
+ drain_support.run()
+ self.assertTrue(drain_support.drain_successful)
+
class DeliveryTagsTest(MessagingHandler):
def __init__(self, sender_address, listening_address, qdstat_address):
@@ -565,7 +577,6 @@ class CloseWithUnsettledTest(MessagingHandler):
def run(self):
Container(self).run()
-
-
if __name__ == '__main__':
unittest.main(main_module())
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-dispatch git commit: DISPATCH-341 - Added two tests for
message-routed drain. Added timeouts for tests.
Posted by tr...@apache.org.
DISPATCH-341 - Added two tests for message-routed drain. Added timeouts for tests.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ae561b76
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ae561b76
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ae561b76
Branch: refs/heads/master
Commit: ae561b760e60d6d08cb466ef00688494a54acc4d
Parents: ccf9533
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Jun 3 10:52:24 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Jun 3 10:52:24 2016 -0400
----------------------------------------------------------------------
tests/system_tests_drain.py | 17 ++++-
tests/system_tests_drain_support.py | 105 ++++++++++++++++++++++++++++++-
2 files changed, 116 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ae561b76/tests/system_tests_drain.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py
index ba503a1..9747995 100644
--- a/tests/system_tests_drain.py
+++ b/tests/system_tests_drain.py
@@ -21,6 +21,7 @@ import unittest
from system_test import TestCase, Qdrouterd, main_module
from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
+from system_tests_drain_support import DrainNoMessagesHandler, DrainNoMoreMessagesHandler
class DrainSupportTest(TestCase):
@@ -44,12 +45,22 @@ class DrainSupportTest(TestCase):
def test_drain_support_all_messages(self):
drain_support = DrainMessagesHandler(self.address)
drain_support.run()
- self.assertTrue(drain_support.drain_successful)
+ self.assertEqual(drain_support.error, None)
def test_drain_support_one_message(self):
drain_support = DrainOneMessageHandler(self.address)
drain_support.run()
- self.assertTrue(drain_support.drain_successful)
+ self.assertEqual(drain_support.error, None)
+
+ def test_drain_support_no_messages(self):
+ drain_support = DrainNoMessagesHandler(self.address)
+ drain_support.run()
+ self.assertEqual(drain_support.error, None)
+
+ def test_drain_support_no_more_messages(self):
+ drain_support = DrainNoMoreMessagesHandler(self.address)
+ drain_support.run()
+ self.assertEqual(drain_support.error, None)
if __name__ == '__main__':
- unittest.main(main_module())
\ No newline at end of file
+ unittest.main(main_module())
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ae561b76/tests/system_tests_drain_support.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py
index 6192a7c..f11b8b8 100644
--- a/tests/system_tests_drain_support.py
+++ b/tests/system_tests_drain_support.py
@@ -21,6 +21,14 @@ from proton.handlers import MessagingHandler
from proton.reactor import Container
from proton import Message
+class Timeout(object):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ self.parent.timeout()
+
+
class DrainMessagesHandler(MessagingHandler):
def __init__(self, address):
# prefetch is set to zero so that proton does not automatically issue 10 credits.
@@ -31,9 +39,14 @@ class DrainMessagesHandler(MessagingHandler):
self.sent_count = 0
self.received_count = 0
self.address = address
- self.drain_successful = False
+ self.error = "Unexpected Exit"
+
+ def timeout(self):
+ self.error = "Timeout Expired"
+ self.conn.close()
def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
self.conn = event.container.connect(self.address)
# Create a sender and a receiver. They are both listening on the same address
@@ -66,7 +79,8 @@ class DrainMessagesHandler(MessagingHandler):
# messages. That along with 10 messages received indicates that the drain worked and we can
# declare that the test is successful
if self.received_count == 10 and event.link.credit == 0:
- self.drain_successful = True
+ self.error = None
+ self.timer.cancel()
self.receiver.close()
self.sender.close()
self.conn.close()
@@ -74,6 +88,7 @@ class DrainMessagesHandler(MessagingHandler):
def run(self):
Container(self).run()
+
class DrainOneMessageHandler(DrainMessagesHandler):
def __init__(self, address):
super(DrainOneMessageHandler, self).__init__(address)
@@ -94,8 +109,92 @@ class DrainOneMessageHandler(DrainMessagesHandler):
# messages. That along with 5 messages received (4 earlier messages and 1 extra message for drain=1)
# indicates that the drain worked and we can declare that the test is successful
if self.received_count == 5 and event.link.credit == 0:
- self.drain_successful = True
+ self.error = None
+ self.timer.cancel()
self.receiver.close()
self.sender.close()
self.conn.close()
+
+class DrainNoMessagesHandler(MessagingHandler):
+ def __init__(self, address):
+ # prefetch is set to zero so that proton does not automatically issue 10 credits.
+ super(DrainNoMessagesHandler, self).__init__(prefetch=0)
+ self.conn = None
+ self.sender = None
+ self.receiver = None
+ self.address = address
+ self.error = "Unexpected Exit"
+
+ def timeout(self):
+ self.error = "Timeout Expired"
+ self.conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+ self.conn = event.container.connect(self.address)
+
+ # Create a sender and a receiver. They are both listening on the same address
+ self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
+ self.sender = event.container.create_sender(self.conn, "org.apache.dev")
+ self.receiver.flow(1)
+
+ def on_sendable(self, event):
+ self.receiver.drain(1)
+
+ def on_drained(self, event):
+ if sender.credit == 0:
+ self.error = None
+ self.timer.cancel()
+ self.conn.close()
+
+ def run(self):
+ Container(self).run()
+
+
+class DrainNoMoreMessagesHandler(MessagingHandler):
+ def __init__(self, address):
+ # prefetch is set to zero so that proton does not automatically issue 10 credits.
+ super(DrainNoMoreMessagesHandler, self).__init__(prefetch=0)
+ self.conn = None
+ self.sender = None
+ self.receiver = None
+ self.address = address
+ self.sent = 0
+ self.rcvd = 0
+ self.error = "Unexpected Exit"
+
+ def timeout(self):
+ self.error = "Timeout Expired: sent=%d rcvd=%d" % (self.sent, self.rcvd)
+ self.conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
+ self.conn = event.container.connect(self.address)
+
+ # Create a sender and a receiver. They are both listening on the same address
+ self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
+ self.sender = event.container.create_sender(self.conn, "org.apache.dev")
+ self.receiver.flow(1)
+
+ def on_sendable(self, event):
+ if self.sent == 0:
+ msg = Message(body="Hello World")
+ event.sender.send(msg)
+ self.sent += 1
+
+ def on_message(self, event):
+ self.rcvd += 1
+
+ def on_settled(self, event):
+ self.receiver.drain(1)
+
+ def on_drained(self, event):
+ if sender.credit == 0:
+ self.error = None
+ self.timer.cancel()
+ self.conn.close()
+
+ def run(self):
+ Container(self).run()
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-dispatch git commit: DISPATCH-341 - Drain now propagates
across link routes and behaves correctly for router-terminated links.
Posted by tr...@apache.org.
DISPATCH-341 - Drain now propagates across link routes and behaves correctly for router-terminated links.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/b6944e8a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/b6944e8a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/b6944e8a
Branch: refs/heads/master
Commit: b6944e8af5d39823b75a6bad2d065fafac596b22
Parents: ae561b7
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Jun 3 15:38:26 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Jun 3 15:38:26 2016 -0400
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 2 ++
src/router_core/connections.c | 24 ++++++++-----
src/router_core/router_core_private.h | 4 ++-
src/router_core/transfer.c | 56 +++++++++++++++++++++---------
src/router_node.c | 13 +++++++
tests/system_tests_drain.py | 8 ++---
tests/system_tests_drain_support.py | 8 ++---
tests/system_tests_link_routes.py | 16 +++++++--
8 files changed, 94 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 459e1a3..8eaa7ef 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -527,6 +527,7 @@ typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_e
typedef void (*qdr_link_flow_t) (void *context, qdr_link_t *link, int credit);
typedef void (*qdr_link_offer_t) (void *context, qdr_link_t *link, int delivery_count);
typedef void (*qdr_link_drained_t) (void *context, qdr_link_t *link);
+typedef void (*qdr_link_drain_t) (void *context, qdr_link_t *link, bool mode);
typedef void (*qdr_link_push_t) (void *context, qdr_link_t *link);
typedef void (*qdr_link_deliver_t) (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled);
typedef void (*qdr_delivery_update_t) (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled);
@@ -540,6 +541,7 @@ void qdr_connection_handlers(qdr_core_t *core,
qdr_link_flow_t flow,
qdr_link_offer_t offer,
qdr_link_drained_t drained,
+ qdr_link_drain_t drain,
qdr_link_push_t push,
qdr_link_deliver_t deliver,
qdr_delivery_update_t delivery_update);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index a9612ea..e0543ec 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -188,8 +188,14 @@ int qdr_connection_process(qdr_connection_t *conn)
sys_mutex_unlock(conn->work_lock);
if (link) {
- core->flow_handler(core->user_context, link, link->incremental_credit);
- link->incremental_credit = 0;
+ if (link->incremental_credit > 0) {
+ core->flow_handler(core->user_context, link, link->incremental_credit);
+ link->incremental_credit = 0;
+ }
+ if (link->drain_mode_changed) {
+ core->drain_handler(core->user_context, link, link->drain_mode);
+ link->drain_mode_changed = false;
+ }
event_count++;
}
} while (link);
@@ -331,6 +337,7 @@ void qdr_connection_handlers(qdr_core_t *core,
qdr_link_flow_t flow,
qdr_link_offer_t offer,
qdr_link_drained_t drained,
+ qdr_link_drain_t drain,
qdr_link_push_t push,
qdr_link_deliver_t deliver,
qdr_delivery_update_t delivery_update)
@@ -343,6 +350,7 @@ void qdr_connection_handlers(qdr_core_t *core,
core->flow_handler = flow;
core->offer_handler = offer;
core->drained_handler = drained;
+ core->drain_handler = drain;
core->push_handler = push;
core->deliver_handler = deliver;
core->delivery_update_handler = delivery_update;
@@ -991,7 +999,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
if (qdr_terminus_is_anonymous(target)) {
link->owning_addr = 0;
qdr_link_outbound_second_attach_CT(core, link, source, target);
- qdr_link_issue_credit_CT(core, link, link->capacity);
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
} else {
//
@@ -1032,7 +1040,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
// Issue the initial credit only if there are destinations for the address.
//
if (DEQ_SIZE(addr->subscriptions) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes))
- qdr_link_issue_credit_CT(core, link, link->capacity);
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
}
}
break;
@@ -1040,12 +1048,12 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
case QD_LINK_CONTROL:
qdr_link_outbound_second_attach_CT(core, link, source, target);
- qdr_link_issue_credit_CT(core, link, link->capacity);
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
break;
case QD_LINK_ROUTER:
qdr_link_outbound_second_attach_CT(core, link, source, target);
- qdr_link_issue_credit_CT(core, link, link->capacity);
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
break;
}
} else {
@@ -1147,12 +1155,12 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
//
qdr_address_t *addr = link->owning_addr;
if (!addr || (DEQ_SIZE(addr->subscriptions) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes)))
- qdr_link_issue_credit_CT(core, link, link->capacity);
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
break;
case QD_LINK_CONTROL:
case QD_LINK_ROUTER:
- qdr_link_issue_credit_CT(core, link, link->capacity);
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
break;
}
} else {
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 58d73a2..0fd5546 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -267,6 +267,7 @@ struct qdr_link_t {
int incremental_credit;
bool flow_started; ///< for incoming, true iff initial credit has been granted
bool drain_mode;
+ bool drain_mode_changed;
int credit_to_core; ///< Number of the available credits incrementally given to the core
uint64_t total_deliveries;
};
@@ -545,6 +546,7 @@ struct qdr_core_t {
qdr_link_flow_t flow_handler;
qdr_link_offer_t offer_handler;
qdr_link_drained_t drained_handler;
+ qdr_link_drain_t drain_handler;
qdr_link_push_t push_handler;
qdr_link_deliver_t deliver_handler;
qdr_delivery_update_t delivery_update_handler;
@@ -589,7 +591,7 @@ void qdr_agent_setup_CT(qdr_core_t *core);
void qdr_forwarder_setup_CT(qdr_core_t *core);
qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label);
void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
-void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit);
+void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain);
void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr);
void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 4ac5e1a..b587baf 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -353,7 +353,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
//
if (moved && link->link_direction == QD_INCOMING &&
link->link_type != QD_LINK_ROUTER && !link->connected_link)
- qdr_link_issue_credit_CT(core, link, 1);
+ qdr_link_issue_credit_CT(core, link, 1, false);
return moved;
}
@@ -364,24 +364,39 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
if (discard)
return;
- qdr_link_t *link = action->args.connection.link;
- int credit = action->args.connection.credit;
- bool drain = action->args.connection.drain;
- bool activate = false;
+ qdr_link_t *link = action->args.connection.link;
+ int credit = action->args.connection.credit;
+ bool drain = action->args.connection.drain;
+ bool activate = false;
+ bool drain_was_set = !link->drain_mode && drain;
+
+ link->drain_mode = drain;
//
// If this is an attach-routed link, propagate the flow data downrange.
// Note that the credit value is incremental.
//
- if (link->connected_link)
- qdr_link_issue_credit_CT(core, link->connected_link, credit);
+ if (link->connected_link) {
+ qdr_link_t *clink = link->connected_link;
+
+ if (clink->link_direction == QD_INCOMING)
+ qdr_link_issue_credit_CT(core, link->connected_link, credit, drain);
+ else {
+ sys_mutex_lock(clink->conn->work_lock);
+ qdr_add_link_ref(&clink->conn->links_with_deliveries, clink, QDR_LINK_LIST_CLASS_DELIVERY);
+ sys_mutex_unlock(clink->conn->work_lock);
+ qdr_connection_activate_CT(core, clink->conn);
+ }
+
+ return;
+ }
//
// Handle the replenishing of credit outbound
//
- if (link->link_direction == QD_OUTGOING && credit > 0) {
+ if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) {
sys_mutex_lock(link->conn->work_lock);
- if (DEQ_SIZE(link->undelivered) > 0) {
+ if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
activate = true;
}
@@ -389,10 +404,8 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
}
//
- // Record the drain mode for the link
+ // Activate the connection if we have deliveries to send or drain mode was set.
//
- link->drain_mode = drain;
-
if (activate)
qdr_connection_activate_CT(core, link->conn);
}
@@ -428,7 +441,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
qdr_delivery_release_CT(core, dlv);
qdr_delivery_decref(dlv);
if (link->link_type == QD_LINK_ROUTER)
- qdr_link_issue_credit_CT(core, link, 1);
+ qdr_link_issue_credit_CT(core, link, 1, false);
}
} else if (fanout > 0) {
if (dlv->settled) {
@@ -436,7 +449,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
// The delivery is settled. Keep it off the unsettled list and issue
// replacement credit for it now.
//
- qdr_link_issue_credit_CT(core, link, 1);
+ qdr_link_issue_credit_CT(core, link, 1, false);
//
// If the delivery has no more references, free it now.
@@ -457,7 +470,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
// are many addresses sharing the link.
//
if (link->link_type == QD_LINK_ROUTER)
- qdr_link_issue_credit_CT(core, link, 1);
+ qdr_link_issue_credit_CT(core, link, 1, false);
}
}
@@ -629,8 +642,14 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
* Check the link's accumulated credit. If the credit given to the connection thread
* has been issued to Proton, provide the next batch of credit to the connection thread.
*/
-void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit)
+void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain)
{
+ bool drain_changed = link->drain_mode |= drain;
+ bool activate = drain_changed;
+
+ link->drain_mode = drain;
+ link->drain_mode_changed = drain_changed;
+
link->incremental_credit_CT += credit;
link->flow_started = true;
@@ -640,7 +659,10 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit)
//
link->incremental_credit = link->incremental_credit_CT;
link->incremental_credit_CT = 0;
+ activate = true;
+ }
+ if (activate) {
//
// Put this link on the connection's has-credit list.
//
@@ -682,7 +704,7 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
// Issue credit to stalled links
//
if (!link->flow_started)
- qdr_link_issue_credit_CT(core, link, link->capacity);
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
//
// Drain undelivered deliveries via the forwarder
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 10794ac..b8e8f0e 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -789,6 +789,18 @@ static void CORE_link_drained(void *context, qdr_link_t *link)
}
+static void CORE_link_drain(void *context, qdr_link_t *link, bool mode)
+{
+ qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
+ pn_link_t *plink = qd_link_pn(qlink);
+
+ if (plink) {
+ if (pn_link_is_receiver(plink))
+ pn_link_set_drain(plink, mode);
+ }
+}
+
+
static void CORE_link_push(void *context, qdr_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
@@ -875,6 +887,7 @@ void qd_router_setup_late(qd_dispatch_t *qd)
CORE_link_flow,
CORE_link_offer,
CORE_link_drained,
+ CORE_link_drain,
CORE_link_push,
CORE_link_deliver,
CORE_delivery_update);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/tests/system_tests_drain.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py
index 9747995..b379d27 100644
--- a/tests/system_tests_drain.py
+++ b/tests/system_tests_drain.py
@@ -42,22 +42,22 @@ class DrainSupportTest(TestCase):
cls.router.wait_ready()
cls.address = cls.router.addresses[0]
- def test_drain_support_all_messages(self):
+ def test_drain_support_1_all_messages(self):
drain_support = DrainMessagesHandler(self.address)
drain_support.run()
self.assertEqual(drain_support.error, None)
- def test_drain_support_one_message(self):
+ def test_drain_support_2_one_message(self):
drain_support = DrainOneMessageHandler(self.address)
drain_support.run()
self.assertEqual(drain_support.error, None)
- def test_drain_support_no_messages(self):
+ def test_drain_support_3_no_messages(self):
drain_support = DrainNoMessagesHandler(self.address)
drain_support.run()
self.assertEqual(drain_support.error, None)
- def test_drain_support_no_more_messages(self):
+ def test_drain_support_4_no_more_messages(self):
drain_support = DrainNoMoreMessagesHandler(self.address)
drain_support.run()
self.assertEqual(drain_support.error, None)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/tests/system_tests_drain_support.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py
index f11b8b8..aa3a23a 100644
--- a/tests/system_tests_drain_support.py
+++ b/tests/system_tests_drain_support.py
@@ -142,8 +142,8 @@ class DrainNoMessagesHandler(MessagingHandler):
def on_sendable(self, event):
self.receiver.drain(1)
- def on_drained(self, event):
- if sender.credit == 0:
+ def on_link_flow(self, event):
+ if self.receiver.credit == 0:
self.error = None
self.timer.cancel()
self.conn.close()
@@ -189,8 +189,8 @@ class DrainNoMoreMessagesHandler(MessagingHandler):
def on_settled(self, event):
self.receiver.drain(1)
- def on_drained(self, event):
- if sender.credit == 0:
+ def on_link_flow(self, event):
+ if self.receiver.credit == 0:
self.error = None
self.timer.cancel()
self.conn.close()
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 37317f9..420cc96 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -28,7 +28,7 @@ from proton.handlers import MessagingHandler
from proton.reactor import AtMostOnce, Container
from proton.utils import BlockingConnection, LinkDetached
-from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
+from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler, DrainNoMessagesHandler, DrainNoMoreMessagesHandler
from qpid_dispatch.management.client import Node
@@ -448,12 +448,22 @@ class LinkRoutePatternTest(TestCase):
def test_www_drain_support_all_messages(self):
drain_support = DrainMessagesHandler(self.routers[2].addresses[1])
drain_support.run()
- self.assertTrue(drain_support.drain_successful)
+ self.assertEqual(None, drain_support.error)
def test_www_drain_support_one_message(self):
drain_support = DrainOneMessageHandler(self.routers[2].addresses[1])
drain_support.run()
- self.assertTrue(drain_support.drain_successful)
+ self.assertEqual(None, drain_support.error)
+
+ def test_www_drain_support_no_messages(self):
+ drain_support = DrainNoMessagesHandler(self.routers[2].addresses[1])
+ drain_support.run()
+ self.assertEqual(None, drain_support.error)
+
+ def test_www_drain_support_no_more_messages(self):
+ drain_support = DrainNoMoreMessagesHandler(self.routers[2].addresses[1])
+ drain_support.run()
+ self.assertEqual(None, drain_support.error)
class DeliveryTagsTest(MessagingHandler):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org