You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/06/03 19:40:03 UTC

[1/3] qpid-dispatch git commit: DISPATCH-341 - From Ganesh Murthy - Added some drain tests

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master b69283aa3 -> b6944e8af


DISPATCH-341 - From Ganesh Murthy - Added some drain tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ccf9533e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ccf9533e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ccf9533e

Branch: refs/heads/master
Commit: ccf9533e68a989ef646d8c684bba08823b057fcc
Parents: b69283a
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Jun 3 10:15:10 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Jun 3 10:15:10 2016 -0400

----------------------------------------------------------------------
 tests/CMakeLists.txt                |   1 +
 tests/system_tests_drain.py         |  55 +++++++++++++++++
 tests/system_tests_drain_support.py | 101 +++++++++++++++++++++++++++++++
 tests/system_tests_link_routes.py   |  15 ++++-
 4 files changed, 170 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf9533e/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index eef7900..1c1be1c 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -72,6 +72,7 @@ add_test(router_policy_test    ${TEST_WRAP} -m unittest -v router_policy_test)
 foreach(py_test_module
 #   system_tests_broker
     system_tests_link_routes
+    system_tests_drain
     system_tests_management
     system_tests_one_router
     system_tests_policy

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf9533e/tests/system_tests_drain.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py
new file mode 100644
index 0000000..ba503a1
--- /dev/null
+++ b/tests/system_tests_drain.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+from system_test import TestCase, Qdrouterd, main_module
+from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
+
+class DrainSupportTest(TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router and a messenger"""
+        super(DrainSupportTest, cls).setUpClass()
+        name = "test-router"
+        config = Qdrouterd.Config([
+            ('router', {'mode': 'standalone', 'id': 'QDR'}),
+
+            # Setting the linkCapacity to 10 will allow the sender to send a burst of 10 messages
+            ('listener', {'port': cls.tester.get_port(), 'linkCapacity': 10}),
+
+        ])
+
+        cls.router = cls.tester.qdrouterd(name, config)
+        cls.router.wait_ready()
+        cls.address = cls.router.addresses[0]
+
+    def test_drain_support_all_messages(self):
+        drain_support = DrainMessagesHandler(self.address)
+        drain_support.run()
+        self.assertTrue(drain_support.drain_successful)
+
+    def test_drain_support_one_message(self):
+        drain_support = DrainOneMessageHandler(self.address)
+        drain_support.run()
+        self.assertTrue(drain_support.drain_successful)
+
+if __name__ == '__main__':
+    unittest.main(main_module())
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf9533e/tests/system_tests_drain_support.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py
new file mode 100644
index 0000000..6192a7c
--- /dev/null
+++ b/tests/system_tests_drain_support.py
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from proton import Message
+
+class DrainMessagesHandler(MessagingHandler):
+    def __init__(self, address):
+        # prefetch is set to zero so that proton does not automatically issue 10 credits.
+        super(DrainMessagesHandler, self).__init__(prefetch=0)
+        self.conn = None
+        self.sender = None
+        self.receiver = None
+        self.sent_count = 0
+        self.received_count = 0
+        self.address = address
+        self.drain_successful = False
+
+    def on_start(self, event):
+        self.conn = event.container.connect(self.address)
+
+        # Create a sender and a receiver. They are both listening on the same address
+        self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
+        self.sender = event.container.create_sender(self.conn, "org.apache.dev")
+        self.receiver.flow(1)
+
+    def on_sendable(self, event):
+        if self.sent_count < 10:
+            msg = Message(body="Hello World")
+            dlv = event.sender.send(msg)
+            dlv.settle()
+            self.sent_count += 1
+
+    def on_message(self, event):
+        if event.receiver == self.receiver:
+            if "Hello World" == event.message.body:
+                self.received_count += 1
+
+            if self.received_count < 4:
+                event.receiver.flow(1)
+            elif self.received_count == 4:
+                # We are issuing a drain of 20. This means that we will receive all the 10 messages
+                # that the sender is sending. The router will also send back a response flow frame with
+                # drain=True but I don't have any way of making sure that the response frame reached the
+                # receiver
+                event.receiver.drain(20)
+
+            # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
+            # messages. That along with 10 messages received indicates that the drain worked and we can
+            # declare that the test is successful
+            if self.received_count == 10 and event.link.credit == 0:
+                self.drain_successful = True
+                self.receiver.close()
+                self.sender.close()
+                self.conn.close()
+
+    def run(self):
+        Container(self).run()
+
+class DrainOneMessageHandler(DrainMessagesHandler):
+    def __init__(self, address):
+        super(DrainOneMessageHandler, self).__init__(address)
+
+    def on_message(self, event):
+        if event.receiver == self.receiver:
+            if "Hello World" == event.message.body:
+                self.received_count += 1
+
+            if self.received_count < 4:
+                event.receiver.flow(1)
+            elif self.received_count == 4:
+                # We are issuing a drain of 1 after we receive the 4th message.
+                # This means that going forward, we will receive only one more message.
+                event.receiver.drain(1)
+
+            # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
+            # messages. That along with 5 messages received (4 earlier messages and 1 extra message for drain=1)
+            # indicates that the drain worked and we can declare that the test is successful
+            if self.received_count == 5 and event.link.credit == 0:
+                self.drain_successful = True
+                self.receiver.close()
+                self.sender.close()
+                self.conn.close()
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf9533e/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 8f2ee8d..37317f9 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -28,6 +28,8 @@ from proton.handlers import MessagingHandler
 from proton.reactor import AtMostOnce, Container
 from proton.utils import BlockingConnection, LinkDetached
 
+from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
+
 from qpid_dispatch.management.client import Node
 
 class LinkRoutePatternTest(TestCase):
@@ -443,6 +445,16 @@ class LinkRoutePatternTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_www_drain_support_all_messages(self):
+        drain_support = DrainMessagesHandler(self.routers[2].addresses[1])
+        drain_support.run()
+        self.assertTrue(drain_support.drain_successful)
+
+    def test_www_drain_support_one_message(self):
+        drain_support = DrainOneMessageHandler(self.routers[2].addresses[1])
+        drain_support.run()
+        self.assertTrue(drain_support.drain_successful)
+
 
 class DeliveryTagsTest(MessagingHandler):
     def __init__(self, sender_address, listening_address, qdstat_address):
@@ -565,7 +577,6 @@ class CloseWithUnsettledTest(MessagingHandler):
     def run(self):
         Container(self).run()
 
-
-
 if __name__ == '__main__':
     unittest.main(main_module())
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-dispatch git commit: DISPATCH-341 - Added two tests for message-routed drain. Added timeouts for tests.

Posted by tr...@apache.org.
DISPATCH-341 - Added two tests for message-routed drain.  Added timeouts for tests.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ae561b76
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ae561b76
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ae561b76

Branch: refs/heads/master
Commit: ae561b760e60d6d08cb466ef00688494a54acc4d
Parents: ccf9533
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Jun 3 10:52:24 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Jun 3 10:52:24 2016 -0400

----------------------------------------------------------------------
 tests/system_tests_drain.py         |  17 ++++-
 tests/system_tests_drain_support.py | 105 ++++++++++++++++++++++++++++++-
 2 files changed, 116 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ae561b76/tests/system_tests_drain.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py
index ba503a1..9747995 100644
--- a/tests/system_tests_drain.py
+++ b/tests/system_tests_drain.py
@@ -21,6 +21,7 @@ import unittest
 
 from system_test import TestCase, Qdrouterd, main_module
 from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
+from system_tests_drain_support import DrainNoMessagesHandler, DrainNoMoreMessagesHandler
 
 class DrainSupportTest(TestCase):
 
@@ -44,12 +45,22 @@ class DrainSupportTest(TestCase):
     def test_drain_support_all_messages(self):
         drain_support = DrainMessagesHandler(self.address)
         drain_support.run()
-        self.assertTrue(drain_support.drain_successful)
+        self.assertEqual(drain_support.error, None)
 
     def test_drain_support_one_message(self):
         drain_support = DrainOneMessageHandler(self.address)
         drain_support.run()
-        self.assertTrue(drain_support.drain_successful)
+        self.assertEqual(drain_support.error, None)
+
+    def test_drain_support_no_messages(self):
+        drain_support = DrainNoMessagesHandler(self.address)
+        drain_support.run()
+        self.assertEqual(drain_support.error, None)
+
+    def test_drain_support_no_more_messages(self):
+        drain_support = DrainNoMoreMessagesHandler(self.address)
+        drain_support.run()
+        self.assertEqual(drain_support.error, None)
 
 if __name__ == '__main__':
-    unittest.main(main_module())
\ No newline at end of file
+    unittest.main(main_module())

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ae561b76/tests/system_tests_drain_support.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py
index 6192a7c..f11b8b8 100644
--- a/tests/system_tests_drain_support.py
+++ b/tests/system_tests_drain_support.py
@@ -21,6 +21,14 @@ from proton.handlers import MessagingHandler
 from proton.reactor import Container
 from proton import Message
 
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
 class DrainMessagesHandler(MessagingHandler):
     def __init__(self, address):
         # prefetch is set to zero so that proton does not automatically issue 10 credits.
@@ -31,9 +39,14 @@ class DrainMessagesHandler(MessagingHandler):
         self.sent_count = 0
         self.received_count = 0
         self.address = address
-        self.drain_successful = False
+        self.error = "Unexpected Exit"
+
+    def timeout(self):
+        self.error = "Timeout Expired"
+        self.conn.close()
 
     def on_start(self, event):
+        self.timer = event.reactor.schedule(5, Timeout(self))
         self.conn = event.container.connect(self.address)
 
         # Create a sender and a receiver. They are both listening on the same address
@@ -66,7 +79,8 @@ class DrainMessagesHandler(MessagingHandler):
             # messages. That along with 10 messages received indicates that the drain worked and we can
             # declare that the test is successful
             if self.received_count == 10 and event.link.credit == 0:
-                self.drain_successful = True
+                self.error = None
+                self.timer.cancel()
                 self.receiver.close()
                 self.sender.close()
                 self.conn.close()
@@ -74,6 +88,7 @@ class DrainMessagesHandler(MessagingHandler):
     def run(self):
         Container(self).run()
 
+
 class DrainOneMessageHandler(DrainMessagesHandler):
     def __init__(self, address):
         super(DrainOneMessageHandler, self).__init__(address)
@@ -94,8 +109,92 @@ class DrainOneMessageHandler(DrainMessagesHandler):
             # messages. That along with 5 messages received (4 earlier messages and 1 extra message for drain=1)
             # indicates that the drain worked and we can declare that the test is successful
             if self.received_count == 5 and event.link.credit == 0:
-                self.drain_successful = True
+                self.error = None
+                self.timer.cancel()
                 self.receiver.close()
                 self.sender.close()
                 self.conn.close()
 
+
+class DrainNoMessagesHandler(MessagingHandler):
+    def __init__(self, address):
+        # prefetch is set to zero so that proton does not automatically issue 10 credits.
+        super(DrainNoMessagesHandler, self).__init__(prefetch=0)
+        self.conn = None
+        self.sender = None
+        self.receiver = None
+        self.address = address
+        self.error = "Unexpected Exit"
+
+    def timeout(self):
+        self.error = "Timeout Expired"
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(5, Timeout(self))
+        self.conn = event.container.connect(self.address)
+
+        # Create a sender and a receiver. They are both listening on the same address
+        self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
+        self.sender = event.container.create_sender(self.conn, "org.apache.dev")
+        self.receiver.flow(1)
+
+    def on_sendable(self, event):
+        self.receiver.drain(1)
+
+    def on_drained(self, event):
+        if sender.credit == 0:
+            self.error = None
+            self.timer.cancel()
+            self.conn.close()
+
+    def run(self):
+        Container(self).run()
+
+
+class DrainNoMoreMessagesHandler(MessagingHandler):
+    def __init__(self, address):
+        # prefetch is set to zero so that proton does not automatically issue 10 credits.
+        super(DrainNoMoreMessagesHandler, self).__init__(prefetch=0)
+        self.conn = None
+        self.sender = None
+        self.receiver = None
+        self.address = address
+        self.sent = 0
+        self.rcvd = 0
+        self.error = "Unexpected Exit"
+
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d rcvd=%d" % (self.sent, self.rcvd)
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(5, Timeout(self))
+        self.conn = event.container.connect(self.address)
+
+        # Create a sender and a receiver. They are both listening on the same address
+        self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
+        self.sender = event.container.create_sender(self.conn, "org.apache.dev")
+        self.receiver.flow(1)
+
+    def on_sendable(self, event):
+        if self.sent == 0:
+            msg = Message(body="Hello World")
+            event.sender.send(msg)
+            self.sent += 1
+
+    def on_message(self, event):
+        self.rcvd += 1
+
+    def on_settled(self, event):
+        self.receiver.drain(1)
+
+    def on_drained(self, event):
+        if sender.credit == 0:
+            self.error = None
+            self.timer.cancel()
+            self.conn.close()
+
+    def run(self):
+        Container(self).run()
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-dispatch git commit: DISPATCH-341 - Drain now propagates across link routes and behaves correctly for router-terminated links.

Posted by tr...@apache.org.
DISPATCH-341 - Drain now propagates across link routes and behaves correctly for router-terminated links.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/b6944e8a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/b6944e8a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/b6944e8a

Branch: refs/heads/master
Commit: b6944e8af5d39823b75a6bad2d065fafac596b22
Parents: ae561b7
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Jun 3 15:38:26 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Jun 3 15:38:26 2016 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   |  2 ++
 src/router_core/connections.c         | 24 ++++++++-----
 src/router_core/router_core_private.h |  4 ++-
 src/router_core/transfer.c            | 56 +++++++++++++++++++++---------
 src/router_node.c                     | 13 +++++++
 tests/system_tests_drain.py           |  8 ++---
 tests/system_tests_drain_support.py   |  8 ++---
 tests/system_tests_link_routes.py     | 16 +++++++--
 8 files changed, 94 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 459e1a3..8eaa7ef 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -527,6 +527,7 @@ typedef void (*qdr_link_detach_t)        (void *context, qdr_link_t *link, qdr_e
 typedef void (*qdr_link_flow_t)          (void *context, qdr_link_t *link, int credit);
 typedef void (*qdr_link_offer_t)         (void *context, qdr_link_t *link, int delivery_count);
 typedef void (*qdr_link_drained_t)       (void *context, qdr_link_t *link);
+typedef void (*qdr_link_drain_t)         (void *context, qdr_link_t *link, bool mode);
 typedef void (*qdr_link_push_t)          (void *context, qdr_link_t *link);
 typedef void (*qdr_link_deliver_t)       (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled);
 typedef void (*qdr_delivery_update_t)    (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled);
@@ -540,6 +541,7 @@ void qdr_connection_handlers(qdr_core_t                *core,
                              qdr_link_flow_t            flow,
                              qdr_link_offer_t           offer,
                              qdr_link_drained_t         drained,
+                             qdr_link_drain_t           drain,
                              qdr_link_push_t            push,
                              qdr_link_deliver_t         deliver,
                              qdr_delivery_update_t      delivery_update);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index a9612ea..e0543ec 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -188,8 +188,14 @@ int qdr_connection_process(qdr_connection_t *conn)
         sys_mutex_unlock(conn->work_lock);
 
         if (link) {
-            core->flow_handler(core->user_context, link, link->incremental_credit);
-            link->incremental_credit = 0;
+            if (link->incremental_credit > 0) {
+                core->flow_handler(core->user_context, link, link->incremental_credit);
+                link->incremental_credit = 0;
+            }
+            if (link->drain_mode_changed) {
+                core->drain_handler(core->user_context, link, link->drain_mode);
+                link->drain_mode_changed = false;
+            }
             event_count++;
         }
     } while (link);
@@ -331,6 +337,7 @@ void qdr_connection_handlers(qdr_core_t                *core,
                              qdr_link_flow_t            flow,
                              qdr_link_offer_t           offer,
                              qdr_link_drained_t         drained,
+                             qdr_link_drain_t           drain,
                              qdr_link_push_t            push,
                              qdr_link_deliver_t         deliver,
                              qdr_delivery_update_t      delivery_update)
@@ -343,6 +350,7 @@ void qdr_connection_handlers(qdr_core_t                *core,
     core->flow_handler            = flow;
     core->offer_handler           = offer;
     core->drained_handler         = drained;
+    core->drain_handler           = drain;
     core->push_handler            = push;
     core->deliver_handler         = deliver;
     core->delivery_update_handler = delivery_update;
@@ -991,7 +999,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
             if (qdr_terminus_is_anonymous(target)) {
                 link->owning_addr = 0;
                 qdr_link_outbound_second_attach_CT(core, link, source, target);
-                qdr_link_issue_credit_CT(core, link, link->capacity);
+                qdr_link_issue_credit_CT(core, link, link->capacity, false);
 
             } else {
                 //
@@ -1032,7 +1040,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                     // Issue the initial credit only if there are destinations for the address.
                     //
                     if (DEQ_SIZE(addr->subscriptions) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes))
-                        qdr_link_issue_credit_CT(core, link, link->capacity);
+                        qdr_link_issue_credit_CT(core, link, link->capacity, false);
                 }
             }
             break;
@@ -1040,12 +1048,12 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
 
         case QD_LINK_CONTROL:
             qdr_link_outbound_second_attach_CT(core, link, source, target);
-            qdr_link_issue_credit_CT(core, link, link->capacity);
+            qdr_link_issue_credit_CT(core, link, link->capacity, false);
             break;
 
         case QD_LINK_ROUTER:
             qdr_link_outbound_second_attach_CT(core, link, source, target);
-            qdr_link_issue_credit_CT(core, link, link->capacity);
+            qdr_link_issue_credit_CT(core, link, link->capacity, false);
             break;
         }
     } else {
@@ -1147,12 +1155,12 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
             //
             qdr_address_t *addr = link->owning_addr;
             if (!addr || (DEQ_SIZE(addr->subscriptions) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes)))
-                qdr_link_issue_credit_CT(core, link, link->capacity);
+                qdr_link_issue_credit_CT(core, link, link->capacity, false);
             break;
 
         case QD_LINK_CONTROL:
         case QD_LINK_ROUTER:
-            qdr_link_issue_credit_CT(core, link, link->capacity);
+            qdr_link_issue_credit_CT(core, link, link->capacity, false);
             break;
         }
     } else {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 58d73a2..0fd5546 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -267,6 +267,7 @@ struct qdr_link_t {
     int                      incremental_credit;
     bool                     flow_started;   ///< for incoming, true iff initial credit has been granted
     bool                     drain_mode;
+    bool                     drain_mode_changed;
     int                      credit_to_core; ///< Number of the available credits incrementally given to the core
     uint64_t                 total_deliveries;
 };
@@ -545,6 +546,7 @@ struct qdr_core_t {
     qdr_link_flow_t            flow_handler;
     qdr_link_offer_t           offer_handler;
     qdr_link_drained_t         drained_handler;
+    qdr_link_drain_t           drain_handler;
     qdr_link_push_t            push_handler;
     qdr_link_deliver_t         deliver_handler;
     qdr_delivery_update_t      delivery_update_handler;
@@ -589,7 +591,7 @@ void  qdr_agent_setup_CT(qdr_core_t *core);
 void  qdr_forwarder_setup_CT(qdr_core_t *core);
 qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label);
 void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
-void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit);
+void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain);
 void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr);
 void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
 void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 4ac5e1a..b587baf 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -353,7 +353,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
     //
     if (moved && link->link_direction == QD_INCOMING &&
         link->link_type != QD_LINK_ROUTER && !link->connected_link)
-        qdr_link_issue_credit_CT(core, link, 1);
+        qdr_link_issue_credit_CT(core, link, 1, false);
 
     return moved;
 }
@@ -364,24 +364,39 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
     if (discard)
         return;
 
-    qdr_link_t *link = action->args.connection.link;
-    int  credit      = action->args.connection.credit;
-    bool drain       = action->args.connection.drain;
-    bool activate    = false;
+    qdr_link_t *link   = action->args.connection.link;
+    int  credit        = action->args.connection.credit;
+    bool drain         = action->args.connection.drain;
+    bool activate      = false;
+    bool drain_was_set = !link->drain_mode && drain;
+
+    link->drain_mode = drain;
 
     //
     // If this is an attach-routed link, propagate the flow data downrange.
     // Note that the credit value is incremental.
     //
-    if (link->connected_link)
-        qdr_link_issue_credit_CT(core, link->connected_link, credit);
+    if (link->connected_link) {
+        qdr_link_t *clink = link->connected_link;
+
+        if (clink->link_direction == QD_INCOMING)
+            qdr_link_issue_credit_CT(core, link->connected_link, credit, drain);
+        else {
+            sys_mutex_lock(clink->conn->work_lock);
+            qdr_add_link_ref(&clink->conn->links_with_deliveries, clink, QDR_LINK_LIST_CLASS_DELIVERY);
+            sys_mutex_unlock(clink->conn->work_lock);
+            qdr_connection_activate_CT(core, clink->conn);
+        }
+
+        return;
+    }
 
     //
     // Handle the replenishing of credit outbound
     //
-    if (link->link_direction == QD_OUTGOING && credit > 0) {
+    if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) {
         sys_mutex_lock(link->conn->work_lock);
-        if (DEQ_SIZE(link->undelivered) > 0) {
+        if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
             qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
             activate = true;
         }
@@ -389,10 +404,8 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
     }
 
     //
-    // Record the drain mode for the link
+    // Activate the connection if we have deliveries to send or drain mode was set.
     //
-    link->drain_mode = drain;
-
     if (activate)
         qdr_connection_activate_CT(core, link->conn);
 }
@@ -428,7 +441,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
             qdr_delivery_release_CT(core, dlv);
             qdr_delivery_decref(dlv);
             if (link->link_type == QD_LINK_ROUTER)
-                qdr_link_issue_credit_CT(core, link, 1);
+                qdr_link_issue_credit_CT(core, link, 1, false);
         }
     } else if (fanout > 0) {
         if (dlv->settled) {
@@ -436,7 +449,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
             // The delivery is settled.  Keep it off the unsettled list and issue
             // replacement credit for it now.
             //
-            qdr_link_issue_credit_CT(core, link, 1);
+            qdr_link_issue_credit_CT(core, link, 1, false);
 
             //
             // If the delivery has no more references, free it now.
@@ -457,7 +470,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
             // are many addresses sharing the link.
             //
             if (link->link_type == QD_LINK_ROUTER)
-                qdr_link_issue_credit_CT(core, link, 1);
+                qdr_link_issue_credit_CT(core, link, 1, false);
         }
     }
 
@@ -629,8 +642,14 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
  * Check the link's accumulated credit.  If the credit given to the connection thread
  * has been issued to Proton, provide the next batch of credit to the connection thread.
  */
-void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit)
+void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain)
 {
+    bool drain_changed = link->drain_mode |= drain;
+    bool activate      = drain_changed;
+
+    link->drain_mode = drain;
+    link->drain_mode_changed = drain_changed;
+
     link->incremental_credit_CT += credit;
     link->flow_started = true;
 
@@ -640,7 +659,10 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit)
         //
         link->incremental_credit    = link->incremental_credit_CT;
         link->incremental_credit_CT = 0;
+        activate = true;
+    }
 
+    if (activate) {
         //
         // Put this link on the connection's has-credit list.
         //
@@ -682,7 +704,7 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
             // Issue credit to stalled links
             //
             if (!link->flow_started)
-                qdr_link_issue_credit_CT(core, link, link->capacity);
+                qdr_link_issue_credit_CT(core, link, link->capacity, false);
 
             //
             // Drain undelivered deliveries via the forwarder

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 10794ac..b8e8f0e 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -789,6 +789,18 @@ static void CORE_link_drained(void *context, qdr_link_t *link)
 }
 
 
+static void CORE_link_drain(void *context, qdr_link_t *link, bool mode)
+{
+    qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
+    pn_link_t *plink = qd_link_pn(qlink);
+
+    if (plink) {
+        if (pn_link_is_receiver(plink))
+            pn_link_set_drain(plink, mode);
+    }
+}
+
+
 static void CORE_link_push(void *context, qdr_link_t *link)
 {
     qd_router_t *router      = (qd_router_t*) context;
@@ -875,6 +887,7 @@ void qd_router_setup_late(qd_dispatch_t *qd)
                             CORE_link_flow,
                             CORE_link_offer,
                             CORE_link_drained,
+                            CORE_link_drain,
                             CORE_link_push,
                             CORE_link_deliver,
                             CORE_delivery_update);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/tests/system_tests_drain.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py
index 9747995..b379d27 100644
--- a/tests/system_tests_drain.py
+++ b/tests/system_tests_drain.py
@@ -42,22 +42,22 @@ class DrainSupportTest(TestCase):
         cls.router.wait_ready()
         cls.address = cls.router.addresses[0]
 
-    def test_drain_support_all_messages(self):
+    def test_drain_support_1_all_messages(self):
         drain_support = DrainMessagesHandler(self.address)
         drain_support.run()
         self.assertEqual(drain_support.error, None)
 
-    def test_drain_support_one_message(self):
+    def test_drain_support_2_one_message(self):
         drain_support = DrainOneMessageHandler(self.address)
         drain_support.run()
         self.assertEqual(drain_support.error, None)
 
-    def test_drain_support_no_messages(self):
+    def test_drain_support_3_no_messages(self):
         drain_support = DrainNoMessagesHandler(self.address)
         drain_support.run()
         self.assertEqual(drain_support.error, None)
 
-    def test_drain_support_no_more_messages(self):
+    def test_drain_support_4_no_more_messages(self):
         drain_support = DrainNoMoreMessagesHandler(self.address)
         drain_support.run()
         self.assertEqual(drain_support.error, None)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/tests/system_tests_drain_support.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py
index f11b8b8..aa3a23a 100644
--- a/tests/system_tests_drain_support.py
+++ b/tests/system_tests_drain_support.py
@@ -142,8 +142,8 @@ class DrainNoMessagesHandler(MessagingHandler):
     def on_sendable(self, event):
         self.receiver.drain(1)
 
-    def on_drained(self, event):
-        if sender.credit == 0:
+    def on_link_flow(self, event):
+        if self.receiver.credit == 0:
             self.error = None
             self.timer.cancel()
             self.conn.close()
@@ -189,8 +189,8 @@ class DrainNoMoreMessagesHandler(MessagingHandler):
     def on_settled(self, event):
         self.receiver.drain(1)
 
-    def on_drained(self, event):
-        if sender.credit == 0:
+    def on_link_flow(self, event):
+        if self.receiver.credit == 0:
             self.error = None
             self.timer.cancel()
             self.conn.close()

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b6944e8a/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 37317f9..420cc96 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -28,7 +28,7 @@ from proton.handlers import MessagingHandler
 from proton.reactor import AtMostOnce, Container
 from proton.utils import BlockingConnection, LinkDetached
 
-from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
+from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler, DrainNoMessagesHandler, DrainNoMoreMessagesHandler
 
 from qpid_dispatch.management.client import Node
 
@@ -448,12 +448,22 @@ class LinkRoutePatternTest(TestCase):
     def test_www_drain_support_all_messages(self):
         drain_support = DrainMessagesHandler(self.routers[2].addresses[1])
         drain_support.run()
-        self.assertTrue(drain_support.drain_successful)
+        self.assertEqual(None, drain_support.error)
 
     def test_www_drain_support_one_message(self):
         drain_support = DrainOneMessageHandler(self.routers[2].addresses[1])
         drain_support.run()
-        self.assertTrue(drain_support.drain_successful)
+        self.assertEqual(None, drain_support.error)
+
+    def test_www_drain_support_no_messages(self):
+        drain_support = DrainNoMessagesHandler(self.routers[2].addresses[1])
+        drain_support.run()
+        self.assertEqual(None, drain_support.error)
+
+    def test_www_drain_support_no_more_messages(self):
+        drain_support = DrainNoMoreMessagesHandler(self.routers[2].addresses[1])
+        drain_support.run()
+        self.assertEqual(None, drain_support.error)
 
 
 class DeliveryTagsTest(MessagingHandler):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org