You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2018/07/11 13:00:12 UTC

qpid-dispatch git commit: DISPATCH-1055: Fix credit propagation over link route after drain cycle

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master aaf5f2cb7 -> a6c4b94b8


DISPATCH-1055: Fix credit propagation over link route after drain cycle

Reset incremental credit given to core at end of drain cycle.
Fix logic computing drain_changed flag.

This closes #340


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a6c4b94b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a6c4b94b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a6c4b94b

Branch: refs/heads/master
Commit: a6c4b94b87f4b11981b1a549ddee6f614ee6e0a3
Parents: aaf5f2c
Author: Chuck Rolke <cr...@redhat.com>
Authored: Wed Jul 11 08:47:07 2018 -0400
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Wed Jul 11 08:47:07 2018 -0400

----------------------------------------------------------------------
 src/router_core/transfer.c          |  14 ++--
 tests/system_tests_drain.py         |  49 ++++++++++-
 tests/system_tests_drain_support.py | 139 +++++++++++++++++++++++++++++++
 3 files changed, 193 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6c4b94b/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 5ee4ae3..a2374fb 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -233,10 +233,14 @@ void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mo
     // incrementally to the router core.  i.e. convert absolute credit to
     // incremental credit.
     //
-    credit -= link->credit_to_core;
-    if (credit < 0)
-        credit = 0;
-    link->credit_to_core += credit;
+    if (link->drain_mode && !drain_mode) {
+        link->credit_to_core = 0;   // credit calc reset when coming out of drain mode
+    } else {
+        credit -= link->credit_to_core;
+        if (credit < 0)
+            credit = 0;
+        link->credit_to_core += credit;
+    }
 
     action->args.connection.link   = link;
     action->args.connection.credit = credit;
@@ -1247,7 +1251,7 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bo
 {
     assert(link->link_direction == QD_INCOMING);
 
-    bool drain_changed = link->drain_mode |= drain;
+    bool drain_changed = link->drain_mode ^ drain;
     link->drain_mode   = drain;
 
     if (link->credit_pending > 0)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6c4b94b/tests/system_tests_drain.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py
index b9fd547..1b38aba 100644
--- a/tests/system_tests_drain.py
+++ b/tests/system_tests_drain.py
@@ -26,27 +26,58 @@ import unittest2 as unittest
 from system_test import TestCase, Qdrouterd, main_module
 from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
 from system_tests_drain_support import DrainNoMessagesHandler, DrainNoMoreMessagesHandler
+from system_tests_drain_support import DrainMessagesMoreHandler
 
+from time import sleep
 
 class DrainSupportTest(TestCase):
 
     @classmethod
     def setUpClass(cls):
-        """Start a router and a messenger"""
+        """
+        Set up two routers:
+          Router 'test-router' is the system under test.
+          Router 'broker' acts as a link route sink/source.
+        The link route uses prefix 'abc'.
+        """
         super(DrainSupportTest, cls).setUpClass()
+
+        test_listener_port   = cls.tester.get_port()
+        broker_listener_port = cls.tester.get_port()
+
+        # Configure and start 'broker'
+        bname = "broker"
+        bconfig = Qdrouterd.Config([
+            ('router', {'mode': 'standalone', 'id': 'Broker'}),
+            ('listener', {'role': 'normal',
+                          'host': '0.0.0.0', 'port': broker_listener_port, 'linkCapacity': 100, 'saslMechanisms': 'ANONYMOUS'}),
+        ])
+        cls.broker = cls.tester.qdrouterd(bname, bconfig, wait=True)
+
+        # Configure and start test-router
         name = "test-router"
         config = Qdrouterd.Config([
             ('router', {'mode': 'standalone', 'id': 'QDR'}),
 
             # Setting the linkCapacity to 10 will allow the sender to send a burst of 10 messages
-            ('listener', {'port': cls.tester.get_port(), 'linkCapacity': 10}),
+            ('listener', {'role': 'normal',
+                          'host': '0.0.0.0', 'port': test_listener_port,
+                          'linkCapacity': 10, 'saslMechanisms': 'ANONYMOUS'}),
 
+            # The DrainMessagesMoreHandler accepts a src/tgt address that may be link-routed.
+            # This defines the link route to 'broker' and the 'abc' prefix.
+            ('connector', {'name': 'broker1-conn', 'role': 'route-container',
+                           'host': '0.0.0.0', 'port': broker_listener_port,
+                           'saslMechanisms': 'ANONYMOUS'}),
+            ('linkRoute', {'prefix': 'abc', 'direction': 'out', 'connection': 'broker1-conn'}),
+            ('linkRoute', {'prefix': 'abc', 'direction': 'in', 'connection': 'broker1-conn'}),
         ])
 
-        cls.router = cls.tester.qdrouterd(name, config)
-        cls.router.wait_ready()
+        cls.router = cls.tester.qdrouterd(name, config, wait=False)
         cls.address = cls.router.addresses[0]
 
+        sleep(4) # starting router with wait=True hangs. sleep for now
+
     def test_drain_support_1_all_messages(self):
         drain_support = DrainMessagesHandler(self.address)
         drain_support.run()
@@ -67,6 +98,16 @@ class DrainSupportTest(TestCase):
         drain_support.run()
         self.assertEqual(drain_support.error, None)
 
+    def test_drain_support_5_drain_then_more_messages_local(self):
+        drain_support = DrainMessagesMoreHandler(self.address, "org.apache.dev")
+        drain_support.run()
+        self.assertEqual(drain_support.error, None)
+
+    def test_drain_support_5_drain_then_more_messages_routed(self):
+        drain_support = DrainMessagesMoreHandler(self.address, "abc")
+        drain_support.run()
+        self.assertEqual(drain_support.error, None)
+
 
 if __name__ == '__main__':
     unittest.main(main_module())

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6c4b94b/tests/system_tests_drain_support.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py
index 930a486..785cb16 100644
--- a/tests/system_tests_drain_support.py
+++ b/tests/system_tests_drain_support.py
@@ -212,5 +212,144 @@ class DrainNoMoreMessagesHandler(MessagingHandler):
         Container(self).run()
 
 
+class DrainMessagesMoreHandler(MessagingHandler):
+    """
+    Make sure the clients can send/receive after going through a drain cycle.
+
+      Send phase
+    1. Sender sending first 10 messages
+    2. Sender paused waiting for drain to finish
+    3. Sender is sending second 10 messages
+    4. Sender is done.
+
+      Receive phase
+    1. Receiver receiving first four messages; At #4 receiver issues drain 4,20
+    2. Reciever receives messages 5..10.
+       When 10 messages have been received and link credit =0 the drain is done
+       Receiver issues 10 credits
+    3. Receiver recieves messages 11..20.
+    4. Receiver is done
+
+    At issue in DISPATCH-1055 is that the 10 credits issued in Receive step 2
+    are never propagated across a link route to the 'broker'.
+
+    This code is instantiated with and without the link route to demonstrate that
+    it works properly when the 'test-router' is handling the drain by itself
+    and that it fails only on the link route.
+    """
+    def __init__(self, address, route_name):
+        # prefetch is set to zero so that proton does not automatically issue 10 credits.
+        super(DrainMessagesMoreHandler, self).__init__(prefetch=0)
+        self.conn = None
+        self.sender = None
+        self.receiver = None
+        self.sent_count = 0
+        self.received_count = 0
+        self.address = address
+        self.error = "Unexpected Exit"
+        self.send_phase = 1
+        self.recv_phase = 1
+        self.route_name = route_name
+        self.verbose_printing = False
+
+    def show_state(self):
+        return str("send_phase:" + str(self.send_phase) 
+                   + ", sent_count:" + str(self.sent_count) 
+                   + ", recv_phase:" + str(self.recv_phase) 
+                   + ", receive_count:" + str(self.received_count) 
+                   + ", receiver_credit:" + str(self.receiver.credit)
+                   + ", sender_credit:" + str(self.sender.credit))
+
+    def printme(self, str):
+        if (self.verbose_printing):
+            print (str + " " + self.show_state())
+
+    def timeout(self):
+        self.error = "Timeout Expired: sent: %d rcvd: %d" % (self.sent_count, self.received_count)
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn = event.container.connect(self.address)
+
+        # Create a sender and a receiver. They are both listening on the same address
+        self.receiver = event.container.create_receiver(self.conn, source=self.route_name)
+        self.sender = event.container.create_sender(self.conn, target=self.route_name)
+        self.receiver.flow(1)
+
+    def on_link_flow(self, event):
+        if event.link.is_sender and event.link.credit \
+           and event.link.state & Endpoint.LOCAL_ACTIVE \
+           and event.link.state & Endpoint.REMOTE_ACTIVE :
+            self.on_sendable(event)
+
+        # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
+        # messages. That along with 10 messages received indicates that the drain worked.
+        if self.send_phase == 2 and self.received_count == 10 and event.link.credit == 0:
+            self.printme ("sender transitions to phase 3 - drain completed, send new flow now")
+            self.receiver.flow(10)
+            self.send_phase = 3
+
+        if event.link.is_sender and event.link.credit \
+           and event.link.state & Endpoint.LOCAL_ACTIVE \
+           and event.link.state & Endpoint.REMOTE_ACTIVE :
+            self.on_sendable(event)
+        self.printme (("sender " if event.link.is_sender else "receiver ") + "exit on_link_flow:")
+
+    def on_sendable(self, event):
+        if event.link.is_sender and self.send_phase == 1 and self.sent_count < 10:
+            msg = Message(body="Hello World", properties={'seq': self.sent_count})
+            dlv = event.sender.send(msg)
+            dlv.settle()
+            self.sent_count += 1
+            if self.sent_count == 10:
+                self.printme ("sender transitions to phase 2 - wait for drain to finish")
+                self.send_phase = 2
+        elif event.link.is_sender and self.send_phase == 3 and self.sent_count < 20:
+            msg = Message(body="Hello World", properties={'seq': self.sent_count})
+            dlv = event.sender.send(msg)
+            dlv.settle()
+            self.sent_count += 1
+            if self.sent_count == 20:
+                self.printme ("sender transitions to phase 4 - done sending")
+                self.send_phase = 4
+        self.printme (("sender " if event.link.is_sender else "receiver ") + "exit on_sendable:")
+
+    def on_message(self, event):
+        if event.receiver == self.receiver:
+            if "Hello World" == event.message.body:
+                self.received_count += 1
+
+            if self.recv_phase == 1 and self.received_count < 4:
+                event.receiver.flow(1)
+            elif self.recv_phase == 1 and self.received_count == 4:
+                # We are issuing a drain of 20. This means that we will receive all the 10 messages
+                # that the sender is sending. The router will also send back a response flow frame with
+                # drain=True but I don't have any way of making sure that the response frame reached the
+                # receiver
+                self.printme ("receiver transitions to phase 2 - sending drain now")
+                event.receiver.drain(20)
+                self.recv_phase = 2
+            elif self.recv_phase == 2 and self.received_count == 10:
+                self.printme ("receiver transitions to phase 3")
+                self.recv_phase = 3
+                msg = Message(body="Hello World", properties={'seq': self.sent_count})
+                dlv = self.sender.send(msg)
+                dlv.settle()
+                self.sent_count += 1
+            elif self.recv_phase == 3 and self.received_count == 20:
+                self.printme ("receiver transitions to phase 4 - test is completed successfully")
+                self.recv_phase = 4
+                self.error = None
+                self.timer.cancel()
+                self.receiver.close()
+                self.sender.close()
+                self.conn.close()
+            self.printme ("exit on_message:")
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org