You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/10/08 22:33:27 UTC
svn commit: r1530415 - in /qpid/trunk/qpid/extras/dispatch:
src/router_node.c tests/system_tests_one_router.py
Author: tross
Date: Tue Oct 8 20:33:27 2013
New Revision: 1530415
URL: http://svn.apache.org/r1530415
Log:
QPID-5218 - Fixed crash caused by fanned-out non-presettled messages.
Modified:
qpid/trunk/qpid/extras/dispatch/src/router_node.c
qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py
Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1530415&r1=1530414&r2=1530415&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Tue Oct 8 20:33:27 2013
@@ -547,7 +547,7 @@ static void router_rx_handler(void* cont
DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
fanout++;
- if (fanout == 1)
+ if (fanout == 1 && !dx_delivery_settled(delivery))
re->delivery = delivery;
addr->deliveries_transit++;
@@ -569,8 +569,7 @@ static void router_rx_handler(void* cont
dx_delivery_free(delivery, PN_ACCEPTED);
} else if (fanout == 0) {
dx_delivery_free(delivery, PN_RELEASED);
- } else if (fanout > 1)
- dx_delivery_free(delivery, PN_ACCEPTED);
+ }
}
} else {
//
Modified: qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py?rev=1530415&r1=1530414&r2=1530415&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py (original)
+++ qpid/trunk/qpid/extras/dispatch/tests/system_tests_one_router.py Tue Oct 8 20:33:27 2013
@@ -138,6 +138,65 @@ class RouterTest(unittest.TestCase):
M4.stop()
+ def test_2a_multicast_unsettled(self):
+ addr = "amqp://0.0.0.0:20000/pre_settled/multicast/1"
+ M1 = Messenger()
+ M2 = Messenger()
+ M3 = Messenger()
+ M4 = Messenger()
+
+ M1.timeout = 1.0
+ M2.timeout = 1.0
+ M3.timeout = 1.0
+ M4.timeout = 1.0
+
+ M1.outgoing_window = 5
+ M2.incoming_window = 5
+ M3.incoming_window = 5
+ M4.incoming_window = 5
+
+ M1.start()
+ M2.start()
+ M3.start()
+ M4.start()
+ self.subscribe(M2, addr)
+ self.subscribe(M3, addr)
+ self.subscribe(M4, addr)
+
+ tm = Message()
+ rm = Message()
+
+ tm.address = addr
+ for i in range(2):
+ tm.body = {'number': i}
+ M1.put(tm)
+ M1.send(0)
+
+ for i in range(2):
+ M2.recv(1)
+ trk = M2.get(rm)
+ M2.accept(trk)
+ M2.settle(trk)
+ self.assertEqual(i, rm.body['number'])
+
+ M3.recv(1)
+ trk = M3.get(rm)
+ M3.accept(trk)
+ M3.settle(trk)
+ self.assertEqual(i, rm.body['number'])
+
+ M4.recv(1)
+ trk = M4.get(rm)
+ M4.accept(trk)
+ M4.settle(trk)
+ self.assertEqual(i, rm.body['number'])
+
+ M1.stop()
+ M2.stop()
+ M3.stop()
+ M4.stop()
+
+
def test_3_propagated_disposition(self):
addr = "amqp://0.0.0.0:20000/unsettled/1"
M1 = Messenger()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org