You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by GitBox <gi...@apache.org> on 2021/11/09 15:50:03 UTC

[GitHub] [qpid-dispatch] jiridanek commented on a change in pull request #1430: Dispatch 2275

jiridanek commented on a change in pull request #1430:
URL: https://github.com/apache/qpid-dispatch/pull/1430#discussion_r745695705



##########
File path: tests/system_tests_topology_disposition.py
##########
@@ -24,6 +24,7 @@
 from subprocess import PIPE, STDOUT
 
 from proton import Message, Timeout
+from proton import VERSION as PROTON_VERSION

Review comment:
       I have a personal dislike to renaming identifiers, so I'd prefer `import proton`, `proton.VERSION > ...` but I realize that is not the way Dispatch does things.

##########
File path: tests/system_tests_link_routes.py
##########
@@ -1797,110 +1814,227 @@ def _fake_broker(self, cls):
         self.router.wait_connectors()
         return fake_broker
 
+    def _find_frame(self, data, code) -> Optional[list]:
+        """Scan a byte sequence for performatives that match code.
+        Return the frame body (list) if match else None
+        """
+        while data:
+            # starts at frame header (8 bytes)
+            frame_len = int.from_bytes(data[:4], "big")
+            if frame_len == 0 or frame_len > len(data):
+                return None
+            desc = Data()
+            desc.decode(data[8:frame_len])  # skip frame header
+            data = data[frame_len:]    # advance to next frame
+            desc.rewind()
+            if desc.next() is None:
+                return None
+            if not desc.is_described():
+                return None
+            py_desc = desc.get_py_described()
+            if py_desc.descriptor == code:
+                return py_desc.value
+        return None
+
+    def _send_frame(self, frame: Data, sock: socket.socket):
+        """Encode and send frame over sock
+        """
+        frame.rewind()
+        fbytes = frame.encode()
+        flen = len(fbytes) + 8
+        # AMQP FRAME HEADER: 4 byte length, DOFF, TYPE, CHANNEL
+        sock.sendall(flen.to_bytes(4, "big"))
+        sock.sendall(bytes([2, 0, 0, 0]))
+        sock.sendall(fbytes)
+
+    def _construct_transfer(self, delivery_id, tag, more=False, add_ma=False,

Review comment:
       I can imagine that these functions will in the future need to be moved up the scope so that other tests can use them too. They look a bit complicated, maybe even enough to deserve their own test, maybe just a small one?

##########
File path: tests/system_tests_link_routes.py
##########
@@ -1797,110 +1814,227 @@ def _fake_broker(self, cls):
         self.router.wait_connectors()
         return fake_broker
 
+    def _find_frame(self, data, code) -> Optional[list]:

Review comment:
       This could be typed more. What is the type of `data` is quite obvious. I have little idea what is the type of the `code` parameter (I suspect `int`) and I am not sure list of _what_ is being returned. I'd venture a guess it is an `int` as well.
   
   ```suggestion
       def _find_frame(self, data: Data, code: int) -> Optional[List[int]]:
   ```

##########
File path: tests/system_tests_topology_disposition.py
##########
@@ -406,6 +407,9 @@ def test_03_connection_id_propagation(self):
             self.assertIsNone(error)
 
     def test_04_scraper_tool(self):
+        if PROTON_VERSION > (0, 36, 0):
+            self.skipTest("Test skipped - see DISPATCH-2276")

Review comment:
       This would be better as a decorator. That way, the test does not even start executing.
   
   ```
   @unittest.skipIf(PROTON_VERSION > (0, 36, 0))
   ```

##########
File path: tests/system_tests_link_routes.py
##########
@@ -1797,110 +1814,227 @@ def _fake_broker(self, cls):
         self.router.wait_connectors()
         return fake_broker
 
+    def _find_frame(self, data, code) -> Optional[list]:
+        """Scan a byte sequence for performatives that match code.
+        Return the frame body (list) if match else None
+        """
+        while data:
+            # starts at frame header (8 bytes)
+            frame_len = int.from_bytes(data[:4], "big")
+            if frame_len == 0 or frame_len > len(data):
+                return None
+            desc = Data()
+            desc.decode(data[8:frame_len])  # skip frame header
+            data = data[frame_len:]    # advance to next frame
+            desc.rewind()
+            if desc.next() is None:
+                return None
+            if not desc.is_described():
+                return None
+            py_desc = desc.get_py_described()
+            if py_desc.descriptor == code:
+                return py_desc.value
+        return None
+
+    def _send_frame(self, frame: Data, sock: socket.socket):
+        """Encode and send frame over sock
+        """
+        frame.rewind()
+        fbytes = frame.encode()
+        flen = len(fbytes) + 8
+        # AMQP FRAME HEADER: 4 byte length, DOFF, TYPE, CHANNEL
+        sock.sendall(flen.to_bytes(4, "big"))
+        sock.sendall(bytes([2, 0, 0, 0]))
+        sock.sendall(fbytes)
+
+    def _construct_transfer(self, delivery_id, tag, more=False, add_ma=False,
+                            add_body=False) -> Data:
+        """Construct a Transfer frame in a proton Data object
+        """
+        t1_frame = Data()
+        t1_frame.put_described()
+        t1_frame.enter()
+        t1_frame.put_ulong(self.TRANSFER_DESCRIPTOR)
+        t1_frame.put_list()
+        t1_frame.enter()
+        t1_frame.put_uint(0)  # handle
+        t1_frame.put_uint(delivery_id)
+        t1_frame.put_binary(tag)
+        t1_frame.put_uint(0)           # msg format
+        t1_frame.put_bool(False)       # settled
+        t1_frame.put_bool(more)
+        t1_frame.exit()   # transfer list
+        t1_frame.exit()   # transfer described type
+        if add_ma:
+            t1_frame.put_described()
+            t1_frame.enter()
+            t1_frame.put_ulong(self.MA_SECTION_DESCRIPTOR)
+            t1_frame.put_list()
+            t1_frame.enter()
+            t1_frame.put_ulong(9)
+            t1_frame.exit()  # list
+            t1_frame.exit()  # described
+        if add_body:
+            t1_frame.put_described()
+            t1_frame.enter()
+            t1_frame.put_ulong(self.BODY_SECTION_DESCRIPTOR)
+            t1_frame.put_string("I'm a small body!")
+            t1_frame.exit()
+            t1_frame.exit()
+
+        return t1_frame
+
+    def _get_outcome(self, dispo_frame: list) -> Optional[int]:
+        """Extract the outcome from a raw disposition frame"""
+        outcome = None
+        if len(dispo_frame) >= 5:  # list[5] == state
+            if isinstance(dispo_frame[4], Described):
+                outcome = dispo_frame[4].descriptor
+        return outcome
+
     def test_DISPATCH_1988(self):
         fake_broker = self._fake_broker(FakeBroker)
-        AMQP_OPEN_BEGIN_ATTACH = bytearray(
-            b'\x41\x4d\x51\x50\x00\x01\x00\x00\x00\x00\x00\x21\x02\x00\x00'
-            b'\x00\x00\x53\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x04\xa1\x06'
-            b'\x2e\x2f\x73\x65\x6e\x64\x40\x40\x60\x7f\xff\x00\x00\x00\x21'
-            b'\x02\x00\x00\x00\x00\x53\x11\xd0\x00\x00\x00\x11\x00\x00\x00'
-            b'\x04\x40\x52\x00\x70\x7f\xff\xff\xff\x70\x7f\xff\xff\xff\x00'
-            b'\x00\x00\x5b\x02\x00\x00\x00\x00\x53\x12\xd0\x00\x00\x00\x4b'
-            b'\x00\x00\x00\x0b\xa1\x09\x6d\x79\x5f\x73\x65\x6e\x64\x65\x72'
-            b'\x52\x00\x42\x50\x02\x50\x00\x00\x53\x28\xd0\x00\x00\x00\x0b'
-            b'\x00\x00\x00\x05\x40\x52\x00\x40\x52\x00\x42\x00\x53\x29\xd0'
-            b'\x00\x00\x00\x14\x00\x00\x00\x05\xa1\x08\x65\x78\x61\x6d\x70'
-            b'\x6c\x65\x73\x52\x00\x40\x52\x00\x42\x40\x40\x52\x00\x53\x00')
 
+        self.router.wait_ready()
         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         # Connect to the router listening port and send an amqp, open,
         # begin, attach. The attach is sent on the link
         # routed address, "examples"
-        s.connect(("0.0.0.0", EmptyTransferTest.ROUTER_LISTEN_PORT))
-        s.sendall(AMQP_OPEN_BEGIN_ATTACH)
+        s.connect(("127.0.0.1", EmptyTransferTest.ROUTER_LISTEN_PORT))
+
+        # send 'AMQP 1 0' preamble
+        s.sendall(b'\x41\x4d\x51\x50\x00\x01\x00\x00')
+
+        # send Open/Begin/Attach
+        open_frame = Data()
+        open_frame.put_described()
+        open_frame.enter()
+        open_frame.put_ulong(self.OPEN_DESCRIPTOR)
+        open_frame.put_list()
+        open_frame.enter()
+        open_frame.put_string("TestContainer")
+        open_frame.exit()
+        open_frame.exit()
+        open_frame.rewind()
+
+        begin_frame = Data()
+        begin_frame.put_described()
+        begin_frame.enter()
+        begin_frame.put_ulong(self.BEGIN_DESCRIPTOR)
+        begin_frame.put_list()
+        begin_frame.enter()
+        begin_frame.put_null()
+        begin_frame.put_uint(0)   # next out id
+        begin_frame.put_uint(0xfffff)  # in/out window
+        begin_frame.put_uint(0xfffff)
+        begin_frame.exit()
+        begin_frame.exit()
+        begin_frame.rewind()
+
+        attach_frame = Data()
+        attach_frame.put_described()
+        attach_frame.enter()
+        attach_frame.put_ulong(self.ATTACH_DESCRIPTOR)
+        attach_frame.put_list()
+        attach_frame.enter()
+        attach_frame.put_string("test-link-name")
+        attach_frame.put_uint(0)      # handle
+        attach_frame.put_bool(False)  # sender
+        attach_frame.put_null()
+        attach_frame.put_null()
+        attach_frame.put_null()
+        # target:
+        attach_frame.put_described()
+        attach_frame.enter()
+        attach_frame.put_ulong(self.TARGET_DESCRIPTOR)
+        attach_frame.put_list()
+        attach_frame.enter()
+        attach_frame.put_string("examples/foo")
+        attach_frame.exit()  # target list
+        attach_frame.exit()  # target descriptor
+        attach_frame.exit()    # attach list
+        attach_frame.exit()    # attach descriptor
+        attach_frame.rewind()
+
+        for frame in [open_frame, begin_frame, attach_frame]:
+            self._send_frame(frame, s)
 
         # Give a second for the attach to propagate to the broker and
-        # for the broker to send a response attach
+        # for the broker to send a response attach and flow:
         sleep(1)
-        data = s.recv(2048)
-        self.assertIn("examples", repr(data))
-
-        # First send a message on link routed address "examples" with
-        # message body of "message 0"
-        # Verify the the sent message has been accepted.
-        TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00'
-                               + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x01'
-                               + b'\xa0\x01\x01\x43\x42'
-                               + b'\x40\x40\x40\x40\x40\x42\x00\x53'
-                               + b'\x73\xc0\x02\x01\x44\x00\x53\x77'
-                               + b'\xa1\x09\x6d\x65\x73\x73\x61\x67'
-                               + b'\x65\x20\x30')
-        s.sendall(TRANSFER_1)
+        data = s.recv(4096)
+        self.assertEqual(data[:8], b'AMQP\x00\x01\x00\x00')
+        # expect that the connection was accepted: check for a flow frame:
+        flow_frame = self._find_frame(data[8:], self.FLOW_DESCRIPTOR)
+        self.assertIsNotNone(flow_frame)
+
+        # First send a message on link routed address "examples" with a small
+        # message body. Verify the the sent message has been accepted.
+        t1_frame = self._construct_transfer(0, b'\x01', add_ma=True, add_body=True)
+        self._send_frame(t1_frame, s)
+
+        # We expect to get a disposition frame that accepted the message
         sleep(0.5)
         data = s.recv(1024)
-        # The delivery has been accepted.
-        self.assertIn("x00S$E", repr(data))
+        dispo_frame = self._find_frame(data, self.DISPO_DESCRIPTOR)
+        self.assertIsNotNone(dispo_frame, "expected a disposition (none arrived!)")

Review comment:
       This is fragile in case s.recv() receives say only half of the AMQP frame the first time it runs, and the other half the second time, and the expected data is found on the border of both frames. So the suggestion with the retry I tried to give above does not apply, it would make the test flaky here. (Well, the s.recv()s would have to be appending to a buffer, which gets checked in the entirety each time, I guess, so solutions exist.)

##########
File path: tests/system_tests_link_routes.py
##########
@@ -1797,110 +1814,227 @@ def _fake_broker(self, cls):
         self.router.wait_connectors()
         return fake_broker
 
+    def _find_frame(self, data, code) -> Optional[list]:
+        """Scan a byte sequence for performatives that match code.
+        Return the frame body (list) if match else None
+        """
+        while data:
+            # starts at frame header (8 bytes)
+            frame_len = int.from_bytes(data[:4], "big")
+            if frame_len == 0 or frame_len > len(data):
+                return None
+            desc = Data()
+            desc.decode(data[8:frame_len])  # skip frame header
+            data = data[frame_len:]    # advance to next frame
+            desc.rewind()
+            if desc.next() is None:
+                return None
+            if not desc.is_described():
+                return None
+            py_desc = desc.get_py_described()
+            if py_desc.descriptor == code:
+                return py_desc.value
+        return None
+
+    def _send_frame(self, frame: Data, sock: socket.socket):
+        """Encode and send frame over sock
+        """
+        frame.rewind()
+        fbytes = frame.encode()
+        flen = len(fbytes) + 8
+        # AMQP FRAME HEADER: 4 byte length, DOFF, TYPE, CHANNEL
+        sock.sendall(flen.to_bytes(4, "big"))
+        sock.sendall(bytes([2, 0, 0, 0]))
+        sock.sendall(fbytes)
+
+    def _construct_transfer(self, delivery_id, tag, more=False, add_ma=False,
+                            add_body=False) -> Data:
+        """Construct a Transfer frame in a proton Data object
+        """
+        t1_frame = Data()
+        t1_frame.put_described()
+        t1_frame.enter()
+        t1_frame.put_ulong(self.TRANSFER_DESCRIPTOR)
+        t1_frame.put_list()
+        t1_frame.enter()
+        t1_frame.put_uint(0)  # handle
+        t1_frame.put_uint(delivery_id)
+        t1_frame.put_binary(tag)
+        t1_frame.put_uint(0)           # msg format
+        t1_frame.put_bool(False)       # settled
+        t1_frame.put_bool(more)
+        t1_frame.exit()   # transfer list
+        t1_frame.exit()   # transfer described type
+        if add_ma:
+            t1_frame.put_described()
+            t1_frame.enter()
+            t1_frame.put_ulong(self.MA_SECTION_DESCRIPTOR)
+            t1_frame.put_list()
+            t1_frame.enter()
+            t1_frame.put_ulong(9)
+            t1_frame.exit()  # list
+            t1_frame.exit()  # described
+        if add_body:
+            t1_frame.put_described()
+            t1_frame.enter()
+            t1_frame.put_ulong(self.BODY_SECTION_DESCRIPTOR)
+            t1_frame.put_string("I'm a small body!")
+            t1_frame.exit()
+            t1_frame.exit()
+
+        return t1_frame
+
+    def _get_outcome(self, dispo_frame: list) -> Optional[int]:
+        """Extract the outcome from a raw disposition frame"""
+        outcome = None
+        if len(dispo_frame) >= 5:  # list[5] == state
+            if isinstance(dispo_frame[4], Described):
+                outcome = dispo_frame[4].descriptor
+        return outcome
+
     def test_DISPATCH_1988(self):
         fake_broker = self._fake_broker(FakeBroker)
-        AMQP_OPEN_BEGIN_ATTACH = bytearray(
-            b'\x41\x4d\x51\x50\x00\x01\x00\x00\x00\x00\x00\x21\x02\x00\x00'
-            b'\x00\x00\x53\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x04\xa1\x06'
-            b'\x2e\x2f\x73\x65\x6e\x64\x40\x40\x60\x7f\xff\x00\x00\x00\x21'
-            b'\x02\x00\x00\x00\x00\x53\x11\xd0\x00\x00\x00\x11\x00\x00\x00'
-            b'\x04\x40\x52\x00\x70\x7f\xff\xff\xff\x70\x7f\xff\xff\xff\x00'
-            b'\x00\x00\x5b\x02\x00\x00\x00\x00\x53\x12\xd0\x00\x00\x00\x4b'
-            b'\x00\x00\x00\x0b\xa1\x09\x6d\x79\x5f\x73\x65\x6e\x64\x65\x72'
-            b'\x52\x00\x42\x50\x02\x50\x00\x00\x53\x28\xd0\x00\x00\x00\x0b'
-            b'\x00\x00\x00\x05\x40\x52\x00\x40\x52\x00\x42\x00\x53\x29\xd0'
-            b'\x00\x00\x00\x14\x00\x00\x00\x05\xa1\x08\x65\x78\x61\x6d\x70'
-            b'\x6c\x65\x73\x52\x00\x40\x52\x00\x42\x40\x40\x52\x00\x53\x00')
 
+        self.router.wait_ready()
         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         # Connect to the router listening port and send an amqp, open,
         # begin, attach. The attach is sent on the link
         # routed address, "examples"
-        s.connect(("0.0.0.0", EmptyTransferTest.ROUTER_LISTEN_PORT))
-        s.sendall(AMQP_OPEN_BEGIN_ATTACH)
+        s.connect(("127.0.0.1", EmptyTransferTest.ROUTER_LISTEN_PORT))
+
+        # send 'AMQP 1 0' preamble
+        s.sendall(b'\x41\x4d\x51\x50\x00\x01\x00\x00')
+
+        # send Open/Begin/Attach
+        open_frame = Data()
+        open_frame.put_described()
+        open_frame.enter()
+        open_frame.put_ulong(self.OPEN_DESCRIPTOR)
+        open_frame.put_list()
+        open_frame.enter()
+        open_frame.put_string("TestContainer")
+        open_frame.exit()
+        open_frame.exit()
+        open_frame.rewind()
+
+        begin_frame = Data()
+        begin_frame.put_described()
+        begin_frame.enter()
+        begin_frame.put_ulong(self.BEGIN_DESCRIPTOR)
+        begin_frame.put_list()
+        begin_frame.enter()
+        begin_frame.put_null()
+        begin_frame.put_uint(0)   # next out id
+        begin_frame.put_uint(0xfffff)  # in/out window
+        begin_frame.put_uint(0xfffff)
+        begin_frame.exit()
+        begin_frame.exit()
+        begin_frame.rewind()
+
+        attach_frame = Data()
+        attach_frame.put_described()
+        attach_frame.enter()
+        attach_frame.put_ulong(self.ATTACH_DESCRIPTOR)
+        attach_frame.put_list()
+        attach_frame.enter()
+        attach_frame.put_string("test-link-name")
+        attach_frame.put_uint(0)      # handle
+        attach_frame.put_bool(False)  # sender
+        attach_frame.put_null()
+        attach_frame.put_null()
+        attach_frame.put_null()
+        # target:
+        attach_frame.put_described()
+        attach_frame.enter()
+        attach_frame.put_ulong(self.TARGET_DESCRIPTOR)
+        attach_frame.put_list()
+        attach_frame.enter()
+        attach_frame.put_string("examples/foo")
+        attach_frame.exit()  # target list
+        attach_frame.exit()  # target descriptor
+        attach_frame.exit()    # attach list
+        attach_frame.exit()    # attach descriptor
+        attach_frame.rewind()
+
+        for frame in [open_frame, begin_frame, attach_frame]:
+            self._send_frame(frame, s)
 
         # Give a second for the attach to propagate to the broker and
-        # for the broker to send a response attach
+        # for the broker to send a response attach and flow:
         sleep(1)
-        data = s.recv(2048)
-        self.assertIn("examples", repr(data))
-
-        # First send a message on link routed address "examples" with
-        # message body of "message 0"
-        # Verify the the sent message has been accepted.
-        TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00'
-                               + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x01'
-                               + b'\xa0\x01\x01\x43\x42'
-                               + b'\x40\x40\x40\x40\x40\x42\x00\x53'
-                               + b'\x73\xc0\x02\x01\x44\x00\x53\x77'
-                               + b'\xa1\x09\x6d\x65\x73\x73\x61\x67'
-                               + b'\x65\x20\x30')
-        s.sendall(TRANSFER_1)
+        data = s.recv(4096)
+        self.assertEqual(data[:8], b'AMQP\x00\x01\x00\x00')
+        # expect that the connection was accepted: check for a flow frame:
+        flow_frame = self._find_frame(data[8:], self.FLOW_DESCRIPTOR)
+        self.assertIsNotNone(flow_frame)
+
+        # First send a message on link routed address "examples" with a small
+        # message body. Verify the the sent message has been accepted.
+        t1_frame = self._construct_transfer(0, b'\x01', add_ma=True, add_body=True)
+        self._send_frame(t1_frame, s)
+
+        # We expect to get a disposition frame that accepted the message
         sleep(0.5)
         data = s.recv(1024)
-        # The delivery has been accepted.
-        self.assertIn("x00S$E", repr(data))
+        dispo_frame = self._find_frame(data, self.DISPO_DESCRIPTOR)
+        self.assertIsNotNone(dispo_frame, "expected a disposition (none arrived!)")
+
+        outcome = self._get_outcome(dispo_frame)
+        self.assertEqual(self.ACCEPTED_OUTCOME, outcome,
+                         "Transfer not accepted (unexpected!)")
 
         # Test case 1
-        # Send an empty transfer frame to the router and you should
-        # receive a rejected disposition from the router.
-        # Without the fix for DISPATCH_1988,
-        # upon sending this EMPTY_TRANSFER
-        # the router crashes with the following assert
+        #
+        # Send an empty transfer frame to the router and you should receive a
+        # rejected disposition from the router.  Without the fix for
+        # DISPATCH_1988, upon sending this EMPTY_TRANSFER the router crashes
+        # with the following assert
+        #
         # qpid-dispatch/src/message.c:1260: qd_message_add_fanout: Assertion `content->pending && qd_buffer_size(content->pending) > 0' failed.
-        # This is the empty transfer frame that is sent to the router.
-        # [0x614000030050]: AMQP:FRAME:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x01", message-format=0, settled=false, batchable=false]
-        EMPTY_TRANSFER = bytearray(b'\x00\x00\x00\x1c\x02\x00\x00\x00'
-                                   + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52'
-                                   + b'\x02\xa0\x01\x02\x43\x42'
-                                   + b'\x42\x40\x40\x40\x40\x42')
-        s.sendall(EMPTY_TRANSFER)
-        sleep(1)
-        data = s.recv(1024)
-        # The delivery has been rejected.
-        self.assertIn("x00S%E", repr(data))
-
-        # Let's send another transfer to make sure that the
-        # router has not crashed.
-        TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00'
-                               + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x03'
-                               + b'\xa0\x01\x03\x43\x42'
-                               + b'\x40\x40\x40\x40\x40\x42\x00\x53'
-                               + b'\x73\xc0\x02\x01\x44\x00\x53\x77'
-                               + b'\xa1\x09\x6d\x65\x73\x73\x61\x67'
-                               + b'\x65\x20\x30')
-        s.sendall(TRANSFER_1)
+
+        t2_frame = self._construct_transfer(1, b'\x02')
+        self._send_frame(t2_frame, s)
+
         sleep(0.5)
         data = s.recv(1024)
-        # The delivery has been accepted.
-        self.assertIn("x00S$E", repr(data))
+        dispo_frame = self._find_frame(data, self.DISPO_DESCRIPTOR)
+        self.assertIsNotNone(dispo_frame, "expected a disposition (none arrived!)")

Review comment:
       Disadvantage of doing the assertion like this is that when the assert fails, the resulting error message is not all that helpful. Afaik pytest will print content of local variables in those terribly verbose failure messages it provides, so there the actual content of `data` would be visible.
   
   The assertion will probably never fail again anyways, and pytest can help, so probably not a good use of time to worry about this.

##########
File path: tests/system_tests_link_routes.py
##########
@@ -1797,110 +1814,227 @@ def _fake_broker(self, cls):
         self.router.wait_connectors()
         return fake_broker
 
+    def _find_frame(self, data, code) -> Optional[list]:
+        """Scan a byte sequence for performatives that match code.
+        Return the frame body (list) if match else None
+        """
+        while data:
+            # starts at frame header (8 bytes)
+            frame_len = int.from_bytes(data[:4], "big")
+            if frame_len == 0 or frame_len > len(data):
+                return None
+            desc = Data()
+            desc.decode(data[8:frame_len])  # skip frame header
+            data = data[frame_len:]    # advance to next frame
+            desc.rewind()
+            if desc.next() is None:
+                return None
+            if not desc.is_described():
+                return None
+            py_desc = desc.get_py_described()
+            if py_desc.descriptor == code:
+                return py_desc.value
+        return None
+
+    def _send_frame(self, frame: Data, sock: socket.socket):
+        """Encode and send frame over sock
+        """
+        frame.rewind()
+        fbytes = frame.encode()
+        flen = len(fbytes) + 8
+        # AMQP FRAME HEADER: 4 byte length, DOFF, TYPE, CHANNEL
+        sock.sendall(flen.to_bytes(4, "big"))
+        sock.sendall(bytes([2, 0, 0, 0]))
+        sock.sendall(fbytes)
+
+    def _construct_transfer(self, delivery_id, tag, more=False, add_ma=False,
+                            add_body=False) -> Data:
+        """Construct a Transfer frame in a proton Data object
+        """
+        t1_frame = Data()
+        t1_frame.put_described()
+        t1_frame.enter()
+        t1_frame.put_ulong(self.TRANSFER_DESCRIPTOR)
+        t1_frame.put_list()
+        t1_frame.enter()
+        t1_frame.put_uint(0)  # handle
+        t1_frame.put_uint(delivery_id)
+        t1_frame.put_binary(tag)
+        t1_frame.put_uint(0)           # msg format
+        t1_frame.put_bool(False)       # settled
+        t1_frame.put_bool(more)
+        t1_frame.exit()   # transfer list
+        t1_frame.exit()   # transfer described type
+        if add_ma:
+            t1_frame.put_described()
+            t1_frame.enter()
+            t1_frame.put_ulong(self.MA_SECTION_DESCRIPTOR)
+            t1_frame.put_list()
+            t1_frame.enter()
+            t1_frame.put_ulong(9)
+            t1_frame.exit()  # list
+            t1_frame.exit()  # described
+        if add_body:
+            t1_frame.put_described()
+            t1_frame.enter()
+            t1_frame.put_ulong(self.BODY_SECTION_DESCRIPTOR)
+            t1_frame.put_string("I'm a small body!")
+            t1_frame.exit()
+            t1_frame.exit()
+
+        return t1_frame
+
+    def _get_outcome(self, dispo_frame: list) -> Optional[int]:
+        """Extract the outcome from a raw disposition frame"""
+        outcome = None
+        if len(dispo_frame) >= 5:  # list[5] == state
+            if isinstance(dispo_frame[4], Described):
+                outcome = dispo_frame[4].descriptor
+        return outcome
+
     def test_DISPATCH_1988(self):
         fake_broker = self._fake_broker(FakeBroker)
-        AMQP_OPEN_BEGIN_ATTACH = bytearray(
-            b'\x41\x4d\x51\x50\x00\x01\x00\x00\x00\x00\x00\x21\x02\x00\x00'
-            b'\x00\x00\x53\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x04\xa1\x06'
-            b'\x2e\x2f\x73\x65\x6e\x64\x40\x40\x60\x7f\xff\x00\x00\x00\x21'
-            b'\x02\x00\x00\x00\x00\x53\x11\xd0\x00\x00\x00\x11\x00\x00\x00'
-            b'\x04\x40\x52\x00\x70\x7f\xff\xff\xff\x70\x7f\xff\xff\xff\x00'
-            b'\x00\x00\x5b\x02\x00\x00\x00\x00\x53\x12\xd0\x00\x00\x00\x4b'
-            b'\x00\x00\x00\x0b\xa1\x09\x6d\x79\x5f\x73\x65\x6e\x64\x65\x72'
-            b'\x52\x00\x42\x50\x02\x50\x00\x00\x53\x28\xd0\x00\x00\x00\x0b'
-            b'\x00\x00\x00\x05\x40\x52\x00\x40\x52\x00\x42\x00\x53\x29\xd0'
-            b'\x00\x00\x00\x14\x00\x00\x00\x05\xa1\x08\x65\x78\x61\x6d\x70'
-            b'\x6c\x65\x73\x52\x00\x40\x52\x00\x42\x40\x40\x52\x00\x53\x00')
 
+        self.router.wait_ready()
         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         # Connect to the router listening port and send an amqp, open,
         # begin, attach. The attach is sent on the link
         # routed address, "examples"
-        s.connect(("0.0.0.0", EmptyTransferTest.ROUTER_LISTEN_PORT))
-        s.sendall(AMQP_OPEN_BEGIN_ATTACH)
+        s.connect(("127.0.0.1", EmptyTransferTest.ROUTER_LISTEN_PORT))
+
+        # send 'AMQP 1 0' preamble
+        s.sendall(b'\x41\x4d\x51\x50\x00\x01\x00\x00')
+
+        # send Open/Begin/Attach
+        open_frame = Data()
+        open_frame.put_described()
+        open_frame.enter()
+        open_frame.put_ulong(self.OPEN_DESCRIPTOR)
+        open_frame.put_list()
+        open_frame.enter()
+        open_frame.put_string("TestContainer")
+        open_frame.exit()
+        open_frame.exit()
+        open_frame.rewind()
+
+        begin_frame = Data()
+        begin_frame.put_described()
+        begin_frame.enter()
+        begin_frame.put_ulong(self.BEGIN_DESCRIPTOR)
+        begin_frame.put_list()
+        begin_frame.enter()
+        begin_frame.put_null()
+        begin_frame.put_uint(0)   # next out id
+        begin_frame.put_uint(0xfffff)  # in/out window
+        begin_frame.put_uint(0xfffff)
+        begin_frame.exit()
+        begin_frame.exit()
+        begin_frame.rewind()
+
+        attach_frame = Data()
+        attach_frame.put_described()
+        attach_frame.enter()
+        attach_frame.put_ulong(self.ATTACH_DESCRIPTOR)
+        attach_frame.put_list()
+        attach_frame.enter()
+        attach_frame.put_string("test-link-name")
+        attach_frame.put_uint(0)      # handle
+        attach_frame.put_bool(False)  # sender
+        attach_frame.put_null()
+        attach_frame.put_null()
+        attach_frame.put_null()
+        # target:
+        attach_frame.put_described()
+        attach_frame.enter()
+        attach_frame.put_ulong(self.TARGET_DESCRIPTOR)
+        attach_frame.put_list()
+        attach_frame.enter()
+        attach_frame.put_string("examples/foo")
+        attach_frame.exit()  # target list
+        attach_frame.exit()  # target descriptor
+        attach_frame.exit()    # attach list
+        attach_frame.exit()    # attach descriptor
+        attach_frame.rewind()
+
+        for frame in [open_frame, begin_frame, attach_frame]:
+            self._send_frame(frame, s)
 
         # Give a second for the attach to propagate to the broker and
-        # for the broker to send a response attach
+        # for the broker to send a response attach and flow:
         sleep(1)
-        data = s.recv(2048)
-        self.assertIn("examples", repr(data))
-
-        # First send a message on link routed address "examples" with
-        # message body of "message 0"
-        # Verify the the sent message has been accepted.
-        TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00'
-                               + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x01'
-                               + b'\xa0\x01\x01\x43\x42'
-                               + b'\x40\x40\x40\x40\x40\x42\x00\x53'
-                               + b'\x73\xc0\x02\x01\x44\x00\x53\x77'
-                               + b'\xa1\x09\x6d\x65\x73\x73\x61\x67'
-                               + b'\x65\x20\x30')
-        s.sendall(TRANSFER_1)
+        data = s.recv(4096)
+        self.assertEqual(data[:8], b'AMQP\x00\x01\x00\x00')
+        # expect that the connection was accepted: check for a flow frame:
+        flow_frame = self._find_frame(data[8:], self.FLOW_DESCRIPTOR)
+        self.assertIsNotNone(flow_frame)
+
+        # First send a message on link routed address "examples" with a small
+        # message body. Verify the the sent message has been accepted.
+        t1_frame = self._construct_transfer(0, b'\x01', add_ma=True, add_body=True)
+        self._send_frame(t1_frame, s)
+
+        # We expect to get a disposition frame that accepted the message
         sleep(0.5)

Review comment:
       Maybe use a retry function? Alan added some back in the day. They would take a lambda and retry until lambda returns True. Also, there was one that would return while an expected exception was being thrown. So maybe use it to repeat `self.assertIsNotNone(flow_frame)` in a loop while either AssertionError is being thrown, or 0.5 seconds is not yet up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org