You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/06/04 13:26:07 UTC

[qpid-dispatch] branch master updated: DISPATCH-1683 - Used the correct sender to send messages. Also waited till address propagated across routers. This closes #754

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new b0ba0ae  DISPATCH-1683 - Used the correct sender to send messages. Also waited till address propagated across routers. This closes #754
b0ba0ae is described below

commit b0ba0ae05e5fdef0ca1cdc95865afc8dfd3d5565
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Wed Jun 3 15:37:33 2020 -0400

    DISPATCH-1683 - Used the correct sender to send messages. Also waited till address propagated across routers. This closes #754
---
 tests/system_tests_distribution.py | 94 ++++++++++++++++++++++++++++----------
 1 file changed, 71 insertions(+), 23 deletions(-)

diff --git a/tests/system_tests_distribution.py b/tests/system_tests_distribution.py
index 8404fa9..c9b3c9d 100644
--- a/tests/system_tests_distribution.py
+++ b/tests/system_tests_distribution.py
@@ -1867,13 +1867,22 @@ class DynamicReplyTo(MessagingHandler):
         self.error              = None
         self.server_receiver    = None
         self.client_receiver    = None
-        self.sender             = None
+        self.client_sender      = None
         self.server_sender      = None
         self.n_expected         = 10
         self.n_sent             = 0
         self.received_by_server = 0
         self.received_by_client = 0
         self.test_name          = test_name
+        self.server_receiver_ready = False
+        self.client_receiver_ready = False
+        self.reply_to_addr = None
+        self.senders_created = False
+        self.addr_check_timer = None
+        self.addr_check_sender = None
+        self.container = None
+        self.num_attempts = 0
+        self.addr_check_receiver = None
 
 
     def timeout(self):
@@ -1882,36 +1891,79 @@ class DynamicReplyTo(MessagingHandler):
         self.client_connection.close()
         self.server_connection.close()
 
+    def address_check_timeout(self):
+        self.addr_check_sender.send(self.addr_checker.make_address_query("M0" + self.dest))
 
-    def on_start ( self, event ):
-        self.timer             = event.reactor.schedule ( TIMEOUT, TestTimeout(self) )
+    def bail(self):
+        self.timer.cancel()
+        self.server_receiver.close()
+        self.client_receiver.close()
+        self.addr_check_sender.close()
+        self.addr_check_receiver.close()
+        self.server_sender.close()
+        self.client_sender.close()
+        self.client_connection.close()
+        self.server_connection.close()
+
+    def on_start( self, event):
+        self.timer             = event.reactor.schedule (TIMEOUT, TestTimeout(self))
         # separate connections to simulate client and server.
         self.client_connection = event.container.connect(self.client_addr)
         self.server_connection = event.container.connect(self.server_addr)
-
-        self.sender            = event.container.create_sender(self.client_connection, self.dest)
-        self.server_sender     = event.container.create_sender(self.server_connection, None)
-
         self.server_receiver   = event.container.create_receiver(self.server_connection, self.dest)
         self.client_receiver   = event.container.create_receiver(self.client_connection, None, dynamic=True)
+        self.addr_check_sender = event.container.create_sender(self.client_connection, "$management")
+        self.container         = event.container
+        self.addr_check_receiver = event.container.create_receiver(self.client_connection, dynamic=True)
 
+    def create_senders(self):
+        if not self.senders_created:
+            self.senders_created = True
+            self.client_sender = self.container.create_sender(self.client_connection, self.dest)
+            self.server_sender = self.container.create_sender(self.server_connection, None)
 
-    def on_sendable(self, event):
-        reply_to_addr = self.client_receiver.remote_source.address
+    def on_link_opened(self, event):
+        if event.receiver == self.addr_check_receiver:
+            self.addr_checker = AddressChecker(self.addr_check_receiver.remote_source.address)
+        if not self.server_receiver_ready and event.receiver == self.server_receiver:
+            self.server_receiver_ready = True
+        if not self.client_receiver_ready and event.receiver == self.client_receiver:
+            self.client_receiver_ready = True
+        if self.server_receiver_ready and self.client_receiver_ready:
+            if self.num_attempts == 0:
+                self.reply_to_addr = self.client_receiver.remote_source.address
+                self.num_attempts += 1
+                self.addr_check_timer = event.reactor.schedule(3, AddressCheckerTimeout(self))
 
-        if reply_to_addr == None:
-          return
+    def on_sendable(self, event):
+        if self.reply_to_addr == None:
+            return
 
-        while event.sender.credit > 0 and self.n_sent < self.n_expected:
-            # We send to server, and tell it how to reply to the client.
-            request = Message ( body=self.n_sent,
-                                address=self.dest,
-                                reply_to = reply_to_addr )
-            event.sender.send ( request )
-            self.n_sent += 1
+        if event.sender == self.client_sender:
+            while event.sender.credit > 0 and self.n_sent < self.n_expected:
+                # We send to server, and tell it how to reply to the client.
+                request = Message ( body=self.n_sent,
+                                    address=self.dest,
+                                    reply_to=self.reply_to_addr )
+                event.sender.send ( request )
+                self.n_sent += 1
 
 
     def on_message(self, event):
+        if event.receiver == self.addr_check_receiver:
+            response = self.addr_checker.parse_address_query_response(event.message)
+            # Create the senders if the address has propagated.
+            if response.status_code == 200 and response.remoteCount == 1:
+                self.create_senders()
+            else:
+                if self.num_attempts < 2:
+                    self.num_attempts += 1
+                    self.addr_check_timer = event.reactor.schedule(3, AddressCheckerTimeout(self))
+                else:
+                    self.error = "Address %s did not propagate to the router to which the sender is attached" % self.dest
+                    self.bail()
+                    return
+
         # Server gets a request and responds to
         # the address that is embedded in the message.
         if event.receiver == self.server_receiver :
@@ -1923,11 +1975,7 @@ class DynamicReplyTo(MessagingHandler):
         elif event.receiver == self.client_receiver :
             self.received_by_client += 1
             if self.received_by_client == self.n_expected:
-                self.timer.cancel()
-                self.server_receiver.close()
-                self.client_receiver.close()
-                self.client_connection.close()
-                self.server_connection.close()
+                self.bail()
 
 
     def run(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org