You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2020/10/20 16:02:18 UTC
[qpid-dispatch] 03/04: DISPATCH-1654: fix for streaming message
This is an automated email from the ASF dual-hosted git repository.
chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit a8e548bfca44851305a8b6a59cfb21272ca668ca
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Wed Oct 7 12:09:57 2020 +0100
DISPATCH-1654: fix for streaming message
---
include/qpid/dispatch/amqp.h | 2 ++
include/qpid/dispatch/message.h | 17 +++++++++++
include/qpid/dispatch/parse.h | 2 ++
src/adaptors/tcp_adaptor.c | 3 ++
src/amqp.c | 1 +
src/message.c | 28 +++++++++++++++++-
src/message_private.h | 2 ++
src/parse.c | 12 +++++++-
src/router_node.c | 64 ++++++++++++++++++++---------------------
9 files changed, 97 insertions(+), 34 deletions(-)
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index aca440c..6c1064f 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -112,6 +112,7 @@ extern const char * const QD_MA_TRACE; ///< Trace
extern const char * const QD_MA_TO; ///< To-Override
extern const char * const QD_MA_PHASE; ///< Phase for override address
extern const char * const QD_MA_CLASS; ///< Message-Class
+extern const char * const QD_MA_STREAM; ///< Indicate streaming message
#define QD_MA_PREFIX_LEN (9)
#define QD_MA_INGRESS_LEN (16)
@@ -119,6 +120,7 @@ extern const char * const QD_MA_CLASS; ///< Message-Class
#define QD_MA_TO_LEN (11)
#define QD_MA_PHASE_LEN (14)
#define QD_MA_CLASS_LEN (14)
+#define QD_MA_STREAM_LEN (15)
extern const int QD_MA_MAX_KEY_LEN; ///< strlen of longest key name
extern const int QD_MA_N_KEYS; ///< number of router annotation keys
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 9978d1c..b797eca 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -197,6 +197,23 @@ void qd_message_set_phase_annotation(qd_message_t *msg, int phase);
int qd_message_get_phase_annotation(const qd_message_t *msg);
/**
+ * Indicate whether message should be considered to be streaming.
+ *
+ * @param msg Pointer to an outgoing message.
+ * @param stream true if the message is streaming
+ *
+ */
+void qd_message_set_stream_annotation(qd_message_t *msg, bool stream);
+/**
+ * Test whether received message should be considered to be streaming.
+ *
+ * @param msg Pointer to an outgoing message.
+ * @return true if the received message has the streaming annotation set, else false.
+ *
+ */
+int qd_message_is_streaming(qd_message_t *msg);
+
+/**
* Set the value for the QD_MA_INGRESS field in the outgoing message
* annotations for the message.
*
diff --git a/include/qpid/dispatch/parse.h b/include/qpid/dispatch/parse.h
index 7fed15d..f6e5fd7 100644
--- a/include/qpid/dispatch/parse.h
+++ b/include/qpid/dispatch/parse.h
@@ -301,6 +301,7 @@ void qd_parse_annotations(
qd_parsed_field_t **ma_phase,
qd_parsed_field_t **ma_to_override,
qd_parsed_field_t **ma_trace,
+ qd_parsed_field_t **ma_stream,
qd_iterator_pointer_t *blob_pointer,
uint32_t *blob_item_count);
@@ -312,6 +313,7 @@ typedef enum {
QD_MAE_TRACE,
QD_MAE_TO,
QD_MAE_PHASE,
+ QD_MAE_STREAM,
QD_MAE_NONE
} qd_ma_enum_t;
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index f5d0ac7..cd22384 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -145,6 +145,8 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
} else {
qd_message_t *msg = qd_message();
+ qd_message_set_stream_annotation(msg, true);
+
qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
qd_compose_start_list(props);
qd_compose_insert_null(props); // message-id
@@ -966,6 +968,7 @@ static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context)
adaptor->log_source = qd_log_source("TCP_ADAPTOR");
DEQ_INIT(adaptor->listeners);
DEQ_INIT(adaptor->connectors);
+ DEQ_INIT(adaptor->connections);
*adaptor_context = adaptor;
tcp_adaptor = adaptor;
diff --git a/src/amqp.c b/src/amqp.c
index 7c9d9c0..8da6db2 100644
--- a/src/amqp.c
+++ b/src/amqp.c
@@ -30,6 +30,7 @@ const char * const QD_MA_TRACE = "x-opt-qd.trace";
const char * const QD_MA_TO = "x-opt-qd.to";
const char * const QD_MA_PHASE = "x-opt-qd.phase";
const char * const QD_MA_CLASS = "x-opt-qd.class";
+const char * const QD_MA_STREAM = "x-opt-qd.stream";
const int QD_MA_MAX_KEY_LEN = 16;
const int QD_MA_N_KEYS = 5; // max number of router annotations to send/receive
const int QD_MA_FILTER_LEN = 5; // N tailing inbound entries to search for stripping
diff --git a/src/message.c b/src/message.c
index a3a8152..360fd84 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1100,6 +1100,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
qd_buffer_list_clone(©->ma_trace, &msg->ma_trace);
qd_buffer_list_clone(©->ma_ingress, &msg->ma_ingress);
copy->ma_phase = msg->ma_phase;
+ copy->ma_stream = msg->ma_stream;
copy->strip_annotations_in = msg->strip_annotations_in;
copy->content = content;
@@ -1131,6 +1132,7 @@ void qd_message_message_annotations(qd_message_t *in_msg)
if (content->ma_field_iter_in == 0)
return;
+ qd_parsed_field_t *ma_pf_stream = 0;
qd_parse_annotations(
msg->strip_annotations_in,
content->ma_field_iter_in,
@@ -1138,6 +1140,7 @@ void qd_message_message_annotations(qd_message_t *in_msg)
&content->ma_pf_phase,
&content->ma_pf_to_override,
&content->ma_pf_trace,
+ &ma_pf_stream,
&content->ma_user_annotation_blob,
&content->ma_count);
@@ -1157,6 +1160,10 @@ void qd_message_message_annotations(qd_message_t *in_msg)
content->ma_int_phase = qd_parse_as_int(content->ma_pf_phase);
}
+ if (ma_pf_stream) {
+ content->ma_stream = qd_parse_as_int(ma_pf_stream);
+ }
+
return;
}
@@ -1189,6 +1196,12 @@ int qd_message_get_phase_annotation(const qd_message_t *in_msg)
return msg->ma_phase;
}
+void qd_message_set_stream_annotation(qd_message_t *in_msg, bool stream)
+{
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+ msg->ma_stream = stream;
+}
+
void qd_message_set_ingress_annotation(qd_message_t *in_msg, qd_composed_field_t *ingress_field)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
@@ -1613,7 +1626,8 @@ static void compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list
if (!DEQ_IS_EMPTY(msg->ma_to_override) ||
!DEQ_IS_EMPTY(msg->ma_trace) ||
!DEQ_IS_EMPTY(msg->ma_ingress) ||
- msg->ma_phase != 0) {
+ msg->ma_phase != 0 ||
+ msg->ma_stream) {
if (!map_started) {
qd_compose_start_map(out_ma);
@@ -1643,6 +1657,12 @@ static void compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list
qd_compose_insert_int(field, msg->ma_phase);
field_count++;
}
+
+ if (msg->ma_stream) {
+ qd_compose_insert_symbol(field, QD_MA_STREAM);
+ qd_compose_insert_int(field, msg->ma_stream);
+ field_count++;
+ }
// pad out to N fields
for (; field_count < QD_MA_N_KEYS; field_count++) {
qd_compose_insert_symbol(field, QD_MA_PREFIX);
@@ -2126,6 +2146,7 @@ qd_message_depth_status_t qd_message_check_depth(const qd_message_t *in_msg, qd_
qd_message_depth_status_t result;
LOCK(content->lock);
+ //printf("qd_message_check_depth(%p, %i)\n", (void*) in_msg, depth);
result = qd_message_check_LH(content, depth);
UNLOCK(content->lock);
return result;
@@ -2671,6 +2692,11 @@ int qd_message_get_phase_val(qd_message_t *msg)
return ((qd_message_pvt_t*)msg)->content->ma_int_phase;
}
+int qd_message_is_streaming(qd_message_t *msg)
+{
+ return ((qd_message_pvt_t*)msg)->content->ma_stream;
+}
+
void qd_message_Q2_holdoff_disable(qd_message_t *msg)
{
diff --git a/src/message_private.h b/src/message_private.h
index 02f002c..a6ca077 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -121,6 +121,7 @@ typedef struct {
qd_parsed_field_t *ma_pf_to_override;
qd_parsed_field_t *ma_pf_trace;
int ma_int_phase;
+ bool ma_stream;
uint64_t max_message_size; // configured max; 0 if no max to enforce
uint64_t bytes_received; // bytes returned by pn_link_recv() when enforcing max_message_size
uint32_t fanout; // The number of receivers for this message, including in-process subscribers.
@@ -148,6 +149,7 @@ struct qd_message_pvt_t {
qd_buffer_list_t ma_trace; // trace list in outgoing message annotations
qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations
int ma_phase; // phase for the override address
+ bool ma_stream; // indicates whether this message is streaming
qd_message_body_data_list_t body_data_list; // TODO - move this to the content for one-time parsing (TLR)
qd_message_body_data_t *next_body_data;
unsigned char *body_cursor;
diff --git a/src/parse.c b/src/parse.c
index 25d303d..3b43c10 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -777,6 +777,7 @@ const char *qd_parse_annotations_v1(
qd_parsed_field_t **ma_phase,
qd_parsed_field_t **ma_to_override,
qd_parsed_field_t **ma_trace,
+ qd_parsed_field_t **ma_stream,
qd_iterator_pointer_t *blob_pointer,
uint32_t *blob_item_count)
{
@@ -850,6 +851,11 @@ const char *qd_parse_annotations_v1(
ma_type = QD_MAE_INGRESS;
}
break;
+ case QD_MA_STREAM_LEN:
+ if (memcmp(QD_MA_STREAM + QMPL, dp, QD_MA_STREAM_LEN - QMPL) == 0) {
+ ma_type = QD_MAE_STREAM;
+ }
+ break;
default:
// padding annotations are ignored here
break;
@@ -885,6 +891,9 @@ const char *qd_parse_annotations_v1(
case QD_MAE_PHASE:
*ma_phase = val_field;
break;
+ case QD_MAE_STREAM:
+ *ma_stream = val_field;
+ break;
case QD_MAE_NONE:
assert(false);
break;
@@ -920,6 +929,7 @@ void qd_parse_annotations(
qd_parsed_field_t **ma_phase,
qd_parsed_field_t **ma_to_override,
qd_parsed_field_t **ma_trace,
+ qd_parsed_field_t **ma_stream,
qd_iterator_pointer_t *blob_pointer,
uint32_t *blob_item_count)
{
@@ -958,7 +968,7 @@ void qd_parse_annotations(
qd_iterator_free(raw_iter);
(void) qd_parse_annotations_v1(strip_annotations_in, ma_iter_in, ma_ingress, ma_phase,
- ma_to_override, ma_trace,
+ ma_to_override, ma_trace, ma_stream,
blob_pointer, blob_item_count);
return;
diff --git a/src/router_node.c b/src/router_node.c
index 956ce77..aa4cf04 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -491,38 +491,6 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
}
//
- // Head of line blocking avoidance (DISPATCH-1545)
- //
- // Before we can forward a message we need to determine whether or not this
- // message is "streaming" - a large message that has the potential to block
- // other messages sharing the trunk link. At this point we cannot for sure
- // know the actual length of the incoming message, so we employ the
- // following heuristic to determine if the message is "streaming":
- //
- // - If the message is receive-complete it is NOT a streaming message.
- // - If it is NOT receive-complete:
- // Continue buffering incoming data until:
- // - receive has completed => NOT a streaming message
- // - not rx-complete AND Q2 threshold hit => a streaming message
- //
- // Once Q2 is hit we MUST forward the message regardless of rx-complete
- // since Q2 will block forever unless the incoming data is drained via
- // forwarding.
- //
- if (!receive_complete) {
- if (qd_message_is_Q2_blocked(msg)) {
- qd_log(router->log_source, QD_LOG_DEBUG,
- "[C%"PRIu64" L%"PRIu64"] Incoming message classified as streaming. User:%s",
- conn->connection_id,
- qd_link_link_id(link),
- conn->user_id);
- } else {
- // Continue buffering this message
- return false;
- }
- }
-
- //
// Determine if the incoming link is anonymous. If the link is addressed,
// there are some optimizations we can take advantage of.
//
@@ -607,6 +575,38 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions, &distance, &ingress_index);
//
+ // Head of line blocking avoidance (DISPATCH-1545)
+ //
+ // Before we can forward a message we need to determine whether or not this
+ // message is "streaming" - a large message that has the potential to block
+ // other messages sharing the trunk link. At this point we cannot for sure
+ // know the actual length of the incoming message, so we employ the
+ // following heuristic to determine if the message is "streaming":
+ //
+ // - If the message is receive-complete it is NOT a streaming message.
+ // - If it is NOT receive-complete:
+ // Continue buffering incoming data until:
+ // - receive has completed => NOT a streaming message
+ // - not rx-complete AND Q2 threshold hit => a streaming message
+ //
+ // Once Q2 is hit we MUST forward the message regardless of rx-complete
+ // since Q2 will block forever unless the incoming data is drained via
+ // forwarding.
+ //
+ if (!receive_complete) {
+ if (qd_message_is_streaming(msg) || qd_message_is_Q2_blocked(msg)) {
+ qd_log(router->log_source, QD_LOG_DEBUG,
+ "[C%"PRIu64" L%"PRIu64"] Incoming message classified as streaming. User:%s",
+ conn->connection_id,
+ qd_link_link_id(link),
+ conn->user_id);
+ } else {
+ // Continue buffering this message
+ return false;
+ }
+ }
+
+ //
// If this delivery has traveled further than the known radius of the network topology (plus 1),
// release and settle the delivery. This can happen in the case of "flood" multicast where the
// deliveries follow all available paths. This will only discard messages that will reach their
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org