You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/03/19 00:06:33 UTC
[15/50] [abbrv] qpid-dispatch git commit: DISPATCH_179 - Link-routing
implemented for attach, flow, transfer.
DISPATCH_179 - Link-routing implemented for attach, flow, transfer.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/bb14ecfb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/bb14ecfb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/bb14ecfb
Branch: refs/heads/master
Commit: bb14ecfb6d070e41ac4d4dfe0eeb779f0b38117d
Parents: 3c84c0b
Author: Ted Ross <tr...@redhat.com>
Authored: Mon Mar 7 17:21:15 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Mar 7 17:21:15 2016 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 2 +-
src/router_core/connections.c | 8 ++++
src/router_core/forwarder.c | 2 +-
src/router_core/router_core_private.h | 4 ++
src/router_core/transfer.c | 71 +++++++++++++++++++-----------
src/router_node.c | 4 +-
6 files changed, 61 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb14ecfb/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 06669f5..4dcc167 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -494,7 +494,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_i
qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
qd_field_iterator_t *ingress, qd_field_iterator_t *addr,
bool settled, qd_bitmask_t *link_exclusion);
-qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg);
+qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled);
void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb14ecfb/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 1aca79f..ab8f6e1 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -957,6 +957,14 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
qdr_terminus_t *source = action->args.connection.source;
qdr_terminus_t *target = action->args.connection.target;
+ //
+ // Handle attach-routed links
+ //
+ if (link->connected_link) {
+ qdr_link_outbound_second_attach_CT(core, link->connected_link, source, target);
+ return;
+ }
+
if (link->link_direction == QD_INCOMING) {
//
// Handle incoming link cases
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb14ecfb/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 1c0f7ec..0b4562b 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -351,7 +351,7 @@ bool qdr_forward_link_balanced_CT(qdr_core_t *core,
qdr_terminus_t *target)
{
qdr_connection_ref_t *conn_ref = DEQ_HEAD(addr->conns);
- qdr_connection_t *conn;
+ qdr_connection_t *conn = 0;
//
// Check for locally connected containers that can handle this link attach.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb14ecfb/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 2877677..f44b1c5 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -236,6 +236,8 @@ struct qdr_link_t {
int capacity;
int incremental_credit_CT;
int incremental_credit;
+ bool drain_mode;
+ int credit_to_core; ///< Number of the available credits incrementally given to the core
uint64_t total_deliveries;
};
@@ -552,6 +554,8 @@ void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit);
void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work);
void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local);
+qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg);
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv);
void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
void qdr_connection_enqueue_work_CT(qdr_core_t *core,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb14ecfb/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index fcc33be..9484489 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -77,31 +77,19 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
}
-qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg)
+qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled)
{
- // TODO - Implement this. Bypass the CT?
+ qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
+ qdr_delivery_t *dlv = new_qdr_delivery_t();
- //
- // We might wish to run link-routed transfers and updates through the core in order to
- // track the number of outstanding deliveries and to have the ability to intervene in
- // flow control.
- //
- // Use case: Quiescing a broker. To do this, all inbound links to the broker shall be
- // idled by preventing the propagation of flow credit out of the broker. This will dry
- // the transfer of inbound deliveries, allow all existing deliveries to be settled, and
- // allow the router to know when it is safe to detach the inbound links. Outbound links
- // can also be detached after all deliveries are settled and "drained" indications are
- // received.
- //
- // Waypoint disconnect procedure:
- // 1) Block flow-credit propagation for link outbound to waypoint.
- // 2) Wait for the number of unsettled outbound deliveries to go to zero.
- // 3) Detach the outbound link.
- // 4) Wait for inbound link to be drained with zero unsettled deliveries.
- // 5) Detach inbound link.
- //
+ ZERO(dlv);
+ dlv->link = link;
+ dlv->msg = msg;
+ dlv->settled = settled;
- return 0;
+ action->args.connection.delivery = dlv;
+ qdr_action_enqueue(link->core, action);
+ return dlv;
}
@@ -127,6 +115,7 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
sys_mutex_unlock(conn->work_lock);
if (dlv) {
+ link->credit_to_core--;
core->deliver_handler(core->user_context, link, dlv, dlv->settled);
if (dlv->settled)
qdr_delivery_free(dlv);
@@ -160,6 +149,17 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mode)
{
qdr_action_t *action = qdr_action(qdr_link_flow_CT, "link_flow");
+
+ //
+ // Compute the number of credits now available that we haven't yet given
+ // incrementally to the router core. i.e. convert absolute credit to
+ // incremental credit.
+ //
+ credit -= link->credit_to_core;
+ if (credit < 0)
+ credit = 0;
+ link->credit_to_core += credit;
+
action->args.connection.link = link;
action->args.connection.credit = credit;
action->args.connection.drain = drain_mode;
@@ -278,13 +278,16 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
return;
qdr_link_t *link = action->args.connection.link;
- int credit = action->args.connection.credit;
- //bool drain = action->args.connection.drain;
- bool activate = false;
+ int credit = action->args.connection.credit;
+ bool drain = action->args.connection.drain;
+ bool activate = false;
//
- // TODO - If this is a link-routed link, propagate the flow data downrange.
+ // If this is an attach-routed link, propagate the flow data downrange.
+ // Note that the credit value is incremental.
//
+ if (link->connected_link)
+ qdr_link_issue_credit_CT(core, link->connected_link, credit);
//
// Handle the replenishing of credit outbound
@@ -298,6 +301,11 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
sys_mutex_unlock(link->conn->work_lock);
}
+ //
+ // Record the drain mode for the link
+ //
+ link->drain_mode = drain;
+
if (activate)
qdr_connection_activate_CT(core, link->conn);
}
@@ -315,6 +323,17 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
bool presettled = dlv->settled;
//
+ // If this is an attach-routed link, put the delivery directly onto the peer link
+ //
+ if (link->connected_link) {
+ qdr_delivery_t *peer = qdr_forward_new_delivery_CT(core, dlv, link->connected_link, dlv->msg);
+ qdr_forward_deliver_CT(core, link->connected_link, peer);
+ qd_message_free(dlv->msg);
+ dlv->msg = 0;
+ return;
+ }
+
+ //
// NOTE: The link->undelivered list does not need to be protected by the
// connection's work lock for incoming links. This protection is only
// needed for outgoing links.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb14ecfb/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index a2b6e2d..27b54c0 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -222,7 +222,7 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd
// Handle the link-routed case
//
if (qdr_link_is_routed(rlink)) {
- // TODO - Add Link-Route Forwarding here
+ qdr_link_deliver_to_routed_link(rlink, msg, pn_delivery_settled(pnd));
return;
}
@@ -417,7 +417,7 @@ static int router_link_flow_handler(void* context, qd_link_t *link)
if (!rlink)
return 0;
- qdr_link_flow(router->router_core, rlink, pn_link_credit(pnlink), pn_link_get_drain(pnlink));
+ qdr_link_flow(router->router_core, rlink, pn_link_remote_credit(pnlink), pn_link_get_drain(pnlink));
return 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org