You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/06/14 12:57:24 UTC
[3/8] qpid-dispatch git commit: DISPATCH-341 - Drain now propagates
across link routes and behaves correctly for router-terminated links.
DISPATCH-341 - Drain now propagates across link routes and behaves correctly for router-terminated links.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/7a8aa51d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/7a8aa51d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/7a8aa51d
Branch: refs/heads/0.6.x
Commit: 7a8aa51de4bf6da2a1a49f1d37774975869f8d54
Parents: 789b73e
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Jun 3 15:38:26 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Jun 13 17:22:09 2016 -0400
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 2 ++
src/router_core/connections.c | 24 ++++++++-----
src/router_core/router_core_private.h | 4 ++-
src/router_core/transfer.c | 56 +++++++++++++++++++++---------
src/router_node.c | 13 +++++++
tests/system_tests_drain.py | 8 ++---
tests/system_tests_drain_support.py | 8 ++---
tests/system_tests_link_routes.py | 16 +++++++--
8 files changed, 94 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 459e1a3..8eaa7ef 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -527,6 +527,7 @@ typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_e
typedef void (*qdr_link_flow_t) (void *context, qdr_link_t *link, int credit);
typedef void (*qdr_link_offer_t) (void *context, qdr_link_t *link, int delivery_count);
typedef void (*qdr_link_drained_t) (void *context, qdr_link_t *link);
+typedef void (*qdr_link_drain_t) (void *context, qdr_link_t *link, bool mode);
typedef void (*qdr_link_push_t) (void *context, qdr_link_t *link);
typedef void (*qdr_link_deliver_t) (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled);
typedef void (*qdr_delivery_update_t) (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled);
@@ -540,6 +541,7 @@ void qdr_connection_handlers(qdr_core_t *core,
qdr_link_flow_t flow,
qdr_link_offer_t offer,
qdr_link_drained_t drained,
+ qdr_link_drain_t drain,
qdr_link_push_t push,
qdr_link_deliver_t deliver,
qdr_delivery_update_t delivery_update);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index a9612ea..e0543ec 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -188,8 +188,14 @@ int qdr_connection_process(qdr_connection_t *conn)
sys_mutex_unlock(conn->work_lock);
if (link) {
- core->flow_handler(core->user_context, link, link->incremental_credit);
- link->incremental_credit = 0;
+ if (link->incremental_credit > 0) {
+ core->flow_handler(core->user_context, link, link->incremental_credit);
+ link->incremental_credit = 0;
+ }
+ if (link->drain_mode_changed) {
+ core->drain_handler(core->user_context, link, link->drain_mode);
+ link->drain_mode_changed = false;
+ }
event_count++;
}
} while (link);
@@ -331,6 +337,7 @@ void qdr_connection_handlers(qdr_core_t *core,
qdr_link_flow_t flow,
qdr_link_offer_t offer,
qdr_link_drained_t drained,
+ qdr_link_drain_t drain,
qdr_link_push_t push,
qdr_link_deliver_t deliver,
qdr_delivery_update_t delivery_update)
@@ -343,6 +350,7 @@ void qdr_connection_handlers(qdr_core_t *core,
core->flow_handler = flow;
core->offer_handler = offer;
core->drained_handler = drained;
+ core->drain_handler = drain;
core->push_handler = push;
core->deliver_handler = deliver;
core->delivery_update_handler = delivery_update;
@@ -991,7 +999,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
if (qdr_terminus_is_anonymous(target)) {
link->owning_addr = 0;
qdr_link_outbound_second_attach_CT(core, link, source, target);
- qdr_link_issue_credit_CT(core, link, link->capacity);
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
} else {
//
@@ -1032,7 +1040,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
// Issue the initial credit only if there are destinations for the address.
//
if (DEQ_SIZE(addr->subscriptions) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes))
- qdr_link_issue_credit_CT(core, link, link->capacity);
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
}
}
break;
@@ -1040,12 +1048,12 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
case QD_LINK_CONTROL:
qdr_link_outbound_second_attach_CT(core, link, source, target);
- qdr_link_issue_credit_CT(core, link, link->capacity);
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
break;
case QD_LINK_ROUTER:
qdr_link_outbound_second_attach_CT(core, link, source, target);
- qdr_link_issue_credit_CT(core, link, link->capacity);
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
break;
}
} else {
@@ -1147,12 +1155,12 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
//
qdr_address_t *addr = link->owning_addr;
if (!addr || (DEQ_SIZE(addr->subscriptions) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes)))
- qdr_link_issue_credit_CT(core, link, link->capacity);
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
break;
case QD_LINK_CONTROL:
case QD_LINK_ROUTER:
- qdr_link_issue_credit_CT(core, link, link->capacity);
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
break;
}
} else {
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 58d73a2..0fd5546 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -267,6 +267,7 @@ struct qdr_link_t {
int incremental_credit;
bool flow_started; ///< for incoming, true iff initial credit has been granted
bool drain_mode;
+ bool drain_mode_changed;
int credit_to_core; ///< Number of the available credits incrementally given to the core
uint64_t total_deliveries;
};
@@ -545,6 +546,7 @@ struct qdr_core_t {
qdr_link_flow_t flow_handler;
qdr_link_offer_t offer_handler;
qdr_link_drained_t drained_handler;
+ qdr_link_drain_t drain_handler;
qdr_link_push_t push_handler;
qdr_link_deliver_t deliver_handler;
qdr_delivery_update_t delivery_update_handler;
@@ -589,7 +591,7 @@ void qdr_agent_setup_CT(qdr_core_t *core);
void qdr_forwarder_setup_CT(qdr_core_t *core);
qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label);
void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
-void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit);
+void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain);
void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr);
void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 4ac5e1a..b587baf 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -353,7 +353,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
//
if (moved && link->link_direction == QD_INCOMING &&
link->link_type != QD_LINK_ROUTER && !link->connected_link)
- qdr_link_issue_credit_CT(core, link, 1);
+ qdr_link_issue_credit_CT(core, link, 1, false);
return moved;
}
@@ -364,24 +364,39 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
if (discard)
return;
- qdr_link_t *link = action->args.connection.link;
- int credit = action->args.connection.credit;
- bool drain = action->args.connection.drain;
- bool activate = false;
+ qdr_link_t *link = action->args.connection.link;
+ int credit = action->args.connection.credit;
+ bool drain = action->args.connection.drain;
+ bool activate = false;
+ bool drain_was_set = !link->drain_mode && drain;
+
+ link->drain_mode = drain;
//
// If this is an attach-routed link, propagate the flow data downrange.
// Note that the credit value is incremental.
//
- if (link->connected_link)
- qdr_link_issue_credit_CT(core, link->connected_link, credit);
+ if (link->connected_link) {
+ qdr_link_t *clink = link->connected_link;
+
+ if (clink->link_direction == QD_INCOMING)
+ qdr_link_issue_credit_CT(core, link->connected_link, credit, drain);
+ else {
+ sys_mutex_lock(clink->conn->work_lock);
+ qdr_add_link_ref(&clink->conn->links_with_deliveries, clink, QDR_LINK_LIST_CLASS_DELIVERY);
+ sys_mutex_unlock(clink->conn->work_lock);
+ qdr_connection_activate_CT(core, clink->conn);
+ }
+
+ return;
+ }
//
// Handle the replenishing of credit outbound
//
- if (link->link_direction == QD_OUTGOING && credit > 0) {
+ if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) {
sys_mutex_lock(link->conn->work_lock);
- if (DEQ_SIZE(link->undelivered) > 0) {
+ if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
activate = true;
}
@@ -389,10 +404,8 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
}
//
- // Record the drain mode for the link
+ // Activate the connection if we have deliveries to send or drain mode was set.
//
- link->drain_mode = drain;
-
if (activate)
qdr_connection_activate_CT(core, link->conn);
}
@@ -428,7 +441,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
qdr_delivery_release_CT(core, dlv);
qdr_delivery_decref(dlv);
if (link->link_type == QD_LINK_ROUTER)
- qdr_link_issue_credit_CT(core, link, 1);
+ qdr_link_issue_credit_CT(core, link, 1, false);
}
} else if (fanout > 0) {
if (dlv->settled) {
@@ -436,7 +449,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
// The delivery is settled. Keep it off the unsettled list and issue
// replacement credit for it now.
//
- qdr_link_issue_credit_CT(core, link, 1);
+ qdr_link_issue_credit_CT(core, link, 1, false);
//
// If the delivery has no more references, free it now.
@@ -457,7 +470,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_
// are many addresses sharing the link.
//
if (link->link_type == QD_LINK_ROUTER)
- qdr_link_issue_credit_CT(core, link, 1);
+ qdr_link_issue_credit_CT(core, link, 1, false);
}
}
@@ -629,8 +642,14 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool
* Check the link's accumulated credit. If the credit given to the connection thread
* has been issued to Proton, provide the next batch of credit to the connection thread.
*/
-void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit)
+void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain)
{
+ bool drain_changed = link->drain_mode |= drain;
+ bool activate = drain_changed;
+
+ link->drain_mode = drain;
+ link->drain_mode_changed = drain_changed;
+
link->incremental_credit_CT += credit;
link->flow_started = true;
@@ -640,7 +659,10 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit)
//
link->incremental_credit = link->incremental_credit_CT;
link->incremental_credit_CT = 0;
+ activate = true;
+ }
+ if (activate) {
//
// Put this link on the connection's has-credit list.
//
@@ -682,7 +704,7 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr)
// Issue credit to stalled links
//
if (!link->flow_started)
- qdr_link_issue_credit_CT(core, link, link->capacity);
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
//
// Drain undelivered deliveries via the forwarder
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 10794ac..b8e8f0e 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -789,6 +789,18 @@ static void CORE_link_drained(void *context, qdr_link_t *link)
}
+static void CORE_link_drain(void *context, qdr_link_t *link, bool mode)
+{
+ qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link);
+ pn_link_t *plink = qd_link_pn(qlink);
+
+ if (plink) {
+ if (pn_link_is_receiver(plink))
+ pn_link_set_drain(plink, mode);
+ }
+}
+
+
static void CORE_link_push(void *context, qdr_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
@@ -875,6 +887,7 @@ void qd_router_setup_late(qd_dispatch_t *qd)
CORE_link_flow,
CORE_link_offer,
CORE_link_drained,
+ CORE_link_drain,
CORE_link_push,
CORE_link_deliver,
CORE_delivery_update);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/tests/system_tests_drain.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py
index 9747995..b379d27 100644
--- a/tests/system_tests_drain.py
+++ b/tests/system_tests_drain.py
@@ -42,22 +42,22 @@ class DrainSupportTest(TestCase):
cls.router.wait_ready()
cls.address = cls.router.addresses[0]
- def test_drain_support_all_messages(self):
+ def test_drain_support_1_all_messages(self):
drain_support = DrainMessagesHandler(self.address)
drain_support.run()
self.assertEqual(drain_support.error, None)
- def test_drain_support_one_message(self):
+ def test_drain_support_2_one_message(self):
drain_support = DrainOneMessageHandler(self.address)
drain_support.run()
self.assertEqual(drain_support.error, None)
- def test_drain_support_no_messages(self):
+ def test_drain_support_3_no_messages(self):
drain_support = DrainNoMessagesHandler(self.address)
drain_support.run()
self.assertEqual(drain_support.error, None)
- def test_drain_support_no_more_messages(self):
+ def test_drain_support_4_no_more_messages(self):
drain_support = DrainNoMoreMessagesHandler(self.address)
drain_support.run()
self.assertEqual(drain_support.error, None)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/tests/system_tests_drain_support.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py
index f11b8b8..aa3a23a 100644
--- a/tests/system_tests_drain_support.py
+++ b/tests/system_tests_drain_support.py
@@ -142,8 +142,8 @@ class DrainNoMessagesHandler(MessagingHandler):
def on_sendable(self, event):
self.receiver.drain(1)
- def on_drained(self, event):
- if sender.credit == 0:
+ def on_link_flow(self, event):
+ if self.receiver.credit == 0:
self.error = None
self.timer.cancel()
self.conn.close()
@@ -189,8 +189,8 @@ class DrainNoMoreMessagesHandler(MessagingHandler):
def on_settled(self, event):
self.receiver.drain(1)
- def on_drained(self, event):
- if sender.credit == 0:
+ def on_link_flow(self, event):
+ if self.receiver.credit == 0:
self.error = None
self.timer.cancel()
self.conn.close()
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 37317f9..420cc96 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -28,7 +28,7 @@ from proton.handlers import MessagingHandler
from proton.reactor import AtMostOnce, Container
from proton.utils import BlockingConnection, LinkDetached
-from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler
+from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler, DrainNoMessagesHandler, DrainNoMoreMessagesHandler
from qpid_dispatch.management.client import Node
@@ -448,12 +448,22 @@ class LinkRoutePatternTest(TestCase):
def test_www_drain_support_all_messages(self):
drain_support = DrainMessagesHandler(self.routers[2].addresses[1])
drain_support.run()
- self.assertTrue(drain_support.drain_successful)
+ self.assertEqual(None, drain_support.error)
def test_www_drain_support_one_message(self):
drain_support = DrainOneMessageHandler(self.routers[2].addresses[1])
drain_support.run()
- self.assertTrue(drain_support.drain_successful)
+ self.assertEqual(None, drain_support.error)
+
+ def test_www_drain_support_no_messages(self):
+ drain_support = DrainNoMessagesHandler(self.routers[2].addresses[1])
+ drain_support.run()
+ self.assertEqual(None, drain_support.error)
+
+ def test_www_drain_support_no_more_messages(self):
+ drain_support = DrainNoMoreMessagesHandler(self.routers[2].addresses[1])
+ drain_support.run()
+ self.assertEqual(None, drain_support.error)
class DeliveryTagsTest(MessagingHandler):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org