You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/11/08 22:15:34 UTC

qpid-dispatch git commit: DISPATCH-867 - Added missing linkage from drained output buffer to restarting a stalled link. This closes #215

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master ede8b029a -> a512c0833


DISPATCH-867 - Added missing linkage from drained output buffer to restarting a stalled link.
This closes #215


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a512c083
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a512c083
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a512c083

Branch: refs/heads/master
Commit: a512c083342e497c3c461163fb0ffd946fe586d5
Parents: ede8b02
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Nov 8 17:12:03 2017 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Nov 8 17:12:03 2017 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/message.h       |  3 +-
 include/qpid/dispatch/router_core.h   |  8 +++++
 src/message.c                         |  9 ++++--
 src/router_core/connections.c         |  6 ++++
 src/router_core/router_core_private.h |  7 +++--
 src/router_core/transfer.c            | 50 +++++++++++++++++-------------
 src/router_node.c                     |  6 +++-
 7 files changed, 60 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a512c083/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index b06b4bd..78c0b95 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -227,8 +227,9 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery);
  * @param link The outgoing link on which to send the message.
  * @param strip_outbound_annotations [in] annotation control flag
  * @param restart_rx [out] indication to wake up receive process
+ * @param q3_stalled [out] indicates that the link is stalled due to proton-buffer-full
  */
-void qd_message_send(qd_message_t *msg, qd_link_t *link, bool strip_outbound_annotations, bool *restart_rx);
+void qd_message_send(qd_message_t *msg, qd_link_t *link, bool strip_outbound_annotations, bool *restart_rx, bool *q3_stalled);
 
 /**
  * Check that the message is well-formed up to a certain depth.  Any part of the message that is

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a512c083/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 9c2992c..81e0dd3 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -474,6 +474,14 @@ bool qdr_link_strip_annotations_in(const qdr_link_t *link);
 bool qdr_link_strip_annotations_out(const qdr_link_t *link);
 
 /**
+ * qdr_link_stalled_outbound
+ *
+ * Tell the link that it has been stalled outbound due to back-pressure from the
+ * transport buffers.  Stalling is undone during link-flow processing.
+ */
+void qdr_link_stalled_outbound(qdr_link_t *link);
+
+/**
  * qdr_link_name
  *
  * Retrieve the name of the link.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a512c083/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 4af7619..03f475b 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1398,7 +1398,8 @@ static void compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t
 void qd_message_send(qd_message_t *in_msg,
                      qd_link_t    *link,
                      bool          strip_annotations,
-                     bool         *restart_rx)
+                     bool         *restart_rx,
+                     bool         *q3_stalled)
 {
     qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
     qd_message_content_t *content = msg->content;
@@ -1407,6 +1408,7 @@ void qd_message_send(qd_message_t *in_msg,
 
     int                  fanout   = qd_message_fanout(in_msg);
     *restart_rx                   = false;
+    *q3_stalled                   = false;
 
     if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
 
@@ -1515,8 +1517,7 @@ void qd_message_send(qd_message_t *in_msg,
 
     pn_session_t     *pns  = pn_link_session(pnl);
 
-    while (buf && pn_session_outgoing_bytes(pns) < QD_QLIMIT_Q3_UPPER) {
-
+    while (buf && pn_session_outgoing_bytes(pns) <= QD_QLIMIT_Q3_UPPER) {
         if (msg->content->aborted) {
             if (pn_link_current(pnl)) {
                 msg->send_complete = true;
@@ -1596,6 +1597,8 @@ void qd_message_send(qd_message_t *in_msg,
 
         buf = next_buf;
     }
+
+    *q3_stalled = (pn_session_outgoing_bytes(pns) > QD_QLIMIT_Q3_UPPER);
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a512c083/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 9d10fa4..5b2a8c8 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -384,6 +384,12 @@ bool qdr_link_strip_annotations_out(const qdr_link_t *link)
 }
 
 
+void qdr_link_stalled_outbound(qdr_link_t *link)
+{
+    link->stalled_outbound = true;
+}
+
+
 const char *qdr_link_name(const qdr_link_t *link)
 {
     return link->name;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a512c083/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index f5e5618..b1f49d8 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -391,14 +391,15 @@ struct qdr_link_t {
     qdr_delivery_list_t      unsettled;          ///< Unsettled deliveries
     qdr_delivery_list_t      settled;            ///< Settled deliveries
     qdr_delivery_ref_list_t  updated_deliveries; ///< References to deliveries (in the unsettled list) with updates.
-    bool                     admin_enabled;
     qdr_link_oper_status_t   oper_status;
+    int                      capacity;
+    int                      credit_to_core; ///< Number of the available credits incrementally given to the core
+    bool                     admin_enabled;
     bool                     strip_annotations_in;
     bool                     strip_annotations_out;
-    int                      capacity;
     bool                     flow_started;   ///< for incoming, true iff initial credit has been granted
     bool                     drain_mode;
-    int                      credit_to_core; ///< Number of the available credits incrementally given to the core
+    bool                     stalled_outbound;  ///< Indicates that this link is stalled on outbound buffer backpressure
 
     uint64_t total_deliveries;
     uint64_t presettled_deliveries;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a512c083/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index f600545..d15b7bc 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -687,6 +687,16 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
     link->drain_mode = drain;
 
     //
+    // If the link was stalled due to internal backpressure from the transport, put it
+    // on the links-with-work list and activate the connection to resume sending.
+    //
+    if (link->stalled_outbound) {
+        link->stalled_outbound = false;
+        qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+        activate = true;
+    }
+
+    //
     // If this is an attach-routed link, propagate the flow data downrange.
     // Note that the credit value is incremental.
     //
@@ -704,29 +714,27 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
                 work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
             qdr_link_enqueue_work_CT(core, clink, work);
         }
+    } else {
+        //
+        // Handle the replenishing of credit outbound
+        //
+        if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) {
+            if (drain_was_set) {
+                work = new_qdr_link_work_t();
+                ZERO(work);
+                work->work_type    = QDR_LINK_WORK_FLOW;
+                work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
+            }
 
-        return;
-    }
-
-    //
-    // Handle the replenishing of credit outbound
-    //
-    if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) {
-        if (drain_was_set) {
-            work = new_qdr_link_work_t();
-            ZERO(work);
-            work->work_type    = QDR_LINK_WORK_FLOW;
-            work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
-        }
-
-        sys_mutex_lock(link->conn->work_lock);
-        if (work)
-            DEQ_INSERT_TAIL(link->work_list, work);
-        if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
-            qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
-            activate = true;
+            sys_mutex_lock(link->conn->work_lock);
+            if (work)
+                DEQ_INSERT_TAIL(link->work_list, work);
+            if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
+                qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+                activate = true;
+            }
+            sys_mutex_unlock(link->conn->work_lock);
         }
-        sys_mutex_unlock(link->conn->work_lock);
     }
 
     //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a512c083/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index d62365f..c447783 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1345,10 +1345,14 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d
     }
 
     bool restart_rx = false;
+    bool q3_stalled = false;
 
     qd_message_t *msg_out = qdr_delivery_message(dlv);
 
-    qd_message_send(msg_out, qlink, qdr_link_strip_annotations_out(link), &restart_rx);
+    qd_message_send(msg_out, qlink, qdr_link_strip_annotations_out(link), &restart_rx, &q3_stalled);
+
+    if (q3_stalled)
+        qdr_link_stalled_outbound(link);
 
     if (restart_rx) {
         qd_link_t *qdl_in = qd_message_get_receiving_link(msg_out);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org