You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/12/06 23:36:15 UTC

svn commit: r1548737 - in /qpid/dispatch/trunk: include/qpid/dispatch/container.h python/qpid_dispatch_internal/router/engine.py src/container.c src/router_node.c tests/system_tests_one_router.py tools/qdstat.in tools/qdtest.in

Author: tross
Date: Fri Dec  6 22:36:14 2013
New Revision: 1548737

URL: http://svn.apache.org/r1548737
Log:
QPID-5397
  - Redesigned the fix to the mutex problem
  - Added a test to detect the settlement-collision problem
  - Increased the verbosity of the system tests in qdtest
  - Some minor cleanup in the python router and qdstat

Modified:
    qpid/dispatch/trunk/include/qpid/dispatch/container.h
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
    qpid/dispatch/trunk/src/container.c
    qpid/dispatch/trunk/src/router_node.c
    qpid/dispatch/trunk/tests/system_tests_one_router.py
    qpid/dispatch/trunk/tools/qdstat.in
    qpid/dispatch/trunk/tools/qdtest.in

Modified: qpid/dispatch/trunk/include/qpid/dispatch/container.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/container.h?rev=1548737&r1=1548736&r2=1548737&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/container.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/container.h Fri Dec  6 22:36:14 2013
@@ -184,8 +184,11 @@ bool qd_link_drain_changed(qd_link_t *li
  *            thrown.
  */
 qd_delivery_t *qd_delivery(qd_link_t *link, pn_delivery_tag_t tag);
-void qd_delivery_free(qd_delivery_t *delivery, uint64_t final_disposition);
-void qd_delivery_link_peers(qd_delivery_t *left, qd_delivery_t *right);
+void qd_delivery_free_LH(qd_delivery_t *delivery, uint64_t final_disposition);
+void qd_delivery_link_peers_LH(qd_delivery_t *left, qd_delivery_t *right);
+void qd_delivery_unlink_LH(qd_delivery_t *delivery);
+void qd_delivery_fifo_enter_LH(qd_delivery_t *delivery);
+bool qd_delivery_fifo_exit_LH(qd_delivery_t *delivery);
 qd_delivery_t *qd_delivery_peer(qd_delivery_t *delivery);
 void qd_delivery_set_context(qd_delivery_t *delivery, void *context);
 void *qd_delivery_context(qd_delivery_t *delivery);

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py?rev=1548737&r1=1548736&r2=1548737&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py Fri Dec  6 22:36:14 2013
@@ -228,10 +228,7 @@ class RouterEngine:
         """
         app_props = {'opcode' : msg.get_opcode() }
         self.io_adapter.send(dest, app_props, msg.to_dict())
-        if "qdhello" in dest:
-            self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
-        else:
-            self.log(LOG_DEBUG, "SENT: %r dest=%s" % (msg, dest))
+        self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
 
 
     def node_updated(self, addr, reachable, neighbor):

Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1548737&r1=1548736&r2=1548737&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Fri Dec  6 22:36:14 2013
@@ -73,6 +73,8 @@ struct qd_delivery_t {
     void          *context;
     uint64_t       disposition;
     qd_link_t     *link;
+    int            in_fifo;
+    bool           pending_delete;
 };
 
 ALLOC_DECLARE(qd_delivery_t);
@@ -89,8 +91,6 @@ static int QD_CONTAINER_CLASS_CONTAINER 
 static int QD_CONTAINER_CLASS_NODE_TYPE = 2;
 static int QD_CONTAINER_CLASS_NODE      = 3;
 
-static sys_mutex_t *delivery_lock = 0;
-
 typedef struct container_class_t {
     qd_container_t *container;
     int             class_id;
@@ -218,11 +218,13 @@ static void do_receive(pn_delivery_t *pn
         if (node) {
             if (!delivery) {
                 delivery = new_qd_delivery_t();
-                delivery->pn_delivery = pnd;
-                delivery->peer        = 0;
-                delivery->context     = 0;
-                delivery->disposition = 0;
-                delivery->link        = link;
+                delivery->pn_delivery    = pnd;
+                delivery->peer           = 0;
+                delivery->context        = 0;
+                delivery->disposition    = 0;
+                delivery->link           = link;
+                delivery->in_fifo        = 0;
+                delivery->pending_delete = false;
                 pn_delivery_set_context(pnd, delivery);
             }
 
@@ -485,9 +487,6 @@ qd_container_t *qd_container(qd_dispatch
     qd_log(module, LOG_TRACE, "Container Initializing");
     qd_server_set_conn_handler(qd, handler, container);
 
-    if (!delivery_lock)
-        delivery_lock = sys_mutex();
-
     return container;
 }
 
@@ -808,39 +807,70 @@ qd_delivery_t *qd_delivery(qd_link_t *li
         return 0;
 
     qd_delivery_t *delivery = new_qd_delivery_t();
-    delivery->pn_delivery = pnd;
-    delivery->peer        = 0;
-    delivery->context     = 0;
-    delivery->disposition = 0;
-    delivery->link        = link;
+    delivery->pn_delivery    = pnd;
+    delivery->peer           = 0;
+    delivery->context        = 0;
+    delivery->disposition    = 0;
+    delivery->link           = link;
+    delivery->in_fifo        = 0;
+    delivery->pending_delete = false;
     pn_delivery_set_context(pnd, delivery);
 
     return delivery;
 }
 
 
-void qd_delivery_free(qd_delivery_t *delivery, uint64_t final_disposition)
+void qd_delivery_free_LH(qd_delivery_t *delivery, uint64_t final_disposition)
 {
     if (delivery->pn_delivery) {
         if (final_disposition > 0)
             pn_delivery_update(delivery->pn_delivery, final_disposition);
         pn_delivery_set_context(delivery->pn_delivery, 0);
         pn_delivery_settle(delivery->pn_delivery);
+        delivery->pn_delivery = 0;
+    }
+
+    assert(!delivery->peer);
+
+    if (delivery->in_fifo)
+        delivery->pending_delete = true;
+    else {
+        free_qd_delivery_t(delivery);
     }
-    sys_mutex_lock(delivery_lock);
-    if (delivery->peer)
-        delivery->peer->peer = 0;
-    sys_mutex_unlock(delivery_lock);
-    free_qd_delivery_t(delivery);
 }
 
 
-void qd_delivery_link_peers(qd_delivery_t *right, qd_delivery_t *left)
+void qd_delivery_link_peers_LH(qd_delivery_t *right, qd_delivery_t *left)
 {
-    sys_mutex_lock(delivery_lock);
     right->peer = left;
     left->peer  = right;
-    sys_mutex_unlock(delivery_lock);
+}
+
+
+void qd_delivery_unlink_LH(qd_delivery_t *delivery)
+{
+    if (delivery->peer) {
+        delivery->peer->peer = 0;
+        delivery->peer       = 0;
+    }
+}
+
+
+void qd_delivery_fifo_enter_LH(qd_delivery_t *delivery)
+{
+    delivery->in_fifo++;
+}
+
+
+bool qd_delivery_fifo_exit_LH(qd_delivery_t *delivery)
+{
+    delivery->in_fifo--;
+    if (delivery->in_fifo == 0 && delivery->pending_delete) {
+        free_qd_delivery_t(delivery);
+        return false;
+    }
+
+    return true;
 }
 
 
@@ -871,6 +901,7 @@ pn_delivery_t *qd_delivery_pn(qd_deliver
 void qd_delivery_settle(qd_delivery_t *delivery)
 {
     if (delivery->pn_delivery) {
+        pn_delivery_set_context(delivery->pn_delivery, 0);
         pn_delivery_settle(delivery->pn_delivery);
         delivery->pn_delivery = 0;
     }

Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1548737&r1=1548736&r2=1548737&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Fri Dec  6 22:36:14 2013
@@ -323,10 +323,13 @@ static int router_writable_link_handler(
         // with the outgoing delivery.  Otherwise, the message arrived pre-settled
         // and should be sent presettled.
         //
-        if (re->delivery)
-            qd_delivery_link_peers(re->delivery, delivery);
-        else
-            qd_delivery_free(delivery, 0);  // settle and free
+        sys_mutex_lock(router->lock);
+        if (re->delivery) {
+            if (qd_delivery_fifo_exit_LH(re->delivery))
+                qd_delivery_link_peers_LH(re->delivery, delivery);
+        } else
+            qd_delivery_free_LH(delivery, 0);  // settle and free
+        sys_mutex_unlock(router->lock);
 
         pn_link_advance(pn_link);
         event_count++;
@@ -348,10 +351,17 @@ static int router_writable_link_handler(
                 pn_delivery_update(qd_delivery_pn(re->delivery), re->disposition);
                 event_count++;
             }
-            if (re->settle) {
-                qd_delivery_free(re->delivery, 0);
+
+            sys_mutex_lock(router->lock);
+
+            bool ok = qd_delivery_fifo_exit_LH(re->delivery);
+            if (ok && re->settle) {
+                qd_delivery_unlink_LH(re->delivery);
+                qd_delivery_free_LH(re->delivery, 0);
                 event_count++;
             }
+
+            sys_mutex_unlock(router->lock);
         }
 
         free_qd_routed_event_t(re);
@@ -491,9 +501,11 @@ static void router_rx_handler(void* cont
         // event.  If it's not settled, link it into the event for later handling.
         //
         if (qd_delivery_settled(delivery))
-            qd_delivery_free(delivery, 0);
-        else
+            qd_delivery_free_LH(delivery, 0);
+        else {
             re->delivery = delivery;
+            qd_delivery_fifo_enter_LH(delivery);
+        }
 
         sys_mutex_unlock(router->lock);
         qd_link_activate(clink->link);
@@ -583,8 +595,10 @@ static void router_rx_handler(void* cont
                         DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
 
                         fanout++;
-                        if (fanout == 1 && !qd_delivery_settled(delivery))
+                        if (fanout == 1 && !qd_delivery_settled(delivery)) {
                             re->delivery = delivery;
+                            qd_delivery_fifo_enter_LH(delivery);
+                        }
 
                         addr->deliveries_egress++;
                         qd_link_activate(dest_link_ref->link->link);
@@ -656,9 +670,11 @@ static void router_rx_handler(void* cont
                                     DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
 
                                     fanout++;
-                                    if (fanout == 1 && !qd_delivery_settled(delivery))
+                                    if (fanout == 1 && !qd_delivery_settled(delivery)) {
                                         re->delivery = delivery;
-                                
+                                        qd_delivery_fifo_enter_LH(delivery);
+                                    }
+                            
                                     addr->deliveries_transit++;
                                     qd_link_activate(dest_link->link);
                                 }
@@ -675,18 +691,18 @@ static void router_rx_handler(void* cont
             // number of copies of the received message that were forwarded.
             //
             if (handler) {
-                qd_delivery_free(delivery, PN_ACCEPTED);
+                qd_delivery_free_LH(delivery, PN_ACCEPTED);
             } else if (fanout == 0) {
-                qd_delivery_free(delivery, PN_RELEASED);
+                qd_delivery_free_LH(delivery, PN_RELEASED);
             } else if (qd_delivery_settled(delivery)) {
-                qd_delivery_free(delivery, 0);
+                qd_delivery_free_LH(delivery, 0);
             }
         }
     } else {
         //
         // Message is invalid.  Reject the message.
         //
-        qd_delivery_free(delivery, PN_REJECTED);
+        qd_delivery_free_LH(delivery, PN_REJECTED);
     }
 
     sys_mutex_unlock(router->lock);
@@ -711,8 +727,9 @@ static void router_disp_handler(void* co
     bool           changed = qd_delivery_disp_changed(delivery);
     uint64_t       disp    = qd_delivery_disp(delivery);
     bool           settled = qd_delivery_settled(delivery);
-    qd_delivery_t *peer    = qd_delivery_peer(delivery);
 
+    sys_mutex_lock(router->lock);
+    qd_delivery_t *peer = qd_delivery_peer(delivery);
     if (peer) {
         //
         // The case where this delivery has a peer.
@@ -727,19 +744,19 @@ static void router_disp_handler(void* co
             re->settle      = settled;
             re->disposition = changed ? disp : 0;
 
-            sys_mutex_lock(router->lock);
+            qd_delivery_fifo_enter_LH(peer);
             DEQ_INSERT_TAIL(prl->event_fifo, re);
-            sys_mutex_unlock(router->lock);
+            if (settled) {
+                qd_delivery_unlink_LH(delivery);
+                qd_delivery_free_LH(delivery, 0);
+            }
 
             qd_link_activate(peer_link);
         }
-    }
+    } else if (settled)
+        qd_delivery_free_LH(delivery, 0);
 
-    //
-    // In all cases, if this delivery is settled, free it.
-    //
-    if (settled)
-        qd_delivery_free(delivery, 0);
+    sys_mutex_unlock(router->lock);
 }
 
 

Modified: qpid/dispatch/trunk/tests/system_tests_one_router.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_one_router.py?rev=1548737&r1=1548736&r2=1548737&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_one_router.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_one_router.py Fri Dec  6 22:36:14 2013
@@ -251,6 +251,46 @@ class RouterTest(unittest.TestCase):
     M2.stop()
 
 
+  def test_2c_sender_settles_first(self):
+    addr = "amqp://0.0.0.0:20000/settled/senderfirst/1"
+    M1 = Messenger()
+    M2 = Messenger()
+
+    M1.timeout = 1.0
+    M2.timeout = 1.0
+
+    M1.outgoing_window = 5
+    M2.incoming_window = 5
+
+    M1.start()
+    M2.start()
+    self.subscribe(M2, addr)
+
+    tm = Message()
+    rm = Message()
+
+    tm.address = addr
+    tm.body = {'number': 0}
+    ttrk = M1.put(tm)
+    M1.send(0)
+
+    M1.settle(ttrk)
+    self.flush(M1)
+    self.flush(M2)
+
+    M2.recv(1)
+    rtrk = M2.get(rm)
+    M2.accept(rtrk)
+    M2.settle(rtrk)
+    self.assertEqual(0, rm.body['number'])
+
+    self.flush(M1)
+    self.flush(M2)
+
+    M1.stop()
+    M2.stop()
+
+
   def test_3_propagated_disposition(self):
     addr = "amqp://0.0.0.0:20000/unsettled/1"
     M1 = Messenger()
@@ -507,6 +547,7 @@ class RouterTest(unittest.TestCase):
     reply = "amqp:/temp.reply"
 
     M = Messenger()
+    M.timeout = 2.0
     M.start()
     M.route("amqp:/*", "amqp://0.0.0.0:20000/$1")
     self.subscribe(M, reply)

Modified: qpid/dispatch/trunk/tools/qdstat.in
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/qdstat.in?rev=1548737&r1=1548736&r2=1548737&view=diff
==============================================================================
--- qpid/dispatch/trunk/tools/qdstat.in (original)
+++ qpid/dispatch/trunk/tools/qdstat.in Fri Dec  6 22:36:14 2013
@@ -209,7 +209,7 @@ class BusManager:
             row = []
             row.append(link['link-type'])
             row.append(link['link-dir'])
-            if link['link-type'] == "router":
+            if link['link-type'] == "inter-router":
                 row.append(link['index'])
             else:
                 row.append('-')

Modified: qpid/dispatch/trunk/tools/qdtest.in
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/qdtest.in?rev=1548737&r1=1548736&r2=1548737&view=diff
==============================================================================
--- qpid/dispatch/trunk/tools/qdtest.in (original)
+++ qpid/dispatch/trunk/tools/qdtest.in Fri Dec  6 22:36:14 2013
@@ -26,4 +26,5 @@ fi
 
 echo "Running system_tests_one_router.py"
 
-python $QPID_DISPATCH_HOME/tests/system_tests_one_router.py
+python $QPID_DISPATCH_HOME/tests/system_tests_one_router.py -v
+



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org