You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/11/08 22:15:34 UTC
qpid-dispatch git commit: DISPATCH-867 - Added missing linkage from
drained output buffer to restarting a stalled link. This closes #215
Repository: qpid-dispatch
Updated Branches:
refs/heads/master ede8b029a -> a512c0833
DISPATCH-867 - Added missing linkage from drained output buffer to restarting a stalled link.
This closes #215
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a512c083
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a512c083
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a512c083
Branch: refs/heads/master
Commit: a512c083342e497c3c461163fb0ffd946fe586d5
Parents: ede8b02
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Nov 8 17:12:03 2017 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Nov 8 17:12:03 2017 -0500
----------------------------------------------------------------------
include/qpid/dispatch/message.h | 3 +-
include/qpid/dispatch/router_core.h | 8 +++++
src/message.c | 9 ++++--
src/router_core/connections.c | 6 ++++
src/router_core/router_core_private.h | 7 +++--
src/router_core/transfer.c | 50 +++++++++++++++++-------------
src/router_node.c | 6 +++-
7 files changed, 60 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a512c083/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index b06b4bd..78c0b95 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -227,8 +227,9 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery);
* @param link The outgoing link on which to send the message.
* @param strip_outbound_annotations [in] annotation control flag
* @param restart_rx [out] indication to wake up receive process
+ * @param q3_stalled [out] indicates that the link is stalled due to proton-buffer-full
*/
-void qd_message_send(qd_message_t *msg, qd_link_t *link, bool strip_outbound_annotations, bool *restart_rx);
+void qd_message_send(qd_message_t *msg, qd_link_t *link, bool strip_outbound_annotations, bool *restart_rx, bool *q3_stalled);
/**
* Check that the message is well-formed up to a certain depth. Any part of the message that is
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a512c083/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 9c2992c..81e0dd3 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -474,6 +474,14 @@ bool qdr_link_strip_annotations_in(const qdr_link_t *link);
bool qdr_link_strip_annotations_out(const qdr_link_t *link);
/**
+ * qdr_link_stalled_outbound
+ *
+ * Tell the link that it has been stalled outbound due to back-pressure from the
+ * transport buffers. Stalling is undone during link-flow processing.
+ */
+void qdr_link_stalled_outbound(qdr_link_t *link);
+
+/**
* qdr_link_name
*
* Retrieve the name of the link.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a512c083/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 4af7619..03f475b 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1398,7 +1398,8 @@ static void compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t
void qd_message_send(qd_message_t *in_msg,
qd_link_t *link,
bool strip_annotations,
- bool *restart_rx)
+ bool *restart_rx,
+ bool *q3_stalled)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
@@ -1407,6 +1408,7 @@ void qd_message_send(qd_message_t *in_msg,
int fanout = qd_message_fanout(in_msg);
*restart_rx = false;
+ *q3_stalled = false;
if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
@@ -1515,8 +1517,7 @@ void qd_message_send(qd_message_t *in_msg,
pn_session_t *pns = pn_link_session(pnl);
- while (buf && pn_session_outgoing_bytes(pns) < QD_QLIMIT_Q3_UPPER) {
-
+ while (buf && pn_session_outgoing_bytes(pns) <= QD_QLIMIT_Q3_UPPER) {
if (msg->content->aborted) {
if (pn_link_current(pnl)) {
msg->send_complete = true;
@@ -1596,6 +1597,8 @@ void qd_message_send(qd_message_t *in_msg,
buf = next_buf;
}
+
+ *q3_stalled = (pn_session_outgoing_bytes(pns) > QD_QLIMIT_Q3_UPPER);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a512c083/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 9d10fa4..5b2a8c8 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -384,6 +384,12 @@ bool qdr_link_strip_annotations_out(const qdr_link_t *link)
}
+void qdr_link_stalled_outbound(qdr_link_t *link)
+{
+ link->stalled_outbound = true;
+}
+
+
const char *qdr_link_name(const qdr_link_t *link)
{
return link->name;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a512c083/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index f5e5618..b1f49d8 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -391,14 +391,15 @@ struct qdr_link_t {
qdr_delivery_list_t unsettled; ///< Unsettled deliveries
qdr_delivery_list_t settled; ///< Settled deliveries
qdr_delivery_ref_list_t updated_deliveries; ///< References to deliveries (in the unsettled list) with updates.
- bool admin_enabled;
qdr_link_oper_status_t oper_status;
+ int capacity;
+ int credit_to_core; ///< Number of the available credits incrementally given to the core
+ bool admin_enabled;
bool strip_annotations_in;
bool strip_annotations_out;
- int capacity;
bool flow_started; ///< for incoming, true iff initial credit has been granted
bool drain_mode;
- int credit_to_core; ///< Number of the available credits incrementally given to the core
+ bool stalled_outbound; ///< Indicates that this link is stalled on outbound buffer backpressure
uint64_t total_deliveries;
uint64_t presettled_deliveries;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a512c083/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index f600545..d15b7bc 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -687,6 +687,16 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
link->drain_mode = drain;
//
+ // If the link was stalled due to internal backpressure from the transport, put it
+ // on the links-with-work list and activate the connection to resume sending.
+ //
+ if (link->stalled_outbound) {
+ link->stalled_outbound = false;
+ qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+ activate = true;
+ }
+
+ //
// If this is an attach-routed link, propagate the flow data downrange.
// Note that the credit value is incremental.
//
@@ -704,29 +714,27 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
qdr_link_enqueue_work_CT(core, clink, work);
}
+ } else {
+ //
+ // Handle the replenishing of credit outbound
+ //
+ if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) {
+ if (drain_was_set) {
+ work = new_qdr_link_work_t();
+ ZERO(work);
+ work->work_type = QDR_LINK_WORK_FLOW;
+ work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
+ }
- return;
- }
-
- //
- // Handle the replenishing of credit outbound
- //
- if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) {
- if (drain_was_set) {
- work = new_qdr_link_work_t();
- ZERO(work);
- work->work_type = QDR_LINK_WORK_FLOW;
- work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
- }
-
- sys_mutex_lock(link->conn->work_lock);
- if (work)
- DEQ_INSERT_TAIL(link->work_list, work);
- if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
- qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
- activate = true;
+ sys_mutex_lock(link->conn->work_lock);
+ if (work)
+ DEQ_INSERT_TAIL(link->work_list, work);
+ if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
+ qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+ activate = true;
+ }
+ sys_mutex_unlock(link->conn->work_lock);
}
- sys_mutex_unlock(link->conn->work_lock);
}
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a512c083/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index d62365f..c447783 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1345,10 +1345,14 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d
}
bool restart_rx = false;
+ bool q3_stalled = false;
qd_message_t *msg_out = qdr_delivery_message(dlv);
- qd_message_send(msg_out, qlink, qdr_link_strip_annotations_out(link), &restart_rx);
+ qd_message_send(msg_out, qlink, qdr_link_strip_annotations_out(link), &restart_rx, &q3_stalled);
+
+ if (q3_stalled)
+ qdr_link_stalled_outbound(link);
if (restart_rx) {
qd_link_t *qdl_in = qd_message_get_receiving_link(msg_out);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org