You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2018/07/11 13:00:12 UTC
qpid-dispatch git commit: DISPATCH-1055: Fix credit propagation over
link route after drain cycle
Repository: qpid-dispatch
Updated Branches:
refs/heads/master aaf5f2cb7 -> a6c4b94b8
DISPATCH-1055: Fix credit propagation over link route after drain cycle
Reset incremental credit given to core at end of drain cycle.
Fix logic computing drain_changed flag.
This closes #340
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a6c4b94b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a6c4b94b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a6c4b94b
Branch: refs/heads/master
Commit: a6c4b94b87f4b11981b1a549ddee6f614ee6e0a3
Parents: aaf5f2c
Author: Chuck Rolke <cr...@redhat.com>
Authored: Wed Jul 11 08:47:07 2018 -0400
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Wed Jul 11 08:47:07 2018 -0400
----------------------------------------------------------------------
src/router_core/transfer.c | 14 ++--
tests/system_tests_drain.py | 49 ++++++++++-
tests/system_tests_drain_support.py | 139 +++++++++++++++++++++++++++++++
3 files changed, 193 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6c4b94b/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 5ee4ae3..a2374fb 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -233,10 +233,14 @@ void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mo
// incrementally to the router core. i.e. convert absolute credit to
// incremental credit.
//
- credit -= link->credit_to_core;
- if (credit < 0)
- credit = 0;
- link->credit_to_core += credit;
+ if (link->drain_mode && !drain_mode) {
+ link->credit_to_core = 0; // credit calc reset when coming out of drain mode
+ } else {
+ credit -= link->credit_to_core;
+ if (credit < 0)
+ credit = 0;
+ link->credit_to_core += credit;
+ }
action->args.connection.link = link;
action->args.connection.credit = credit;
@@ -1247,7 +1251,7 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bo
{
assert(link->link_direction == QD_INCOMING);
- bool drain_changed = link->drain_mode |= drain;
+ bool drain_changed = link->drain_mode ^ drain;
link->drain_mode = drain;
if (link->credit_pending > 0)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6c4b94b/tests/system_tests_drain.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py
index b9fd547..1b38aba 100644
--- a/tests/system_tests_drain.py
+++ b/tests/system_tests_drain.py
@@ -26,27 +26,58 @@ import unittest2 as unittest
from system_test import TestCase, Qdrouterd, main_module
from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
from system_tests_drain_support import DrainNoMessagesHandler, DrainNoMoreMessagesHandler
+from system_tests_drain_support import DrainMessagesMoreHandler
+from time import sleep
class DrainSupportTest(TestCase):
@classmethod
def setUpClass(cls):
- """Start a router and a messenger"""
+ """
+ Set up two routers:
+ Router 'test-router' is the system under test.
+ Router 'broker' acts as a link route sink/source.
+ The link route uses prefix 'abc'.
+ """
super(DrainSupportTest, cls).setUpClass()
+
+ test_listener_port = cls.tester.get_port()
+ broker_listener_port = cls.tester.get_port()
+
+ # Configure and start 'broker'
+ bname = "broker"
+ bconfig = Qdrouterd.Config([
+ ('router', {'mode': 'standalone', 'id': 'Broker'}),
+ ('listener', {'role': 'normal',
+ 'host': '0.0.0.0', 'port': broker_listener_port, 'linkCapacity': 100, 'saslMechanisms': 'ANONYMOUS'}),
+ ])
+ cls.broker = cls.tester.qdrouterd(bname, bconfig, wait=True)
+
+ # Configure and start test-router
name = "test-router"
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
# Setting the linkCapacity to 10 will allow the sender to send a burst of 10 messages
- ('listener', {'port': cls.tester.get_port(), 'linkCapacity': 10}),
+ ('listener', {'role': 'normal',
+ 'host': '0.0.0.0', 'port': test_listener_port,
+ 'linkCapacity': 10, 'saslMechanisms': 'ANONYMOUS'}),
+ # The DrainMessagesMoreHandler accepts a src/tgt address that may be link-routed.
+ # This defines the link route to 'broker' and the 'abc' prefix.
+ ('connector', {'name': 'broker1-conn', 'role': 'route-container',
+ 'host': '0.0.0.0', 'port': broker_listener_port,
+ 'saslMechanisms': 'ANONYMOUS'}),
+ ('linkRoute', {'prefix': 'abc', 'direction': 'out', 'connection': 'broker1-conn'}),
+ ('linkRoute', {'prefix': 'abc', 'direction': 'in', 'connection': 'broker1-conn'}),
])
- cls.router = cls.tester.qdrouterd(name, config)
- cls.router.wait_ready()
+ cls.router = cls.tester.qdrouterd(name, config, wait=False)
cls.address = cls.router.addresses[0]
+ sleep(4) # starting router with wait=True hangs. sleep for now
+
def test_drain_support_1_all_messages(self):
drain_support = DrainMessagesHandler(self.address)
drain_support.run()
@@ -67,6 +98,16 @@ class DrainSupportTest(TestCase):
drain_support.run()
self.assertEqual(drain_support.error, None)
+ def test_drain_support_5_drain_then_more_messages_local(self):
+ drain_support = DrainMessagesMoreHandler(self.address, "org.apache.dev")
+ drain_support.run()
+ self.assertEqual(drain_support.error, None)
+
+ def test_drain_support_5_drain_then_more_messages_routed(self):
+ drain_support = DrainMessagesMoreHandler(self.address, "abc")
+ drain_support.run()
+ self.assertEqual(drain_support.error, None)
+
if __name__ == '__main__':
unittest.main(main_module())
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6c4b94b/tests/system_tests_drain_support.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py
index 930a486..785cb16 100644
--- a/tests/system_tests_drain_support.py
+++ b/tests/system_tests_drain_support.py
@@ -212,5 +212,144 @@ class DrainNoMoreMessagesHandler(MessagingHandler):
Container(self).run()
+class DrainMessagesMoreHandler(MessagingHandler):
+ """
+ Make sure the clients can send/receive after going through a drain cycle.
+
+ Send phase
+ 1. Sender sending first 10 messages
+ 2. Sender paused waiting for drain to finish
+ 3. Sender is sending second 10 messages
+ 4. Sender is done.
+
+ Receive phase
+ 1. Receiver receiving first four messages; At #4 receiver issues drain 4,20
+ 2. Reciever receives messages 5..10.
+ When 10 messages have been received and link credit =0 the drain is done
+ Receiver issues 10 credits
+ 3. Receiver recieves messages 11..20.
+ 4. Receiver is done
+
+ At issue in DISPATCH-1055 is that the 10 credits issued in Receive step 2
+ are never propagated across a link route to the 'broker'.
+
+ This code is instantiated with and without the link route to demonstrate that
+ it works properly when the 'test-router' is handling the drain by itself
+ and that it fails only on the link route.
+ """
+ def __init__(self, address, route_name):
+ # prefetch is set to zero so that proton does not automatically issue 10 credits.
+ super(DrainMessagesMoreHandler, self).__init__(prefetch=0)
+ self.conn = None
+ self.sender = None
+ self.receiver = None
+ self.sent_count = 0
+ self.received_count = 0
+ self.address = address
+ self.error = "Unexpected Exit"
+ self.send_phase = 1
+ self.recv_phase = 1
+ self.route_name = route_name
+ self.verbose_printing = False
+
+ def show_state(self):
+ return str("send_phase:" + str(self.send_phase)
+ + ", sent_count:" + str(self.sent_count)
+ + ", recv_phase:" + str(self.recv_phase)
+ + ", receive_count:" + str(self.received_count)
+ + ", receiver_credit:" + str(self.receiver.credit)
+ + ", sender_credit:" + str(self.sender.credit))
+
+ def printme(self, str):
+ if (self.verbose_printing):
+ print (str + " " + self.show_state())
+
+ def timeout(self):
+ self.error = "Timeout Expired: sent: %d rcvd: %d" % (self.sent_count, self.received_count)
+ self.conn.close()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+ self.conn = event.container.connect(self.address)
+
+ # Create a sender and a receiver. They are both listening on the same address
+ self.receiver = event.container.create_receiver(self.conn, source=self.route_name)
+ self.sender = event.container.create_sender(self.conn, target=self.route_name)
+ self.receiver.flow(1)
+
+ def on_link_flow(self, event):
+ if event.link.is_sender and event.link.credit \
+ and event.link.state & Endpoint.LOCAL_ACTIVE \
+ and event.link.state & Endpoint.REMOTE_ACTIVE :
+ self.on_sendable(event)
+
+ # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
+ # messages. That along with 10 messages received indicates that the drain worked.
+ if self.send_phase == 2 and self.received_count == 10 and event.link.credit == 0:
+ self.printme ("sender transitions to phase 3 - drain completed, send new flow now")
+ self.receiver.flow(10)
+ self.send_phase = 3
+
+ if event.link.is_sender and event.link.credit \
+ and event.link.state & Endpoint.LOCAL_ACTIVE \
+ and event.link.state & Endpoint.REMOTE_ACTIVE :
+ self.on_sendable(event)
+ self.printme (("sender " if event.link.is_sender else "receiver ") + "exit on_link_flow:")
+
+ def on_sendable(self, event):
+ if event.link.is_sender and self.send_phase == 1 and self.sent_count < 10:
+ msg = Message(body="Hello World", properties={'seq': self.sent_count})
+ dlv = event.sender.send(msg)
+ dlv.settle()
+ self.sent_count += 1
+ if self.sent_count == 10:
+ self.printme ("sender transitions to phase 2 - wait for drain to finish")
+ self.send_phase = 2
+ elif event.link.is_sender and self.send_phase == 3 and self.sent_count < 20:
+ msg = Message(body="Hello World", properties={'seq': self.sent_count})
+ dlv = event.sender.send(msg)
+ dlv.settle()
+ self.sent_count += 1
+ if self.sent_count == 20:
+ self.printme ("sender transitions to phase 4 - done sending")
+ self.send_phase = 4
+ self.printme (("sender " if event.link.is_sender else "receiver ") + "exit on_sendable:")
+
+ def on_message(self, event):
+ if event.receiver == self.receiver:
+ if "Hello World" == event.message.body:
+ self.received_count += 1
+
+ if self.recv_phase == 1 and self.received_count < 4:
+ event.receiver.flow(1)
+ elif self.recv_phase == 1 and self.received_count == 4:
+ # We are issuing a drain of 20. This means that we will receive all the 10 messages
+ # that the sender is sending. The router will also send back a response flow frame with
+ # drain=True but I don't have any way of making sure that the response frame reached the
+ # receiver
+ self.printme ("receiver transitions to phase 2 - sending drain now")
+ event.receiver.drain(20)
+ self.recv_phase = 2
+ elif self.recv_phase == 2 and self.received_count == 10:
+ self.printme ("receiver transitions to phase 3")
+ self.recv_phase = 3
+ msg = Message(body="Hello World", properties={'seq': self.sent_count})
+ dlv = self.sender.send(msg)
+ dlv.settle()
+ self.sent_count += 1
+ elif self.recv_phase == 3 and self.received_count == 20:
+ self.printme ("receiver transitions to phase 4 - test is completed successfully")
+ self.recv_phase = 4
+ self.error = None
+ self.timer.cancel()
+ self.receiver.close()
+ self.sender.close()
+ self.conn.close()
+ self.printme ("exit on_message:")
+
+ def run(self):
+ Container(self).run()
+
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org