You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2020/10/20 16:02:18 UTC

[qpid-dispatch] 03/04: DISPATCH-1654: fix for streaming message

This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit a8e548bfca44851305a8b6a59cfb21272ca668ca
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Wed Oct 7 12:09:57 2020 +0100

    DISPATCH-1654: fix for streaming message
---
 include/qpid/dispatch/amqp.h    |  2 ++
 include/qpid/dispatch/message.h | 17 +++++++++++
 include/qpid/dispatch/parse.h   |  2 ++
 src/adaptors/tcp_adaptor.c      |  3 ++
 src/amqp.c                      |  1 +
 src/message.c                   | 28 +++++++++++++++++-
 src/message_private.h           |  2 ++
 src/parse.c                     | 12 +++++++-
 src/router_node.c               | 64 ++++++++++++++++++++---------------------
 9 files changed, 97 insertions(+), 34 deletions(-)

diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index aca440c..6c1064f 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -112,6 +112,7 @@ extern const char * const QD_MA_TRACE;    ///< Trace
 extern const char * const QD_MA_TO;       ///< To-Override
 extern const char * const QD_MA_PHASE;    ///< Phase for override address
 extern const char * const QD_MA_CLASS;    ///< Message-Class
+extern const char * const QD_MA_STREAM;   ///< Indicate streaming message
 
 #define QD_MA_PREFIX_LEN  (9)
 #define QD_MA_INGRESS_LEN (16)
@@ -119,6 +120,7 @@ extern const char * const QD_MA_CLASS;    ///< Message-Class
 #define QD_MA_TO_LEN      (11)
 #define QD_MA_PHASE_LEN   (14)
 #define QD_MA_CLASS_LEN   (14)
+#define QD_MA_STREAM_LEN  (15)
 
 extern const int          QD_MA_MAX_KEY_LEN;  ///< strlen of longest key name
 extern const int          QD_MA_N_KEYS;       ///< number of router annotation keys
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 9978d1c..b797eca 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -197,6 +197,23 @@ void qd_message_set_phase_annotation(qd_message_t *msg, int phase);
 int  qd_message_get_phase_annotation(const qd_message_t *msg);
 
 /**
+ * Indicate whether message should be considered to be streaming.
+ *
+ * @param msg Pointer to an outgoing message.
+ * @param stream true if the message is streaming
+ *
+ */
+void qd_message_set_stream_annotation(qd_message_t *msg, bool stream);
+/**
+ * Test whether received message should be considered to be streaming.
+ *
+ * @param msg Pointer to an outgoing message.
+ * @return true if the received message has the streaming annotation set, else false.
+ *
+ */
+int qd_message_is_streaming(qd_message_t *msg);
+
+/**
  * Set the value for the QD_MA_INGRESS field in the outgoing message
  * annotations for the message.
  *
diff --git a/include/qpid/dispatch/parse.h b/include/qpid/dispatch/parse.h
index 7fed15d..f6e5fd7 100644
--- a/include/qpid/dispatch/parse.h
+++ b/include/qpid/dispatch/parse.h
@@ -301,6 +301,7 @@ void qd_parse_annotations(
     qd_parsed_field_t    **ma_phase,
     qd_parsed_field_t    **ma_to_override,
     qd_parsed_field_t    **ma_trace,
+    qd_parsed_field_t    **ma_stream,
     qd_iterator_pointer_t *blob_pointer,
     uint32_t              *blob_item_count);
 
@@ -312,6 +313,7 @@ typedef enum {
     QD_MAE_TRACE,
     QD_MAE_TO,
     QD_MAE_PHASE,
+    QD_MAE_STREAM,
     QD_MAE_NONE
 } qd_ma_enum_t;
 
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index f5d0ac7..cd22384 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -145,6 +145,8 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
     } else {
         qd_message_t *msg = qd_message();
 
+        qd_message_set_stream_annotation(msg, true);
+
         qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
         qd_compose_start_list(props);
         qd_compose_insert_null(props);                      // message-id
@@ -966,6 +968,7 @@ static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context)
     adaptor->log_source  = qd_log_source("TCP_ADAPTOR");
     DEQ_INIT(adaptor->listeners);
     DEQ_INIT(adaptor->connectors);
+    DEQ_INIT(adaptor->connections);
     *adaptor_context = adaptor;
 
     tcp_adaptor = adaptor;
diff --git a/src/amqp.c b/src/amqp.c
index 7c9d9c0..8da6db2 100644
--- a/src/amqp.c
+++ b/src/amqp.c
@@ -30,6 +30,7 @@ const char * const QD_MA_TRACE   = "x-opt-qd.trace";
 const char * const QD_MA_TO      = "x-opt-qd.to";
 const char * const QD_MA_PHASE   = "x-opt-qd.phase";
 const char * const QD_MA_CLASS   = "x-opt-qd.class";
+const char * const QD_MA_STREAM  = "x-opt-qd.stream";
 const int          QD_MA_MAX_KEY_LEN = 16;
 const int          QD_MA_N_KEYS      = 5;  // max number of router annotations to send/receive
 const int          QD_MA_FILTER_LEN  = 5;  // N tailing inbound entries to search for stripping
diff --git a/src/message.c b/src/message.c
index a3a8152..360fd84 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1100,6 +1100,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
     qd_buffer_list_clone(&copy->ma_trace, &msg->ma_trace);
     qd_buffer_list_clone(&copy->ma_ingress, &msg->ma_ingress);
     copy->ma_phase = msg->ma_phase;
+    copy->ma_stream = msg->ma_stream;
     copy->strip_annotations_in  = msg->strip_annotations_in;
 
     copy->content = content;
@@ -1131,6 +1132,7 @@ void qd_message_message_annotations(qd_message_t *in_msg)
     if (content->ma_field_iter_in == 0)
         return;
 
+    qd_parsed_field_t *ma_pf_stream = 0;
     qd_parse_annotations(
         msg->strip_annotations_in,
         content->ma_field_iter_in,
@@ -1138,6 +1140,7 @@ void qd_message_message_annotations(qd_message_t *in_msg)
         &content->ma_pf_phase,
         &content->ma_pf_to_override,
         &content->ma_pf_trace,
+        &ma_pf_stream,
         &content->ma_user_annotation_blob,
         &content->ma_count);
 
@@ -1157,6 +1160,10 @@ void qd_message_message_annotations(qd_message_t *in_msg)
         content->ma_int_phase = qd_parse_as_int(content->ma_pf_phase);
     }
 
+    if (ma_pf_stream) {
+        content->ma_stream = qd_parse_as_int(ma_pf_stream);
+    }
+
     return;
 }
 
@@ -1189,6 +1196,12 @@ int qd_message_get_phase_annotation(const qd_message_t *in_msg)
     return msg->ma_phase;
 }
 
+void qd_message_set_stream_annotation(qd_message_t *in_msg, bool stream)
+{
+    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+    msg->ma_stream = stream;
+}
+
 void qd_message_set_ingress_annotation(qd_message_t *in_msg, qd_composed_field_t *ingress_field)
 {
     qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
@@ -1613,7 +1626,8 @@ static void compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list
     if (!DEQ_IS_EMPTY(msg->ma_to_override) ||
         !DEQ_IS_EMPTY(msg->ma_trace) ||
         !DEQ_IS_EMPTY(msg->ma_ingress) ||
-        msg->ma_phase != 0) {
+        msg->ma_phase != 0 ||
+        msg->ma_stream) {
 
         if (!map_started) {
             qd_compose_start_map(out_ma);
@@ -1643,6 +1657,12 @@ static void compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list
             qd_compose_insert_int(field, msg->ma_phase);
             field_count++;
         }
+
+        if (msg->ma_stream) {
+            qd_compose_insert_symbol(field, QD_MA_STREAM);
+            qd_compose_insert_int(field, msg->ma_stream);
+            field_count++;
+        }
         // pad out to N fields
         for  (; field_count < QD_MA_N_KEYS; field_count++) {
             qd_compose_insert_symbol(field, QD_MA_PREFIX);
@@ -2126,6 +2146,7 @@ qd_message_depth_status_t qd_message_check_depth(const qd_message_t *in_msg, qd_
     qd_message_depth_status_t    result;
 
     LOCK(content->lock);
+    //printf("qd_message_check_depth(%p, %i)\n", (void*) in_msg, depth);
     result = qd_message_check_LH(content, depth);
     UNLOCK(content->lock);
     return result;
@@ -2671,6 +2692,11 @@ int qd_message_get_phase_val(qd_message_t *msg)
     return ((qd_message_pvt_t*)msg)->content->ma_int_phase;
 }
 
+int qd_message_is_streaming(qd_message_t *msg)
+{
+    return ((qd_message_pvt_t*)msg)->content->ma_stream;
+}
+
 
 void qd_message_Q2_holdoff_disable(qd_message_t *msg)
 {
diff --git a/src/message_private.h b/src/message_private.h
index 02f002c..a6ca077 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -121,6 +121,7 @@ typedef struct {
     qd_parsed_field_t   *ma_pf_to_override;
     qd_parsed_field_t   *ma_pf_trace;
     int                  ma_int_phase;
+    bool                 ma_stream;
     uint64_t             max_message_size;               // configured max; 0 if no max to enforce
     uint64_t             bytes_received;                 // bytes returned by pn_link_recv() when enforcing max_message_size
     uint32_t             fanout;                         // The number of receivers for this message, including in-process subscribers.
@@ -148,6 +149,7 @@ struct qd_message_pvt_t {
     qd_buffer_list_t             ma_trace;        // trace list in outgoing message annotations
     qd_buffer_list_t             ma_ingress;      // ingress field in outgoing message annotations
     int                          ma_phase;        // phase for the override address
+    bool                         ma_stream;       // indicates whether this message is streaming
     qd_message_body_data_list_t  body_data_list;  // TODO - move this to the content for one-time parsing (TLR)
     qd_message_body_data_t      *next_body_data;
     unsigned char               *body_cursor;
diff --git a/src/parse.c b/src/parse.c
index 25d303d..3b43c10 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -777,6 +777,7 @@ const char *qd_parse_annotations_v1(
     qd_parsed_field_t    **ma_phase,
     qd_parsed_field_t    **ma_to_override,
     qd_parsed_field_t    **ma_trace,
+    qd_parsed_field_t    **ma_stream,
     qd_iterator_pointer_t *blob_pointer,
     uint32_t              *blob_item_count)
 {
@@ -850,6 +851,11 @@ const char *qd_parse_annotations_v1(
                         ma_type = QD_MAE_INGRESS;
                     }
                     break;
+                case QD_MA_STREAM_LEN:
+                    if (memcmp(QD_MA_STREAM + QMPL, dp, QD_MA_STREAM_LEN - QMPL) == 0) {
+                        ma_type = QD_MAE_STREAM;
+                    }
+                    break;
                 default:
                     // padding annotations are ignored here
                     break;
@@ -885,6 +891,9 @@ const char *qd_parse_annotations_v1(
                     case QD_MAE_PHASE:
                         *ma_phase = val_field;
                         break;
+                    case QD_MAE_STREAM:
+                        *ma_stream = val_field;
+                        break;
                     case QD_MAE_NONE:
                         assert(false);
                         break;
@@ -920,6 +929,7 @@ void qd_parse_annotations(
     qd_parsed_field_t    **ma_phase,
     qd_parsed_field_t    **ma_to_override,
     qd_parsed_field_t    **ma_trace,
+    qd_parsed_field_t    **ma_stream,
     qd_iterator_pointer_t *blob_pointer,
     uint32_t              *blob_item_count)
 {
@@ -958,7 +968,7 @@ void qd_parse_annotations(
     qd_iterator_free(raw_iter);
 
     (void) qd_parse_annotations_v1(strip_annotations_in, ma_iter_in, ma_ingress, ma_phase,
-                                    ma_to_override, ma_trace,
+                                    ma_to_override, ma_trace, ma_stream,
                                     blob_pointer, blob_item_count);
 
     return;
diff --git a/src/router_node.c b/src/router_node.c
index 956ce77..aa4cf04 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -491,38 +491,6 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     }
 
     //
-    // Head of line blocking avoidance (DISPATCH-1545)
-    //
-    // Before we can forward a message we need to determine whether or not this
-    // message is "streaming" - a large message that has the potential to block
-    // other messages sharing the trunk link.  At this point we cannot for sure
-    // know the actual length of the incoming message, so we employ the
-    // following heuristic to determine if the message is "streaming":
-    //
-    // - If the message is receive-complete it is NOT a streaming message.
-    // - If it is NOT receive-complete:
-    //   Continue buffering incoming data until:
-    //   - receive has completed => NOT a streaming message
-    //   - not rx-complete AND Q2 threshold hit => a streaming message
-    //
-    // Once Q2 is hit we MUST forward the message regardless of rx-complete
-    // since Q2 will block forever unless the incoming data is drained via
-    // forwarding.
-    //
-    if (!receive_complete) {
-        if (qd_message_is_Q2_blocked(msg)) {
-            qd_log(router->log_source, QD_LOG_DEBUG,
-                   "[C%"PRIu64" L%"PRIu64"] Incoming message classified as streaming. User:%s",
-                   conn->connection_id,
-                   qd_link_link_id(link),
-                   conn->user_id);
-        } else {
-            // Continue buffering this message
-            return false;
-        }
-    }
-
-    //
     // Determine if the incoming link is anonymous.  If the link is addressed,
     // there are some optimizations we can take advantage of.
     //
@@ -607,6 +575,38 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions, &distance, &ingress_index);
 
     //
+    // Head of line blocking avoidance (DISPATCH-1545)
+    //
+    // Before we can forward a message we need to determine whether or not this
+    // message is "streaming" - a large message that has the potential to block
+    // other messages sharing the trunk link.  At this point we cannot for sure
+    // know the actual length of the incoming message, so we employ the
+    // following heuristic to determine if the message is "streaming":
+    //
+    // - If the message is receive-complete it is NOT a streaming message.
+    // - If it is NOT receive-complete:
+    //   Continue buffering incoming data until:
+    //   - receive has completed => NOT a streaming message
+    //   - not rx-complete AND Q2 threshold hit => a streaming message
+    //
+    // Once Q2 is hit we MUST forward the message regardless of rx-complete
+    // since Q2 will block forever unless the incoming data is drained via
+    // forwarding.
+    //
+    if (!receive_complete) {
+        if (qd_message_is_streaming(msg) || qd_message_is_Q2_blocked(msg)) {
+            qd_log(router->log_source, QD_LOG_DEBUG,
+                   "[C%"PRIu64" L%"PRIu64"] Incoming message classified as streaming. User:%s",
+                   conn->connection_id,
+                   qd_link_link_id(link),
+                   conn->user_id);
+        } else {
+            // Continue buffering this message
+            return false;
+        }
+    }
+
+    //
     // If this delivery has traveled further than the known radius of the network topology (plus 1),
     // release and settle the delivery.  This can happen in the case of "flood" multicast where the
     // deliveries follow all available paths.  This will only discard messages that will reach their


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org