You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/10/28 20:47:10 UTC
[qpid-dispatch] branch dev-protocol-adaptors-2 updated:
DISPATCH-1742 - Added function for asynchronous completion of
sent-messages. Improved API docs. DISPATCH-1742 - Updated reference adaptor
to make multiple extend calls. DISPATCH-1804 - Changed body_data API to
stream_data to encompass both body_data and footer performatives.
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
new cef0f5a DISPATCH-1742 - Added function for asynchronous completion of sent-messages. Improved API docs. DISPATCH-1742 - Updated reference adaptor to make multiple extend calls. DISPATCH-1804 - Changed body_data API to stream_data to encompass both body_data and footer performatives.
cef0f5a is described below
commit cef0f5a5ffae788b2692551775df9e4daba1305b
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Oct 28 16:44:19 2020 -0400
DISPATCH-1742 - Added function for asynchronous completion of sent-messages. Improved API docs.
DISPATCH-1742 - Updated reference adaptor to make multiple extend calls.
DISPATCH-1804 - Changed body_data API to stream_data to encompass both body_data and footer performatives.
---
include/qpid/dispatch/http1_codec.h | 12 +--
include/qpid/dispatch/message.h | 80 +++++++-------
include/qpid/dispatch/protocol_adaptor.h | 37 +++++++
src/adaptors/http1/http1_adaptor.c | 20 ++--
src/adaptors/http1/http1_client.c | 32 +++---
src/adaptors/http1/http1_codec.c | 16 +--
src/adaptors/http1/http1_private.h | 8 +-
src/adaptors/http1/http1_server.c | 28 ++---
src/adaptors/http2/http2_adaptor.c | 130 +++++++++++------------
src/adaptors/http2/http2_adaptor.h | 12 +--
src/adaptors/reference_adaptor.c | 94 +++++++++++------
src/adaptors/tcp_adaptor.c | 50 +++++----
src/message.c | 173 +++++++++++++++++--------------
src/message_private.h | 42 ++++----
src/router_core/transfer.c | 42 +++++++-
tests/message_test.c | 103 +++++++++---------
16 files changed, 498 insertions(+), 381 deletions(-)
diff --git a/include/qpid/dispatch/http1_codec.h b/include/qpid/dispatch/http1_codec.h
index 6967cc0..132f06c 100644
--- a/include/qpid/dispatch/http1_codec.h
+++ b/include/qpid/dispatch/http1_codec.h
@@ -87,12 +87,12 @@ typedef struct h1_codec_config_t {
//
void (*tx_buffers)(h1_codec_request_state_t *hrs, qd_buffer_list_t *data, unsigned int len);
- // tx_body_data()
- // Called with body_data containing encoded HTTP message data. Only
+ // tx_stream_data()
+ // Called with stream_data containing encoded HTTP message data. Only
// called if the outgoing HTTP message has a body. The caller assumes
- // ownership of the body_data and must release it when done.
+ // ownership of the stream_data and must release it when done.
//
- void (*tx_body_data)(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data);
+ void (*tx_stream_data)(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data);
//
// RX message callbacks
@@ -227,9 +227,9 @@ int h1_codec_tx_response(h1_codec_request_state_t *hrs, int status_code, const c
//
int h1_codec_tx_add_header(h1_codec_request_state_t *hrs, const char *key, const char *value);
-// Stream outgoing body data. Ownership of body_data is passed to the caller.
+// Stream outgoing body data. Ownership of stream_data is passed to the caller.
//
-int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data);
+int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data);
// outgoing message construction complete. The request_complete() callback MAY
// occur during this call.
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index b797eca..41ec73b 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -59,8 +59,8 @@
// Callback for status change (confirmed persistent, loaded-in-memory, etc.)
-typedef struct qd_message_t qd_message_t;
-typedef struct qd_message_body_data_t qd_message_body_data_t;
+typedef struct qd_message_t qd_message_t;
+typedef struct qd_message_stream_data_t qd_message_stream_data_t;
/** Amount of message to be parsed. */
typedef enum {
@@ -310,93 +310,93 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field);
/**
- * qd_message_body_data_iterator
+ * qd_message_stream_data_iterator
*
* Return an iterator that references the content (not the performative headers)
* of the entire body-data section.
*
* The returned iterator must eventually be freed by the caller.
*
- * @param body_data Pointer to a body_data object produced by qd_message_next_body_data
- * @return Pointer to an iterator referencing the body_data content
+ * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
+ * @return Pointer to an iterator referencing the stream_data content
*/
-qd_iterator_t *qd_message_body_data_iterator(const qd_message_body_data_t *body_data);
+qd_iterator_t *qd_message_stream_data_iterator(const qd_message_stream_data_t *stream_data);
/**
- * qd_message_body_data_buffer_count
+ * qd_message_stream_data_buffer_count
*
* Return the number of buffers that are needed to hold this body-data's content.
*
- * @param body_data Pointer to a body_data object produced by qd_message_next_body_data
- * @return Number of pn_raw_buffers needed to contain the entire content of this body_data.
+ * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
+ * @return Number of pn_raw_buffers needed to contain the entire content of this stream_data.
*/
-int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data);
+int qd_message_stream_data_buffer_count(const qd_message_stream_data_t *stream_data);
/**
- * qd_message_body_data_buffers
+ * qd_message_stream_data_buffers
*
- * Populate an array of pn_raw_buffer_t objects with references to the body_data's content.
+ * Populate an array of pn_raw_buffer_t objects with references to the stream_data's content.
*
- * @param body_data Pointer to a body_data object produced by qd_message_next_body_data
+ * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
* @param buffers Pointer to an array of pn_raw_buffer_t objects
- * @param offset The offset (in the body_data's buffer set) from which copying should begin
+ * @param offset The offset (in the stream_data's buffer set) from which copying should begin
* @param count The number of pn_raw_buffer_t objects in the buffers array
* @return The number of pn_raw_buffer_t objects that were overwritten
*/
-int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffer_t *buffers, int offset, int count);
+int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw_buffer_t *buffers, int offset, int count);
/**
- * qd_message_body_data_payload_length
+ * qd_message_stream_data_payload_length
*
- * Given a body_data object, return the length of the payload.
- * This will equal the sum of the length of all qd_buffer_t objects contained in payload portion of the body_data object
+ * Given a stream_data object, return the length of the payload.
+ * This will equal the sum of the length of all qd_buffer_t objects contained in payload portion of the stream_data object
*
- * @param body_data Pointer to a body_data object produced by qd_message_next_body_data
+ * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
* @return The length of the payload of the passed in body data object.
*/
-size_t qd_message_body_data_payload_length(const qd_message_body_data_t *body_data);
+size_t qd_message_stream_data_payload_length(const qd_message_stream_data_t *stream_data);
/**
- * qd_message_body_data_release
+ * qd_message_stream_data_release
*
* Release buffers that were associated with a body-data section. It is not required that body-data
* objects be released in the same order in which they were offered.
*
- * Once this function is called, the caller must drop its reference to the body_data object
+ * Once this function is called, the caller must drop its reference to the stream_data object
* and not use it again.
*
- * @param body_data Pointer to a body data object returned by qd_message_next_body_data
+ * @param stream_data Pointer to a body data object returned by qd_message_next_stream_data
*/
-void qd_message_body_data_release(qd_message_body_data_t *body_data);
+void qd_message_stream_data_release(qd_message_stream_data_t *stream_data);
typedef enum {
- QD_MESSAGE_BODY_DATA_OK, // A valid body data object have been returned
- QD_MESSAGE_BODY_DATA_INCOMPLETE, // The next body data is incomplete, try again later
- QD_MESSAGE_BODY_DATA_NO_MORE, // There are no more body data objects in this stream
- QD_MESSAGE_BODY_DATA_INVALID, // The next body data is invalid, the stream is corrupted
- QD_MESSAGE_BODY_DATA_NOT_DATA // The body of the message is not a DATA segment
-} qd_message_body_data_result_t;
+ QD_MESSAGE_STREAM_DATA_BODY_OK, // A valid body data object has been returned
+ QD_MESSAGE_STREAM_DATA_FOOTER_OK, // A valid footer has been returned
+ QD_MESSAGE_STREAM_DATA_INCOMPLETE, // The next body data is incomplete, try again later
+ QD_MESSAGE_STREAM_DATA_NO_MORE, // There are no more body data objects in this stream
+ QD_MESSAGE_STREAM_DATA_INVALID // The next body data is invalid, the stream is corrupted
+} qd_message_stream_data_result_t;
/**
- * qd_message_next_body_data
+ * qd_message_next_stream_data
*
* Get the next body-data section from this streaming message return the result and
- * possibly the valid, completed body_data object.
+ * possibly the valid, completed stream_data object.
*
* @param msg Pointer to a message
- * @param body_data Output pointer to a body_data object (or 0 if not OK)
- * @return The body_data_result describing the result of this operation
+ * @param stream_data Output pointer to a stream_data object (or 0 if not OK)
+ * @return The stream_data_result describing the result of this operation
*/
-qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *msg, qd_message_body_data_t **body_data);
+qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *msg, qd_message_stream_data_t **stream_data);
/**
- * qd_message_body_data_append
+ * qd_message_stream_data_append
*
* Append the buffers in data as a sequence of one or more BODY_DATA sections
* to the given message. The buffers in data are moved into the message
@@ -406,7 +406,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *msg, qd_me
* @param data List of buffers containing body data.
* @return The number of buffers stored in the message's content
*/
-int qd_message_body_data_append(qd_message_t *msg, qd_buffer_list_t *data);
+int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data);
/** Put string representation of a message suitable for logging in buffer.
@@ -424,7 +424,7 @@ qd_log_source_t* qd_message_log_source();
* @param msg A pointer to the message
* @return the parsed field
*/
-qd_parsed_field_t *qd_message_get_ingress (qd_message_t *msg);
+qd_parsed_field_t *qd_message_get_ingress(qd_message_t *msg);
/**
* Accessor for message field phase
@@ -432,7 +432,7 @@ qd_parsed_field_t *qd_message_get_ingress (qd_message_t *msg);
* @param msg A pointer to the message
* @return the parsed field
*/
-qd_parsed_field_t *qd_message_get_phase (qd_message_t *msg);
+qd_parsed_field_t *qd_message_get_phase(qd_message_t *msg);
/**
* Accessor for message field to_override
@@ -448,7 +448,7 @@ qd_parsed_field_t *qd_message_get_to_override(qd_message_t *msg);
* @param msg A pointer to the message
* @return the parsed field
*/
-qd_parsed_field_t *qd_message_get_trace (qd_message_t *msg);
+qd_parsed_field_t *qd_message_get_trace(qd_message_t *msg);
/**
* Accessor for message field phase
diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h
index 29c7a8b..909b860 100644
--- a/include/qpid/dispatch/protocol_adaptor.h
+++ b/include/qpid/dispatch/protocol_adaptor.h
@@ -857,8 +857,45 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
const uint8_t *tag, int tag_length,
uint64_t remote_disposition,
pn_data_t *remote_extension_state);
+
+/**
+ * qdr_link_process_deliveries
+ *
+ * This function is called by the protocol adaptor in the context of the link_push
+ * callback. It provides the core module access to the IO thread so the core can
+ * deliver outgoing messages to the adaptor.
+ *
+ * @param core Pointer to the router core object
+ * @param link Pointer to the link being processed
+ * @param credit The maximum number of deliveries to be processed on this link
+ * @return The number of deliveries that were completed during the processing
+ */
int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit);
+
+/**
+ * qdr_link_complete_sent_message
+ *
+ * If an outgoing message is completed outside of the context of the link_deliver callback,
+ * this function must be called to inform the router core that the delivery on the head of
+ * the link's undelivered list can be moved out of that list. Ensure that the send-complete
+ * status of the message has been set before calling this function. This function will check
+ * the send-complete status of the head delivery on the link's undelivered list. If it is
+ * true, that delivery will be removed from the undelivered list.
+ *
+ * DO NOT call this function from within the link_deliver callback. Use it only if you must
+ * asynchronously complete the sending of the current message.
+ *
+ * This will typically occur when a message delivered to the protcol adaptor cannot be sent
+ * on the wire due to back-pressure. In this case, the removal of the back pressure is the
+ * stimulus for completing the send of the message.
+ *
+ * @param core Pointer to the router core object
+ * @param link Pointer to the link on which the head delivery has been completed
+ */
+void qdr_link_complete_sent_message(qdr_core_t *core, qdr_link_t *link);
+
+
void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mode);
/**
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index 2207478..eb8349a 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -138,8 +138,8 @@ void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data)
qdr_http1_out_data_t *od = DEQ_HEAD(out_data->fifo);
while (od) {
DEQ_REMOVE_HEAD(out_data->fifo);
- if (od->body_data)
- qd_message_body_data_release(od->body_data);
+ if (od->stream_data)
+ qd_message_stream_data_release(od->stream_data);
else
qd_buffer_list_free_buffers(&od->raw_buffers);
free_qdr_http1_out_data_t(od);
@@ -265,9 +265,9 @@ uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_d
size_t limit = MIN(RAW_BUFFER_BATCH, od_len);
int written = 0;
- if (od->body_data) { // buffers stored in qd_message_t
+ if (od->stream_data) { // buffers stored in qd_message_t
- written = qd_message_body_data_buffers(od->body_data, buffers, od->next_buffer, limit);
+ written = qd_message_stream_data_buffers(od->stream_data, buffers, od->next_buffer, limit);
for (int i = 0; i < written; ++i) {
// enforce this: we expect the context can be used by the adaptor!
assert(buffers[i].context == 0);
@@ -352,14 +352,14 @@ void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_li
// The HTTP encoder has a message body data to be written to the raw connection.
// Queue it to the outgoing data fifo.
//
-void qdr_http1_enqueue_body_data(qdr_http1_out_data_fifo_t *fifo, qd_message_body_data_t *body_data)
+void qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, qd_message_stream_data_t *stream_data)
{
- int count = qd_message_body_data_buffer_count(body_data);
+ int count = qd_message_stream_data_buffer_count(stream_data);
if (count) {
qdr_http1_out_data_t *od = new_qdr_http1_out_data_t();
ZERO(od);
od->owning_fifo = fifo;
- od->body_data = body_data;
+ od->stream_data = stream_data;
od->buffer_count = count;
DEQ_INSERT_TAIL(fifo->fifo, od);
@@ -367,7 +367,7 @@ void qdr_http1_enqueue_body_data(qdr_http1_out_data_fifo_t *fifo, qd_message_bod
fifo->write_ptr = od;
} else {
// empty body-data
- qd_message_body_data_release(body_data);
+ qd_message_stream_data_release(stream_data);
}
}
@@ -401,8 +401,8 @@ void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn)
// all buffers returned
qdr_http1_out_data_fifo_t *fifo = od->owning_fifo;
DEQ_REMOVE(fifo->fifo, od);
- if (od->body_data)
- qd_message_body_data_release(od->body_data);
+ if (od->stream_data)
+ qd_message_stream_data_release(od->stream_data);
else
qd_buffer_list_free_buffers(&od->raw_buffers);
free_qdr_http1_out_data_t(od);
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 708d2a9..0a82599 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -86,7 +86,7 @@ ALLOC_DEFINE(_client_request_t);
static void _client_tx_buffers_cb(h1_codec_request_state_t *lib_hrs, qd_buffer_list_t *blist, unsigned int len);
-static void _client_tx_body_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_body_data_t *body_data);
+static void _client_tx_stream_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_stream_data_t *stream_data);
static int _client_rx_request_cb(h1_codec_request_state_t *lib_rs,
const char *method,
const char *target,
@@ -133,7 +133,7 @@ static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li)
h1_codec_config_t config = {0};
config.type = HTTP1_CONN_CLIENT;
config.tx_buffers = _client_tx_buffers_cb;
- config.tx_body_data = _client_tx_body_data_cb;
+ config.tx_stream_data = _client_tx_stream_data_cb;
config.rx_request = _client_rx_request_cb;
config.rx_response = _client_rx_response_cb;
config.rx_header = _client_rx_header_cb;
@@ -554,9 +554,9 @@ static void _client_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_
}
-// Encoder callback: send body_data buffers (response msg) to client endpoint
+// Encoder callback: send stream_data buffers (response msg) to client endpoint
//
-static void _client_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data)
+static void _client_tx_stream_data_cb(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data)
{
_client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
@@ -565,7 +565,7 @@ static void _client_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_bo
// client connection has been lost
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"] Discarding outgoing data - client connection closed", hconn->conn_id);
- qd_message_body_data_release(body_data);
+ qd_message_stream_data_release(stream_data);
return;
}
@@ -578,7 +578,7 @@ static void _client_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_bo
_client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
assert(rmsg);
- qdr_http1_enqueue_body_data(&rmsg->out_data, body_data);
+ qdr_http1_enqueue_stream_data(&rmsg->out_data, stream_data);
// if this happens to be the current outgoing response try writing to the
// raw connection
@@ -781,7 +781,7 @@ static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b
"[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.",
hconn->conn_id, hconn->in_link_id, len);
- qd_message_body_data_append(msg, body);
+ qd_message_stream_data_append(msg, body);
//
// Notify the router that more data is ready to be pushed out on the delivery
@@ -1119,18 +1119,18 @@ static uint64_t _encode_response_message(_client_request_t *hreq,
}
}
- qd_message_body_data_t *body_data = 0;
+ qd_message_stream_data_t *stream_data = 0;
while (true) {
- switch (qd_message_next_body_data(msg, &body_data)) {
+ switch (qd_message_next_stream_data(msg, &stream_data)) {
- case QD_MESSAGE_BODY_DATA_OK:
+ case QD_MESSAGE_STREAM_DATA_BODY_OK:
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Encoding response body data",
hconn->conn_id, hconn->out_link_id);
- if (h1_codec_tx_body(hreq->base.lib_rs, body_data)) {
+ if (h1_codec_tx_body(hreq->base.lib_rs, stream_data)) {
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] body data encode failed",
hconn->conn_id, hconn->out_link_id);
@@ -1138,18 +1138,20 @@ static uint64_t _encode_response_message(_client_request_t *hreq,
}
break;
- case QD_MESSAGE_BODY_DATA_NO_MORE:
+ case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
+ break;
+
+ case QD_MESSAGE_STREAM_DATA_NO_MORE:
// indicate this message is complete
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] response message encoding completed",
hconn->conn_id, hconn->out_link_id);
return PN_ACCEPTED;
- case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+ case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
return 0; // wait for more
- case QD_MESSAGE_BODY_DATA_INVALID:
- case QD_MESSAGE_BODY_DATA_NOT_DATA:
+ case QD_MESSAGE_STREAM_DATA_INVALID:
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Rejecting corrupted body data.",
hconn->conn_id, hconn->out_link_id);
diff --git a/src/adaptors/http1/http1_codec.c b/src/adaptors/http1/http1_codec.c
index 93bc73a..5cb16b3 100644
--- a/src/adaptors/http1/http1_codec.c
+++ b/src/adaptors/http1/http1_codec.c
@@ -966,7 +966,7 @@ static bool parse_header(h1_codec_connection_t *conn, struct decoder_t *decoder)
// Pass message body data up to the application.
//
-static inline int consume_body_data(h1_codec_connection_t *conn, bool flush)
+static inline int consume_stream_data(h1_codec_connection_t *conn, bool flush)
{
struct decoder_t *decoder = &conn->decoder;
qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
@@ -1092,7 +1092,7 @@ static bool parse_body_chunked_data(h1_codec_connection_t *conn, struct decoder_
decoder->chunk_length -= skipped;
body_ptr->remaining += skipped;
- consume_body_data(conn, false);
+ consume_stream_data(conn, false);
if (decoder->chunk_length == 0) {
// end of chunk
@@ -1117,7 +1117,7 @@ static bool parse_body_chunked_trailer(h1_codec_connection_t *conn, struct decod
body_ptr->remaining += line.remaining;
if (is_empty_line(&line)) {
// end of message
- consume_body_data(conn, true);
+ consume_stream_data(conn, true);
decoder->state = HTTP1_MSG_STATE_DONE;
}
@@ -1164,7 +1164,7 @@ static bool parse_body_content(h1_codec_connection_t *conn, struct decoder_t *de
body_ptr->remaining += skipped;
bool eom = decoder->content_length == 0;
- consume_body_data(conn, eom);
+ consume_stream_data(conn, eom);
if (eom)
decoder->state = HTTP1_MSG_STATE_DONE;
@@ -1188,7 +1188,7 @@ static bool parse_body(h1_codec_connection_t *conn, struct decoder_t *decoder)
decoder->read_ptr.remaining = 0;
decoder->read_ptr.buffer = DEQ_TAIL(decoder->incoming);
decoder->read_ptr.cursor = qd_buffer_cursor(decoder->read_ptr.buffer);
- consume_body_data(conn, true);
+ consume_stream_data(conn, true);
decoder->body_ptr = decoder->read_ptr = NULL_I_PTR;
DEQ_INIT(decoder->incoming);
}
@@ -1489,7 +1489,7 @@ static inline void _flush_headers(h1_codec_request_state_t *hrs, struct encoder_
// just forward the body chain along
-int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data)
+int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data)
{
h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs);
struct encoder_t *encoder = &conn->encoder;
@@ -1498,8 +1498,8 @@ int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_body_data_t *body
_flush_headers(hrs, encoder);
// skip the outgoing queue and send directly
- hrs->out_octets += qd_message_body_data_payload_length(body_data);
- conn->config.tx_body_data(hrs, body_data);
+ hrs->out_octets += qd_message_stream_data_payload_length(stream_data);
+ conn->config.tx_stream_data(hrs, stream_data);
return 0;
}
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
index d0d2306..593803d 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -60,10 +60,10 @@ extern qdr_http1_adaptor_t *qdr_http1_adaptor;
// Data to be written out the raw connection.
//
// This adaptor has to cope with two different data sources: the HTTP1 encoder
-// and the qd_message_body_data_t list. The HTTP1 encoder produces a simple
+// and the qd_message_stream_data_t list. The HTTP1 encoder produces a simple
// qd_buffer_list_t for outgoing header data whose ownership is given to the
// adaptor: the adaptor is free to deque/free these buffers as needed. The
-// qd_message_body_data_t buffers are shared with the owning message and the
+// qd_message_stream_data_t buffers are shared with the owning message and the
// buffer list must not be modified by the adaptor. The qdr_http1_out_data_t
// is used to manage both types of data sources.
//
@@ -76,7 +76,7 @@ struct qdr_http1_out_data_t {
// or a message body data (not both!)
qd_buffer_list_t raw_buffers;
- qd_message_body_data_t *body_data;
+ qd_message_stream_data_t *stream_data;
int buffer_count; // # total buffers
int next_buffer; // offset to next buffer to send
@@ -207,7 +207,7 @@ ALLOC_DECLARE(qdr_http1_connection_t);
void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn);
void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_list_t *blist);
-void qdr_http1_enqueue_body_data(qdr_http1_out_data_fifo_t *fifo, qd_message_body_data_t *body_data);
+void qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, qd_message_stream_data_t *stream_data);
uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_fifo_t *fifo);
void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data);
// return the number of buffers currently held by the proactor for writing
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 21e3e38..98af72a 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -95,7 +95,7 @@ ALLOC_DEFINE(_server_request_t);
#define MAX_RECONNECT 5 // 5 * 500 = 2.5 sec
static void _server_tx_buffers_cb(h1_codec_request_state_t *lib_hrs, qd_buffer_list_t *blist, unsigned int len);
-static void _server_tx_body_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_body_data_t *body_data);
+static void _server_tx_stream_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_stream_data_t *stream_data);
static int _server_rx_request_cb(h1_codec_request_state_t *hrs,
const char *method,
const char *target,
@@ -321,7 +321,7 @@ static void _setup_server_links(qdr_http1_connection_t *hconn)
h1_codec_config_t config = {0};
config.type = HTTP1_CONN_SERVER;
config.tx_buffers = _server_tx_buffers_cb;
- config.tx_body_data = _server_tx_body_data_cb;
+ config.tx_stream_data = _server_tx_stream_data_cb;
config.rx_request = _server_rx_request_cb;
config.rx_response = _server_rx_response_cb;
config.rx_header = _server_rx_header_cb;
@@ -684,7 +684,7 @@ static void _server_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_
// Encoder has body data to send to the server
//
-static void _server_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data)
+static void _server_tx_stream_data_cb(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data)
{
_server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
qdr_http1_connection_t *hconn = hreq->base.hconn;
@@ -692,7 +692,7 @@ static void _server_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_bo
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Sending body data to server",
hconn->conn_id, hconn->out_link_id);
- qdr_http1_enqueue_body_data(&hreq->out_data, body_data);
+ qdr_http1_enqueue_stream_data(&hreq->out_data, stream_data);
if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests) && hconn->raw_conn) {
_write_pending_request(hreq);
}
@@ -883,7 +883,7 @@ static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b
"[C%"PRIu64"][L%"PRIu64"] HTTP response body received len=%zu.",
hconn->conn_id, hconn->in_link_id, len);
- qd_message_body_data_append(msg, body);
+ qd_message_stream_data_append(msg, body);
//
// Notify the router that more data is ready to be pushed out on the delivery
@@ -1246,17 +1246,17 @@ static uint64_t _encode_request_message(_server_request_t *hreq)
hreq->headers_encoded = true;
}
- qd_message_body_data_t *body_data = 0;
+ qd_message_stream_data_t *stream_data = 0;
while (true) {
- switch (qd_message_next_body_data(msg, &body_data)) {
- case QD_MESSAGE_BODY_DATA_OK: {
+ switch (qd_message_next_stream_data(msg, &stream_data)) {
+ case QD_MESSAGE_STREAM_DATA_BODY_OK: {
qd_log(hconn->adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] Encoding request body data",
hconn->conn_id, hconn->out_link_id);
- if (h1_codec_tx_body(hreq->base.lib_rs, body_data)) {
+ if (h1_codec_tx_body(hreq->base.lib_rs, stream_data)) {
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] body data encode failed",
hconn->conn_id, hconn->out_link_id);
@@ -1265,21 +1265,23 @@ static uint64_t _encode_request_message(_server_request_t *hreq)
break;
}
- case QD_MESSAGE_BODY_DATA_NO_MORE:
+ case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
+ break;
+
+ case QD_MESSAGE_STREAM_DATA_NO_MORE:
// indicate this message is complete
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] request message encoding completed",
hconn->conn_id, hconn->out_link_id);
return PN_ACCEPTED;
- case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+ case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] body data need more",
hconn->conn_id, hconn->out_link_id);
return 0; // wait for more
- case QD_MESSAGE_BODY_DATA_INVALID:
- case QD_MESSAGE_BODY_DATA_NOT_DATA:
+ case QD_MESSAGE_STREAM_DATA_INVALID:
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Rejecting corrupted body data.",
hconn->conn_id, hconn->out_link_id);
diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index 8618b81..996e897 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -405,7 +405,7 @@ static int on_data_chunk_recv_callback(nghttp2_session *session,
qd_buffer_list_append(&buffers, (uint8_t *)data, len);
if (stream_data->in_dlv) {
- qd_message_body_data_append(stream_data->message, &buffers);
+ qd_message_stream_data_append(stream_data->message, &buffers);
}
else {
if (!stream_data->body) {
@@ -463,7 +463,7 @@ static int snd_data_callback(nghttp2_session *session,
int bytes_sent = 0; // This should not include the header length of 9.
if (length) {
pn_raw_buffer_t pn_raw_buffs[stream_data->qd_buffers_to_send];
- qd_message_body_data_buffers(stream_data->curr_body_data, pn_raw_buffs, 0, stream_data->qd_buffers_to_send);
+ qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, 0, stream_data->qd_buffers_to_send);
int idx = 0;
while (idx < stream_data->qd_buffers_to_send) {
@@ -481,16 +481,16 @@ static int snd_data_callback(nghttp2_session *session,
// bytes_sent += bytes_remaining;
// qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback memcpy bytes_remaining=%i", conn->conn_id, stream_data->stream_id, bytes_remaining);
// }
- stream_data->curr_body_data_qd_buff_offset += 1;
+ stream_data->curr_stream_data_qd_buff_offset += 1;
}
idx += 1;
}
}
if (stream_data->full_payload_handled) {
- qd_message_body_data_release(stream_data->curr_body_data);
- stream_data->curr_body_data = 0;
- stream_data->curr_body_data_qd_buff_offset = 0;
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, qd_message_body_data_release", conn->conn_id, stream_data->stream_id);
+ qd_message_stream_data_release(stream_data->curr_stream_data);
+ stream_data->curr_stream_data = 0;
+ stream_data->curr_stream_data_qd_buff_offset = 0;
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, qd_message_stream_data_release", conn->conn_id, stream_data->stream_id);
}
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 snd_data_callback finished, length=%zu, bytes_sent=%i, stream_data=%p", conn->conn_id, stream_data->stream_id, length, bytes_sent, (void *)stream_data);
@@ -935,21 +935,21 @@ ssize_t read_data_callback(nghttp2_session *session,
//
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_DEPTH_OK", conn->conn_id, stream_data->stream_id);
- if (stream_data->next_body_data) {
- stream_data->curr_body_data = stream_data->next_body_data;
- stream_data->curr_body_data_result = stream_data->next_body_data_result;
- stream_data->next_body_data = 0;
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback Use next_body_data", conn->conn_id, stream_data->stream_id);
+ if (stream_data->next_stream_data) {
+ stream_data->curr_stream_data = stream_data->next_stream_data;
+ stream_data->curr_stream_data_result = stream_data->next_stream_data_result;
+ stream_data->next_stream_data = 0;
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback Use next_stream_data", conn->conn_id, stream_data->stream_id);
}
- if (!stream_data->curr_body_data) {
- stream_data->curr_body_data_result = qd_message_next_body_data(message, &stream_data->curr_body_data);
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback No body data, get qd_message_next_body_data", conn->conn_id, stream_data->stream_id);
+ if (!stream_data->curr_stream_data) {
+ stream_data->curr_stream_data_result = qd_message_next_stream_data(message, &stream_data->curr_stream_data);
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback No body data, get qd_message_next_stream_data", conn->conn_id, stream_data->stream_id);
}
- switch (stream_data->curr_body_data_result) {
- case QD_MESSAGE_BODY_DATA_OK: {
+ switch (stream_data->curr_stream_data_result) {
+ case QD_MESSAGE_STREAM_DATA_BODY_OK: {
//
// We have a new valid body-data segment. Handle it
//
@@ -966,29 +966,29 @@ ssize_t read_data_callback(nghttp2_session *session,
}
// total length of the payload (across all qd_buffers in the current body data)
- size_t payload_length = qd_message_body_data_payload_length(stream_data->curr_body_data);
+ size_t payload_length = qd_message_stream_data_payload_length(stream_data->curr_stream_data);
if (payload_length == 0) {
//
- // Current body data has payload length zero. Release the curr_body_data
+ // Current body data has payload length zero. Release the curr_stream_data
//
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, payload_length=0", conn->conn_id, stream_data->stream_id);
- qd_message_body_data_release(stream_data->curr_body_data);
- stream_data->curr_body_data = 0;
+ qd_message_stream_data_release(stream_data->curr_stream_data);
+ stream_data->curr_stream_data = 0;
- // The payload length is zero on this body data. Look ahead one body data to see if it is QD_MESSAGE_BODY_DATA_NO_MORE
- stream_data->next_body_data_result = qd_message_next_body_data(message, &stream_data->next_body_data);
- if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) {
+ // The payload length is zero on this body data. Look ahead one body data to see if it is QD_MESSAGE_STREAM_DATA_NO_MORE
+ stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
+ if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
stream_data->out_msg_body_sent = true;
stream_data->full_payload_handled = true;
- qd_message_body_data_release(stream_data->next_body_data);
- stream_data->next_body_data = 0;
+ qd_message_stream_data_release(stream_data->next_stream_data);
+ stream_data->next_stream_data = 0;
stream_data->out_dlv_local_disposition = PN_ACCEPTED;
if ((*data_flags & NGHTTP2_DATA_FLAG_EOF) && conn->ingress) {
_http_record_request(conn, stream_data);
}
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, payload_length=0 and next_body_data=QD_MESSAGE_BODY_DATA_NO_MORE", conn->conn_id, stream_data->stream_id);
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, payload_length=0 and next_stream_data=QD_MESSAGE_STREAM_DATA_NO_MORE", conn->conn_id, stream_data->stream_id);
return 0;
}
@@ -1002,35 +1002,35 @@ ssize_t read_data_callback(nghttp2_session *session,
return NGHTTP2_ERR_DEFERRED;
}
- stream_data->body_data_buff_count = qd_message_body_data_buffer_count(stream_data->curr_body_data);
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, stream_data->body_data_buff_count=%i, payload_length=%zu\n", conn->conn_id, stream_data->stream_id, stream_data->body_data_buff_count, payload_length);
+ stream_data->stream_data_buff_count = qd_message_stream_data_buffer_count(stream_data->curr_stream_data);
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, stream_data->stream_data_buff_count=%i, payload_length=%zu\n", conn->conn_id, stream_data->stream_id, stream_data->stream_data_buff_count, payload_length);
size_t bytes_to_send = 0;
if (payload_length) {
- size_t remaining_payload_length = payload_length - (stream_data->curr_body_data_qd_buff_offset * BUFFER_SIZE);
+ size_t remaining_payload_length = payload_length - (stream_data->curr_stream_data_qd_buff_offset * BUFFER_SIZE);
if (remaining_payload_length <= QD_HTTP2_BUFFER_SIZE) {
bytes_to_send = remaining_payload_length;
- stream_data->qd_buffers_to_send = stream_data->body_data_buff_count;
+ stream_data->qd_buffers_to_send = stream_data->stream_data_buff_count;
stream_data->full_payload_handled = true;
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length (%zu) <= QD_HTTP2_BUFFER_SIZE(16384), bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, remaining_payload_length, bytes_to_send, stream_data->qd_buffers_to_send);
// Look ahead one body data
- stream_data->next_body_data_result = qd_message_next_body_data(message, &stream_data->next_body_data);
- if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) {
+ stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
+ if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
stream_data->out_msg_body_sent = true;
- qd_message_body_data_release(stream_data->next_body_data);
- stream_data->next_body_data = 0;
+ qd_message_stream_data_release(stream_data->next_stream_data);
+ stream_data->next_stream_data = 0;
stream_data->out_dlv_local_disposition = PN_ACCEPTED;
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_BODY_DATA_NO_MORE", conn->conn_id, stream_data->stream_id);
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_NO_MORE", conn->conn_id, stream_data->stream_id);
}
- else if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) {
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_BODY_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id);
+ else if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_INCOMPLETE) {
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id);
}
- else if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_OK) {
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_BODY_DATA_OK", conn->conn_id, stream_data->stream_id);
+ else if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) {
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_OK", conn->conn_id, stream_data->stream_id);
}
}
@@ -1050,15 +1050,18 @@ ssize_t read_data_callback(nghttp2_session *session,
return bytes_to_send;
}
- case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+ case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
+ break;
+
+ case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
//
// A new segment has not completely arrived yet. Check again later.
//
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id);
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id);
stream_data->out_dlv_local_disposition = 0;
return NGHTTP2_ERR_DEFERRED;
- case QD_MESSAGE_BODY_DATA_NO_MORE: {
+ case QD_MESSAGE_STREAM_DATA_NO_MORE: {
//
// We have already handled the last body-data segment for this delivery.
// Complete the "sending" of this delivery and replenish credit.
@@ -1066,7 +1069,7 @@ ssize_t read_data_callback(nghttp2_session *session,
size_t pn_buffs_write_capacity = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
if (pn_buffs_write_capacity == 0) {
stream_data->out_dlv_local_disposition = 0;
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_NO_MORE - pn_buffs_write_capacity=0 send is not complete", conn->conn_id, stream_data->stream_id);
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_NO_MORE - pn_buffs_write_capacity=0 send is not complete", conn->conn_id, stream_data->stream_id);
return NGHTTP2_ERR_DEFERRED;
}
else {
@@ -1075,32 +1078,21 @@ ssize_t read_data_callback(nghttp2_session *session,
stream_data->full_payload_handled = true;
stream_data->out_msg_body_sent = true;
stream_data->out_dlv_local_disposition = PN_ACCEPTED;
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_NO_MORE - stream_data->out_dlv_local_disposition = PN_ACCEPTED - send_complete=true, setting NGHTTP2_DATA_FLAG_EOF", conn->conn_id, stream_data->stream_id);
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_NO_MORE - stream_data->out_dlv_local_disposition = PN_ACCEPTED - send_complete=true, setting NGHTTP2_DATA_FLAG_EOF", conn->conn_id, stream_data->stream_id);
}
break;
}
- case QD_MESSAGE_BODY_DATA_INVALID:
+ case QD_MESSAGE_STREAM_DATA_INVALID:
//
// The body-data is corrupt in some way. Stop handling the delivery and reject it.
//
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
- qd_message_body_data_release(stream_data->curr_body_data);
- stream_data->curr_body_data = 0;
- stream_data->out_dlv_local_disposition = PN_REJECTED;
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_INVALID", conn->conn_id, stream_data->stream_id);
- break;
-
- case QD_MESSAGE_BODY_DATA_NOT_DATA:
- //
- // Valid data was seen, but it is not a body-data performative. Reject the delivery.
- //
- *data_flags |= NGHTTP2_DATA_FLAG_EOF;
- qd_message_body_data_release(stream_data->curr_body_data);
- stream_data->curr_body_data = 0;
+ qd_message_stream_data_release(stream_data->curr_stream_data);
+ stream_data->curr_stream_data = 0;
stream_data->out_dlv_local_disposition = PN_REJECTED;
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_NOT_DATA", conn->conn_id, stream_data->stream_id);
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_INVALID", conn->conn_id, stream_data->stream_id);
break;
}
break;
@@ -1418,16 +1410,16 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
create_settings_frame(conn);
uint8_t flags = 0;
- stream_data->curr_body_data_result = qd_message_next_body_data(message, &stream_data->curr_body_data);
- if (stream_data->curr_body_data_result == QD_MESSAGE_BODY_DATA_OK) {
- size_t payload_length = qd_message_body_data_payload_length(stream_data->curr_body_data);
+ stream_data->curr_stream_data_result = qd_message_next_stream_data(message, &stream_data->curr_stream_data);
+ if (stream_data->curr_stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) {
+ size_t payload_length = qd_message_stream_data_payload_length(stream_data->curr_stream_data);
if (payload_length == 0) {
- stream_data->next_body_data_result = qd_message_next_body_data(message, &stream_data->next_body_data);
- if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) {
- qd_message_body_data_release(stream_data->next_body_data);
- stream_data->next_body_data = 0;
- qd_message_body_data_release(stream_data->curr_body_data);
- stream_data->curr_body_data = 0;
+ stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
+ if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) {
+ qd_message_stream_data_release(stream_data->next_stream_data);
+ stream_data->next_stream_data = 0;
+ qd_message_stream_data_release(stream_data->curr_stream_data);
+ stream_data->curr_stream_data = 0;
flags = NGHTTP2_FLAG_END_STREAM;
stream_data->out_msg_has_body = false;
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] Message has no body, sending NGHTTP2_FLAG_END_STREAM with nghttp2_submit_headers", conn->conn_id);
diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h
index 979f34d..047c440 100644
--- a/src/adaptors/http2/http2_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -80,14 +80,14 @@ struct qdr_http2_stream_data_t {
qd_composed_field_t *app_properties;
qd_composed_field_t *footer_properties;
qd_composed_field_t *body;
- qd_message_body_data_t *curr_body_data;
- qd_message_body_data_t *next_body_data;
+ qd_message_stream_data_t *curr_stream_data;
+ qd_message_stream_data_t *next_stream_data;
DEQ_LINKS(qdr_http2_stream_data_t);
- qd_message_body_data_result_t curr_body_data_result;
- qd_message_body_data_result_t next_body_data_result;
- int curr_body_data_qd_buff_offset;
- int body_data_buff_count;
+ qd_message_stream_data_result_t curr_stream_data_result;
+ qd_message_stream_data_result_t next_stream_data_result;
+ int curr_stream_data_qd_buff_offset;
+ int stream_data_buff_count;
int in_link_credit; // provided by router
int32_t stream_id;
size_t qd_buffers_to_send;
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index 7b9af30..84b14ba 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -248,39 +248,67 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t
// over to the per-message extraction of body-data segments.
//
printf("qdr_ref_deliver: depth ok\n");
- qd_message_body_data_t *body_data;
- qd_message_body_data_result_t body_data_result;
+ qd_message_stream_data_t *stream_data;
+ qd_message_stream_data_result_t stream_data_result;
//
// Process as many body-data segments as are available.
//
while (true) {
- body_data_result = qd_message_next_body_data(msg, &body_data);
+ stream_data_result = qd_message_next_stream_data(msg, &stream_data);
- switch (body_data_result) {
- case QD_MESSAGE_BODY_DATA_OK: {
+ switch (stream_data_result) {
+ case QD_MESSAGE_STREAM_DATA_BODY_OK: {
//
// We have a new valid body-data segment. Handle it
//
- printf("qdr_ref_deliver: body_data_buffer_count: %d\n", qd_message_body_data_buffer_count(body_data));
+ printf("qdr_ref_deliver: stream_data_buffer_count: %d\n", qd_message_stream_data_buffer_count(stream_data));
- qd_iterator_t *body_iter = qd_message_body_data_iterator(body_data);
+ qd_iterator_t *body_iter = qd_message_stream_data_iterator(stream_data);
char *body = (char*) qd_iterator_copy(body_iter);
printf("qdr_ref_deliver: message body-data received: %s\n", body);
free(body);
qd_iterator_free(body_iter);
- qd_message_body_data_release(body_data);
+ qd_message_stream_data_release(stream_data);
+ break;
+ }
+
+ case QD_MESSAGE_STREAM_DATA_FOOTER_OK: {
+ printf("qdr_ref_deliver: Received message footer\n");
+ qd_iterator_t *footer_iter = qd_message_stream_data_iterator(stream_data);
+ qd_parsed_field_t *footer = qd_parse(footer_iter);
+
+ if (qd_parse_ok(footer)) {
+ uint8_t tag = qd_parse_tag(footer);
+ if (tag == QD_AMQP_MAP8 || tag == QD_AMQP_MAP32) {
+ uint32_t item_count = qd_parse_sub_count(footer);
+ for (uint32_t i = 0; i < item_count; i++) {
+ qd_iterator_t *key_iter = qd_parse_raw(qd_parse_sub_key(footer, i));
+ qd_iterator_t *value_iter = qd_parse_raw(qd_parse_sub_value(footer, i));
+ char *key = (char*) qd_iterator_copy(key_iter);
+ char *value = (char*) qd_iterator_copy(value_iter);
+ printf("qdr_ref_deliver: %s: %s\n", key, value);
+ free(key);
+ free(value);
+ }
+ } else
+ printf("qdr_ref_deliver: Unexpected tag in footer: %02x\n", tag);
+ } else
+ printf("qdr_ref_deliver: Footer parse error: %s\n", qd_parse_error(footer));
+
+ qd_parse_free(footer);
+ qd_iterator_free(footer_iter);
break;
}
- case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+ case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
//
// A new segment has not completely arrived yet. Check again later.
//
printf("qdr_ref_deliver: body-data incomplete\n");
return 0;
- case QD_MESSAGE_BODY_DATA_NO_MORE:
+ case QD_MESSAGE_STREAM_DATA_NO_MORE:
//
// We have already handled the last body-data segment for this delivery.
// Complete the "sending" of this delivery and replenish credit.
@@ -293,21 +321,13 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t
qdr_link_flow(adaptor->core, link, 1, false);
return PN_ACCEPTED; // This will cause the delivery to be settled
- case QD_MESSAGE_BODY_DATA_INVALID:
+ case QD_MESSAGE_STREAM_DATA_INVALID:
//
// The body-data is corrupt in some way. Stop handling the delivery and reject it.
//
printf("qdr_ref_deliver: body-data invalid\n");
qdr_link_flow(adaptor->core, link, 1, false);
return PN_REJECTED;
-
- case QD_MESSAGE_BODY_DATA_NOT_DATA:
- //
- // Valid data was seen, but it is not a body-data performative. Reject the delivery.
- //
- printf("qdr_ref_deliver: body not data\n");
- qdr_link_flow(adaptor->core, link, 1, false);
- return PN_REJECTED;
}
}
@@ -456,16 +476,24 @@ static void on_stream(void *context)
//
// Accumulated buffer list
//
- qd_buffer_list_t buffer_list;
- DEQ_INIT(buffer_list);
- qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length);
- qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length);
-
- //
- // append this data to the streaming message as one or more DATA
- // performatives
- //
- depth = qd_message_body_data_append(adaptor->streaming_message, &buffer_list);
+ for (int sections = 0; sections < 3; sections++) {
+ qd_buffer_list_t buffer_list;
+ DEQ_INIT(buffer_list);
+ qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length);
+ qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length);
+
+ //
+ // Compose a DATA performative for this section of the stream
+ //
+ qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+ qd_compose_insert_binary_buffers(field, &buffer_list);
+
+ //
+ // Extend the streaming message and free the composed field
+ //
+ depth = qd_message_extend(adaptor->streaming_message, field);
+ qd_compose_free(field);
+ }
//
// Notify the router that more data is ready to be pushed out on the delivery
@@ -478,6 +506,14 @@ static void on_stream(void *context)
adaptor->stream_count++;
printf("on_stream: sent streamed frame %d, depth=%d\n", adaptor->stream_count, depth);
} else {
+ qd_composed_field_t *footer = qd_compose(QD_PERFORMATIVE_FOOTER, 0);
+ qd_compose_start_map(footer);
+ qd_compose_insert_symbol(footer, "trailer");
+ qd_compose_insert_string(footer, "value");
+ qd_compose_end_map(footer);
+ depth = qd_message_extend(adaptor->streaming_message, footer);
+ qd_compose_free(footer);
+
qd_message_set_receive_complete(adaptor->streaming_message);
adaptor->streaming_message = 0;
adaptor->stream_count = 0;
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 397f8bd..d711c7a 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -63,7 +63,7 @@ struct qdr_tcp_connection_t {
uint64_t last_in_time;
uint64_t last_out_time;
- qd_message_body_data_t *outgoing_body_data; // current segment
+ qd_message_stream_data_t *outgoing_stream_data; // current segment
size_t outgoing_body_bytes; // bytes received from current segment
int outgoing_body_offset; // buffer offset into current segment
@@ -147,7 +147,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
grant_read_buffers(conn);
if (conn->instream) {
- qd_message_body_data_append(qdr_delivery_message(conn->instream), &buffers);
+ qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers);
qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count);
} else {
@@ -229,25 +229,23 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r
{
int used = 0;
- // Advance to next body_data vbin segment if necessary.
+ // Advance to next stream_data vbin segment if necessary.
// Return early if no data to process or error
- if (conn->outgoing_body_data == 0) {
- qd_message_body_data_result_t body_data_result = qd_message_next_body_data(msg, &conn->outgoing_body_data);
- if (body_data_result == QD_MESSAGE_BODY_DATA_OK) {
- // a new body_data segment has been found
+ if (conn->outgoing_stream_data == 0) {
+ qd_message_stream_data_result_t stream_data_result = qd_message_next_stream_data(msg, &conn->outgoing_stream_data);
+ if (stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) {
+ // a new stream_data segment has been found
conn->outgoing_body_bytes = 0;
conn->outgoing_body_offset = 0;
// continue to process this segment
- } else if (body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) {
+ } else if (stream_data_result == QD_MESSAGE_STREAM_DATA_INCOMPLETE) {
return 0;
} else {
- switch (body_data_result) {
- case QD_MESSAGE_BODY_DATA_NO_MORE:
+ switch (stream_data_result) {
+ case QD_MESSAGE_STREAM_DATA_NO_MORE:
qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] EOS", conn->conn_id); break;
- case QD_MESSAGE_BODY_DATA_INVALID:
+ case QD_MESSAGE_STREAM_DATA_INVALID:
qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] Invalid body data for streaming message", conn->conn_id); break;
- case QD_MESSAGE_BODY_DATA_NOT_DATA:
- qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] Invalid body; expected data section", conn->conn_id); break;
default:
break;
}
@@ -256,30 +254,30 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r
}
}
- // A valid body_data is in place.
+ // A valid stream_data is in place.
// Try to get a buffer set from it.
- used = qd_message_body_data_buffers(conn->outgoing_body_data, buffers, conn->outgoing_body_offset, count);
+ used = qd_message_stream_data_buffers(conn->outgoing_stream_data, buffers, conn->outgoing_body_offset, count);
if (used > 0) {
// Accumulate the lengths of the returned buffers.
for (int i=0; i<used; i++) {
conn->outgoing_body_bytes += buffers[i].size;
}
- // Buffers returned should never exceed the body_data payload length
- assert(conn->outgoing_body_bytes <= conn->outgoing_body_data->payload.length);
+ // Buffers returned should never exceed the stream_data payload length
+ assert(conn->outgoing_body_bytes <= conn->outgoing_stream_data->payload.length);
- if (conn->outgoing_body_bytes == conn->outgoing_body_data->payload.length) {
- // This buffer set consumes the remainder of the body_data segment.
- // Attach the body_data struct to the last buffer so that the struct
+ if (conn->outgoing_body_bytes == conn->outgoing_stream_data->payload.length) {
+ // This buffer set consumes the remainder of the stream_data segment.
+ // Attach the stream_data struct to the last buffer so that the struct
// can be freed after the buffer has been transmitted by raw connection out.
- buffers[used-1].context = (uintptr_t) conn->outgoing_body_data;
+ buffers[used-1].context = (uintptr_t) conn->outgoing_stream_data;
- // Erase the body_data struct from the connection so that
+ // Erase the stream_data struct from the connection so that
// a new one gets created on the next pass.
- conn->outgoing_body_data = 0;
+ conn->outgoing_stream_data = 0;
} else {
- // Returned buffer set did not consume the entire body_data segment.
- // Leave existing body_data struct in place for use on next pass.
+ // Returned buffer set did not consume the entire stream_data segment.
+ // Leave existing stream_data struct in place for use on next pass.
// Add the number of returned buffers to the offset for the next pass.
conn->outgoing_body_offset += used;
}
@@ -517,7 +515,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
for (size_t i = 0; i < n; ++i) {
written += buffs[i].size;
if (buffs[i].context) {
- qd_message_body_data_release((qd_message_body_data_t*) buffs[i].context);
+ qd_message_stream_data_release((qd_message_stream_data_t*) buffs[i].context);
}
}
}
diff --git a/src/message.c b/src/message.c
index 360fd84..80d569f 100644
--- a/src/message.c
+++ b/src/message.c
@@ -86,7 +86,7 @@ PN_HANDLE(PN_DELIVERY_CTX)
ALLOC_DEFINE_CONFIG(qd_message_t, sizeof(qd_message_pvt_t), 0, 0);
ALLOC_DEFINE(qd_message_content_t);
-ALLOC_DEFINE(qd_message_body_data_t);
+ALLOC_DEFINE(qd_message_stream_data_t);
typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length);
@@ -1901,7 +1901,7 @@ void qd_message_send(qd_message_t *in_msg,
// by freeing a buffer there now may be room to restart a
// stalled message receiver
if (content->q2_input_holdoff) {
- if (qd_message_Q2_holdoff_should_unblock((qd_message_t *)msg)) {
+ if (qd_message_Q2_holdoff_should_unblock((qd_message_t*) msg)) {
// wake up receive side
// Note: clearing holdoff here is easy compared to
// clearing it in the deferred callback. Tracing
@@ -2368,79 +2368,83 @@ static void find_last_buffer(qd_field_location_t *location, unsigned char **curs
}
-void trim_body_data_headers(qd_message_body_data_t *body_data)
+void trim_stream_data_headers(qd_message_stream_data_t *stream_data, bool remove_vbin_header)
{
- const qd_field_location_t *location = &body_data->section;
+ const qd_field_location_t *location = &stream_data->section;
qd_buffer_t *buffer = location->buffer;
unsigned char *cursor = qd_buffer_base(buffer) + location->offset;
bool good = advance(&cursor, &buffer, location->hdr_length);
assert(good);
if (good) {
- unsigned char tag = 0;
- size_t vbin_hdr_len = 1;
- // coverity[check_return]
- next_octet(&cursor, &buffer, &tag);
- if (tag == QD_AMQP_VBIN8) {
- advance(&cursor, &buffer, 1);
- vbin_hdr_len += 1;
- } else if (tag == QD_AMQP_VBIN32) {
- advance(&cursor, &buffer, 4);
- vbin_hdr_len += 4;
+ size_t vbin_hdr_len = 0;
+ unsigned char tag = 0;
+
+ if (remove_vbin_header) {
+ vbin_hdr_len = 1;
+ // coverity[check_return]
+ next_octet(&cursor, &buffer, &tag);
+ if (tag == QD_AMQP_VBIN8) {
+ advance(&cursor, &buffer, 1);
+ vbin_hdr_len += 1;
+ } else if (tag == QD_AMQP_VBIN32) {
+ advance(&cursor, &buffer, 4);
+ vbin_hdr_len += 4;
+ }
}
// coverity[check_return]
can_advance(&cursor, &buffer); // bump cursor to the next buffer if necessary
- body_data->payload.buffer = buffer;
- body_data->payload.offset = cursor - qd_buffer_base(buffer);
- body_data->payload.length = location->length - vbin_hdr_len;
- body_data->payload.hdr_length = 0;
- body_data->payload.parsed = true;
- body_data->payload.tag = tag;
+ stream_data->payload.buffer = buffer;
+ stream_data->payload.offset = cursor - qd_buffer_base(buffer);
+ stream_data->payload.length = location->length - vbin_hdr_len;
+ stream_data->payload.hdr_length = 0;
+ stream_data->payload.parsed = true;
+ stream_data->payload.tag = tag;
}
}
/**
- * qd_message_body_data_iterator
+ * qd_message_stream_data_iterator
*
- * Given a body_data object, return an iterator that refers to the content of that body data. This iterator
+ * Given a stream_data object, return an iterator that refers to the content of that body data. This iterator
* shall not refer to the 3-byte performative header or the header for the vbin{8,32} field.
*
* The iterator must be freed eventually by the caller.
*/
-qd_iterator_t *qd_message_body_data_iterator(const qd_message_body_data_t *body_data)
+qd_iterator_t *qd_message_stream_data_iterator(const qd_message_stream_data_t *stream_data)
{
- const qd_field_location_t *location = &body_data->payload;
+ const qd_field_location_t *location = &stream_data->payload;
return qd_iterator_buffer(location->buffer, location->offset, location->length, ITER_VIEW_ALL);
}
/**
- * qd_message_body_data_payload_length
+ * qd_message_stream_data_payload_length
*
- * Given a body_data object, return the length of the payload.
+ * Given a stream_data object, return the length of the payload.
*/
-size_t qd_message_body_data_payload_length(const qd_message_body_data_t *body_data)
+size_t qd_message_stream_data_payload_length(const qd_message_stream_data_t *stream_data)
{
- return body_data->payload.length;
+ return stream_data->payload.length;
}
/**
- * qd_message_body_data_buffer_count
+ * qd_message_stream_data_buffer_count
*
- * Return the number of buffers contained in payload portion of the body_data object.
+ * Return the number of buffers contained in payload portion of the stream_data object.
*/
-int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data)
+int qd_message_stream_data_buffer_count(const qd_message_stream_data_t *stream_data)
{
- if (body_data->payload.length == 0)
+ if (stream_data->payload.length == 0)
return 0;
int count = 1;
- qd_buffer_t *buffer = body_data->payload.buffer;
- while (!!buffer && buffer != body_data->last_buffer) {
+ qd_buffer_t *buffer = stream_data->payload.buffer;
+ while (!!buffer && buffer != stream_data->last_buffer) {
buffer = DEQ_NEXT(buffer);
count++;
}
@@ -2450,23 +2454,23 @@ int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data)
/**
- * qd_message_body_data_buffers
+ * qd_message_stream_data_buffers
*
- * Populate the provided array of pn_raw_buffers with the addresses and lengths of the buffers in the body_data
+ * Populate the provided array of pn_raw_buffers with the addresses and lengths of the buffers in the stream_data
* object. Don't fill more than count raw_buffers with data. Start at offset from the zero-th buffer in the
- * body_data.
+ * stream_data.
*/
-int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffer_t *buffers, int offset, int count)
+int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw_buffer_t *buffers, int offset, int count)
{
- qd_buffer_t *buffer = body_data->payload.buffer;
- size_t data_offset = body_data->payload.offset;
- size_t payload_len = body_data->payload.length;
+ qd_buffer_t *buffer = stream_data->payload.buffer;
+ size_t data_offset = stream_data->payload.offset;
+ size_t payload_len = stream_data->payload.length;
//
// Skip the buffer offset
//
if (offset > 0) {
- assert(offset < qd_message_body_data_buffer_count(body_data));
+ assert(offset < qd_message_stream_data_buffer_count(stream_data));
while (offset > 0 && payload_len > 0) {
payload_len -= qd_buffer_size(buffer) - data_offset;
offset--;
@@ -2498,21 +2502,21 @@ int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffe
/**
- * qd_message_body_data_release
+ * qd_message_stream_data_release
*
- * Decrement the fanout ref-counts for all of the buffers referred to in the body_data. If any have reached zero,
+ * Decrement the fanout ref-counts for all of the buffers referred to in the stream_data. If any have reached zero,
* remove them from the buffer list and free them. Never dec-ref the last buffer in the content's buffer list.
*/
-void qd_message_body_data_release(qd_message_body_data_t *body_data)
+void qd_message_stream_data_release(qd_message_stream_data_t *stream_data)
{
- free_qd_message_body_data_t(body_data);
+ free_qd_message_stream_data_t(stream_data);
}
-qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd_message_body_data_t **out_body_data)
+qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *in_msg, qd_message_stream_data_t **out_stream_data)
{
- qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
- qd_message_body_data_t *body_data = 0;
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+ qd_message_stream_data_t *stream_data = 0;
if (!msg->body_cursor) {
//
@@ -2520,58 +2524,67 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd
//
qd_message_depth_status_t status = qd_message_check_depth(in_msg, QD_DEPTH_BODY);
if (status == QD_MESSAGE_DEPTH_OK) {
- body_data = new_qd_message_body_data_t();
- ZERO(body_data);
- body_data->owning_message = msg;
- body_data->section = msg->content->section_body;
-
- find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer);
- body_data->last_buffer = msg->body_buffer;
- trim_body_data_headers(body_data);
-
- assert(DEQ_SIZE(msg->body_data_list) == 0);
- DEQ_INSERT_TAIL(msg->body_data_list, body_data);
- *out_body_data = body_data;
- return QD_MESSAGE_BODY_DATA_OK;
+ stream_data = new_qd_message_stream_data_t();
+ ZERO(stream_data);
+ stream_data->owning_message = msg;
+ stream_data->section = msg->content->section_body;
+
+ find_last_buffer(&stream_data->section, &msg->body_cursor, &msg->body_buffer);
+ stream_data->last_buffer = msg->body_buffer;
+ trim_stream_data_headers(stream_data, true);
+
+ assert(DEQ_SIZE(msg->stream_data_list) == 0);
+ DEQ_INSERT_TAIL(msg->stream_data_list, stream_data);
+ *out_stream_data = stream_data;
+ return QD_MESSAGE_STREAM_DATA_BODY_OK;
} else if (status == QD_MESSAGE_DEPTH_INCOMPLETE)
- return QD_MESSAGE_BODY_DATA_INCOMPLETE;
+ return QD_MESSAGE_STREAM_DATA_INCOMPLETE;
else if (status == QD_MESSAGE_DEPTH_INVALID)
- return QD_MESSAGE_BODY_DATA_INVALID;
+ return QD_MESSAGE_STREAM_DATA_INVALID;
}
qd_section_status_t section_status;
qd_field_location_t location;
ZERO(&location);
+ bool is_footer = false;
+
section_status = message_section_check(&msg->body_buffer, &msg->body_cursor,
BODY_DATA_SHORT, 3, TAGS_BINARY,
&location, true);
+ if (section_status == QD_SECTION_INVALID || section_status == QD_SECTION_NO_MATCH) {
+ is_footer = true;
+ section_status = message_section_check(&msg->body_buffer, &msg->body_cursor,
+ FOOTER_SHORT, 3, TAGS_MAP,
+ &location, true);
+ }
+
switch (section_status) {
case QD_SECTION_INVALID:
case QD_SECTION_NO_MATCH:
- return QD_MESSAGE_BODY_DATA_INVALID;
+ return QD_MESSAGE_STREAM_DATA_INVALID;
case QD_SECTION_MATCH:
- body_data = new_qd_message_body_data_t();
- ZERO(body_data);
- body_data->owning_message = msg;
- body_data->section = location;
- find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer);
- body_data->last_buffer = msg->body_buffer;
- trim_body_data_headers(body_data);
- DEQ_INSERT_TAIL(msg->body_data_list, body_data);
- *out_body_data = body_data;
- return QD_MESSAGE_BODY_DATA_OK;
+ stream_data = new_qd_message_stream_data_t();
+ ZERO(stream_data);
+ stream_data->owning_message = msg;
+ stream_data->section = location;
+ find_last_buffer(&stream_data->section, &msg->body_cursor, &msg->body_buffer);
+ stream_data->last_buffer = msg->body_buffer;
+ trim_stream_data_headers(stream_data, !is_footer);
+ DEQ_INSERT_TAIL(msg->stream_data_list, stream_data);
+ *out_stream_data = stream_data;
+ return is_footer ? QD_MESSAGE_STREAM_DATA_FOOTER_OK : QD_MESSAGE_STREAM_DATA_BODY_OK;
case QD_SECTION_NEED_MORE:
if (msg->content->receive_complete)
- return QD_MESSAGE_BODY_DATA_NO_MORE;
+ return QD_MESSAGE_STREAM_DATA_NO_MORE;
else
- return QD_MESSAGE_BODY_DATA_INCOMPLETE;
+ return QD_MESSAGE_STREAM_DATA_INCOMPLETE;
}
- return QD_MESSAGE_BODY_DATA_NO_MORE;
+ return QD_MESSAGE_STREAM_DATA_NO_MORE;
}
@@ -2663,7 +2676,7 @@ void qd_message_release_body(qd_message_t *msg, pn_raw_buffer_t *buffers, int bu
}
-qd_parsed_field_t *qd_message_get_ingress (qd_message_t *msg)
+qd_parsed_field_t *qd_message_get_ingress(qd_message_t *msg)
{
return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress;
}
@@ -2755,7 +2768,7 @@ bool qd_message_oversize(const qd_message_t *msg)
}
-int qd_message_body_data_append(qd_message_t *message, qd_buffer_list_t *data)
+int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data)
{
unsigned int length = DEQ_SIZE(*data);
qd_composed_field_t *field = 0;
diff --git a/src/message_private.h b/src/message_private.h
index a6ca077..3b83de0 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -61,16 +61,16 @@ typedef struct {
} qd_field_location_t;
-struct qd_message_body_data_t {
- DEQ_LINKS(qd_message_body_data_t); // Linkage to form a DEQ
+struct qd_message_stream_data_t {
+ DEQ_LINKS(qd_message_stream_data_t); // Linkage to form a DEQ
qd_message_pvt_t *owning_message; // Pointer to the owning message
qd_field_location_t section; // Section descriptor for the field
qd_field_location_t payload; // Descriptor for the payload of the body data
qd_buffer_t *last_buffer; // Pointer to the last buffer in the field
};
-ALLOC_DECLARE(qd_message_body_data_t);
-DEQ_DECLARE(qd_message_body_data_t, qd_message_body_data_list_t);
+ALLOC_DECLARE(qd_message_stream_data_t);
+DEQ_DECLARE(qd_message_stream_data_t, qd_message_stream_data_list_t);
// TODO - consider using pointers to qd_field_location_t below to save memory
// TODO - provide a way to allocate a message without a lock for the link-routing case.
@@ -141,23 +141,23 @@ typedef struct {
} qd_message_content_t;
struct qd_message_pvt_t {
- qd_iterator_pointer_t cursor; // A pointer to the current location of the outgoing byte stream.
- qd_message_depth_t message_depth; // What is the depth of the message that has been received so far
- qd_message_depth_t sent_depth; // How much of the message has been sent? QD_DEPTH_NONE means nothing has been sent so far, QD_DEPTH_HEADER means the header has already been sent, dont send it again and so on.
- qd_message_content_t *content; // The actual content of the message. The content is never copied
- qd_buffer_list_t ma_to_override; // to field in outgoing message annotations.
- qd_buffer_list_t ma_trace; // trace list in outgoing message annotations
- qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations
- int ma_phase; // phase for the override address
- bool ma_stream; // indicates whether this message is streaming
- qd_message_body_data_list_t body_data_list; // TODO - move this to the content for one-time parsing (TLR)
- qd_message_body_data_t *next_body_data;
- unsigned char *body_cursor;
- qd_buffer_t *body_buffer;
- bool strip_annotations_in;
- bool send_complete; // Has the message been completely received and completely sent?
- bool tag_sent; // Tags are sent
- bool is_fanout; // If msg is an outgoing fanout
+ qd_iterator_pointer_t cursor; // A pointer to the current location of the outgoing byte stream.
+ qd_message_depth_t message_depth; // What is the depth of the message that has been received so far
+ qd_message_depth_t sent_depth; // How much of the message has been sent? QD_DEPTH_NONE means nothing has been sent so far, QD_DEPTH_HEADER means the header has already been sent, dont send it again and so on.
+ qd_message_content_t *content; // The actual content of the message. The content is never copied
+ qd_buffer_list_t ma_to_override; // to field in outgoing message annotations.
+ qd_buffer_list_t ma_trace; // trace list in outgoing message annotations
+ qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations
+ int ma_phase; // phase for the override address
+ bool ma_stream; // indicates whether this message is streaming
+ qd_message_stream_data_list_t stream_data_list; // TODO - move this to the content for one-time parsing (TLR)
+ qd_message_stream_data_t *next_stream_data;
+ unsigned char *body_cursor;
+ qd_buffer_t *body_buffer;
+ bool strip_annotations_in;
+ bool send_complete; // Has the message been completely received and completely sent?
+ bool tag_sent; // Tags are sent
+ bool is_fanout; // If msg is an outgoing fanout
};
ALLOC_DECLARE(qd_message_t);
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 6ca2b2b..8c361a2 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -171,11 +171,13 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
break;
}
} while (settled != dlv->settled && !to_new_link); // oops missed the settlement
+
send_complete = qdr_delivery_send_complete(dlv);
if (send_complete || to_new_link) {
//
- // The entire message has been sent. It is now the appropriate time to have the delivery removed
- // from the head of the undelivered list and move it to the unsettled list if it is not settled.
+ // The entire message has been sent or the message has been moved from this link.
+ // It is now the appropriate time to remove the delivery from the head of the
+ // undelivered list to the unsettled list if it is not settled.
//
num_deliveries_completed++;
@@ -252,6 +254,42 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
}
+void qdr_link_complete_sent_message(qdr_core_t *core, qdr_link_t *link)
+{
+ if (!link || !link->conn)
+ return;
+
+ qdr_connection_t *conn = link->conn;
+ bool activate = false;
+
+ sys_mutex_lock(conn->work_lock);
+ qdr_delivery_t *dlv = DEQ_HEAD(link->undelivered);
+ if (!!dlv && qdr_delivery_send_complete(dlv)) {
+ DEQ_REMOVE_HEAD(link->undelivered);
+ if (!dlv->settled && !qdr_delivery_oversize(dlv) && !qdr_delivery_is_aborted(dlv)) {
+ DEQ_INSERT_TAIL(link->unsettled, dlv);
+ dlv->where = QDR_DELIVERY_IN_UNSETTLED;
+ qd_log(core->log, QD_LOG_DEBUG, "Delivery transfer: dlv:%lx qdr_link_complete_sent_message: undelivered-list -> unsettled-list", (long) dlv);
+ } else {
+ dlv->where = QDR_DELIVERY_NOWHERE;
+ qdr_delivery_decref(core, dlv, "qdr_link_complete_sent_message - removed from undelivered");
+ }
+
+ //
+ // If there's another delivery on the undelivered list, get the outbound process moving again.
+ //
+ if (DEQ_SIZE(link->undelivered) > 0) {
+ qdr_add_link_ref(&conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK);
+ activate = true;
+ }
+ }
+ sys_mutex_unlock(conn->work_lock);
+
+ if (activate)
+ conn->protocol_adaptor->activate_handler(conn->protocol_adaptor->user_context, conn);
+}
+
+
void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mode)
{
qdr_action_t *action = qdr_action(qdr_link_flow_CT, "link_flow");
diff --git a/tests/message_test.c b/tests/message_test.c
index 87271df..1d3a5d3 100644
--- a/tests/message_test.c
+++ b/tests/message_test.c
@@ -706,10 +706,10 @@ exit:
}
//
-// Testing protocol adapter 'body_data' interfaces
+// Testing protocol adapter 'stream_data' interfaces
//
-static void body_data_generate_message(qd_message_t *msg, char *s_chunk_size, char *s_n_chunks)
+static void stream_data_generate_message(qd_message_t *msg, char *s_chunk_size, char *s_n_chunks)
{
// Fill a message with n_chunks of vbin chunk_size body data.
@@ -747,20 +747,20 @@ static void body_data_generate_message(qd_message_t *msg, char *s_chunk_size, ch
}
}
-static void free_body_data_list(qd_message_t *msg_in)
+static void free_stream_data_list(qd_message_t *msg_in)
{
// DISPATCH-1800 - this should not be required here
qd_message_pvt_t *msg = (qd_message_pvt_t *)msg_in;
- qd_message_body_data_t *bd = DEQ_HEAD(msg->body_data_list);
+ qd_message_stream_data_t *bd = DEQ_HEAD(msg->stream_data_list);
while (bd) {
- qd_message_body_data_t *next = DEQ_NEXT(bd);
- free_qd_message_body_data_t(bd);
+ qd_message_stream_data_t *next = DEQ_NEXT(bd);
+ free_qd_message_stream_data_t(bd);
bd = next;
}
}
-static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
+static char *check_stream_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
{
// Fill a message with n chunks of vbin chunk_size body data.
// Then test by retrieving n chunks from a message copy and verifing.
@@ -786,7 +786,7 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
qd_message_pvt_t *msg_pvt = (qd_message_pvt_t *)msg;
// Set the original message content
- body_data_generate_message(msg, s_chunk_size, s_n_chunks);
+ stream_data_generate_message(msg, s_chunk_size, s_n_chunks);
// flatten if required
if (flatten) {
@@ -810,22 +810,22 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
// Define the number of raw buffers to be extracted on each loop
#define N_PN_RAW_BUFFS (2)
- qd_message_body_data_t *body_data;
+ qd_message_stream_data_t *stream_data;
for (int j=0; j<n_chunks; j++) {
received = 0; // this chunk received size in bytes.
- // Set up the next_body_data snapshot
- qd_message_body_data_result_t body_data_result = qd_message_next_body_data(copy, &body_data);
+ // Set up the next_stream_data snapshot
+ qd_message_stream_data_result_t stream_data_result = qd_message_next_stream_data(copy, &stream_data);
- if (body_data_result == QD_MESSAGE_BODY_DATA_OK) {
- // check body_data payload length
- if (body_data->payload.length != chunk_size) {
- printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
+ if (stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) {
+ // check stream_data payload length
+ if (stream_data->payload.length != chunk_size) {
+ printf("********** check_stream_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
"chunk_size:%s, n_chunks:%s, payload length error : %zu \n",
- BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, body_data->payload.length);
+ BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, stream_data->payload.length);
fflush(stdout);
- result = "qd_message_next_body_data returned wrong payload length.";
+ result = "qd_message_next_stream_data returned wrong payload length.";
break;
}
@@ -837,14 +837,14 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
pn_raw_buffer_t buffs[N_PN_RAW_BUFFS];
// used_buffers - Number of qd_buffers in content buffer chain consumed so far.
- // This number must increase as dictated by qd_message_body_data_buffers()
- // when vbin segments are consumed from the current body_data chunk.
+ // This number must increase as dictated by qd_message_stream_data_buffers()
+ // when vbin segments are consumed from the current stream_data chunk.
// A single vbin segment may consume 0, 1, or many qd_buffers.
size_t used_buffers = 0;
while (received < chunk_size) {
ZERO(buffs);
- size_t n_used = qd_message_body_data_buffers(body_data, buffs, used_buffers, N_PN_RAW_BUFFS);
+ size_t n_used = qd_message_stream_data_buffers(stream_data, buffs, used_buffers, N_PN_RAW_BUFFS);
if (n_used > 0) {
for (size_t ii=0; ii<n_used; ii++) {
char e_char = (char)(j + 1); // expected char in payload
@@ -852,7 +852,7 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
for (uint32_t idx=0; idx < buffs[ii].size; idx++) {
char actual = buffs[ii].bytes[buffs[ii].offset + idx];
if (e_char != actual) {
- printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
+ printf("********** check_stream_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
"chunk_size:%s, n_chunks:%s, verify error at index %d, expected:%d, actual:%d \n",
BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, received + idx, e_char,
actual);
@@ -865,7 +865,7 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
used_buffers += n_used;
if (!!result) break;
} else {
- printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
+ printf("********** check_stream_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
"chunk_size:%s, n_chunks:%s, received %d bytes (not enough) \n",
BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, received);
fflush(stdout);
@@ -873,7 +873,7 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
break;
}
if (received > chunk_size) {
- printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
+ printf("********** check_stream_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
"chunk_size:%s, n_chunks:%s, received %d bytes (too many) \n",
BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, received);
result = "Received too much data";
@@ -882,32 +882,30 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
}
// successful check
- } else if (body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) {
+ } else if (stream_data_result == QD_MESSAGE_STREAM_DATA_INCOMPLETE) {
result = "DATA_INCOMPLETE"; break;
} else {
- switch (body_data_result) {
- case QD_MESSAGE_BODY_DATA_NO_MORE:
+ switch (stream_data_result) {
+ case QD_MESSAGE_STREAM_DATA_NO_MORE:
result = "EOS"; break;
- case QD_MESSAGE_BODY_DATA_INVALID:
+ case QD_MESSAGE_STREAM_DATA_INVALID:
result = "Invalid body data for streaming message"; break;
- case QD_MESSAGE_BODY_DATA_NOT_DATA:
- result = "Invalid body; expected data section"; break;
default:
result = "result: default"; break;
}
}
}
- free_body_data_list(msg);
+ free_stream_data_list(msg);
qd_message_free(msg);
if (!!copy) {
- free_body_data_list(copy);
+ free_stream_data_list(copy);
qd_message_free(copy);
}
return result;
}
-static char *test_check_body_data(void * context)
+static char *test_check_stream_data(void * context)
{
char *result = 0;
@@ -919,16 +917,16 @@ static char *test_check_body_data(void * context)
for (int i=0; i<N_CHUNK_SIZES; i++) {
for (int j=0; j<N_N_CHUNKS; j++) {
- result = check_body_data(chunk_sizes[i], n_chunks[j], false);
+ result = check_stream_data(chunk_sizes[i], n_chunks[j], false);
if (!!result) {
- printf("test_check_body_data: chunk_size:%s, n_chunks:%s, flatten:%s, result:%s \n",
+ printf("test_check_stream_data: chunk_size:%s, n_chunks:%s, flatten:%s, result:%s \n",
chunk_sizes[i], n_chunks[j], "false", result);
fflush(stdout);
return result;
}
- result = check_body_data(chunk_sizes[i], n_chunks[j], true);
+ result = check_stream_data(chunk_sizes[i], n_chunks[j], true);
if (!!result) {
- printf("test_check_body_data: chunk_size:%s, n_chunks:%s, flatten:%s, result:%s \n",
+ printf("test_check_stream_data: chunk_size:%s, n_chunks:%s, flatten:%s, result:%s \n",
chunk_sizes[i], n_chunks[j], "true", result);
fflush(stdout);
return result;
@@ -939,10 +937,10 @@ static char *test_check_body_data(void * context)
}
-// Verify that qd_message_body_data_append() will break up a long binary data
+// Verify that qd_message_stream_data_append() will break up a long binary data
// field in order to avoid triggering Q2
//
-static char *test_check_body_data_append(void * context)
+static char *test_check_stream_data_append(void * context)
{
char *result = 0;
qd_message_t *msg = 0;
@@ -977,7 +975,7 @@ static char *test_check_body_data_append(void * context)
qd_message_compose_2(msg, field, false);
qd_compose_free(field);
- int depth = qd_message_body_data_append(msg, &bin_data);
+ int depth = qd_message_stream_data_append(msg, &bin_data);
if (depth <= buffer_count) {
// expected to add extra buffer(s) for meta-data
result = "append length is incorrect";
@@ -995,29 +993,30 @@ static char *test_check_body_data_append(void * context)
int bd_count = 0;
int total_buffers = 0;
- qd_message_body_data_t *body_data = 0;
+ qd_message_stream_data_t *stream_data = 0;
bool done = false;
while (!done) {
- switch (qd_message_next_body_data(msg, &body_data)) {
- case QD_MESSAGE_BODY_DATA_INCOMPLETE:
- case QD_MESSAGE_BODY_DATA_INVALID:
- case QD_MESSAGE_BODY_DATA_NOT_DATA:
+ switch (qd_message_next_stream_data(msg, &stream_data)) {
+ case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
+ case QD_MESSAGE_STREAM_DATA_INVALID:
result = "Next body data failed to get next body data";
goto exit;
- case QD_MESSAGE_BODY_DATA_NO_MORE:
+ case QD_MESSAGE_STREAM_DATA_NO_MORE:
done = true;
break;
- case QD_MESSAGE_BODY_DATA_OK:
+ case QD_MESSAGE_STREAM_DATA_BODY_OK:
bd_count += 1;
- // qd_message_body_data_append() breaks the buffer list up into
+ // qd_message_stream_data_append() breaks the buffer list up into
// smaller lists that are no bigger than QD_QLIMIT_Q2_LOWER buffers
// long
- total_buffers += qd_message_body_data_buffer_count(body_data);
- if (qd_message_body_data_buffer_count(body_data) > QD_QLIMIT_Q2_LOWER) {
+ total_buffers += qd_message_stream_data_buffer_count(stream_data);
+ if (qd_message_stream_data_buffer_count(stream_data) > QD_QLIMIT_Q2_LOWER) {
result = "Body data list length too long!";
goto exit;
}
- qd_message_body_data_release(body_data);
+ qd_message_stream_data_release(stream_data);
+ break;
+ case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
break;
}
}
@@ -1051,8 +1050,8 @@ int message_tests(void)
TEST_CASE(test_q2_input_holdoff_sensing, 0);
TEST_CASE(test_incomplete_annotations, 0);
TEST_CASE(test_check_weird_messages, 0);
- TEST_CASE(test_check_body_data, 0);
- TEST_CASE(test_check_body_data_append, 0);
+ TEST_CASE(test_check_stream_data, 0);
+ TEST_CASE(test_check_stream_data_append, 0);
return result;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org