You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/06/04 13:26:07 UTC
[qpid-dispatch] branch master updated: DISPATCH-1683 - Used the
correct sender to send messages. Also waited till address propagated across
routers. This closes #754
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new b0ba0ae DISPATCH-1683 - Used the correct sender to send messages. Also waited till address propagated across routers. This closes #754
b0ba0ae is described below
commit b0ba0ae05e5fdef0ca1cdc95865afc8dfd3d5565
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Wed Jun 3 15:37:33 2020 -0400
DISPATCH-1683 - Used the correct sender to send messages. Also waited till address propagated across routers. This closes #754
---
tests/system_tests_distribution.py | 94 ++++++++++++++++++++++++++++----------
1 file changed, 71 insertions(+), 23 deletions(-)
diff --git a/tests/system_tests_distribution.py b/tests/system_tests_distribution.py
index 8404fa9..c9b3c9d 100644
--- a/tests/system_tests_distribution.py
+++ b/tests/system_tests_distribution.py
@@ -1867,13 +1867,22 @@ class DynamicReplyTo(MessagingHandler):
self.error = None
self.server_receiver = None
self.client_receiver = None
- self.sender = None
+ self.client_sender = None
self.server_sender = None
self.n_expected = 10
self.n_sent = 0
self.received_by_server = 0
self.received_by_client = 0
self.test_name = test_name
+ self.server_receiver_ready = False
+ self.client_receiver_ready = False
+ self.reply_to_addr = None
+ self.senders_created = False
+ self.addr_check_timer = None
+ self.addr_check_sender = None
+ self.container = None
+ self.num_attempts = 0
+ self.addr_check_receiver = None
def timeout(self):
@@ -1882,36 +1891,79 @@ class DynamicReplyTo(MessagingHandler):
self.client_connection.close()
self.server_connection.close()
+ def address_check_timeout(self):
+ self.addr_check_sender.send(self.addr_checker.make_address_query("M0" + self.dest))
- def on_start ( self, event ):
- self.timer = event.reactor.schedule ( TIMEOUT, TestTimeout(self) )
+ def bail(self):
+ self.timer.cancel()
+ self.server_receiver.close()
+ self.client_receiver.close()
+ self.addr_check_sender.close()
+ self.addr_check_receiver.close()
+ self.server_sender.close()
+ self.client_sender.close()
+ self.client_connection.close()
+ self.server_connection.close()
+
+ def on_start( self, event):
+ self.timer = event.reactor.schedule (TIMEOUT, TestTimeout(self))
# separate connections to simulate client and server.
self.client_connection = event.container.connect(self.client_addr)
self.server_connection = event.container.connect(self.server_addr)
-
- self.sender = event.container.create_sender(self.client_connection, self.dest)
- self.server_sender = event.container.create_sender(self.server_connection, None)
-
self.server_receiver = event.container.create_receiver(self.server_connection, self.dest)
self.client_receiver = event.container.create_receiver(self.client_connection, None, dynamic=True)
+ self.addr_check_sender = event.container.create_sender(self.client_connection, "$management")
+ self.container = event.container
+ self.addr_check_receiver = event.container.create_receiver(self.client_connection, dynamic=True)
+ def create_senders(self):
+ if not self.senders_created:
+ self.senders_created = True
+ self.client_sender = self.container.create_sender(self.client_connection, self.dest)
+ self.server_sender = self.container.create_sender(self.server_connection, None)
- def on_sendable(self, event):
- reply_to_addr = self.client_receiver.remote_source.address
+ def on_link_opened(self, event):
+ if event.receiver == self.addr_check_receiver:
+ self.addr_checker = AddressChecker(self.addr_check_receiver.remote_source.address)
+ if not self.server_receiver_ready and event.receiver == self.server_receiver:
+ self.server_receiver_ready = True
+ if not self.client_receiver_ready and event.receiver == self.client_receiver:
+ self.client_receiver_ready = True
+ if self.server_receiver_ready and self.client_receiver_ready:
+ if self.num_attempts == 0:
+ self.reply_to_addr = self.client_receiver.remote_source.address
+ self.num_attempts += 1
+ self.addr_check_timer = event.reactor.schedule(3, AddressCheckerTimeout(self))
- if reply_to_addr == None:
- return
+ def on_sendable(self, event):
+ if self.reply_to_addr == None:
+ return
- while event.sender.credit > 0 and self.n_sent < self.n_expected:
- # We send to server, and tell it how to reply to the client.
- request = Message ( body=self.n_sent,
- address=self.dest,
- reply_to = reply_to_addr )
- event.sender.send ( request )
- self.n_sent += 1
+ if event.sender == self.client_sender:
+ while event.sender.credit > 0 and self.n_sent < self.n_expected:
+ # We send to server, and tell it how to reply to the client.
+ request = Message ( body=self.n_sent,
+ address=self.dest,
+ reply_to=self.reply_to_addr )
+ event.sender.send ( request )
+ self.n_sent += 1
def on_message(self, event):
+ if event.receiver == self.addr_check_receiver:
+ response = self.addr_checker.parse_address_query_response(event.message)
+ # Create the senders if the address has propagated.
+ if response.status_code == 200 and response.remoteCount == 1:
+ self.create_senders()
+ else:
+ if self.num_attempts < 2:
+ self.num_attempts += 1
+ self.addr_check_timer = event.reactor.schedule(3, AddressCheckerTimeout(self))
+ else:
+ self.error = "Address %s did not propagate to the router to which the sender is attached" % self.dest
+ self.bail()
+ return
+
# Server gets a request and responds to
# the address that is embedded in the message.
if event.receiver == self.server_receiver :
@@ -1923,11 +1975,7 @@ class DynamicReplyTo(MessagingHandler):
elif event.receiver == self.client_receiver :
self.received_by_client += 1
if self.received_by_client == self.n_expected:
- self.timer.cancel()
- self.server_receiver.close()
- self.client_receiver.close()
- self.client_connection.close()
- self.server_connection.close()
+ self.bail()
def run(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org