You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/12/06 23:36:15 UTC
svn commit: r1548737 - in /qpid/dispatch/trunk:
include/qpid/dispatch/container.h
python/qpid_dispatch_internal/router/engine.py src/container.c
src/router_node.c tests/system_tests_one_router.py tools/qdstat.in
tools/qdtest.in
Author: tross
Date: Fri Dec 6 22:36:14 2013
New Revision: 1548737
URL: http://svn.apache.org/r1548737
Log:
QPID-5397
- Redesigned the fix to the mutex problem
- Added a test to detect the settlement-collision problem
- Increased the verbosity of the system tests in qdtest
- Some minor cleanup in the python router and qdstat
Modified:
qpid/dispatch/trunk/include/qpid/dispatch/container.h
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
qpid/dispatch/trunk/src/container.c
qpid/dispatch/trunk/src/router_node.c
qpid/dispatch/trunk/tests/system_tests_one_router.py
qpid/dispatch/trunk/tools/qdstat.in
qpid/dispatch/trunk/tools/qdtest.in
Modified: qpid/dispatch/trunk/include/qpid/dispatch/container.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/container.h?rev=1548737&r1=1548736&r2=1548737&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/container.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/container.h Fri Dec 6 22:36:14 2013
@@ -184,8 +184,11 @@ bool qd_link_drain_changed(qd_link_t *li
* thrown.
*/
qd_delivery_t *qd_delivery(qd_link_t *link, pn_delivery_tag_t tag);
-void qd_delivery_free(qd_delivery_t *delivery, uint64_t final_disposition);
-void qd_delivery_link_peers(qd_delivery_t *left, qd_delivery_t *right);
+void qd_delivery_free_LH(qd_delivery_t *delivery, uint64_t final_disposition);
+void qd_delivery_link_peers_LH(qd_delivery_t *left, qd_delivery_t *right);
+void qd_delivery_unlink_LH(qd_delivery_t *delivery);
+void qd_delivery_fifo_enter_LH(qd_delivery_t *delivery);
+bool qd_delivery_fifo_exit_LH(qd_delivery_t *delivery);
qd_delivery_t *qd_delivery_peer(qd_delivery_t *delivery);
void qd_delivery_set_context(qd_delivery_t *delivery, void *context);
void *qd_delivery_context(qd_delivery_t *delivery);
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py?rev=1548737&r1=1548736&r2=1548737&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py Fri Dec 6 22:36:14 2013
@@ -228,10 +228,7 @@ class RouterEngine:
"""
app_props = {'opcode' : msg.get_opcode() }
self.io_adapter.send(dest, app_props, msg.to_dict())
- if "qdhello" in dest:
- self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
- else:
- self.log(LOG_DEBUG, "SENT: %r dest=%s" % (msg, dest))
+ self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
def node_updated(self, addr, reachable, neighbor):
Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1548737&r1=1548736&r2=1548737&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Fri Dec 6 22:36:14 2013
@@ -73,6 +73,8 @@ struct qd_delivery_t {
void *context;
uint64_t disposition;
qd_link_t *link;
+ int in_fifo;
+ bool pending_delete;
};
ALLOC_DECLARE(qd_delivery_t);
@@ -89,8 +91,6 @@ static int QD_CONTAINER_CLASS_CONTAINER
static int QD_CONTAINER_CLASS_NODE_TYPE = 2;
static int QD_CONTAINER_CLASS_NODE = 3;
-static sys_mutex_t *delivery_lock = 0;
-
typedef struct container_class_t {
qd_container_t *container;
int class_id;
@@ -218,11 +218,13 @@ static void do_receive(pn_delivery_t *pn
if (node) {
if (!delivery) {
delivery = new_qd_delivery_t();
- delivery->pn_delivery = pnd;
- delivery->peer = 0;
- delivery->context = 0;
- delivery->disposition = 0;
- delivery->link = link;
+ delivery->pn_delivery = pnd;
+ delivery->peer = 0;
+ delivery->context = 0;
+ delivery->disposition = 0;
+ delivery->link = link;
+ delivery->in_fifo = 0;
+ delivery->pending_delete = false;
pn_delivery_set_context(pnd, delivery);
}
@@ -485,9 +487,6 @@ qd_container_t *qd_container(qd_dispatch
qd_log(module, LOG_TRACE, "Container Initializing");
qd_server_set_conn_handler(qd, handler, container);
- if (!delivery_lock)
- delivery_lock = sys_mutex();
-
return container;
}
@@ -808,39 +807,70 @@ qd_delivery_t *qd_delivery(qd_link_t *li
return 0;
qd_delivery_t *delivery = new_qd_delivery_t();
- delivery->pn_delivery = pnd;
- delivery->peer = 0;
- delivery->context = 0;
- delivery->disposition = 0;
- delivery->link = link;
+ delivery->pn_delivery = pnd;
+ delivery->peer = 0;
+ delivery->context = 0;
+ delivery->disposition = 0;
+ delivery->link = link;
+ delivery->in_fifo = 0;
+ delivery->pending_delete = false;
pn_delivery_set_context(pnd, delivery);
return delivery;
}
-void qd_delivery_free(qd_delivery_t *delivery, uint64_t final_disposition)
+void qd_delivery_free_LH(qd_delivery_t *delivery, uint64_t final_disposition)
{
if (delivery->pn_delivery) {
if (final_disposition > 0)
pn_delivery_update(delivery->pn_delivery, final_disposition);
pn_delivery_set_context(delivery->pn_delivery, 0);
pn_delivery_settle(delivery->pn_delivery);
+ delivery->pn_delivery = 0;
+ }
+
+ assert(!delivery->peer);
+
+ if (delivery->in_fifo)
+ delivery->pending_delete = true;
+ else {
+ free_qd_delivery_t(delivery);
}
- sys_mutex_lock(delivery_lock);
- if (delivery->peer)
- delivery->peer->peer = 0;
- sys_mutex_unlock(delivery_lock);
- free_qd_delivery_t(delivery);
}
-void qd_delivery_link_peers(qd_delivery_t *right, qd_delivery_t *left)
+void qd_delivery_link_peers_LH(qd_delivery_t *right, qd_delivery_t *left)
{
- sys_mutex_lock(delivery_lock);
right->peer = left;
left->peer = right;
- sys_mutex_unlock(delivery_lock);
+}
+
+
+void qd_delivery_unlink_LH(qd_delivery_t *delivery)
+{
+ if (delivery->peer) {
+ delivery->peer->peer = 0;
+ delivery->peer = 0;
+ }
+}
+
+
+void qd_delivery_fifo_enter_LH(qd_delivery_t *delivery)
+{
+ delivery->in_fifo++;
+}
+
+
+bool qd_delivery_fifo_exit_LH(qd_delivery_t *delivery)
+{
+ delivery->in_fifo--;
+ if (delivery->in_fifo == 0 && delivery->pending_delete) {
+ free_qd_delivery_t(delivery);
+ return false;
+ }
+
+ return true;
}
@@ -871,6 +901,7 @@ pn_delivery_t *qd_delivery_pn(qd_deliver
void qd_delivery_settle(qd_delivery_t *delivery)
{
if (delivery->pn_delivery) {
+ pn_delivery_set_context(delivery->pn_delivery, 0);
pn_delivery_settle(delivery->pn_delivery);
delivery->pn_delivery = 0;
}
Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1548737&r1=1548736&r2=1548737&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Fri Dec 6 22:36:14 2013
@@ -323,10 +323,13 @@ static int router_writable_link_handler(
// with the outgoing delivery. Otherwise, the message arrived pre-settled
// and should be sent presettled.
//
- if (re->delivery)
- qd_delivery_link_peers(re->delivery, delivery);
- else
- qd_delivery_free(delivery, 0); // settle and free
+ sys_mutex_lock(router->lock);
+ if (re->delivery) {
+ if (qd_delivery_fifo_exit_LH(re->delivery))
+ qd_delivery_link_peers_LH(re->delivery, delivery);
+ } else
+ qd_delivery_free_LH(delivery, 0); // settle and free
+ sys_mutex_unlock(router->lock);
pn_link_advance(pn_link);
event_count++;
@@ -348,10 +351,17 @@ static int router_writable_link_handler(
pn_delivery_update(qd_delivery_pn(re->delivery), re->disposition);
event_count++;
}
- if (re->settle) {
- qd_delivery_free(re->delivery, 0);
+
+ sys_mutex_lock(router->lock);
+
+ bool ok = qd_delivery_fifo_exit_LH(re->delivery);
+ if (ok && re->settle) {
+ qd_delivery_unlink_LH(re->delivery);
+ qd_delivery_free_LH(re->delivery, 0);
event_count++;
}
+
+ sys_mutex_unlock(router->lock);
}
free_qd_routed_event_t(re);
@@ -491,9 +501,11 @@ static void router_rx_handler(void* cont
// event. If it's not settled, link it into the event for later handling.
//
if (qd_delivery_settled(delivery))
- qd_delivery_free(delivery, 0);
- else
+ qd_delivery_free_LH(delivery, 0);
+ else {
re->delivery = delivery;
+ qd_delivery_fifo_enter_LH(delivery);
+ }
sys_mutex_unlock(router->lock);
qd_link_activate(clink->link);
@@ -583,8 +595,10 @@ static void router_rx_handler(void* cont
DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
fanout++;
- if (fanout == 1 && !qd_delivery_settled(delivery))
+ if (fanout == 1 && !qd_delivery_settled(delivery)) {
re->delivery = delivery;
+ qd_delivery_fifo_enter_LH(delivery);
+ }
addr->deliveries_egress++;
qd_link_activate(dest_link_ref->link->link);
@@ -656,9 +670,11 @@ static void router_rx_handler(void* cont
DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
fanout++;
- if (fanout == 1 && !qd_delivery_settled(delivery))
+ if (fanout == 1 && !qd_delivery_settled(delivery)) {
re->delivery = delivery;
-
+ qd_delivery_fifo_enter_LH(delivery);
+ }
+
addr->deliveries_transit++;
qd_link_activate(dest_link->link);
}
@@ -675,18 +691,18 @@ static void router_rx_handler(void* cont
// number of copies of the received message that were forwarded.
//
if (handler) {
- qd_delivery_free(delivery, PN_ACCEPTED);
+ qd_delivery_free_LH(delivery, PN_ACCEPTED);
} else if (fanout == 0) {
- qd_delivery_free(delivery, PN_RELEASED);
+ qd_delivery_free_LH(delivery, PN_RELEASED);
} else if (qd_delivery_settled(delivery)) {
- qd_delivery_free(delivery, 0);
+ qd_delivery_free_LH(delivery, 0);
}
}
} else {
//
// Message is invalid. Reject the message.
//
- qd_delivery_free(delivery, PN_REJECTED);
+ qd_delivery_free_LH(delivery, PN_REJECTED);
}
sys_mutex_unlock(router->lock);
@@ -711,8 +727,9 @@ static void router_disp_handler(void* co
bool changed = qd_delivery_disp_changed(delivery);
uint64_t disp = qd_delivery_disp(delivery);
bool settled = qd_delivery_settled(delivery);
- qd_delivery_t *peer = qd_delivery_peer(delivery);
+ sys_mutex_lock(router->lock);
+ qd_delivery_t *peer = qd_delivery_peer(delivery);
if (peer) {
//
// The case where this delivery has a peer.
@@ -727,19 +744,19 @@ static void router_disp_handler(void* co
re->settle = settled;
re->disposition = changed ? disp : 0;
- sys_mutex_lock(router->lock);
+ qd_delivery_fifo_enter_LH(peer);
DEQ_INSERT_TAIL(prl->event_fifo, re);
- sys_mutex_unlock(router->lock);
+ if (settled) {
+ qd_delivery_unlink_LH(delivery);
+ qd_delivery_free_LH(delivery, 0);
+ }
qd_link_activate(peer_link);
}
- }
+ } else if (settled)
+ qd_delivery_free_LH(delivery, 0);
- //
- // In all cases, if this delivery is settled, free it.
- //
- if (settled)
- qd_delivery_free(delivery, 0);
+ sys_mutex_unlock(router->lock);
}
Modified: qpid/dispatch/trunk/tests/system_tests_one_router.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_one_router.py?rev=1548737&r1=1548736&r2=1548737&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_one_router.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_one_router.py Fri Dec 6 22:36:14 2013
@@ -251,6 +251,46 @@ class RouterTest(unittest.TestCase):
M2.stop()
+ def test_2c_sender_settles_first(self):
+ addr = "amqp://0.0.0.0:20000/settled/senderfirst/1"
+ M1 = Messenger()
+ M2 = Messenger()
+
+ M1.timeout = 1.0
+ M2.timeout = 1.0
+
+ M1.outgoing_window = 5
+ M2.incoming_window = 5
+
+ M1.start()
+ M2.start()
+ self.subscribe(M2, addr)
+
+ tm = Message()
+ rm = Message()
+
+ tm.address = addr
+ tm.body = {'number': 0}
+ ttrk = M1.put(tm)
+ M1.send(0)
+
+ M1.settle(ttrk)
+ self.flush(M1)
+ self.flush(M2)
+
+ M2.recv(1)
+ rtrk = M2.get(rm)
+ M2.accept(rtrk)
+ M2.settle(rtrk)
+ self.assertEqual(0, rm.body['number'])
+
+ self.flush(M1)
+ self.flush(M2)
+
+ M1.stop()
+ M2.stop()
+
+
def test_3_propagated_disposition(self):
addr = "amqp://0.0.0.0:20000/unsettled/1"
M1 = Messenger()
@@ -507,6 +547,7 @@ class RouterTest(unittest.TestCase):
reply = "amqp:/temp.reply"
M = Messenger()
+ M.timeout = 2.0
M.start()
M.route("amqp:/*", "amqp://0.0.0.0:20000/$1")
self.subscribe(M, reply)
Modified: qpid/dispatch/trunk/tools/qdstat.in
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/qdstat.in?rev=1548737&r1=1548736&r2=1548737&view=diff
==============================================================================
--- qpid/dispatch/trunk/tools/qdstat.in (original)
+++ qpid/dispatch/trunk/tools/qdstat.in Fri Dec 6 22:36:14 2013
@@ -209,7 +209,7 @@ class BusManager:
row = []
row.append(link['link-type'])
row.append(link['link-dir'])
- if link['link-type'] == "router":
+ if link['link-type'] == "inter-router":
row.append(link['index'])
else:
row.append('-')
Modified: qpid/dispatch/trunk/tools/qdtest.in
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/qdtest.in?rev=1548737&r1=1548736&r2=1548737&view=diff
==============================================================================
--- qpid/dispatch/trunk/tools/qdtest.in (original)
+++ qpid/dispatch/trunk/tools/qdtest.in Fri Dec 6 22:36:14 2013
@@ -26,4 +26,5 @@ fi
echo "Running system_tests_one_router.py"
-python $QPID_DISPATCH_HOME/tests/system_tests_one_router.py
+python $QPID_DISPATCH_HOME/tests/system_tests_one_router.py -v
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org