You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2019/10/31 18:24:04 UTC
[qpid-dispatch] 04/05: DISPATCH-1409 - Added test case,
fixed accounting bug found by the test case.
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 1837ece08add3cfd1af377be4dfeb3203dc32339
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Thu Oct 31 12:17:21 2019 -0400
DISPATCH-1409 - Added test case, fixed accounting bug found by the test case.
---
src/router_core/connections.c | 10 ++-
tests/system_tests_stuck_deliveries.py | 136 +++++++++++++++++++++++++++++++++
2 files changed, 142 insertions(+), 4 deletions(-)
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index a9229d1..984a7c2 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -233,17 +233,19 @@ void qdr_record_link_credit(qdr_core_t *core, qdr_link_t *link)
if (link->credit_reported > 0 && pn_credit == 0) {
//
// The link has transitioned from positive credit to zero credit.
- // Mark it as eligible for logging and record the time.
//
- link->reported_as_blocked = false;
link->zero_credit_time = core->uptime_ticks;
- core->links_blocked--;
- } else if (link->credit_reported == 0 && pn_credit > 0)
+ } else if (link->credit_reported == 0 && pn_credit > 0) {
//
// The link has transitioned from zero credit to positive credit.
// Clear the recorded time.
//
link->zero_credit_time = 0;
+ if (link->reported_as_blocked) {
+ link->reported_as_blocked = false;
+ core->links_blocked--;
+ }
+ }
link->credit_reported = pn_credit;
}
diff --git a/tests/system_tests_stuck_deliveries.py b/tests/system_tests_stuck_deliveries.py
index c4f8b21..c81c001 100644
--- a/tests/system_tests_stuck_deliveries.py
+++ b/tests/system_tests_stuck_deliveries.py
@@ -153,6 +153,11 @@ class RouterTest(TestCase):
test.run()
self.assertEqual(None, test.error)
+ def test_09_receiver_link_credit_test(self):
+ test = RxLinkCreditTest(self.routers[0].addresses[0])
+ test.run()
+ self.assertEqual(None, test.error)
+
class Timeout(object):
def __init__(self, parent):
@@ -270,5 +275,136 @@ class DelayedSettlementTest(MessagingHandler):
Container(self).run()
+class RxLinkCreditTest(MessagingHandler):
+ def __init__(self, host):
+ super(RxLinkCreditTest, self).__init__(prefetch = 0)
+ self.host = host
+
+ self.receiver_conn = None
+ self.query_conn = None
+ self.addr = "rx/link/credit/test"
+ self.credit_issued = 0
+ self.error = None
+
+ self.stages = ['Setup', 'LinkBlocked', 'LinkUnblocked', '10Credits', '20Credits']
+ self.stage = 0
+
+ def timeout(self):
+ self.error = "Timeout Expired - stage: %s" % self.stages[self.stage]
+ self.receiver_conn.close()
+ self.query_conn.close()
+ if self.poll_timer:
+ self.poll_timer.cancel()
+
+ def fail(self, error):
+ self.error = error
+ self.receiver_conn.close()
+ self.query_conn.close()
+ if self.poll_timer:
+ self.poll_timer.cancel()
+ self.timer.cancel()
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(30.0, Timeout(self))
+ self.poll_timer = None
+ self.receiver_conn = event.container.connect(self.host)
+ self.query_conn = event.container.connect(self.host)
+ self.reply_receiver = event.container.create_receiver(self.query_conn, None, dynamic=True)
+ self.query_sender = event.container.create_sender(self.query_conn, "$management")
+ self.receiver = None
+
+ def on_link_opened(self, event):
+ if event.receiver == self.reply_receiver:
+ self.reply_addr = event.receiver.remote_source.address
+ self.proxy = MgmtMsgProxy(self.reply_addr)
+ self.receiver = event.container.create_receiver(self.receiver_conn, self.addr)
+ self.reply_receiver.flow(1)
+ elif event.receiver == self.receiver:
+ self.stage = 1
+ self.process()
+
+ def process(self):
+ if self.stage == 1:
+ #
+ # LinkBlocked
+ #
+ msg = self.proxy.query_router()
+ self.query_sender.send(msg)
+
+ elif self.stage == 2:
+ #
+ # LinkUnblocked
+ #
+ msg = self.proxy.query_router()
+ self.query_sender.send(msg)
+
+ elif self.stage == 3:
+ #
+ # 10Credits
+ #
+ msg = self.proxy.query_links()
+ self.query_sender.send(msg)
+
+ elif self.stage == 4:
+ #
+ # 20Credits
+ #
+ msg = self.proxy.query_links()
+ self.query_sender.send(msg)
+
+ def on_message(self, event):
+ if event.receiver == self.reply_receiver:
+ response = self.proxy.response(event.message)
+ self.reply_receiver.flow(1)
+ if self.stage == 1:
+ #
+ # LinkBlocked
+ #
+ if response.results[0].linksBlocked == 1:
+ self.receiver.flow(10)
+ self.stage = 2
+ self.process()
+ return
+
+ elif self.stage == 2:
+ #
+ # LinkUnblocked
+ #
+ if response.results[0].linksBlocked == 0:
+ self.stage = 3
+ self.process()
+ return
+
+ elif self.stage == 3:
+ #
+ # 10Credits
+ #
+ for link in response.results:
+ if 'M0' + self.addr == link.owningAddr:
+ if link.creditAvailable == 10:
+ self.receiver.flow(10)
+ self.stage = 4
+ self.process()
+ return
+
+ elif self.stage == 4:
+ #
+ # 20Credits
+ #
+ for link in response.results:
+ if 'M0' + self.addr == link.owningAddr:
+ if link.creditAvailable == 20:
+ self.fail(None)
+ return
+
+ self.poll_timer = event.reactor.schedule(0.5, PollTimeout(self))
+
+ def poll_timeout(self):
+ self.process()
+
+ def run(self):
+ Container(self).run()
+
+
if __name__== '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org