You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2017/09/15 18:48:57 UTC

[4/7] qpid-dispatch git commit: DISPATCH-807: Define limits, state, and accessors for Q2 input holdoff

DISPATCH-807: Define limits, state, and accessors for Q2 input holdoff


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ece8bfea
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ece8bfea
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ece8bfea

Branch: refs/heads/DISPATCH-807-1
Commit: ece8bfea22fbe4333ed11e86741156913ed95b35
Parents: 02ed034
Author: Chuck Rolke <cr...@redhat.com>
Authored: Thu Sep 14 09:53:55 2017 -0400
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Fri Sep 15 14:47:26 2017 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/message.h | 41 ++++++++++++++++++++++++++++++++++++
 src/message.c                   | 24 +++++++++++++++++++++
 src/message_private.h           |  1 +
 tests/message_test.c            | 33 +++++++++++++++++++++++++++++
 4 files changed, 99 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ece8bfea/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 111c6a0..e0df420 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -36,6 +36,13 @@
  * @{
  */
 
+// DISPATCH-807 Queue depth limits
+// upper and lower limits for bang bang hysteresis control
+//
+// Q2 defines the number of buffers allowed in a message's buffer chain
+#define QD_QLIMIT_Q2_UPPER 128
+#define QD_QLIMIT_Q2_LOWER 120
+
 // Callback for status change (confirmed persistent, loaded-in-memory, etc.)
 
 typedef struct qd_message_t qd_message_t;
@@ -351,6 +358,40 @@ size_t qd_message_fanout(qd_message_t *msg);
  */
 void qd_message_add_fanout(qd_message_t *msg);
 
+/**
+ * Setter for message Q2 input_holdoff state
+ *
+ * @param msg A pointer to the message
+ */
+void qd_message_set_Q2_input_holdoff(qd_message_t *msg, bool holdoff);
+
+/**
+ * Accessor for message Q2 input_holdoff state
+ *
+ * @param msg A pointer to the message
+ * @return true if input is being held off
+ */
+bool qd_message_get_Q2_input_holdoff(qd_message_t *msg);
+
+/**
+ * Test if attempt to retreive message data through qd_message_recv should block
+ * due to Q2 input holdoff limit being exceeded. This message has enough
+ * buffers in the internal buffer chain and any calls to to qd_message_receive
+ * will not result in a call to pn_link_receive to retrieve more data.
+ *
+ * @param msg A pointer to the message
+ */
+bool qd_message_Q2_holdoff_should_block(qd_message_t *msg);
+
+/**
+ * Test if a message that is blocked by Q2 input holdoff has enough room
+ * to begin receiving again. This message has transmitted and disposed of
+ * enough buffers to begin receiving more data from the underlying proton link.
+ *
+ * @param msg A pointer to the message
+ */
+bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg);
+
 ///@}
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ece8bfea/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 6074ebb..12b09ce 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1865,3 +1865,27 @@ int qd_message_get_phase_val(qd_message_t *msg)
 {
     return ((qd_message_pvt_t*)msg)->content->ma_int_phase;
 }
+
+
+void qd_message_set_Q2_input_holdoff(qd_message_t *msg, bool holdoff)
+{
+    ((qd_message_pvt_t*)msg)->q2_input_holdoff = holdoff;
+}
+
+
+bool qd_message_get_Q2_input_holdoff(qd_message_t *msg)
+{
+    return ((qd_message_pvt_t*)msg)->q2_input_holdoff;
+}
+
+
+bool qd_message_Q2_holdoff_should_block(qd_message_t *msg)
+{
+    return DEQ_SIZE(((qd_message_pvt_t*)msg)->content->buffers) >= QD_QLIMIT_Q2_UPPER;
+}
+
+
+bool qd_message_Q2_holdoff_should_unblock(qd_message_t *msg)
+{
+    return DEQ_SIZE(((qd_message_pvt_t*)msg)->content->buffers) < QD_QLIMIT_Q2_LOWER;
+}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ece8bfea/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index eab4ae0..aa01831 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -127,6 +127,7 @@ typedef struct {
     qd_buffer_list_t      ma_ingress;      // ingress field in outgoing message annotations
     int                   ma_phase;        // phase for the override address
     bool                  strip_annotations_in;
+    bool                  q2_input_holdoff;// hold off calling pn_link_recv 
 } qd_message_pvt_t;
 
 ALLOC_DECLARE(qd_message_t);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ece8bfea/tests/message_test.c
----------------------------------------------------------------------
diff --git a/tests/message_test.c b/tests/message_test.c
index 116e304..0ef1d8b 100644
--- a/tests/message_test.c
+++ b/tests/message_test.c
@@ -61,6 +61,17 @@ static void set_content(qd_message_content_t *content, size_t len)
 }
 
 
+static void set_content_bufs(qd_message_content_t *content, int nbufs)
+{
+    for (; nbufs > 0; nbufs--) {
+        qd_buffer_t *buf = qd_buffer();
+        size_t segment   = qd_buffer_capacity(buf);
+        qd_buffer_insert(buf, segment);
+        DEQ_INSERT_TAIL(content->buffers, buf);
+    }
+}
+
+
 static char* test_send_to_messenger(void *context)
 {
     qd_message_t         *msg     = qd_message();
@@ -314,6 +325,27 @@ static char* test_send_message_annotations(void *context)
 }
 
 
+static char* test_q2_input_holdoff_sensing(void *context)
+{
+    if (QD_QLIMIT_Q2_LOWER >= QD_QLIMIT_Q2_UPPER)
+        return "QD_LIMIT_Q2 lower limit is bigger than upper limit";
+
+    for (int nbufs=1; nbufs<QD_QLIMIT_Q2_UPPER + 1; nbufs++) {
+        qd_message_t         *msg     = qd_message();
+        qd_message_content_t *content = MSG_CONTENT(msg);
+
+        set_content_bufs(content, nbufs);
+        if (qd_message_Q2_holdoff_should_block(msg) != (nbufs >= QD_QLIMIT_Q2_UPPER))
+            return "qd_message_holdoff_would_block was miscalculated";
+        if (qd_message_Q2_holdoff_should_unblock(msg) != (nbufs < QD_QLIMIT_Q2_LOWER))
+            return "qd_message_holdoff_would_unblock was miscalculated";
+
+        qd_message_free(msg);
+    }
+    return 0;
+}
+
+
 int message_tests(void)
 {
     int result = 0;
@@ -324,6 +356,7 @@ int message_tests(void)
     TEST_CASE(test_message_properties, 0);
     TEST_CASE(test_check_multiple, 0);
     TEST_CASE(test_send_message_annotations, 0);
+    TEST_CASE(test_q2_input_holdoff_sensing, 0);
 
     return result;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org