You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/03/29 01:22:54 UTC

qpid-dispatch git commit: DISPATCH-179 - system_tests_one_router now passes - Fixed handling of delivery lists in links.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 4ace33e70 -> 19abbfc2c


DISPATCH-179 - system_tests_one_router now passes - Fixed handling of delivery lists in links.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/19abbfc2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/19abbfc2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/19abbfc2

Branch: refs/heads/master
Commit: 19abbfc2c1bdba3ff2e84ab9c4b61f9a76bae794
Parents: 4ace33e
Author: Ted Ross <tr...@redhat.com>
Authored: Mon Mar 28 19:22:10 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Mar 28 19:22:10 2016 -0400

----------------------------------------------------------------------
 src/router_core/connections.c         |   8 +--
 src/router_core/forwarder.c           |  11 +--
 src/router_core/router_core_private.h |   6 ++
 src/router_core/transfer.c            | 107 ++++++++++++++++-------------
 tests/system_tests_one_router.py      |  29 ++++----
 5 files changed, 92 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/19abbfc2/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index ee9ec04..597d858 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -442,7 +442,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
 
     //
     // Free the undelivered deliveries.  If this is an incoming link, the
-    // undelivereds can simply be desetroyed.  If it's an outgoing link, the
+    // undelivereds can simply be destroyed.  If it's an outgoing link, the
     // undelivereds' peer deliveries need to be released.
     //
     qdr_delivery_t *dlv = DEQ_HEAD(undelivered);
@@ -466,10 +466,8 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
         DEQ_REMOVE_HEAD(unsettled);
         peer = dlv->peer;
         qdr_delivery_free(dlv);
-        if (peer) {
+        if (peer)
             peer->peer = 0;
-            qdr_delivery_remove_unsettled_CT(core, peer);
-        }
         dlv = DEQ_HEAD(unsettled);
     }
 
@@ -479,7 +477,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
     sys_mutex_lock(conn->work_lock);
     qdr_del_link_ref(&conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
-    qdr_del_link_ref(&conn->links_with_credit    , link, QDR_LINK_LIST_CLASS_FLOW);
+    qdr_del_link_ref(&conn->links_with_credit,     link, QDR_LINK_LIST_CLASS_FLOW);
     sys_mutex_unlock(conn->work_lock);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/19abbfc2/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 0b305d2..8e41995 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -71,23 +71,23 @@ static bool qdr_forward_attach_null_CT(qdr_core_t     *core,
 }
 
 
-qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg)
+qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, qdr_link_t *link, qd_message_t *msg)
 {
     qdr_delivery_t *dlv = new_qdr_delivery_t();
 
     ZERO(dlv);
     dlv->link    = link;
     dlv->msg     = qd_message_copy(msg);
-    dlv->settled = !peer || peer->settled;
+    dlv->settled = !in_dlv || in_dlv->settled;
     dlv->tag     = core->next_tag++;
 
     //
     // Create peer linkage only if the delivery is not settled
     //
     if (!dlv->settled) {
-        if (peer && peer->peer == 0) {
-            dlv->peer = peer;
-            peer->peer = dlv;  // TODO - make this a back-list for multicast tracking
+        if (in_dlv && in_dlv->peer == 0) {
+            dlv->peer = in_dlv;
+            in_dlv->peer = dlv;  // TODO - make this a back-list for multicast tracking
         }
     }
 
@@ -99,6 +99,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *
 {
     sys_mutex_lock(link->conn->work_lock);
     DEQ_INSERT_TAIL(link->undelivered, dlv);
+    dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
 
     //
     // If the link isn't already on the links_with_deliveries list, put it there.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/19abbfc2/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index dbf3f58..29de0f6 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -187,6 +187,11 @@ struct qdr_router_ref_t {
 ALLOC_DECLARE(qdr_router_ref_t);
 DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t);
 
+typedef enum {
+    QDR_DELIVERY_NOWHERE = 0,
+    QDR_DELIVERY_IN_UNDELIVERED,
+    QDR_DELIVERY_IN_UNSETTLED
+} qdr_delivery_where_t;
 
 struct qdr_delivery_t {
     DEQ_LINKS(qdr_delivery_t);
@@ -198,6 +203,7 @@ struct qdr_delivery_t {
     qd_field_iterator_t *origin;
     uint64_t             disposition;
     bool                 settled;
+    qdr_delivery_where_t where;
     uint64_t             tag;
     qd_bitmask_t        *link_exclusion;
 };

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/19abbfc2/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index e348b3e..0a19444 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -105,8 +105,11 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
         dlv = DEQ_HEAD(link->undelivered);
         if (dlv) {
             DEQ_REMOVE_HEAD(link->undelivered);
-            if (!dlv->settled)
+            if (!dlv->settled) {
                 DEQ_INSERT_TAIL(link->unsettled, dlv);
+                dlv->where = QDR_DELIVERY_IN_UNSETTLED;
+            } else
+                dlv->where = QDR_DELIVERY_NOWHERE;
             credit--;
             link->total_deliveries++;
             offer = DEQ_SIZE(link->undelivered);
@@ -203,20 +206,6 @@ void qdr_delivery_free(qdr_delivery_t *delivery)
 }
 
 
-void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery)
-{
-}
-
-
-void qdr_delivery_remove_unsettled_CT(qdr_core_t *core, qdr_delivery_t *delivery)
-{
-    //
-    // Remove a delivery from its unsettled list.  Side effects include issuing
-    // replacement credit and visiting the link-quiescence algorithm
-    //
-}
-
-
 void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition, bool settled)
 {
     qdr_action_t *action = qdr_action(qdr_update_delivery_CT, "update_delivery");
@@ -257,6 +246,48 @@ qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery)
 // In-Thread Functions
 //==================================================================================
 
+void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery)
+{
+}
+
+
+void qdr_delivery_remove_unsettled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
+{
+    //
+    // Remove a delivery from its unsettled list.  Side effects include issuing
+    // replacement credit and visiting the link-quiescence algorithm
+    //
+    qdr_link_t       *link = dlv->link;
+    qdr_connection_t *conn = link ? link->conn : 0;
+    bool              issue_credit = false;
+
+    if (!link || !conn)
+        return;
+
+    //
+    // The lock needs to be acquired only for outgoing links
+    //
+    if (link->link_direction == QD_OUTGOING)
+        sys_mutex_lock(conn->work_lock);
+
+    if (dlv->where == QDR_DELIVERY_IN_UNSETTLED) {
+        DEQ_REMOVE(link->unsettled, dlv);
+        dlv->where = QDR_DELIVERY_NOWHERE;
+        issue_credit = true;
+    }
+
+    if (link->link_direction == QD_OUTGOING)
+        sys_mutex_unlock(conn->work_lock);
+
+    //
+    // If this is an incoming link and it is not link-routed, issue
+    // one replacement credit on the link.
+    //
+    if (issue_credit && link->link_direction == QD_INCOMING && !link->connected_link)
+        qdr_link_issue_credit_CT(core, link, 1);
+}
+
+
 static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     if (discard)
@@ -316,12 +347,14 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
             // a valid destination).
             //
             DEQ_INSERT_TAIL(link->undelivered, dlv);
+            dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
         } else {
             //
-            // TODO - Release the delivery
+            // Release the delivery
             //
+            qdr_delivery_release_CT(core, dlv);
         }
-    } else if (fanout == 1) {
+    } else if (fanout > 0) {
         qd_bitmask_free(dlv->link_exclusion);
         dlv->link_exclusion = 0;
         if (dlv->settled) {
@@ -338,19 +371,9 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
                 assert(!dlv->peer);
                 qdr_delivery_free(dlv);
             }
-        } else
+        } else {
             DEQ_INSERT_TAIL(link->unsettled, dlv);
-    } else {
-        //
-        // The fanout is greater than one.  Do something!  TODO
-        //
-        qd_bitmask_free(dlv->link_exclusion);
-        dlv->link_exclusion = 0;
-
-        if (presettled) {
-            qdr_link_issue_credit_CT(core, link, 1);
-            assert(!dlv->peer);
-            qdr_delivery_free(dlv);
+            dlv->where = QDR_DELIVERY_IN_UNSETTLED;
         }
     }
 
@@ -388,8 +411,10 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
         if (!addr && dlv->to_addr)
             qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr);
         qdr_link_forward_CT(core, link, dlv, addr);
-    } else
+    } else {
         DEQ_INSERT_TAIL(link->undelivered, dlv);
+        dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
+    }
 }
 
 
@@ -427,8 +452,6 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
     uint64_t        disp    = action->args.delivery.disposition;
     bool            settled = action->args.delivery.settled;
 
-    bool link_routed = dlv && dlv->link && dlv->link->connected_link;
-
     //
     // Logic:
     //
@@ -455,22 +478,12 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
             push = true;
             peer->peer = 0;
             dlv->peer  = 0;
-            if (peer->link) {
-                sys_mutex_lock(peer->link->conn->work_lock);
-                DEQ_REMOVE(peer->link->unsettled, peer);
-                sys_mutex_unlock(peer->link->conn->work_lock);
-                if (peer->link->link_direction == QD_INCOMING && !link_routed)
-                    qdr_link_issue_credit_CT(core, peer->link, 1);
-            }
+            if (peer->link)
+                qdr_delivery_remove_unsettled_CT(core, peer);
         }
 
-        if (dlv->link) {
-            sys_mutex_lock(dlv->link->conn->work_lock);
-            DEQ_REMOVE(dlv->link->unsettled, dlv);
-            sys_mutex_unlock(dlv->link->conn->work_lock);
-            if (dlv->link->link_direction == QD_INCOMING && !link_routed)
-                qdr_link_issue_credit_CT(core, dlv->link, 1);
-        }
+        if (dlv->link)
+            qdr_delivery_remove_unsettled_CT(core, dlv);
 
         qdr_delivery_free(dlv);
     }
@@ -567,7 +580,7 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
 
 void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
 {
-    if (!dlv || !dlv->link)
+    if (!dlv || !dlv->link || dlv->where == QDR_DELIVERY_IN_UNDELIVERED)
         return;
 
     qdr_link_t *link = dlv->link;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/19abbfc2/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 607545e..cfc3646 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -53,10 +53,9 @@ class RouterTest(TestCase):
             ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'out'}),
             ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'in'}),
             
-            ('fixedAddress', {'prefix': '/closest/', 'fanout': 'single', 'bias': 'closest'}),
-            ('fixedAddress', {'prefix': '/spread/', 'fanout': 'single', 'bias': 'spread'}),
-            ('fixedAddress', {'prefix': '/multicast/', 'fanout': 'multiple'}),
-            ('fixedAddress', {'prefix': '/', 'fanout': 'multiple'})
+            ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+            ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+            ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
         ])
         cls.router = cls.tester.qdrouterd(name, config)
         cls.router.wait_ready()
@@ -89,7 +88,7 @@ class RouterTest(TestCase):
         M2.stop()
 
     def test_02a_multicast_unsettled(self):
-        addr = self.address+"/pre_settled/multicast/1"
+        addr = self.address+"/multicast.unsettled.1"
         M1 = self.messenger()
         M2 = self.messenger()
         M3 = self.messenger()
@@ -280,14 +279,20 @@ class RouterTest(TestCase):
         M1.outgoing_window = 5
 
         M1.start()
+        M1.timeout = 1
         tm = Message()
         tm.address = addr
         tm.body = {'number': 200}
 
-        tx_tracker = M1.put(tm)
-        M1.send(0)
-        M1.flush()
-        self.assertEqual(MODIFIED, M1.status(tx_tracker))
+        exception = False
+        try:
+            M1.put(tm)
+            M1.send(0)
+            M1.flush()
+        except Exception:
+            exception = True
+
+        self.assertEqual(exception, True)
 
         M1.stop()
 
@@ -833,7 +838,7 @@ class RouterTest(TestCase):
 
 
     def test_10_semantics_multicast(self):
-        addr = self.address+"/multicast/1"
+        addr = self.address+"/multicast.10"
         M1 = self.messenger()
         M2 = self.messenger()
         M3 = self.messenger()
@@ -877,7 +882,7 @@ class RouterTest(TestCase):
         M4.stop()
 
     def test_11_semantics_closest(self):
-        addr = self.address+"/closest/1"
+        addr = self.address+"/closest.1"
         M1 = self.messenger()
         M2 = self.messenger()
         M3 = self.messenger()
@@ -928,7 +933,7 @@ class RouterTest(TestCase):
         M4.stop()
 
     def test_12_semantics_spread(self):
-        addr = self.address+"/spread/1"
+        addr = self.address+"/spread.1"
         M1 = self.messenger()
         M2 = self.messenger()
         M3 = self.messenger()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org