You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/04/04 15:17:07 UTC

qpid-dispatch git commit: DISPATCH-179 - Updated multicast forwarder to settle deliveries for which there is a non-zero fanout.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master ba920ba14 -> 44f38e4c6


DISPATCH-179 - Updated multicast forwarder to settle deliveries for which there
is a non-zero fanout.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/44f38e4c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/44f38e4c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/44f38e4c

Branch: refs/heads/master
Commit: 44f38e4c62afa385556b5b655d65306948a0c065
Parents: ba920ba
Author: Ted Ross <tr...@redhat.com>
Authored: Mon Apr 4 09:15:53 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Apr 4 09:15:53 2016 -0400

----------------------------------------------------------------------
 src/router_core/forwarder.c      | 28 +++++++++++++++
 tests/system_tests_one_router.py | 67 +++++++++++++++++++++++++++++++++--
 2 files changed, 93 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/44f38e4c/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index eddb079..1a03f93 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -142,6 +142,17 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     bool          bypass_valid_origins = addr->forwarder->bypass_valid_origins;
     int           fanout               = 0;
     qd_bitmask_t *link_exclusion       = !!in_delivery ? in_delivery->link_exclusion : 0;
+    bool          presettled           = !!in_delivery ? in_delivery->settled : true;
+
+    //
+    // If the delivery is not presettled, set the settled flag for forwarding so all
+    // outgoing deliveries will be presettled.
+    //
+    // NOTE:  This is the only multicast mode currently supported.  Others will likely be
+    //        implemented in the future.
+    //
+    if (!presettled)
+        in_delivery->settled = true;
 
     //
     // Forward to local subscribers
@@ -246,6 +257,23 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         }
     }
 
+    if (in_delivery && !presettled) {
+        if (fanout == 0)
+            //
+            // The delivery was not presettled and it was not forwarded to any
+            // destinations, return it to its original unsettled state.
+            //
+            in_delivery->settled = false;
+        else {
+            //
+            // The delivery was not presettled and it was forwarded to at least
+            // one destination.  Accept and settle the delivery.
+            //
+            in_delivery->disposition = PN_ACCEPTED;
+            qdr_delivery_push_CT(core, in_delivery);
+        }
+    }
+
     return fanout;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/44f38e4c/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 94b32dd..5dee9eb 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -1043,7 +1043,7 @@ class RouterTest(TestCase):
         M1.stop()
         M2.stop()
 
-    def test_send_settle_mode_settled(self):
+    def test_14_send_settle_mode_settled(self):
         """
         The receiver sets a snd-settle-mode of settle thus indicating that it wants to receive settled messages from
         the sender. This tests make sure that the delivery that comes to the receiver comes as already settled.
@@ -1053,7 +1053,7 @@ class RouterTest(TestCase):
         self.assertTrue(send_settle_mode_test.message_received)
         self.assertTrue(send_settle_mode_test.delivery_already_settled)
 
-    def test_14_excess_deliveries_released(self):
+    def test_15_excess_deliveries_released(self):
         """
         Message-route a series of deliveries where the receiver provides credit for a subset and
         once received, closes the link.  The remaining deliveries should be released back to the sender.
@@ -1062,6 +1062,19 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_16_multicast_unsettled(self):
+        test = MulticastUnsettledTest(self.address)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
 
 HELLO_WORLD = "Hello World!"
 
@@ -1144,5 +1157,55 @@ class ExcessDeliveriesReleasedTest(MessagingHandler):
     def run(self):
         Container(self).run()
 
+
+class MulticastUnsettledTest(MessagingHandler):
+    def __init__(self, address):
+        super(MulticastUnsettledTest, self).__init__(prefetch=0)
+        self.address = address
+        self.dest = "multicast.MUtest"
+        self.error = None
+        self.count      = 10
+        self.n_sent     = 0
+        self.n_received = 0
+        self.n_accepted = 0
+
+    def check_if_done(self):
+        if self.n_received == self.count * 2 and self.n_accepted == self.count:
+            self.timer.cancel()
+            self.conn.close()
+
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d, received=%d, accepted=%d" % (self.n_sent, self.n_received, self.n_accepted)
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer     = event.reactor.schedule(5, Timeout(self))
+        self.conn      = event.container.connect(self.address)
+        self.sender    = event.container.create_sender(self.conn, self.dest)
+        self.receiver1 = event.container.create_receiver(self.conn, self.dest, name="A")
+        self.receiver2 = event.container.create_receiver(self.conn, self.dest, name="B")
+        self.receiver1.flow(self.count)
+        self.receiver2.flow(self.count)
+
+    def on_sendable(self, event):
+        for i in range(self.count - self.n_sent):
+            msg = Message(body=i)
+            event.sender.send(msg)
+            self.n_sent += 1
+
+    def on_accepted(self, event):
+        self.n_accepted += 1
+        self.check_if_done()
+
+    def on_message(self, event):
+        if not event.delivery.settled:
+            self.error = "Received unsettled delivery"
+        self.n_received += 1
+        self.check_if_done()
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org