You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/06/14 12:57:24 UTC

[3/8] qpid-dispatch git commit: DISPATCH-341 - Drain now propagates across link routes and behaves correctly for router-terminated links.

DISPATCH-341 - Drain now propagates across link routes and behaves correctly for router-terminated links.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/7a8aa51d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/7a8aa51d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/7a8aa51d

Branch: refs/heads/0.6.x
Commit: 7a8aa51de4bf6da2a1a49f1d37774975869f8d54
Parents: 789b73e
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Jun 3 15:38:26 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Jun 13 17:22:09 2016 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   |  2 ++
 src/router_core/connections.c         | 24 ++++++++-----
 src/router_core/router_core_private.h |  4 ++-
 src/router_core/transfer.c            | 56 +++++++++++++++++++++---------
 src/router_node.c                     | 13 +++++++
 tests/system_tests_drain.py           |  8 ++---
 tests/system_tests_drain_support.py   |  8 ++---
 tests/system_tests_link_routes.py     | 16 +++++++--
 8 files changed, 94 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 459e1a3..8eaa7ef 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -527,6 +527,7 @@ typedef void (*qdr_link_detach_t)        (void *context, qdr_link_t *link, qdr_e
 typedef void (*qdr_link_flow_t)          (void *context, qdr_link_t *link, int credit);
 typedef void (*qdr_link_offer_t)         (void *context, qdr_link_t *link, int delivery_count);
 typedef void (*qdr_link_drained_t)       (void *context, qdr_link_t *link);
+typedef void (*qdr_link_drain_t)         (void *context, qdr_link_t *link, bool mode);
 typedef void (*qdr_link_push_t)          (void *context, qdr_link_t *link);
 typedef void (*qdr_link_deliver_t)       (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled);
 typedef void (*qdr_delivery_update_t)    (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled);
@@ -540,6 +541,7 @@ void qdr_connection_handlers(qdr_core_t                *core,
                              qdr_link_flow_t            flow,
                              qdr_link_offer_t           offer,
                              qdr_link_drained_t         drained,
+                             qdr_link_drain_t           drain,
                              qdr_link_push_t            push,
                              qdr_link_deliver_t         deliver,
                              qdr_delivery_update_t      delivery_update);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index a9612ea..e0543ec 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -188,8 +188,14 @@ int qdr_connection_process(qdr_connection_t *conn)
         sys_mutex_unlock(conn->work_lock);
 
         if (link) {
-            core->flow_handler(core->user_context, link, link->incremental_credit);
-            link->incremental_credit = 0;
+            if (link->incremental_credit > 0) {
+                core->flow_handler(core->user_context, link, link->incremental_credit);
+                link->incremental_credit = 0;
+            }
+            if (link->drain_mode_changed) {
+                core->drain_handler(core->user_context, link, link->drain_mode);
+                link->drain_mode_changed = false;
+            }
             event_count++;
         }
     } while (link);
@@ -331,6 +337,7 @@ void qdr_connection_handlers(qdr_core_t                *core,
                              qdr_link_flow_t            flow,
                              qdr_link_offer_t           offer,
                              qdr_link_drained_t         drained,
+                             qdr_link_drain_t           drain,
                              qdr_link_push_t            push,
                              qdr_link_deliver_t         deliver,
                              qdr_delivery_update_t      delivery_update)
@@ -343,6 +350,7 @@ void qdr_connection_handlers(qdr_core_t                *core,
     core->flow_handler            = flow;
     core->offer_handler           = offer;
     core->drained_handler         = drained;
+    core->drain_handler           = drain;
     core->push_handler            = push;
     core->deliver_handler         = deliver;
     core->delivery_update_handler = delivery_update;
@@ -991,7 +999,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
             if (qdr_terminus_is_anonymous(target)) {
                 link->owning_addr = 0;
                 qdr_link_outbound_second_attach_CT(core, link, source, target);
-                qdr_link_issue_credit_CT(core, link, link->capacity);
+                qdr_link_issue_credit_CT(core, link, link->capacity, false);
 
             } else {
                 //
@@ -1032,7 +1040,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                     // Issue the initial credit only if there are destinations for the address.
                     //
                     if (DEQ_SIZE(addr->subscriptions) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes))
-                        qdr_link_issue_credit_CT(core, link, link->capacity);
+                        qdr_link_issue_credit_CT(core, link, link->capacity, false);
                 }
             }
             break;
@@ -1040,12 +1048,12 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
 
         case QD_LINK_CONTROL:
             qdr_link_outbound_second_attach_CT(core, link, source, target);
-            qdr_link_issue_credit_CT(core, link, link->capacity);
+            qdr_link_issue_credit_CT(core, link, link->capacity, false);
             break;
 
         case QD_LINK_ROUTER:
             qdr_link_outbound_second_attach_CT(core, link, source, target);
-            qdr_link_issue_credit_CT(core, link, link->capacity);
+            qdr_link_issue_credit_CT(core, link, link->capacity, false);
             break;
         }
     } else {
@@ -1147,12 +1155,12 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
             //
             qdr_address_t *addr = link->owning_addr;
             if (!addr || (DEQ_SIZE(addr->subscriptions) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes)))
-                qdr_link_issue_credit_CT(core, link, link->capacity);
+                qdr_link_issue_credit_CT(core, link, link->capacity, false);
             break;
 
         case QD_LINK_CONTROL:
         case QD_LINK_ROUTER:
-            qdr_link_issue_credit_CT(core, link, link->capacity);
+            qdr_link_issue_credit_CT(core, link, link->capacity, false);
             break;
         }
     } else {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 58d73a2..0fd5546 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -267,6 +267,7 @@ struct qdr_link_t {
     int                      incremental_credit;
     bool                     flow_started;   ///< for incoming, true iff initial credit has been granted
     bool                     drain_mode;
+    bool                     drain_mode_changed;
     int                      credit_to_core; ///< Number of the available credits incrementally given to the core
     uint64_t                 total_deliveries;
 };
@@ -545,6 +546,7 @@ struct qdr_core_t {
     qdr_link_flow_t            flow_handler;
     qdr_link_offer_t           offer_handler;
     qdr_link_drained_t         drained_handler;
+    qdr_link_drain_t           drain_handler;
     qdr_link_push_t            push_handler;
     qdr_link_deliver_t         deliver_handler;
     qdr_delivery_update_t      delivery_update_handler;
@@ -589,7 +591,7 @@ void  qdr_agent_setup_CT(qdr_core_t *core);
 void  qdr_forwarder_setup_CT(qdr_core_t *core);
 qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label);
 void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
-void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit);
+void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain);
 void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr);
 void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
 void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 4ac5e1a..b587baf 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -353,7 +353,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
     //
     if (moved && link->link_direction == QD_INCOMING &&
         link->link_type != QD_LINK_ROUTER && !link->connected_link)
-        qdr_link_issue_credit_CT(core, link, 1);
+        qdr_link_issue_credit_CT(core, link, 1, false);
 
     return moved;
 }
@@ -364,24 +364,39 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
     if (discard)
         return;
 
-    qdr_link_t *link = action->args.connection.link;
-    int  credit      = action->args.connection.credit;
-    bool drain       = action->args.connection.drain;
-    bool activate    = false;
+    qdr_link_t *link   = action->args.connection.link;
+    int  credit        = action->args.connection.credit;
+    bool drain         = action->args.connection.drain;
+    bool activate      = false;
+    bool drain_was_set = !link->drain_mode && drain;
+
+    link->drain_mode = drain;
 
     //
     // If this is an attach-routed link, propagate the flow data downrange.
     // Note that the credit value is incremental.
     //
-    if (link->connected_link)
-        qdr_link_issue_credit_CT(core, link->connected_link, credit);
+    if (link->connected_link) {
+        qdr_link_t *clink = link->connected_link;
+
+        if (clink->link_direction == QD_INCOMING)
+            qdr_link_issue_credit_CT(core, link->connected_link, credit, drain);
+        else {
+            sys_mutex_lock(clink->conn->work_lock);
+            qdr_add_link_ref(&clink->conn->links_with_deliveries, clink, QDR_LINK_LIST_CLASS_DELIVERY);
+            sys_mutex_unlock(clink->conn->work_lock);
+            qdr_connection_activate_CT(core, clink->conn);
+        }
+
+        return;
+    }
 
     //
     // Handle the replenishing of credit outbound
     //
-    if (link->link_direction == QD_OUTGOING && credit > 0) {
+    if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) {
         sys_mutex_lock(link->conn->work_lock);
-        if (DEQ_SIZE(link->undelivered) > 0) {
+        if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
             qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
             activate = true;
         }
@@ -389,10 +404,8 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
     }
 
     //
-    // Record the drain mode for the link
+    // Activate the connection if we have deliveries to send or drain mode was set.
     //
-    link->drain_mode = drain;
-
     if (activate)
         qdr_connection_activate_CT(core, link->conn);
 }
@@ -428,7 +441,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
             qdr_delivery_release_CT(core, dlv);
             qdr_delivery_decref(dlv);
             if (link->link_type == QD_LINK_ROUTER)
-                qdr_link_issue_credit_CT(core, link, 1);
+                qdr_link_issue_credit_CT(core, link, 1, false);
         }
     } else if (fanout > 0) {
         if (dlv->settled) {
@@ -436,7 +449,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
             // The delivery is settled.  Keep it off the unsettled list and issue
             // replacement credit for it now.
             //
-            qdr_link_issue_credit_CT(core, link, 1);
+            qdr_link_issue_credit_CT(core, link, 1, false);
 
             //
             // If the delivery has no more references, free it now.
@@ -457,7 +470,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
             // are many addresses sharing the link.
             //
             if (link->link_type == QD_LINK_ROUTER)
-                qdr_link_issue_credit_CT(core, link, 1);
+                qdr_link_issue_credit_CT(core, link, 1, false);
         }
     }
 
@@ -629,8 +642,14 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
  * Check the link's accumulated credit.  If the credit given to the connection thread
  * has been issued to Proton, provide the next batch of credit to the connection thread.
  */
-void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit)
+void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain)
 {
+    bool drain_changed = link->drain_mode |= drain;
+    bool activate      = drain_changed;
+
+    link->drain_mode = drain;
+    link->drain_mode_changed = drain_changed;
+
     link->incremental_credit_CT += credit;
     link->flow_started = true;
 
@@ -640,7 +659,10 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit)
         //
         link->incremental_credit    = link->incremental_credit_CT;
         link->incremental_credit_CT = 0;
+        activate = true;
+    }
 
+    if (activate) {
         //
         // Put this link on the connection's has-credit list.
         //
@@ -682,7 +704,7 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
             // Issue credit to stalled links
             //
             if (!link->flow_started)
-                qdr_link_issue_credit_CT(core, link, link->capacity);
+                qdr_link_issue_credit_CT(core, link, link->capacity, false);
 
             //
             // Drain undelivered deliveries via the forwarder

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 10794ac..b8e8f0e 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -789,6 +789,18 @@ static void CORE_link_drained(void *context, qdr_link_t *link)
 }
 
 
+static void CORE_link_drain(void *context, qdr_link_t *link, bool mode)
+{
+    qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
+    pn_link_t *plink = qd_link_pn(qlink);
+
+    if (plink) {
+        if (pn_link_is_receiver(plink))
+            pn_link_set_drain(plink, mode);
+    }
+}
+
+
 static void CORE_link_push(void *context, qdr_link_t *link)
 {
     qd_router_t *router      = (qd_router_t*) context;
@@ -875,6 +887,7 @@ void qd_router_setup_late(qd_dispatch_t *qd)
                             CORE_link_flow,
                             CORE_link_offer,
                             CORE_link_drained,
+                            CORE_link_drain,
                             CORE_link_push,
                             CORE_link_deliver,
                             CORE_delivery_update);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/tests/system_tests_drain.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py
index 9747995..b379d27 100644
--- a/tests/system_tests_drain.py
+++ b/tests/system_tests_drain.py
@@ -42,22 +42,22 @@ class DrainSupportTest(TestCase):
         cls.router.wait_ready()
         cls.address = cls.router.addresses[0]
 
-    def test_drain_support_all_messages(self):
+    def test_drain_support_1_all_messages(self):
         drain_support = DrainMessagesHandler(self.address)
         drain_support.run()
         self.assertEqual(drain_support.error, None)
 
-    def test_drain_support_one_message(self):
+    def test_drain_support_2_one_message(self):
         drain_support = DrainOneMessageHandler(self.address)
         drain_support.run()
         self.assertEqual(drain_support.error, None)
 
-    def test_drain_support_no_messages(self):
+    def test_drain_support_3_no_messages(self):
         drain_support = DrainNoMessagesHandler(self.address)
         drain_support.run()
         self.assertEqual(drain_support.error, None)
 
-    def test_drain_support_no_more_messages(self):
+    def test_drain_support_4_no_more_messages(self):
         drain_support = DrainNoMoreMessagesHandler(self.address)
         drain_support.run()
         self.assertEqual(drain_support.error, None)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/tests/system_tests_drain_support.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py
index f11b8b8..aa3a23a 100644
--- a/tests/system_tests_drain_support.py
+++ b/tests/system_tests_drain_support.py
@@ -142,8 +142,8 @@ class DrainNoMessagesHandler(MessagingHandler):
     def on_sendable(self, event):
         self.receiver.drain(1)
 
-    def on_drained(self, event):
-        if sender.credit == 0:
+    def on_link_flow(self, event):
+        if self.receiver.credit == 0:
             self.error = None
             self.timer.cancel()
             self.conn.close()
@@ -189,8 +189,8 @@ class DrainNoMoreMessagesHandler(MessagingHandler):
     def on_settled(self, event):
         self.receiver.drain(1)
 
-    def on_drained(self, event):
-        if sender.credit == 0:
+    def on_link_flow(self, event):
+        if self.receiver.credit == 0:
             self.error = None
             self.timer.cancel()
             self.conn.close()

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 37317f9..420cc96 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -28,7 +28,7 @@ from proton.handlers import MessagingHandler
 from proton.reactor import AtMostOnce, Container
 from proton.utils import BlockingConnection, LinkDetached
 
-from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
+from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler, DrainNoMessagesHandler, DrainNoMoreMessagesHandler
 
 from qpid_dispatch.management.client import Node
 
@@ -448,12 +448,22 @@ class LinkRoutePatternTest(TestCase):
     def test_www_drain_support_all_messages(self):
         drain_support = DrainMessagesHandler(self.routers[2].addresses[1])
         drain_support.run()
-        self.assertTrue(drain_support.drain_successful)
+        self.assertEqual(None, drain_support.error)
 
     def test_www_drain_support_one_message(self):
         drain_support = DrainOneMessageHandler(self.routers[2].addresses[1])
         drain_support.run()
-        self.assertTrue(drain_support.drain_successful)
+        self.assertEqual(None, drain_support.error)
+
+    def test_www_drain_support_no_messages(self):
+        drain_support = DrainNoMessagesHandler(self.routers[2].addresses[1])
+        drain_support.run()
+        self.assertEqual(None, drain_support.error)
+
+    def test_www_drain_support_no_more_messages(self):
+        drain_support = DrainNoMoreMessagesHandler(self.routers[2].addresses[1])
+        drain_support.run()
+        self.assertEqual(None, drain_support.error)
 
 
 class DeliveryTagsTest(MessagingHandler):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org