You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2018/03/22 18:32:13 UTC

qpid-dispatch git commit: DISPATCH-947 - Modified some tests in system_tests_two_routers to use proton reactor instead of messenger

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 172629e3d -> 627616850


DISPATCH-947 - Modified some tests in system_tests_two_routers to use proton reactor instead of messenger


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/62761685
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/62761685
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/62761685

Branch: refs/heads/master
Commit: 6276168502f17124239c90fc01e93aaa6eb6acd3
Parents: 172629e
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Thu Mar 22 13:48:58 2018 -0400
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Thu Mar 22 14:29:01 2018 -0400

----------------------------------------------------------------------
 tests/system_tests_two_routers.py | 1780 +++++++++++++++-----------------
 1 file changed, 856 insertions(+), 924 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/62761685/tests/system_tests_two_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index b53cfa7..c317f9b 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -94,184 +94,425 @@ class TwoRouterTest(TestCase):
         cls.routers[1].wait_router_connected('QDR.A')
 
     def test_01_pre_settled(self):
-        addr = "amqp:/pre_settled.1"
+        test = DeliveriesInTransit(self.routers[0].addresses[0], self.routers[1].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
+        local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT)
+        outs = local_node.query(type='org.apache.qpid.dispatch.routerStats')
+
+        # deliveriesTransit must most surely be greater than num_msgs
+        pos = outs.attribute_names.index("deliveriesTransit")
+        results = outs.results[0]
+        self.assertTrue(results[pos] > 104)
+
+    def test_02a_multicast_unsettled(self):
+        test = MulticastUnsettled(self.routers[0].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_02c_sender_settles_first(self):
+        test = SenderSettlesFirst(self.routers[0].addresses[0], self.routers[1].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_03_message_annotations(self):
+        test = MessageAnnotationsTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_03a_test_strip_message_annotations_no(self):
+        test = MessageAnnotationsStripTest(self.routers[0].addresses[1], self.routers[1].addresses[1])
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_03a_test_strip_message_annotations_no_add_trace(self):
+        test = MessageAnnotationsStripAddTraceTest(self.routers[0].addresses[1], self.routers[1].addresses[1])
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_03a_test_strip_message_annotations_both_add_ingress_trace(self):
+        test = MessageAnnotationsStripBothAddIngressTrace(self.routers[0].addresses[2], self.routers[1].addresses[2])
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_03a_test_strip_message_annotations_out(self):
+        test = MessageAnnotationsStripMessageAnnotationsOut(self.routers[0].addresses[3], self.routers[1].addresses[3])
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_03a_test_strip_message_annotations_in(self):
+        test = MessageAnnotationSstripMessageAnnotationsInn(self.routers[0].addresses[4], self.routers[1].addresses[4])
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_04_management(self):
+        M = self.messenger()
+        M.start()
+        M.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        sub = M.subscribe("amqp:/#")
+        reply = sub.address
+
+        request  = Message()
+        response = Message()
+
+        request.address    = "amqp:/_local/$management"
+        request.reply_to   = reply
+        request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-MGMT-NODES'}
+
+        M.put(request)
+        M.send()
+        M.recv()
+        M.get(response)
+
+        assert response.properties['statusCode'] == 200, response.properties['statusDescription']
+        self.assertIn('amqp:/_topo/0/QDR.B/$management', response.body)
+
+        request.address    = "amqp:/_topo/0/QDR.B/$management"
+        request.reply_to   = reply
+        request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-MGMT-NODES'}
+
+        M.put(request)
+        M.send()
+        M.recv()
+        M.get(response)
+
+        self.assertEqual(response.properties['statusCode'], 200)
+        self.assertTrue('amqp:/_topo/0/QDR.A/$management' in response.body)
+
+        M.stop()
+
+    def test_05_semantics_multicast(self):
+        addr = "amqp:/multicast.1"
         M1 = self.messenger()
         M2 = self.messenger()
-
-        # Why 104 ? Choose a random number and use it to test later on.
-        num_msgs = 104
+        M3 = self.messenger()
+        M4 = self.messenger()
 
         M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
         M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+
+        M1.start()
+        M2.start()
+        M3.start()
+        M4.start()
 
         M2.subscribe(addr)
+        M3.subscribe(addr)
+        M4.subscribe(addr)
+        self.routers[0].wait_address("multicast.1", 1, 1)
 
         tm = Message()
         rm = Message()
 
-        self.routers[0].wait_address("pre_settled.1", 0, 1)
-
         tm.address = addr
-        for i in range(num_msgs):
+        for i in range(100):
             tm.body = {'number': i}
             M1.put(tm)
         M1.send()
 
-        for i in range(num_msgs):
-            M2.recv(1)
-            M2.get(rm)
-            self.assertEqual(i, rm.body['number'])
+        for i in range(100):
+            try:
+                M2.recv(1)
+                M2.get(rm)
+                self.assertEqual(i, rm.body['number'])
+            except:
+                print "M2 at", i
 
-        M1.stop()
-        M2.stop()
+            try:
+                M3.recv(1)
+                M3.get(rm)
+                self.assertEqual(i, rm.body['number'])
+            except:
+                print "M3 at", i
 
-        local_node = Node.connect(self.routers[0].addresses[0], timeout=TIMEOUT)
-        outs = local_node.query(type='org.apache.qpid.dispatch.routerStats')
+            try:
+                M4.recv(1)
+                M4.get(rm)
+                self.assertEqual(i, rm.body['number'])
+            except:
+                print "M4 at", i
 
-        # deliveriesTransit must most surely be greater than num_msgs
-        pos = outs.attribute_names.index("deliveriesTransit")
-        results = outs.results[0]
-        self.assertTrue(results[pos] > num_msgs)
+        M1.stop()
+        M2.stop()
+        M3.stop()
+        M4.stop()
 
-    def test_02a_multicast_unsettled(self):
-        addr = "amqp:/multicast.2"
+    def test_06_semantics_closest_is_local(self):
+        addr = "amqp:/closest.1"
         M1 = self.messenger()
         M2 = self.messenger()
         M3 = self.messenger()
         M4 = self.messenger()
 
+        M2.timeout = 0.1
+        M3.timeout = 0.1
+        M4.timeout = 0.1
+
         M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
         M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
         M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
         M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
-        M1.outgoing_window = 5
-        M2.incoming_window = 5
-        M3.incoming_window = 5
-        M4.incoming_window = 5
-
         M1.start()
         M2.start()
         M3.start()
         M4.start()
 
         M2.subscribe(addr)
+        self.routers[0].wait_address("closest.1", 0, 1)
         M3.subscribe(addr)
         M4.subscribe(addr)
-        self.routers[0].wait_address("multicast.2", 1, 1)
+        self.routers[0].wait_address("closest.1", 1, 1)
 
         tm = Message()
         rm = Message()
 
         tm.address = addr
-        for i in range(2):
+        for i in range(30):
             tm.body = {'number': i}
             M1.put(tm)
-        M1.send(0)
-
-        for i in range(2):
-            M2.recv(1)
-            trk = M2.get(rm)
-            M2.accept(trk)
-            M2.settle(trk)
-            self.assertEqual(i, rm.body['number'])
+        M1.send()
 
+        i = 0
+        rx_set = []
+        for i in range(30):
             M3.recv(1)
-            trk = M3.get(rm)
-            M3.accept(trk)
-            M3.settle(trk)
-            self.assertEqual(i, rm.body['number'])
+            M3.get(rm)
+            rx_set.append(rm.body['number'])
+
+        try:
+            M2.recv(1)
+            self.assertEqual(0, "Unexpected messages arrived on M2")
+        except AssertionError:
+            raise
+        except Exception:
+            pass
 
+        try:
             M4.recv(1)
-            trk = M4.get(rm)
-            M4.accept(trk)
-            M4.settle(trk)
-            self.assertEqual(i, rm.body['number'])
+            self.assertEqual(0, "Unexpected messages arrived on M4")
+        except AssertionError:
+            raise
+        except Exception:
+            pass
+
+        self.assertEqual(30, len(rx_set))
+        rx_set.sort()
+        for i in range(30):
+            self.assertEqual(i, rx_set[i])
 
         M1.stop()
         M2.stop()
         M3.stop()
         M4.stop()
 
-
-    def test_02c_sender_settles_first(self):
-        addr = "amqp:/closest.senderfirst.1"
+    def test_07_semantics_closest_is_remote(self):
+        addr = "amqp:/closest.2"
         M1 = self.messenger()
         M2 = self.messenger()
+        M3 = self.messenger()
+        M4 = self.messenger()
 
         M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
         M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
-        M1.outgoing_window = 5
-        M2.incoming_window = 5
+        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
         M1.start()
         M2.start()
+        M3.start()
+        M4.start()
+
         M2.subscribe(addr)
-        self.routers[0].wait_address("closest.senderfirst.1", 0, 1)
+        M4.subscribe(addr)
+        self.routers[0].wait_address("closest.2", 0, 1)
 
         tm = Message()
         rm = Message()
 
         tm.address = addr
-        tm.body = {'number': 0}
-        ttrk = M1.put(tm)
-        M1.send(0)
+        for i in range(30):
+            tm.body = {'number': i}
+            M1.put(tm)
+        M1.send()
 
-        M1.settle(ttrk)
-        M1.flush()
-        M2.flush()
+        i = 0
+        rx_set = []
+        for i in range(15):
+            M2.recv(1)
+            M2.get(rm)
+            rx_set.append(rm.body['number'])
 
-        M2.recv(1)
-        rtrk = M2.get(rm)
-        M2.accept(rtrk)
-        M2.settle(rtrk)
-        self.assertEqual(0, rm.body['number'])
+            M4.recv(1)
+            M4.get(rm)
+            rx_set.append(rm.body['number'])
 
-        M1.flush()
-        M2.flush()
+        self.assertEqual(30, len(rx_set))
+        rx_set.sort()
+        for i in range(30):
+            self.assertEqual(i, rx_set[i])
 
         M1.stop()
         M2.stop()
+        M3.stop()
+        M4.stop()
 
-
-    def test_03_propagated_disposition(self):
-        addr = "amqp:/unsettled/2"
+    def test_08_semantics_spread(self):
+        addr = "amqp:/spread.1"
         M1 = self.messenger()
         M2 = self.messenger()
+        M3 = self.messenger()
+        M4 = self.messenger()
+
+        M2.timeout = 0.1
+        M3.timeout = 0.1
+        M4.timeout = 0.1
 
         M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
         M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
-        M1.outgoing_window = 5
-        M2.incoming_window = 5
+        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
 
         M1.start()
         M2.start()
+        M3.start()
+        M4.start()
         M2.subscribe(addr)
-        self.routers[0].wait_address("unsettled/2", 0, 1)
+        M3.subscribe(addr)
+        M4.subscribe(addr)
+        self.routers[0].wait_address("spread.1", 1, 1)
 
         tm = Message()
         rm = Message()
 
         tm.address = addr
-        tm.body = {'number': 0}
+        for i in range(50):
+            tm.body = {'number': i}
+            M1.put(tm)
+        M1.send()
 
-        ##
-        ## Test ACCEPT
-        ##
-        tx_tracker = M1.put(tm)
-        M1.send(0)
-        M2.recv(1)
-        rx_tracker = M2.get(rm)
-        self.assertEqual(0, rm.body['number'])
-        self.assertEqual(PENDING, M1.status(tx_tracker))
+        i = 0
+        rx_set = []
+        ca = 0
+        cb = 0
+        cc = 0
 
-        M2.accept(rx_tracker)
-        M2.settle(rx_tracker)
+        while len(rx_set) < 50:
+            try:
+                M2.recv(1)
+                M2.get(rm)
+                rx_set.append(rm.body['number'])
+                ca += 1
+            except:
+                pass
 
-        M2.flush()
-        M1.flush()
+            try:
+                M3.recv(1)
+                M3.get(rm)
+                rx_set.append(rm.body['number'])
+                cb += 1
+            except:
+                pass
+
+            try:
+                M4.recv(1)
+                M4.get(rm)
+                rx_set.append(rm.body['number'])
+                cc += 1
+            except:
+                pass
+
+        self.assertEqual(50, len(rx_set))
+        rx_set.sort()
+        for i in range(50):
+            self.assertEqual(i, rx_set[i])
+        self.assertTrue(ca > 0)
+        self.assertTrue(cb > 0)
+        self.assertTrue(cc > 0)
+
+        M1.stop()
+        M2.stop()
+        M3.stop()
+        M4.stop()
+
+    def test_9_to_override(self):
+        addr = "amqp:/toov/1"
+        M1 = self.messenger()
+        M2 = self.messenger()
+
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+
+        M1.start()
+        M2.start()
+        M2.subscribe(addr)
+        self.routers[0].wait_address("toov/1", 0, 1)
+
+        tm = Message()
+        rm = Message()
+
+        tm.address = addr
+
+        ##
+        ## Pre-existing TO
+        ##
+        tm.annotations = {'x-opt-qd.to': 'toov/1'}
+        for i in range(10):
+            tm.body = {'number': i}
+            M1.put(tm)
+        M1.send()
+
+        for i in range(10):
+            M2.recv(1)
+            M2.get(rm)
+            self.assertEqual(i, rm.body['number'])
+            ma = rm.annotations
+            self.assertEqual(ma.__class__, dict)
+            self.assertEqual(ma['x-opt-qd.to'], 'toov/1')
+
+        M1.stop()
+        M2.stop()
+
+    def test_10_propagated_disposition(self):
+        addr = "amqp:/unsettled/2"
+        M1 = self.messenger()
+        M2 = self.messenger()
+
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
+
+        M1.outgoing_window = 5
+        M2.incoming_window = 5
+
+        M1.start()
+        M2.start()
+        M2.subscribe(addr)
+        self.routers[0].wait_address("unsettled/2", 0, 1)
+
+        tm = Message()
+        rm = Message()
+
+        tm.address = addr
+        tm.body = {'number': 0}
+
+        ##
+        ## Test ACCEPT
+        ##
+        tx_tracker = M1.put(tm)
+        M1.send(0)
+        M2.recv(1)
+        rx_tracker = M2.get(rm)
+        self.assertEqual(0, rm.body['number'])
+        self.assertEqual(PENDING, M1.status(tx_tracker))
+
+        M2.accept(rx_tracker)
+        M2.settle(rx_tracker)
+
+        M2.flush()
+        M1.flush()
 
         self.assertEqual(ACCEPTED, M1.status(tx_tracker))
 
@@ -296,31 +537,7 @@ class TwoRouterTest(TestCase):
         M1.stop()
         M2.stop()
 
-
-    def test_04_unsettled_undeliverable(self):
-        addr = self.routers[0].addresses[0]+"/unsettled_undeliverable/1"
-        M1 = self.messenger()
-
-        M1.outgoing_window = 5
-
-        M1.start()
-        M1.timeout = 1
-        tm = Message()
-        tm.address = addr
-        tm.body = {'number': 200}
-
-        exception = False
-        try:
-            M1.put(tm)
-            M1.send(0)
-            M1.flush()
-        except Exception:
-            exception = True
-
-        M1.stop()
-
-
-    def test_05_three_ack(self):
+    def test_11_three_ack(self):
         addr = "amqp:/three_ack/1"
         M1 = self.messenger()
         M2 = self.messenger()
@@ -374,958 +591,673 @@ class TwoRouterTest(TestCase):
         M1.stop()
         M2.stop()
 
+    def test_12_excess_deliveries_released(self):
+        """
+        Message-route a series of deliveries where the receiver provides credit for a subset and
+        once received, closes the link.  The remaining deliveries should be released back to the sender.
+        """
+        test = ExcessDeliveriesReleasedTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
 
-    def notest_06_link_route_sender(self):
-        pass
-
-    def notest_07_link_route_receiver(self):
-        pass
-
+    def test_15_attach_on_inter_router(self):
+        test = AttachOnInterRouterTest(self.routers[0].addresses[5])
+        test.run()
+        self.assertEqual(None, test.error)
 
-    def test_08_message_annotations(self):
-        addr = "amqp:/ma/1"
+    def test_16_delivery_annotations(self):
+        addr = "amqp:/delivery_annotations.1"
         M1 = self.messenger()
         M2 = self.messenger()
 
         M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
         M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
         M1.start()
         M2.start()
         M2.subscribe(addr)
-        self.routers[0].wait_address("ma/1", 0, 1)
 
         tm = Message()
         rm = Message()
 
-        tm.address = addr
+        self.routers[0].wait_address("delivery_annotations.1", 0, 1)
 
-        ##
-        ## No inbound message annotations
-        ##
-        for i in range(10):
-            tm.body = {'number': i}
-            M1.put(tm)
+        tm.annotations = {'a1': 'a1', 'b1': 'b2'}
+        tm.address = addr
+        tm.instructions = {'work': 'hard', 'stay': 'humble'}
+        tm.body = {'number': 38}
+        M1.put(tm)
         M1.send()
 
-        for i in range(10):
-            M2.recv(1)
-            M2.get(rm)
-            self.assertEqual(i, rm.body['number'])
-            ma = rm.annotations
-            self.assertEqual(ma.__class__, dict)
-            self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR.A')
-            self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR.A', '0/QDR.B'])
+        M2.recv(1)
+        M2.get(rm)
+        self.assertEqual(38, rm.body['number'])
 
         M1.stop()
         M2.stop()
 
+    def test_17_address_wildcard(self):
+        # verify proper distribution is selected by wildcard
+        addresses = [
+            # (address, count of messages expected to be received)
+            ('a.b.c.d',   1), # closest 'a.b.c.d'
+            ('b.c.d',     2), # multi   '#.b.c.d'
+            ('f.a.b.c.d', 2), # multi   '#.b.c.d
+            ('a.c.d',     2), # multi   'a.*.d'
+            ('a/c/c/d',   1), # closest 'a/*/#.d
+            ('a/x/z/z/d', 1), # closest 'a/*/#.d
+            ('a/x/d',     1), # closest 'a.x.d'
+            ('a.x.e',     1), # balanced  ----
+            ('m.b.c.d',   2)  # multi   '*/b/c/d'
+        ]
 
-    #The stripAnnotations property is set to 'no'
-    def test_08a_test_strip_message_annotations_no(self):
-        addr = "amqp:/message_annotations_strip_no/1"
+        # two receivers per address - one for each router
+        receivers = []
+        for a in addresses:
+            for x in range(2):
+                M = self.messenger(timeout=0.1)
+                M.route("amqp:/*", self.routers[x].addresses[0]+"/$1")
+                M.start()
+                M.subscribe('amqp:/' + a[0])
+                receivers.append(M)
+            self.routers[0].wait_address(a[0], 1, 1)
+            self.routers[1].wait_address(a[0], 1, 1)
 
+        # single sender sends one message to each address
         M1 = self.messenger()
-        M2 = self.messenger()
-
-        M1.route("amqp:/*", self.routers[0].addresses[1]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[1]+"/$1")
-
+        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
         M1.start()
-        M2.start()
-        M2.subscribe(addr)
-        self.routers[0].wait_address("message_annotations_strip_no/1", 0, 1)
+        for a in addresses:
+            tm = Message()
+            tm.address = 'amqp:/' + a[0]
+            tm.body = {'address': a[0]}
+            M1.put(tm)
+            M1.send()
 
-        ingress_message = Message()
-        ingress_message.address = addr
-        ingress_message.body = {'message': 'Hello World!'}
-        ingress_message_annotations = {'work': 'hard', 'stay': 'humble'}
+        # gather all received messages
+        msgs_recvd = {}
+        rm = Message()
+        for M in receivers:
+            try:
+                while True:
+                    M.recv(1)
+                    M.get(rm)
+                    index = rm.body.get('address', "ERROR")
+                    if index not in msgs_recvd:
+                        msgs_recvd[index] = 0
+                    msgs_recvd[index] += 1
+            except Exception as exc:
+                self.assertTrue("None" in str(exc))
 
-        ingress_message.annotations = ingress_message_annotations
+        # verify expected count == actual count
+        self.assertTrue("ERROR" not in msgs_recvd)
+        for a in addresses:
+            self.assertTrue(a[0] in msgs_recvd)
+            self.assertEqual(a[1], msgs_recvd[a[0]])
 
-        M1.put(ingress_message)
-        M1.send()
+        M1.stop()
+        for M in receivers:
+            M.stop()
 
-        # Receive the message
-        M2.recv(1)
-        egress_message = Message()
-        M2.get(egress_message)
+    def test_17_large_streaming_test(self):
+        test = LargeMessageStreamTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
 
-        #Make sure 'Hello World!' is in the message body dict
-        self.assertEqual('Hello World!', egress_message.body['message'])
 
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
 
-        egress_message_annotations = egress_message.annotations
+    def on_timer_task(self, event):
+        self.parent.timeout()
 
-        self.assertEqual(egress_message_annotations.__class__, dict)
-        self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], '0/QDR.A')
-        self.assertEqual(egress_message_annotations['work'], 'hard')
-        self.assertEqual(egress_message_annotations['stay'], 'humble')
-        self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR.A', '0/QDR.B'])
+class LargeMessageStreamTest(MessagingHandler):
+    def __init__(self, address1, address2):
+        super(LargeMessageStreamTest, self).__init__()
+        self.address1 = address1
+        self.address2 = address2
+        self.dest = "LargeMessageStreamTest"
+        self.error = None
+        self.conn1 = None
+        self.conn2 = None
+        self.count = 10
+        self.n_sent = 0
+        self.timer = None
+        self.sender = None
+        self.receiver = None
+        self.n_received = 0
+        self.body = ""
+        for i in range(10000):
+            self.body += "0123456789101112131415"
 
-        M1.stop()
-        M2.stop()
+    def check_if_done(self):
+        if self.n_received == self.count:
+            self.timer.cancel()
+            self.conn1.close()
+            self.conn2.close()
 
-    # This unit test is currently skipped because dispatch router do not pass thru custom message annotations.
-    # Once the feature is added the @unittest.skip decorator can be removed.
-    # The stripAnnotations property is set to 'no'
-    def test_08a_strip_message_annotations_custom(self):
-        addr = "amqp:/message_annotations_strip_no_custom/1"
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received)
+        self.conn1.close()
+        self.conn2.close()
 
-        M1 = self.messenger()
-        M2 = self.messenger()
-
-        M1.route("amqp:/*", self.routers[0].addresses[1]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[1]+"/$1")
-
-        M1.start()
-        M2.start()
-        M2.subscribe(addr)
-        self.routers[0].wait_address("message_annotations_strip_no_custom/1", 0, 1)
-
-        ingress_message = Message()
-        ingress_message.address = addr
-        ingress_message.body = {'message': 'Hello World!'}
-        ingress_message_annotations = {}
-        ingress_message_annotations['custom-annotation'] = '1/Custom_Annotation'
-
-        ingress_message.annotations = ingress_message_annotations
-
-        M1.put(ingress_message)
-        M1.send()
-
-        # Receive the message
-        M2.recv(1)
-        egress_message = Message()
-        M2.get(egress_message)
-
-        # Make sure 'Hello World!' is in the message body dict
-        self.assertEqual('Hello World!', egress_message.body['message'])
-
-        egress_message_annotations = egress_message.annotations
-
-        self.assertEqual(egress_message_annotations.__class__, dict)
-        self.assertEqual(egress_message_annotations['custom-annotation'], '1/Custom_Annotation')
-        self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], '0/QDR.A')
-        self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR.A', '0/QDR.B'])
-
-        M1.stop()
-        M2.stop()
-
-    #The stripAnnotations property is set to 'no'
-    def test_08a_test_strip_message_annotations_no_add_trace(self):
-        addr = "amqp:/strip_message_annotations_no_add_trace/1"
-
-        M1 = self.messenger()
-        M2 = self.messenger()
-
-        M1.route("amqp:/*", self.routers[0].addresses[1]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[1]+"/$1")
-
-        M1.start()
-        M2.start()
-        M2.subscribe(addr)
-        self.routers[0].wait_address("strip_message_annotations_no_add_trace/1", 0, 1)
-
-        ingress_message = Message()
-        ingress_message.address = addr
-        ingress_message.body = {'message': 'Hello World!'}
-
-        ##
-        ## Pre-existing ingress and trace
-        ##
-        #ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['0/QDR.1']}
-        ingress_message_annotations = {'x-opt-qd.trace': ['0/QDR.1']}
-        ingress_message.annotations = ingress_message_annotations
-
-        ingress_message.annotations = ingress_message_annotations
-
-        M1.put(ingress_message)
-        M1.send()
-
-        # Receive the message
-        M2.recv(1)
-        egress_message = Message()
-        M2.get(egress_message)
-
-        #Make sure 'Hello World!' is in the message body dict
-        self.assertEqual('Hello World!', egress_message.body['message'])
-
-
-        egress_message_annotations = egress_message.annotations
-
-        self.assertEqual(egress_message_annotations.__class__, dict)
-        self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], '0/QDR.A')
-        self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR.1', '0/QDR.A', '0/QDR.B'])
-
-        M1.stop()
-        M2.stop()
-
-    # Test to see if the dispatch router specific annotations were stripped.
-    # The stripAnnotations property is set to 'both'
-    # Send a message to the router with pre-existing ingress and trace annotations and make sure that nothing comes out.
-    def test_08a_test_strip_message_annotations_both_add_ingress_trace(self):
-        addr = "amqp:/strip_message_annotations_both_add_ingress_trace/1"
-
-        M1 = self.messenger()
-        M2 = self.messenger()
-
-        M1.route("amqp:/*", self.routers[0].addresses[2]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[2]+"/$1")
-
-        M1.start()
-        M2.start()
-        M2.subscribe(addr)
-        self.routers[0].wait_address("strip_message_annotations_both_add_ingress_trace/1", 0, 1)
-
-        ingress_message = Message()
-        ingress_message.address = addr
-        ingress_message.body = {'message': 'Hello World!'}
-
-        ##
-        # Pre-existing ingress and trace. Intentionally populate the trace with the 0/QDR.A which is the trace
-        # of the first router. If the inbound annotations were not stripped, the router would drop this message
-        # since it would consider this message as being looped.
-        #
-        ingress_message_annotations = {'work': 'hard',
-                                       'x-opt-qd': 'humble',
-                                       'x-opt-qd.ingress': 'ingress-router',
-                                       'x-opt-qd.trace': ['0/QDR.A']}
-        ingress_message.annotations = ingress_message_annotations
-
-        #Put and send the message
-        M1.put(ingress_message)
-        M1.send()
-
-        # Receive the message
-        M2.recv(1)
-        egress_message = Message()
-        M2.get(egress_message)
-
-        # Router specific annotations (annotations with prefix "x-opt-qd.") will be stripped. User defined annotations will not be stripped.
-        self.assertEqual(egress_message.annotations, {'work': 'hard', 'x-opt-qd': 'humble'})
-
-        M1.stop()
-        M2.stop()
-
-    # Send in pre-existing trace and ingress and annotations and make sure that there are no outgoing annotations.
-    # stripAnnotations property is set to "in"
-    def test_08a_test_strip_message_annotations_out(self):
-        addr = "amqp:/strip_message_annotations_out/1"
-
-        M1 = self.messenger()
-        M2 = self.messenger()
-
-        M1.route("amqp:/*", self.routers[0].addresses[3]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[3]+"/$1")
-
-        M1.start()
-        M2.start()
-        M2.subscribe(addr)
-        self.routers[0].wait_address("strip_message_annotations_out/1", 0, 1)
-
-        ingress_message = Message()
-        ingress_message.address = addr
-        ingress_message.body = {'message': 'Hello World!'}
-
-        #Put and send the message
-        M1.put(ingress_message)
-        M1.send()
-
-        # Receive the message
-        egress_message = Message()
-        M2.recv(1)
-        M2.get(egress_message)
-
-         #Make sure 'Hello World!' is in the message body dict
-        self.assertEqual('Hello World!', egress_message.body['message'])
-
-        egress_message_annotations = egress_message.annotations
-
-        self.assertEqual(egress_message.annotations, None)
-
-        M1.stop()
-        M2.stop()
-
-    # Send in pre-existing trace and ingress and annotations and make sure that there are no outgoing annotations.
-    # stripAnnotations property is set to "in"
-    def test_08a_test_strip_message_annotations_out_custom(self):
-        # This test puts three items into message_annotations.
-        # Current router code depends on the order of the items in the annotation map and
-        # this code can't coerce python and the underlying messenger to send the items in
-        # any particular order. That is, the order of the items in the code is not equal
-        # to the order of the map items on the wire. Thus the test fails.
-        pass
-        #addr = "amqp:/strip_message_annotations_out/08a"
-
-        #M1 = self.messenger()
-        #M2 = self.messenger()
-
-        #M1.route("amqp:/*", self.routers[0].addresses[3]+"/$1")
-        #M2.route("amqp:/*", self.routers[1].addresses[3]+"/$1")
-
-        #M1.start()
-        #M2.start()
-        #M2.subscribe(addr)
-        #self.routers[0].wait_address("strip_message_annotations_out/08a", 0, 1)
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn1 = event.container.connect(self.address1)
+        self.conn2 = event.container.connect(self.address2)
+        self.sender = event.container.create_sender(self.conn1, self.dest)
+        self.receiver = event.container.create_receiver(self.conn2, self.dest)
 
-        #ingress_message = Message()
-        #ingress_message.address = addr
-        #ingress_message.body = {'message': 'Hello World!'}
+    def on_sendable(self, event):
+        if self.n_sent < self.count:
+            msg = Message(body=self.body)
+            # send(msg) calls the stream function which streams data from sender to the router
+            event.sender.send(msg)
+            self.n_sent += 1
 
-        ## Annotations with prefix "x-opt-qd." will be skipped
-        #ingress_message_annotations = {'work': 'zarg', "x-opt-qd": "custom", "x-opt-qd.": "custom"}
-        #ingress_message.annotations = ingress_message_annotations
+    def on_message(self, event):
+        self.n_received += 1
+        self.check_if_done()
 
-        ## Put and send the message
-        #M1.put(ingress_message)
-        #M1.send()
+    def run(self):
+        Container(self).run()
 
-        ## Receive the message
-        #egress_message = Message()
-        #M2.recv(1)
-        #M2.get(egress_message)
 
-        ## Make sure 'Hello World!' is in the message body dict
-        #self.assertEqual('Hello World!', egress_message.body['message'])
+class ExcessDeliveriesReleasedTest(MessagingHandler):
+    def __init__(self, address1, address2):
+        super(ExcessDeliveriesReleasedTest, self).__init__(prefetch=0)
+        self.address1 = address1
+        self.address2 = address2
+        self.dest = "closest.EDRtest"
+        self.error = None
+        self.sender = None
+        self.receiver = None
+        self.n_sent  = 0
+        self.n_received = 0
+        self.n_accepted = 0
+        self.n_released = 0
+        self.timer = None
+        self.conn1 = None
+        self.conn2 = None
 
-        #self.assertEqual(egress_message.annotations, {'work': 'hard', "x-opt-qd": "custom"})
+    def timeout(self):
+        self.error = "Timeout Expired"
+        self.conn1.close()
+        self.conn2.close()
 
-        #M1.stop()
-        #M2.stop()
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn1 = event.container.connect(self.address1)
+        self.conn2 = event.container.connect(self.address2)
+        self.sender   = event.container.create_sender(self.conn1, self.dest)
+        self.receiver = event.container.create_receiver(self.conn2, self.dest)
+        self.receiver.flow(6)
 
-    #Send in pre-existing trace and ingress and annotations and make sure that they are not in the outgoing annotations.
-    #stripAnnotations property is set to "in"
-    def test_08a_test_strip_message_annotations_in(self):
-        addr = "amqp:/strip_message_annotations_in/1"
+    def on_sendable(self, event):
+        for i in range(10 - self.n_sent):
+            msg = Message(body=i)
+            event.sender.send(msg)
+            self.n_sent += 1
 
-        M1 = self.messenger()
-        M2 = self.messenger()
-        M1.route("amqp:/*", self.routers[0].addresses[4]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[4]+"/$1")
+    def on_accepted(self, event):
+        self.n_accepted += 1
 
-        M1.start()
-        M2.start()
-        M2.subscribe(addr)
-        self.routers[0].wait_address("strip_message_annotations_in/1", 0, 1)
+    def on_released(self, event):
+        self.n_released += 1
+        if self.n_released == 4:
+            if self.n_accepted != 6:
+                self.error = "Expected 6 accepted, got %d" % self.n_accepted
+            if self.n_received != 6:
+                self.error = "Expected 6 received, got %d" % self.n_received
+            self.conn1.close()
+            self.conn2.close()
+            self.timer.cancel()
 
-        ingress_message = Message()
-        ingress_message.address = addr
-        ingress_message.body = {'message': 'Hello World!'}
+    def on_message(self, event):
+        self.n_received += 1
+        if self.n_received == 6:
+            self.receiver.close()
 
-        ##
-        ## Pre-existing ingress and trace
-        ##
-        ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['X/QDR']}
-        ingress_message.annotations = ingress_message_annotations
+    def run(self):
+        Container(self).run()
 
-        #Put and send the message
-        M1.put(ingress_message)
-        M1.send()
 
-        # Receive the message
-        egress_message = Message()
-        M2.recv(1)
-        M2.get(egress_message)
+class AttachOnInterRouterTest(MessagingHandler):
+    """Expect an error when attaching a link to an inter-router listener"""
+    def __init__(self, address):
+        super(AttachOnInterRouterTest, self).__init__(prefetch=0)
+        self.address = address
+        self.dest = "AOIRtest"
+        self.error = None
+        self.sender = None
 
-         #Make sure 'Hello World!' is in the message body dict
-        self.assertEqual('Hello World!', egress_message.body['message'])
+    def timeout(self):
+        self.error = "Timeout Expired"
+        self.conn.close()
 
-        egress_message_annotations = egress_message.annotations
+    def on_start(self, event):
+        self.timer  = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn   = event.container.connect(self.address)
+        self.sender = event.container.create_sender(self.conn, self.dest)
 
-        self.assertEqual(egress_message_annotations.__class__, dict)
-        self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], '0/QDR.A')
-        self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR.A', '0/QDR.B'])
+    def on_link_remote_close(self, event):
+        self.conn.close()
+        self.timer.cancel()
 
-        M1.stop()
-        M2.stop()
+    def run(self):
+        logging.disable(logging.ERROR) # Hide expected log errors
+        try:
+            Container(self).run()
+        finally:
+            logging.disable(logging.NOTSET) # Restore to normal
 
 
-    def test_09_management(self):
-        M = self.messenger()
-        M.start()
-        M.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        sub = M.subscribe("amqp:/#")
-        reply = sub.address
+class DeliveriesInTransit(MessagingHandler):
+    def __init__(self, address1, address2):
+        super(DeliveriesInTransit, self).__init__()
+        self.address1 = address1
+        self.address2 = address2
+        self.dest = "pre_settled.1"
+        self.error = "All messages not received"
+        self.n_sent = 0
+        self.timer = None
+        self.conn1 = None
+        self.conn2 = None
+        self.sender = None
+        self.num_msgs = 104
+        self.sent_count = 0
+        self.received_count = 0
+        self.receiver = None
 
-        request  = Message()
-        response = Message()
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn1 = event.container.connect(self.address1)
+        self.sender = event.container.create_sender(self.conn1, self.dest)
+        self.conn2 = event.container.connect(self.address2)
+        self.receiver = event.container.create_receiver(self.conn2, self.dest)
 
-        request.address    = "amqp:/_local/$management"
-        request.reply_to   = reply
-        request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-MGMT-NODES'}
+    def on_sendable(self, event):
+        if self.n_sent <= self.num_msgs-1:
+            msg = Message(body="Hello World")
+            self.sender.send(msg)
+            self.n_sent += 1
 
-        M.put(request)
-        M.send()
-        M.recv()
-        M.get(response)
+    def check_if_done(self):
+        if self.n_sent == self.received_count:
+            self.error = None
+            self.timer.cancel()
+            self.conn1.close()
+            self.conn2.close()
 
-        assert response.properties['statusCode'] == 200, response.properties['statusDescription']
-        self.assertIn('amqp:/_topo/0/QDR.B/$management', response.body)
+    def on_message(self, event):
+        self.received_count+=1
+        self.check_if_done()
 
-        request.address    = "amqp:/_topo/0/QDR.B/$management"
-        request.reply_to   = reply
-        request.properties = {u'type':u'org.amqp.management', u'name':u'self', u'operation':u'GET-MGMT-NODES'}
+    def run(self):
+        Container(self).run()
 
-        M.put(request)
-        M.send()
-        M.recv()
-        M.get(response)
 
-        self.assertEqual(response.properties['statusCode'], 200)
-        self.assertTrue('amqp:/_topo/0/QDR.A/$management' in response.body)
+class MessageAnnotationsTest(MessagingHandler):
+    def __init__(self, address1, address2):
+        super(MessageAnnotationsTest, self).__init__()
+        self.address1 = address1
+        self.address2 = address2
+        self.dest = "ma/1"
+        self.error = "Message annotations not found"
+        self.timer = None
+        self.conn1 = None
+        self.conn2 = None
+        self.sender = None
+        self.receiver = None
+        self.sent_count = 0
+        self.msg_not_sent = True
 
-        M.stop()
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn1 = event.container.connect(self.address1)
+        self.sender = event.container.create_sender(self.conn1, self.dest)
+        self.conn2 = event.container.connect(self.address2)
+        self.receiver = event.container.create_receiver(self.conn2, self.dest)
 
-    def test_10_semantics_multicast(self):
-        addr = "amqp:/multicast.1"
-        M1 = self.messenger()
-        M2 = self.messenger()
-        M3 = self.messenger()
-        M4 = self.messenger()
-
-        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
-        M1.start()
-        M2.start()
-        M3.start()
-        M4.start()
-
-        M2.subscribe(addr)
-        M3.subscribe(addr)
-        M4.subscribe(addr)
-        self.routers[0].wait_address("multicast.1", 1, 1)
-
-        tm = Message()
-        rm = Message()
-
-        tm.address = addr
-        for i in range(100):
-            tm.body = {'number': i}
-            M1.put(tm)
-        M1.send()
-
-        for i in range(100):
-            try:
-                M2.recv(1)
-                M2.get(rm)
-                self.assertEqual(i, rm.body['number'])
-            except:
-                print "M2 at", i
-
-            try:
-                M3.recv(1)
-                M3.get(rm)
-                self.assertEqual(i, rm.body['number'])
-            except:
-                print "M3 at", i
-
-            try:
-                M4.recv(1)
-                M4.get(rm)
-                self.assertEqual(i, rm.body['number'])
-            except:
-                print "M4 at", i
-
-        M1.stop()
-        M2.stop()
-        M3.stop()
-        M4.stop()
-
-    def test_11a_semantics_closest_is_local(self):
-        addr = "amqp:/closest.1"
-        M1 = self.messenger()
-        M2 = self.messenger()
-        M3 = self.messenger()
-        M4 = self.messenger()
-
-        M2.timeout = 0.1
-        M3.timeout = 0.1
-        M4.timeout = 0.1
-
-        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
-        M1.start()
-        M2.start()
-        M3.start()
-        M4.start()
-
-        M2.subscribe(addr)
-        self.routers[0].wait_address("closest.1", 0, 1)
-        M3.subscribe(addr)
-        M4.subscribe(addr)
-        self.routers[0].wait_address("closest.1", 1, 1)
-
-        tm = Message()
-        rm = Message()
-
-        tm.address = addr
-        for i in range(30):
-            tm.body = {'number': i}
-            M1.put(tm)
-        M1.send()
-
-        i = 0
-        rx_set = []
-        for i in range(30):
-            M3.recv(1)
-            M3.get(rm)
-            rx_set.append(rm.body['number'])
-
-        try:
-            M2.recv(1)
-            self.assertEqual(0, "Unexpected messages arrived on M2")
-        except AssertionError:
-            raise
-        except Exception:
-            pass
-
-        try:
-            M4.recv(1)
-            self.assertEqual(0, "Unexpected messages arrived on M4")
-        except AssertionError:
-            raise
-        except Exception:
-            pass
-
-        self.assertEqual(30, len(rx_set))
-        rx_set.sort()
-        for i in range(30):
-            self.assertEqual(i, rx_set[i])
-
-        M1.stop()
-        M2.stop()
-        M3.stop()
-        M4.stop()
-
-    def test_11b_semantics_closest_is_remote(self):
-        addr = "amqp:/closest.2"
-        M1 = self.messenger()
-        M2 = self.messenger()
-        M3 = self.messenger()
-        M4 = self.messenger()
-
-        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
-        M1.start()
-        M2.start()
-        M3.start()
-        M4.start()
-
-        M2.subscribe(addr)
-        M4.subscribe(addr)
-        self.routers[0].wait_address("closest.2", 0, 1)
-
-        tm = Message()
-        rm = Message()
-
-        tm.address = addr
-        for i in range(30):
-            tm.body = {'number': i}
-            M1.put(tm)
-        M1.send()
-
-        i = 0
-        rx_set = []
-        for i in range(15):
-            M2.recv(1)
-            M2.get(rm)
-            rx_set.append(rm.body['number'])
-
-            M4.recv(1)
-            M4.get(rm)
-            rx_set.append(rm.body['number'])
-
-        self.assertEqual(30, len(rx_set))
-        rx_set.sort()
-        for i in range(30):
-            self.assertEqual(i, rx_set[i])
-
-        M1.stop()
-        M2.stop()
-        M3.stop()
-        M4.stop()
-
-    def test_12_semantics_spread(self):
-        addr = "amqp:/spread.1"
-        M1 = self.messenger()
-        M2 = self.messenger()
-        M3 = self.messenger()
-        M4 = self.messenger()
-
-        M2.timeout = 0.1
-        M3.timeout = 0.1
-        M4.timeout = 0.1
-
-        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-        M3.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M4.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
-        M1.start()
-        M2.start()
-        M3.start()
-        M4.start()
-        M2.subscribe(addr)
-        M3.subscribe(addr)
-        M4.subscribe(addr)
-        self.routers[0].wait_address("spread.1", 1, 1)
-
-        tm = Message()
-        rm = Message()
-
-        tm.address = addr
-        for i in range(50):
-            tm.body = {'number': i}
-            M1.put(tm)
-        M1.send()
-
-        i = 0
-        rx_set = []
-        ca = 0
-        cb = 0
-        cc = 0
-
-        while len(rx_set) < 50:
-            try:
-                M2.recv(1)
-                M2.get(rm)
-                rx_set.append(rm.body['number'])
-                ca += 1
-            except:
-                pass
-
-            try:
-                M3.recv(1)
-                M3.get(rm)
-                rx_set.append(rm.body['number'])
-                cb += 1
-            except:
-                pass
-
-            try:
-                M4.recv(1)
-                M4.get(rm)
-                rx_set.append(rm.body['number'])
-                cc += 1
-            except:
-                pass
-
-        self.assertEqual(50, len(rx_set))
-        rx_set.sort()
-        for i in range(50):
-            self.assertEqual(i, rx_set[i])
-        self.assertTrue(ca > 0)
-        self.assertTrue(cb > 0)
-        self.assertTrue(cc > 0)
-
-        M1.stop()
-        M2.stop()
-        M3.stop()
-        M4.stop()
-
-    def test_13_to_override(self):
-        addr = "amqp:/toov/1"
-        M1 = self.messenger()
-        M2 = self.messenger()
-
-        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-
-        M1.start()
-        M2.start()
-        M2.subscribe(addr)
-        self.routers[0].wait_address("toov/1", 0, 1)
-
-        tm = Message()
-        rm = Message()
-
-        tm.address = addr
-
-        ##
-        ## Pre-existing TO
-        ##
-        tm.annotations = {'x-opt-qd.to': 'toov/1'}
-        for i in range(10):
-            tm.body = {'number': i}
-            M1.put(tm)
-        M1.send()
-
-        for i in range(10):
-            M2.recv(1)
-            M2.get(rm)
-            self.assertEqual(i, rm.body['number'])
-            ma = rm.annotations
-            self.assertEqual(ma.__class__, dict)
-            self.assertEqual(ma['x-opt-qd.to'], 'toov/1')
-
-        M1.stop()
-        M2.stop()
-
-    def test_14_excess_deliveries_released(self):
-        """
-        Message-route a series of deliveries where the receiver provides credit for a subset and
-        once received, closes the link.  The remaining deliveries should be released back to the sender.
-        """
-        test = ExcessDeliveriesReleasedTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
-        test.run()
-        self.assertEqual(None, test.error)
-
-    def test_15_attach_on_inter_router(self):
-        test = AttachOnInterRouterTest(self.routers[0].addresses[5])
-        test.run()
-        self.assertEqual(None, test.error)
-
-    def test_16_delivery_annotations(self):
-        addr = "amqp:/delivery_annotations.1"
-        M1 = self.messenger()
-        M2 = self.messenger()
-
-        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M2.route("amqp:/*", self.routers[1].addresses[0]+"/$1")
-        M1.start()
-        M2.start()
-        M2.subscribe(addr)
-
-        tm = Message()
-        rm = Message()
-
-        self.routers[0].wait_address("delivery_annotations.1", 0, 1)
-
-        tm.annotations = {'a1': 'a1', 'b1': 'b2'}
-        tm.address = addr
-        tm.instructions = {'work': 'hard', 'stay': 'humble'}
-        tm.body = {'number': 38}
-        M1.put(tm)
-        M1.send()
-
-        M2.recv(1)
-        M2.get(rm)
-        self.assertEqual(38, rm.body['number'])
-
-        M1.stop()
-        M2.stop()
-
-    def test_17_address_wildcard(self):
-        # verify proper distribution is selected by wildcard
-        addresses = [
-            # (address, count of messages expected to be received)
-            ('a.b.c.d',   1), # closest 'a.b.c.d'
-            ('b.c.d',     2), # multi   '#.b.c.d'
-            ('f.a.b.c.d', 2), # multi   '#.b.c.d
-            ('a.c.d',     2), # multi   'a.*.d'
-            ('a/c/c/d',   1), # closest 'a/*/#.d
-            ('a/x/z/z/d', 1), # closest 'a/*/#.d
-            ('a/x/d',     1), # closest 'a.x.d'
-            ('a.x.e',     1), # balanced  ----
-            ('m.b.c.d',   2)  # multi   '*/b/c/d'
-        ]
+    def on_sendable(self, event):
+        if self.msg_not_sent:
+            msg = Message(body={'number': 0})
+            event.sender.send(msg)
+            self.msg_not_sent = False
 
-        # two receivers per address - one for each router
-        receivers = []
-        for a in addresses:
-            for x in range(2):
-                M = self.messenger(timeout=0.1)
-                M.route("amqp:/*", self.routers[x].addresses[0]+"/$1")
-                M.start()
-                M.subscribe('amqp:/' + a[0])
-                receivers.append(M)
-            self.routers[0].wait_address(a[0], 1, 1)
-            self.routers[1].wait_address(a[0], 1, 1)
+    def on_message(self, event):
+        if 0 == event.message.body['number']:
+            ma = event.message.annotations
+            if ma['x-opt-qd.ingress'] == '0/QDR.A' and ma['x-opt-qd.trace'] == ['0/QDR.A', '0/QDR.B']:
+                self.error = None
+        self.accept(event.delivery)
+        self.timer.cancel()
+        self.conn1.close()
+        self.conn2.close()
 
-        # single sender sends one message to each address
-        M1 = self.messenger()
-        M1.route("amqp:/*", self.routers[0].addresses[0]+"/$1")
-        M1.start()
-        for a in addresses:
-            tm = Message()
-            tm.address = 'amqp:/' + a[0]
-            tm.body = {'address': a[0]}
-            M1.put(tm)
-            M1.send()
+    def run(self):
+        Container(self).run()
 
-        # gather all received messages
-        msgs_recvd = {}
-        rm = Message()
-        for M in receivers:
-            try:
-                while True:
-                    M.recv(1)
-                    M.get(rm)
-                    index = rm.body.get('address', "ERROR")
-                    if index not in msgs_recvd:
-                        msgs_recvd[index] = 0
-                    msgs_recvd[index] += 1
-            except Exception as exc:
-                self.assertTrue("None" in str(exc))
 
-        # verify expected count == actual count
-        self.assertTrue("ERROR" not in msgs_recvd)
-        for a in addresses:
-            self.assertTrue(a[0] in msgs_recvd)
-            self.assertEqual(a[1], msgs_recvd[a[0]])
+class MessageAnnotationsStripTest(MessagingHandler):
+    def __init__(self, address1, address2):
+        super(MessageAnnotationsStripTest, self).__init__()
+        self.address1 = address1
+        self.address2 = address2
+        self.dest = "message_annotations_strip_no/1"
+        self.error = "Message annotations not found"
+        self.timer = None
+        self.conn1 = None
+        self.conn2 = None
+        self.sender = None
+        self.receiver = None
+        self.sent_count = 0
+        self.msg_not_sent = True
 
-        M1.stop()
-        for M in receivers:
-            M.stop()
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn1 = event.container.connect(self.address1)
+        self.sender = event.container.create_sender(self.conn1, self.dest)
+        self.conn2 = event.container.connect(self.address2)
+        self.receiver = event.container.create_receiver(self.conn2, self.dest)
 
-    def test_17_large_streaming_test(self):
-        test = LargeMessageStreamTest(self.routers[0].addresses[0], self.routers[1].addresses[0])
-        test.run()
-        self.assertEqual(None, test.error)
+    def on_sendable(self, event):
+        if self.msg_not_sent:
+            msg = Message(body={'number': 0})
+            ingress_message_annotations = {'work': 'hard', 'stay': 'humble'}
+            msg.annotations = ingress_message_annotations
+            event.sender.send(msg)
+            self.msg_not_sent = False
 
+    def on_message(self, event):
+        if 0 == event.message.body['number']:
+            ma = event.message.annotations
+            if ma['x-opt-qd.ingress'] == '0/QDR.A' and ma['x-opt-qd.trace'] == ['0/QDR.A', '0/QDR.B'] \
+                    and ma['work'] == 'hard' and ma['stay'] == 'humble':
+                self.error = None
+        self.accept(event.delivery)
+        self.timer.cancel()
+        self.conn1.close()
+        self.conn2.close()
 
-class Timeout(object):
-    def __init__(self, parent):
-        self.parent = parent
+    def run(self):
+        Container(self).run()
 
-    def on_timer_task(self, event):
-        self.parent.timeout()
 
-class LargeMessageStreamTest(MessagingHandler):
+class MessageAnnotationSstripMessageAnnotationsInn(MessagingHandler):
     def __init__(self, address1, address2):
-        super(LargeMessageStreamTest, self).__init__()
+        super(MessageAnnotationSstripMessageAnnotationsInn, self).__init__()
         self.address1 = address1
         self.address2 = address2
-        self.dest = "LargeMessageStreamTest"
-        self.error = None
+        self.dest = "strip_message_annotations_in/1"
+        self.error = "Message annotations not found"
+        self.timer = None
         self.conn1 = None
         self.conn2 = None
-        self.count = 10
-        self.n_sent = 0
-        self.timer = None
         self.sender = None
         self.receiver = None
-        self.n_received = 0
-        self.body = ""
-        for i in range(10000):
-            self.body += "0123456789101112131415"
+        self.sent_count = 0
+        self.msg_not_sent = True
 
-    def check_if_done(self):
-        if self.n_received == self.count:
-            self.timer.cancel()
-            self.conn1.close()
-            self.conn2.close()
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn1 = event.container.connect(self.address1)
+        self.sender = event.container.create_sender(self.conn1, self.dest)
+        self.conn2 = event.container.connect(self.address2)
+        self.receiver = event.container.create_receiver(self.conn2, self.dest)
 
-    def timeout(self):
-        self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received)
+    def on_sendable(self, event):
+        if self.msg_not_sent:
+            msg = Message(body={'number': 0})
+            #
+            # Pre-existing ingress and trace
+            #
+            ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['X/QDR']}
+            msg.annotations = ingress_message_annotations
+            event.sender.send(msg)
+            self.msg_not_sent = False
+
+    def on_message(self, event):
+        if 0 == event.message.body['number']:
+            if event.message.annotations['x-opt-qd.ingress'] == '0/QDR.A' \
+                    and event.message.annotations['x-opt-qd.trace'] == ['0/QDR.A', '0/QDR.B']:
+                self.error = None
+        self.timer.cancel()
         self.conn1.close()
         self.conn2.close()
 
+    def run(self):
+        Container(self).run()
+
+
+class MessageAnnotationsStripMessageAnnotationsOut(MessagingHandler):
+    def __init__(self, address1, address2):
+        super(MessageAnnotationsStripMessageAnnotationsOut, self).__init__()
+        self.address1 = address1
+        self.address2 = address2
+        self.dest = "strip_message_annotations_out/1"
+        self.error = "Message annotations not found"
+        self.timer = None
+        self.conn1 = None
+        self.conn2 = None
+        self.sender = None
+        self.receiver = None
+        self.sent_count = 0
+        self.msg_not_sent = True
+
     def on_start(self, event):
         self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
         self.conn1 = event.container.connect(self.address1)
-        self.conn2 = event.container.connect(self.address2)
         self.sender = event.container.create_sender(self.conn1, self.dest)
+        self.conn2 = event.container.connect(self.address2)
         self.receiver = event.container.create_receiver(self.conn2, self.dest)
 
     def on_sendable(self, event):
-        if self.n_sent < self.count:
-            msg = Message(body=self.body)
-            # send(msg) calls the stream function which streams data from sender to the router
+        if self.msg_not_sent:
+            msg = Message(body={'number': 0})
             event.sender.send(msg)
-            self.n_sent += 1
+            self.msg_not_sent = False
 
     def on_message(self, event):
-        self.n_received += 1
-        self.check_if_done()
+        if 0 == event.message.body['number']:
+            if event.message.annotations is None:
+                self.error = None
+        self.timer.cancel()
+        self.conn1.close()
+        self.conn2.close()
 
     def run(self):
         Container(self).run()
 
-class ExcessDeliveriesReleasedTest(MessagingHandler):
+
+class MessageAnnotationsStripBothAddIngressTrace(MessagingHandler):
     def __init__(self, address1, address2):
-        super(ExcessDeliveriesReleasedTest, self).__init__(prefetch=0)
+        super(MessageAnnotationsStripBothAddIngressTrace, self).__init__()
         self.address1 = address1
         self.address2 = address2
-        self.dest = "closest.EDRtest"
-        self.error = None
+        self.dest = "strip_message_annotations_both_add_ingress_trace/1"
+        self.error = "Message annotations not found"
+        self.timer = None
+        self.conn1 = None
+        self.conn2 = None
         self.sender = None
         self.receiver = None
-        self.n_sent     = 0
-        self.n_received = 0
-        self.n_accepted = 0
-        self.n_released = 0
+        self.sent_count = 0
+        self.msg_not_sent = True
 
-    def timeout(self):
-        self.error = "Timeout Expired"
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn1 = event.container.connect(self.address1)
+        self.sender = event.container.create_sender(self.conn1, self.dest)
+        self.conn2 = event.container.connect(self.address2)
+        self.receiver = event.container.create_receiver(self.conn2, self.dest)
+
+    def on_sendable(self, event):
+        if self.msg_not_sent:
+            msg = Message(body={'number': 0})
+            ingress_message_annotations = {'work': 'hard',
+                                           'x-opt-qd': 'humble',
+                                           'x-opt-qd.ingress': 'ingress-router',
+                                           'x-opt-qd.trace': ['0/QDR.A']}
+            msg.annotations = ingress_message_annotations
+            event.sender.send(msg)
+            self.msg_not_sent = False
+
+    def on_message(self, event):
+        if 0 == event.message.body['number']:
+            if event.message.annotations == {'work': 'hard', 'x-opt-qd': 'humble'}:
+                self.error = None
+        self.timer.cancel()
         self.conn1.close()
         self.conn2.close()
 
+    def run(self):
+        Container(self).run()
+
+
+class MessageAnnotationsStripAddTraceTest(MessagingHandler):
+    def __init__(self, address1, address2):
+        super(MessageAnnotationsStripAddTraceTest, self).__init__()
+        self.address1 = address1
+        self.address2 = address2
+        self.dest = "message_annotations_strip_no/1"
+        self.error = "Message annotations not found"
+        self.timer = None
+        self.conn1 = None
+        self.conn2 = None
+        self.sender = None
+        self.receiver = None
+        self.sent_count = 0
+        self.msg_not_sent = True
+
     def on_start(self, event):
         self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
         self.conn1 = event.container.connect(self.address1)
+        self.sender = event.container.create_sender(self.conn1, self.dest)
         self.conn2 = event.container.connect(self.address2)
-        self.sender   = event.container.create_sender(self.conn1, self.dest)
         self.receiver = event.container.create_receiver(self.conn2, self.dest)
-        self.receiver.flow(6)
 
     def on_sendable(self, event):
-        for i in range(10 - self.n_sent):
-            msg = Message(body=i)
+        if self.msg_not_sent:
+            msg = Message(body={'number': 0})
+            ingress_message_annotations = {'x-opt-qd.trace': ['0/QDR.1']}
+            msg.annotations = ingress_message_annotations
             event.sender.send(msg)
-            self.n_sent += 1
+            self.msg_not_sent = False
 
-    def on_accepted(self, event):
-        self.n_accepted += 1
+    def on_message(self, event):
+        if 0 == event.message.body['number']:
+            ma = event.message.annotations
+            if ma['x-opt-qd.ingress'] == '0/QDR.A' and ma['x-opt-qd.trace'] == ['0/QDR.1', '0/QDR.A', '0/QDR.B']:
+                self.error = None
+        self.accept(event.delivery)
+        self.timer.cancel()
+        self.conn1.close()
+        self.conn2.close()
 
-    def on_released(self, event):
-        self.n_released += 1
-        if self.n_released == 4:
-            if self.n_accepted != 6:
-                self.error = "Expected 6 accepted, got %d" % self.n_accepted
-            if self.n_received != 6:
-                self.error = "Expected 6 received, got %d" % self.n_received
-            self.conn1.close()
-            self.conn2.close()
-            self.timer.cancel()
+    def run(self):
+        Container(self).run()
+
+
+
+class SenderSettlesFirst(MessagingHandler):
+    def __init__(self, address1, address2):
+        super(SenderSettlesFirst, self).__init__(auto_accept=False)
+        self.address1 = address1
+        self.address2 = address2
+        self.dest = "closest.senderfirst.1"
+        self.error = "Message body received differs from the one sent"
+        self.n_sent = 0
+        self.timer = None
+        self.conn1 = None
+        self.conn2 = None
+        self.sender = None
+        self.sent_count = 0
+        self.received_count = 0
+        self.receiver = None
+        self.msg_not_sent = True
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn1 = event.container.connect(self.address1)
+        self.sender = event.container.create_sender(self.conn1, self.dest)
+        self.conn2 = event.container.connect(self.address2)
+        self.receiver = event.container.create_receiver(self.conn2, self.dest)
+
+    def on_sendable(self, event):
+        if self.msg_not_sent:
+            msg = Message(body={'number': 0})
+            dlv = event.sender.send(msg)
+            dlv.settle()
+            self.msg_not_sent = False
 
     def on_message(self, event):
-        self.n_received += 1
-        if self.n_received == 6:
-            self.receiver.close()
+        if 0 == event.message.body['number']:
+            self.error = None
+        self.accept(event.delivery)
+        self.timer.cancel()
+        self.conn1.close()
+        self.conn2.close()
 
     def run(self):
         Container(self).run()
 
 
-class AttachOnInterRouterTest(MessagingHandler):
-    """Expect an error when attaching a link to an inter-router listener"""
+class MulticastUnsettled(MessagingHandler):
     def __init__(self, address):
-        super(AttachOnInterRouterTest, self).__init__(prefetch=0)
+        super(MulticastUnsettled, self).__init__()
         self.address = address
-        self.dest = "AOIRtest"
+        self.dest = "multicast.2"
         self.error = None
+        self.n_sent = 0
+        self.count = 3
+        self.n_received_a = 0
+        self.n_received_b = 0
+        self.n_received_c = 0
+        self.timer = None
+        self.conn = None
         self.sender = None
-
-    def timeout(self):
-        self.error = "Timeout Expired"
-        self.conn.close()
+        self.receiver_a = None
+        self.receiver_b = None
+        self.receiver_c = None
 
     def on_start(self, event):
-        self.timer  = event.reactor.schedule(TIMEOUT, Timeout(self))
-        self.conn   = event.container.connect(self.address)
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn = event.container.connect(self.address)
         self.sender = event.container.create_sender(self.conn, self.dest)
+        self.receiver_a = event.container.create_receiver(self.conn, self.dest, name="A")
+        self.receiver_b = event.container.create_receiver(self.conn, self.dest, name="B")
+        self.receiver_c = event.container.create_receiver(self.conn, self.dest, name="C")
 
-    def on_link_remote_close(self, event):
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \
+                     (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c)
         self.conn.close()
-        self.timer.cancel()
+
+    def check_if_done(self):
+        if self.n_received_a + self.n_received_b + self.n_received_c == self.count:
+            self.timer.cancel()
+            self.conn.close()
+
+    def on_sendable(self, event):
+        if self.n_sent == 0:
+            msg = Message(body="Appearance-Test")
+            self.sender.send(msg)
+            self.n_sent += 1
+
+    def on_message(self, event):
+        if event.receiver == self.receiver_a:
+            self.n_received_a += 1
+        if event.receiver == self.receiver_b:
+            self.n_received_b += 1
+        if event.receiver == self.receiver_c:
+            self.n_received_c += 1
+
+    def on_accepted(self, event):
+        self.check_if_done()
 
     def run(self):
-        logging.disable(logging.ERROR) # Hide expected log errors
-        try:
-            Container(self).run()
-        finally:
-            logging.disable(logging.NOTSET) # Restore to normal
+        Container(self).run()
 
 
 if __name__ == '__main__':


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org