You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/10/28 20:47:10 UTC

[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1742 - Added function for asynchronous completion of sent-messages. Improved API docs. DISPATCH-1742 - Updated reference adaptor to make multiple extend calls. DISPATCH-1804 - Changed body_data API to stream_data to encompass both body_data and footer performatives.

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
     new cef0f5a  DISPATCH-1742 - Added function for asynchronous completion of sent-messages. Improved API docs. DISPATCH-1742 - Updated reference adaptor to make multiple extend calls. DISPATCH-1804 - Changed body_data API to stream_data to encompass both body_data and footer performatives.
cef0f5a is described below

commit cef0f5a5ffae788b2692551775df9e4daba1305b
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Oct 28 16:44:19 2020 -0400

    DISPATCH-1742 - Added function for asynchronous completion of sent-messages. Improved API docs.
    DISPATCH-1742 - Updated reference adaptor to make multiple extend calls.
    DISPATCH-1804 - Changed body_data API to stream_data to encompass both body_data and footer performatives.
---
 include/qpid/dispatch/http1_codec.h      |  12 +--
 include/qpid/dispatch/message.h          |  80 +++++++-------
 include/qpid/dispatch/protocol_adaptor.h |  37 +++++++
 src/adaptors/http1/http1_adaptor.c       |  20 ++--
 src/adaptors/http1/http1_client.c        |  32 +++---
 src/adaptors/http1/http1_codec.c         |  16 +--
 src/adaptors/http1/http1_private.h       |   8 +-
 src/adaptors/http1/http1_server.c        |  28 ++---
 src/adaptors/http2/http2_adaptor.c       | 130 +++++++++++------------
 src/adaptors/http2/http2_adaptor.h       |  12 +--
 src/adaptors/reference_adaptor.c         |  94 +++++++++++------
 src/adaptors/tcp_adaptor.c               |  50 +++++----
 src/message.c                            | 173 +++++++++++++++++--------------
 src/message_private.h                    |  42 ++++----
 src/router_core/transfer.c               |  42 +++++++-
 tests/message_test.c                     | 103 +++++++++---------
 16 files changed, 498 insertions(+), 381 deletions(-)

diff --git a/include/qpid/dispatch/http1_codec.h b/include/qpid/dispatch/http1_codec.h
index 6967cc0..132f06c 100644
--- a/include/qpid/dispatch/http1_codec.h
+++ b/include/qpid/dispatch/http1_codec.h
@@ -87,12 +87,12 @@ typedef struct h1_codec_config_t {
     //
     void (*tx_buffers)(h1_codec_request_state_t *hrs, qd_buffer_list_t *data, unsigned int len);
 
-    // tx_body_data()
-    // Called with body_data containing encoded HTTP message data. Only
+    // tx_stream_data()
+    // Called with stream_data containing encoded HTTP message data. Only
     // called if the outgoing HTTP message has a body. The caller assumes
-    // ownership of the body_data and must release it when done.
+    // ownership of the stream_data and must release it when done.
     //
-    void (*tx_body_data)(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data);
+    void (*tx_stream_data)(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data);
 
     //
     // RX message callbacks
@@ -227,9 +227,9 @@ int h1_codec_tx_response(h1_codec_request_state_t *hrs, int status_code, const c
 //
 int h1_codec_tx_add_header(h1_codec_request_state_t *hrs, const char *key, const char *value);
 
-// Stream outgoing body data.  Ownership of body_data is passed to the caller.
+// Stream outgoing body data.  Ownership of stream_data is passed to the caller.
 //
-int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data);
+int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data);
 
 // outgoing message construction complete.  The request_complete() callback MAY
 // occur during this call.
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index b797eca..41ec73b 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -59,8 +59,8 @@
 
 // Callback for status change (confirmed persistent, loaded-in-memory, etc.)
 
-typedef struct qd_message_t           qd_message_t;
-typedef struct qd_message_body_data_t qd_message_body_data_t;
+typedef struct qd_message_t             qd_message_t;
+typedef struct qd_message_stream_data_t qd_message_stream_data_t;
 
 /** Amount of message to be parsed.  */
 typedef enum {
@@ -310,93 +310,93 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field);
 
 
 /**
- * qd_message_body_data_iterator
+ * qd_message_stream_data_iterator
  *
  * Return an iterator that references the content (not the performative headers)
  * of the entire body-data section.
  *
  * The returned iterator must eventually be freed by the caller.
  *
- * @param body_data Pointer to a body_data object produced by qd_message_next_body_data
- * @return Pointer to an iterator referencing the body_data content
+ * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
+ * @return Pointer to an iterator referencing the stream_data content
  */
-qd_iterator_t *qd_message_body_data_iterator(const qd_message_body_data_t *body_data);
+qd_iterator_t *qd_message_stream_data_iterator(const qd_message_stream_data_t *stream_data);
 
 
 /**
- * qd_message_body_data_buffer_count
+ * qd_message_stream_data_buffer_count
  *
  * Return the number of buffers that are needed to hold this body-data's content.
  *
- * @param body_data Pointer to a body_data object produced by qd_message_next_body_data
- * @return Number of pn_raw_buffers needed to contain the entire content of this body_data.
+ * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
+ * @return Number of pn_raw_buffers needed to contain the entire content of this stream_data.
  */
-int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data);
+int qd_message_stream_data_buffer_count(const qd_message_stream_data_t *stream_data);
 
 
 /**
- * qd_message_body_data_buffers
+ * qd_message_stream_data_buffers
  *
- * Populate an array of pn_raw_buffer_t objects with references to the body_data's content.
+ * Populate an array of pn_raw_buffer_t objects with references to the stream_data's content.
  *
- * @param body_data Pointer to a body_data object produced by qd_message_next_body_data
+ * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
  * @param buffers Pointer to an array of pn_raw_buffer_t objects
- * @param offset The offset (in the body_data's buffer set) from which copying should begin
+ * @param offset The offset (in the stream_data's buffer set) from which copying should begin
  * @param count The number of pn_raw_buffer_t objects in the buffers array
  * @return The number of pn_raw_buffer_t objects that were overwritten
  */
-int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffer_t *buffers, int offset, int count);
+int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw_buffer_t *buffers, int offset, int count);
 
 /**
- * qd_message_body_data_payload_length
+ * qd_message_stream_data_payload_length
  *
- * Given a body_data object, return the length of the payload.
- * This will equal the sum of the length of all qd_buffer_t objects contained in payload portion of the body_data object
+ * Given a stream_data object, return the length of the payload.
+ * This will equal the sum of the length of all qd_buffer_t objects contained in payload portion of the stream_data object
  *
- * @param body_data Pointer to a body_data object produced by qd_message_next_body_data
+ * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data
  * @return The length of the payload of the passed in body data object.
  */
-size_t qd_message_body_data_payload_length(const qd_message_body_data_t *body_data);
+size_t qd_message_stream_data_payload_length(const qd_message_stream_data_t *stream_data);
 
 
 /**
- * qd_message_body_data_release
+ * qd_message_stream_data_release
  *
  * Release buffers that were associated with a body-data section.  It is not required that body-data
  * objects be released in the same order in which they were offered.
  *
- * Once this function is called, the caller must drop its reference to the body_data object
+ * Once this function is called, the caller must drop its reference to the stream_data object
  * and not use it again.
  *
- * @param body_data Pointer to a body data object returned by qd_message_next_body_data
+ * @param stream_data Pointer to a body data object returned by qd_message_next_stream_data
  */
-void qd_message_body_data_release(qd_message_body_data_t *body_data);
+void qd_message_stream_data_release(qd_message_stream_data_t *stream_data);
 
 
 typedef enum {
-    QD_MESSAGE_BODY_DATA_OK,           // A valid body data object have been returned
-    QD_MESSAGE_BODY_DATA_INCOMPLETE,   // The next body data is incomplete, try again later
-    QD_MESSAGE_BODY_DATA_NO_MORE,      // There are no more body data objects in this stream
-    QD_MESSAGE_BODY_DATA_INVALID,      // The next body data is invalid, the stream is corrupted
-    QD_MESSAGE_BODY_DATA_NOT_DATA      // The body of the message is not a DATA segment
-} qd_message_body_data_result_t;
+    QD_MESSAGE_STREAM_DATA_BODY_OK,      // A valid body data object has been returned
+    QD_MESSAGE_STREAM_DATA_FOOTER_OK,    // A valid footer has been returned
+    QD_MESSAGE_STREAM_DATA_INCOMPLETE,   // The next body data is incomplete, try again later
+    QD_MESSAGE_STREAM_DATA_NO_MORE,      // There are no more body data objects in this stream
+    QD_MESSAGE_STREAM_DATA_INVALID       // The next body data is invalid, the stream is corrupted
+} qd_message_stream_data_result_t;
 
 
 /**
- * qd_message_next_body_data
+ * qd_message_next_stream_data
  *
  * Get the next body-data section from this streaming message return the result and
- * possibly the valid, completed body_data object.
+ * possibly the valid, completed stream_data object.
  *
  * @param msg Pointer to a message
- * @param body_data Output pointer to a body_data object (or 0 if not OK)
- * @return The body_data_result describing the result of this operation
+ * @param stream_data Output pointer to a stream_data object (or 0 if not OK)
+ * @return The stream_data_result describing the result of this operation
  */
-qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *msg, qd_message_body_data_t **body_data);
+qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *msg, qd_message_stream_data_t **stream_data);
 
 
 /**
- * qd_message_body_data_append
+ * qd_message_stream_data_append
  *
  * Append the buffers in data as a sequence of one or more BODY_DATA sections
  * to the given message.  The buffers in data are moved into the message
@@ -406,7 +406,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *msg, qd_me
  * @param data List of buffers containing body data.
  * @return The number of buffers stored in the message's content
  */
-int qd_message_body_data_append(qd_message_t *msg, qd_buffer_list_t *data);
+int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data);
 
 
 /** Put string representation of a message suitable for logging in buffer.
@@ -424,7 +424,7 @@ qd_log_source_t* qd_message_log_source();
  * @param msg A pointer to the message
  * @return the parsed field
  */
-qd_parsed_field_t *qd_message_get_ingress    (qd_message_t *msg);
+qd_parsed_field_t *qd_message_get_ingress(qd_message_t *msg);
 
 /**
  * Accessor for message field phase
@@ -432,7 +432,7 @@ qd_parsed_field_t *qd_message_get_ingress    (qd_message_t *msg);
  * @param msg A pointer to the message
  * @return the parsed field
  */
-qd_parsed_field_t *qd_message_get_phase      (qd_message_t *msg);
+qd_parsed_field_t *qd_message_get_phase(qd_message_t *msg);
 
 /**
  * Accessor for message field to_override
@@ -448,7 +448,7 @@ qd_parsed_field_t *qd_message_get_to_override(qd_message_t *msg);
  * @param msg A pointer to the message
  * @return the parsed field
  */
-qd_parsed_field_t *qd_message_get_trace      (qd_message_t *msg);
+qd_parsed_field_t *qd_message_get_trace(qd_message_t *msg);
 
 /**
  * Accessor for message field phase
diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h
index 29c7a8b..909b860 100644
--- a/include/qpid/dispatch/protocol_adaptor.h
+++ b/include/qpid/dispatch/protocol_adaptor.h
@@ -857,8 +857,45 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
                                                 const uint8_t *tag, int tag_length,
                                                 uint64_t remote_disposition,
                                                 pn_data_t *remote_extension_state);
+
+/**
+ * qdr_link_process_deliveries
+ *
+ * This function is called by the protocol adaptor in the context of the link_push
+ * callback.  It provides the core module access to the IO thread so the core can
+ * deliver outgoing messages to the adaptor.
+ *
+ * @param core Pointer to the router core object
+ * @param link Pointer to the link being processed
+ * @param credit The maximum number of deliveries to be processed on this link
+ * @return The number of deliveries that were completed during the processing
+ */
 int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit);
 
+
+/**
+ * qdr_link_complete_sent_message
+ *
+ * If an outgoing message is completed outside of the context of the link_deliver callback,
+ * this function must be called to inform the router core that the delivery on the head of
+ * the link's undelivered list can be moved out of that list.  Ensure that the send-complete
+ * status of the message has been set before calling this function.  This function will check
+ * the send-complete status of the head delivery on the link's undelivered list.  If it is
+ * true, that delivery will be removed from the undelivered list.
+ *
+ * DO NOT call this function from within the link_deliver callback.  Use it only if you must
+ * asynchronously complete the sending of the current message.
+ *
+ * This will typically occur when a message delivered to the protcol adaptor cannot be sent
+ * on the wire due to back-pressure.  In this case, the removal of the back pressure is the
+ * stimulus for completing the send of the message.
+ *
+ * @param core Pointer to the router core object
+ * @param link Pointer to the link on which the head delivery has been completed
+ */
+void qdr_link_complete_sent_message(qdr_core_t *core, qdr_link_t *link);
+
+
 void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mode);
 
 /**
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index 2207478..eb8349a 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -138,8 +138,8 @@ void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data)
         qdr_http1_out_data_t *od = DEQ_HEAD(out_data->fifo);
         while (od) {
             DEQ_REMOVE_HEAD(out_data->fifo);
-            if (od->body_data)
-                qd_message_body_data_release(od->body_data);
+            if (od->stream_data)
+                qd_message_stream_data_release(od->stream_data);
             else
                 qd_buffer_list_free_buffers(&od->raw_buffers);
             free_qdr_http1_out_data_t(od);
@@ -265,9 +265,9 @@ uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_d
             size_t limit = MIN(RAW_BUFFER_BATCH, od_len);
             int written = 0;
 
-            if (od->body_data) {  // buffers stored in qd_message_t
+            if (od->stream_data) {  // buffers stored in qd_message_t
 
-                written = qd_message_body_data_buffers(od->body_data, buffers, od->next_buffer, limit);
+                written = qd_message_stream_data_buffers(od->stream_data, buffers, od->next_buffer, limit);
                 for (int i = 0; i < written; ++i) {
                     // enforce this: we expect the context can be used by the adaptor!
                     assert(buffers[i].context == 0);
@@ -352,14 +352,14 @@ void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_li
 // The HTTP encoder has a message body data to be written to the raw connection.
 // Queue it to the outgoing data fifo.
 //
-void qdr_http1_enqueue_body_data(qdr_http1_out_data_fifo_t *fifo, qd_message_body_data_t *body_data)
+void qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, qd_message_stream_data_t *stream_data)
 {
-    int count = qd_message_body_data_buffer_count(body_data);
+    int count = qd_message_stream_data_buffer_count(stream_data);
     if (count) {
         qdr_http1_out_data_t *od = new_qdr_http1_out_data_t();
         ZERO(od);
         od->owning_fifo = fifo;
-        od->body_data = body_data;
+        od->stream_data = stream_data;
         od->buffer_count = count;
 
         DEQ_INSERT_TAIL(fifo->fifo, od);
@@ -367,7 +367,7 @@ void qdr_http1_enqueue_body_data(qdr_http1_out_data_fifo_t *fifo, qd_message_bod
             fifo->write_ptr = od;
     } else {
         // empty body-data
-        qd_message_body_data_release(body_data);
+        qd_message_stream_data_release(stream_data);
     }
 }
 
@@ -401,8 +401,8 @@ void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn)
                 // all buffers returned
                 qdr_http1_out_data_fifo_t *fifo = od->owning_fifo;
                 DEQ_REMOVE(fifo->fifo, od);
-                if (od->body_data)
-                    qd_message_body_data_release(od->body_data);
+                if (od->stream_data)
+                    qd_message_stream_data_release(od->stream_data);
                 else
                     qd_buffer_list_free_buffers(&od->raw_buffers);
                 free_qdr_http1_out_data_t(od);
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 708d2a9..0a82599 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -86,7 +86,7 @@ ALLOC_DEFINE(_client_request_t);
 
 
 static void _client_tx_buffers_cb(h1_codec_request_state_t *lib_hrs, qd_buffer_list_t *blist, unsigned int len);
-static void _client_tx_body_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_body_data_t *body_data);
+static void _client_tx_stream_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_stream_data_t *stream_data);
 static int _client_rx_request_cb(h1_codec_request_state_t *lib_rs,
                                  const char *method,
                                  const char *target,
@@ -133,7 +133,7 @@ static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li)
     h1_codec_config_t config = {0};
     config.type             = HTTP1_CONN_CLIENT;
     config.tx_buffers       = _client_tx_buffers_cb;
-    config.tx_body_data     = _client_tx_body_data_cb;
+    config.tx_stream_data   = _client_tx_stream_data_cb;
     config.rx_request       = _client_rx_request_cb;
     config.rx_response      = _client_rx_response_cb;
     config.rx_header        = _client_rx_header_cb;
@@ -554,9 +554,9 @@ static void _client_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_
 }
 
 
-// Encoder callback: send body_data buffers (response msg) to client endpoint
+// Encoder callback: send stream_data buffers (response msg) to client endpoint
 //
-static void _client_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data)
+static void _client_tx_stream_data_cb(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data)
 {
     _client_request_t       *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs);
     qdr_http1_connection_t *hconn = hreq->base.hconn;
@@ -565,7 +565,7 @@ static void _client_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_bo
         // client connection has been lost
         qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
                "[C%"PRIu64"] Discarding outgoing data - client connection closed", hconn->conn_id);
-        qd_message_body_data_release(body_data);
+        qd_message_stream_data_release(stream_data);
         return;
     }
 
@@ -578,7 +578,7 @@ static void _client_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_bo
 
     _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
     assert(rmsg);
-    qdr_http1_enqueue_body_data(&rmsg->out_data, body_data);
+    qdr_http1_enqueue_stream_data(&rmsg->out_data, stream_data);
 
     // if this happens to be the current outgoing response try writing to the
     // raw connection
@@ -781,7 +781,7 @@ static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b
            "[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.",
            hconn->conn_id, hconn->in_link_id, len);
 
-    qd_message_body_data_append(msg, body);
+    qd_message_stream_data_append(msg, body);
 
     //
     // Notify the router that more data is ready to be pushed out on the delivery
@@ -1119,18 +1119,18 @@ static uint64_t _encode_response_message(_client_request_t *hreq,
         }
     }
 
-    qd_message_body_data_t *body_data = 0;
+    qd_message_stream_data_t *stream_data = 0;
 
     while (true) {
-        switch (qd_message_next_body_data(msg, &body_data)) {
+        switch (qd_message_next_stream_data(msg, &stream_data)) {
 
-        case QD_MESSAGE_BODY_DATA_OK:
+        case QD_MESSAGE_STREAM_DATA_BODY_OK:
 
             qd_log(hconn->adaptor->log, QD_LOG_TRACE,
                    "[C%"PRIu64"][L%"PRIu64"] Encoding response body data",
                    hconn->conn_id, hconn->out_link_id);
 
-            if (h1_codec_tx_body(hreq->base.lib_rs, body_data)) {
+            if (h1_codec_tx_body(hreq->base.lib_rs, stream_data)) {
                 qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
                        "[C%"PRIu64"][L%"PRIu64"] body data encode failed",
                        hconn->conn_id, hconn->out_link_id);
@@ -1138,18 +1138,20 @@ static uint64_t _encode_response_message(_client_request_t *hreq,
             }
             break;
 
-        case QD_MESSAGE_BODY_DATA_NO_MORE:
+        case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
+            break;
+
+        case QD_MESSAGE_STREAM_DATA_NO_MORE:
             // indicate this message is complete
             qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
                    "[C%"PRIu64"][L%"PRIu64"] response message encoding completed",
                    hconn->conn_id, hconn->out_link_id);
             return PN_ACCEPTED;
 
-        case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+        case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
             return 0;  // wait for more
 
-        case QD_MESSAGE_BODY_DATA_INVALID:
-        case QD_MESSAGE_BODY_DATA_NOT_DATA:
+        case QD_MESSAGE_STREAM_DATA_INVALID:
             qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
                    "[C%"PRIu64"][L%"PRIu64"] Rejecting corrupted body data.",
                    hconn->conn_id, hconn->out_link_id);
diff --git a/src/adaptors/http1/http1_codec.c b/src/adaptors/http1/http1_codec.c
index 93bc73a..5cb16b3 100644
--- a/src/adaptors/http1/http1_codec.c
+++ b/src/adaptors/http1/http1_codec.c
@@ -966,7 +966,7 @@ static bool parse_header(h1_codec_connection_t *conn, struct decoder_t *decoder)
 
 // Pass message body data up to the application.
 //
-static inline int consume_body_data(h1_codec_connection_t *conn, bool flush)
+static inline int consume_stream_data(h1_codec_connection_t *conn, bool flush)
 {
     struct decoder_t       *decoder = &conn->decoder;
     qd_iterator_pointer_t *body_ptr = &decoder->body_ptr;
@@ -1092,7 +1092,7 @@ static bool parse_body_chunked_data(h1_codec_connection_t *conn, struct decoder_
     decoder->chunk_length -= skipped;
     body_ptr->remaining += skipped;
 
-    consume_body_data(conn, false);
+    consume_stream_data(conn, false);
 
     if (decoder->chunk_length == 0) {
         // end of chunk
@@ -1117,7 +1117,7 @@ static bool parse_body_chunked_trailer(h1_codec_connection_t *conn, struct decod
         body_ptr->remaining += line.remaining;
         if (is_empty_line(&line)) {
             // end of message
-            consume_body_data(conn, true);
+            consume_stream_data(conn, true);
             decoder->state = HTTP1_MSG_STATE_DONE;
         }
 
@@ -1164,7 +1164,7 @@ static bool parse_body_content(h1_codec_connection_t *conn, struct decoder_t *de
     body_ptr->remaining += skipped;
     bool eom = decoder->content_length == 0;
 
-    consume_body_data(conn, eom);
+    consume_stream_data(conn, eom);
     if (eom)
         decoder->state = HTTP1_MSG_STATE_DONE;
 
@@ -1188,7 +1188,7 @@ static bool parse_body(h1_codec_connection_t *conn, struct decoder_t *decoder)
         decoder->read_ptr.remaining = 0;
         decoder->read_ptr.buffer = DEQ_TAIL(decoder->incoming);
         decoder->read_ptr.cursor = qd_buffer_cursor(decoder->read_ptr.buffer);
-        consume_body_data(conn, true);
+        consume_stream_data(conn, true);
         decoder->body_ptr = decoder->read_ptr = NULL_I_PTR;
         DEQ_INIT(decoder->incoming);
     }
@@ -1489,7 +1489,7 @@ static inline void _flush_headers(h1_codec_request_state_t *hrs, struct encoder_
 
 
 // just forward the body chain along
-int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data)
+int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data)
 {
     h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs);
     struct encoder_t *encoder = &conn->encoder;
@@ -1498,8 +1498,8 @@ int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_body_data_t *body
         _flush_headers(hrs, encoder);
 
     // skip the outgoing queue and send directly
-    hrs->out_octets += qd_message_body_data_payload_length(body_data);
-    conn->config.tx_body_data(hrs, body_data);
+    hrs->out_octets += qd_message_stream_data_payload_length(stream_data);
+    conn->config.tx_stream_data(hrs, stream_data);
 
     return 0;
 }
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
index d0d2306..593803d 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -60,10 +60,10 @@ extern qdr_http1_adaptor_t *qdr_http1_adaptor;
 // Data to be written out the raw connection.
 //
 // This adaptor has to cope with two different data sources: the HTTP1 encoder
-// and the qd_message_body_data_t list.  The HTTP1 encoder produces a simple
+// and the qd_message_stream_data_t list.  The HTTP1 encoder produces a simple
 // qd_buffer_list_t for outgoing header data whose ownership is given to the
 // adaptor: the adaptor is free to deque/free these buffers as needed.  The
-// qd_message_body_data_t buffers are shared with the owning message and the
+// qd_message_stream_data_t buffers are shared with the owning message and the
 // buffer list must not be modified by the adaptor.  The qdr_http1_out_data_t
 // is used to manage both types of data sources.
 //
@@ -76,7 +76,7 @@ struct qdr_http1_out_data_t {
     // or a message body data (not both!)
 
     qd_buffer_list_t raw_buffers;
-    qd_message_body_data_t *body_data;
+    qd_message_stream_data_t *stream_data;
 
     int buffer_count;  // # total buffers
     int next_buffer;   // offset to next buffer to send
@@ -207,7 +207,7 @@ ALLOC_DECLARE(qdr_http1_connection_t);
 
 void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn);
 void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_list_t *blist);
-void qdr_http1_enqueue_body_data(qdr_http1_out_data_fifo_t *fifo, qd_message_body_data_t *body_data);
+void qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, qd_message_stream_data_t *stream_data);
 uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_fifo_t *fifo);
 void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data);
 // return the number of buffers currently held by the proactor for writing
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 21e3e38..98af72a 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -95,7 +95,7 @@ ALLOC_DEFINE(_server_request_t);
 #define MAX_RECONNECT    5  // 5 * 500 = 2.5 sec
 
 static void _server_tx_buffers_cb(h1_codec_request_state_t *lib_hrs, qd_buffer_list_t *blist, unsigned int len);
-static void _server_tx_body_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_body_data_t *body_data);
+static void _server_tx_stream_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_stream_data_t *stream_data);
 static int  _server_rx_request_cb(h1_codec_request_state_t *hrs,
                                   const char *method,
                                   const char *target,
@@ -321,7 +321,7 @@ static void _setup_server_links(qdr_http1_connection_t *hconn)
         h1_codec_config_t config = {0};
         config.type             = HTTP1_CONN_SERVER;
         config.tx_buffers       = _server_tx_buffers_cb;
-        config.tx_body_data     = _server_tx_body_data_cb;
+        config.tx_stream_data   = _server_tx_stream_data_cb;
         config.rx_request       = _server_rx_request_cb;
         config.rx_response      = _server_rx_response_cb;
         config.rx_header        = _server_rx_header_cb;
@@ -684,7 +684,7 @@ static void _server_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_
 
 // Encoder has body data to send to the server
 //
-static void _server_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data)
+static void _server_tx_stream_data_cb(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data)
 {
     _server_request_t       *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs);
     qdr_http1_connection_t *hconn = hreq->base.hconn;
@@ -692,7 +692,7 @@ static void _server_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_bo
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
            "[C%"PRIu64"][L%"PRIu64"] Sending body data to server",
            hconn->conn_id, hconn->out_link_id);
-    qdr_http1_enqueue_body_data(&hreq->out_data, body_data);
+    qdr_http1_enqueue_stream_data(&hreq->out_data, stream_data);
     if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests) && hconn->raw_conn) {
         _write_pending_request(hreq);
     }
@@ -883,7 +883,7 @@ static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b
            "[C%"PRIu64"][L%"PRIu64"] HTTP response body received len=%zu.",
            hconn->conn_id, hconn->in_link_id, len);
 
-    qd_message_body_data_append(msg, body);
+    qd_message_stream_data_append(msg, body);
 
     //
     // Notify the router that more data is ready to be pushed out on the delivery
@@ -1246,17 +1246,17 @@ static uint64_t _encode_request_message(_server_request_t *hreq)
         hreq->headers_encoded = true;
     }
 
-    qd_message_body_data_t *body_data = 0;
+    qd_message_stream_data_t *stream_data = 0;
 
     while (true) {
-        switch (qd_message_next_body_data(msg, &body_data)) {
-        case QD_MESSAGE_BODY_DATA_OK: {
+        switch (qd_message_next_stream_data(msg, &stream_data)) {
+        case QD_MESSAGE_STREAM_DATA_BODY_OK: {
 
             qd_log(hconn->adaptor->log, QD_LOG_TRACE,
                    "[C%"PRIu64"][L%"PRIu64"] Encoding request body data",
                    hconn->conn_id, hconn->out_link_id);
 
-            if (h1_codec_tx_body(hreq->base.lib_rs, body_data)) {
+            if (h1_codec_tx_body(hreq->base.lib_rs, stream_data)) {
                 qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
                        "[C%"PRIu64"][L%"PRIu64"] body data encode failed",
                        hconn->conn_id, hconn->out_link_id);
@@ -1265,21 +1265,23 @@ static uint64_t _encode_request_message(_server_request_t *hreq)
             break;
         }
 
-        case QD_MESSAGE_BODY_DATA_NO_MORE:
+        case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
+            break;
+
+        case QD_MESSAGE_STREAM_DATA_NO_MORE:
             // indicate this message is complete
             qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
                    "[C%"PRIu64"][L%"PRIu64"] request message encoding completed",
                    hconn->conn_id, hconn->out_link_id);
             return PN_ACCEPTED;
 
-        case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+        case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
             qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
                    "[C%"PRIu64"][L%"PRIu64"] body data need more",
                    hconn->conn_id, hconn->out_link_id);
             return 0;  // wait for more
 
-        case QD_MESSAGE_BODY_DATA_INVALID:
-        case QD_MESSAGE_BODY_DATA_NOT_DATA:
+        case QD_MESSAGE_STREAM_DATA_INVALID:
             qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
                    "[C%"PRIu64"][L%"PRIu64"] Rejecting corrupted body data.",
                    hconn->conn_id, hconn->out_link_id);
diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index 8618b81..996e897 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -405,7 +405,7 @@ static int on_data_chunk_recv_callback(nghttp2_session *session,
     qd_buffer_list_append(&buffers, (uint8_t *)data, len);
 
     if (stream_data->in_dlv) {
-        qd_message_body_data_append(stream_data->message, &buffers);
+        qd_message_stream_data_append(stream_data->message, &buffers);
     }
     else {
         if (!stream_data->body) {
@@ -463,7 +463,7 @@ static int snd_data_callback(nghttp2_session *session,
     int bytes_sent = 0; // This should not include the header length of 9.
     if (length) {
         pn_raw_buffer_t pn_raw_buffs[stream_data->qd_buffers_to_send];
-        qd_message_body_data_buffers(stream_data->curr_body_data, pn_raw_buffs, 0, stream_data->qd_buffers_to_send);
+        qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, 0, stream_data->qd_buffers_to_send);
 
         int idx = 0;
         while (idx < stream_data->qd_buffers_to_send) {
@@ -481,16 +481,16 @@ static int snd_data_callback(nghttp2_session *session,
 //                    bytes_sent += bytes_remaining;
 //                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback memcpy bytes_remaining=%i", conn->conn_id, stream_data->stream_id, bytes_remaining);
 //                }
-                stream_data->curr_body_data_qd_buff_offset += 1;
+                stream_data->curr_stream_data_qd_buff_offset += 1;
             }
             idx += 1;
         }
     }
     if (stream_data->full_payload_handled) {
-        qd_message_body_data_release(stream_data->curr_body_data);
-        stream_data->curr_body_data = 0;
-        stream_data->curr_body_data_qd_buff_offset = 0;
-        qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, qd_message_body_data_release", conn->conn_id, stream_data->stream_id);
+        qd_message_stream_data_release(stream_data->curr_stream_data);
+        stream_data->curr_stream_data = 0;
+        stream_data->curr_stream_data_qd_buff_offset = 0;
+        qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, qd_message_stream_data_release", conn->conn_id, stream_data->stream_id);
     }
 
     qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 snd_data_callback finished, length=%zu, bytes_sent=%i, stream_data=%p", conn->conn_id, stream_data->stream_id, length, bytes_sent, (void *)stream_data);
@@ -935,21 +935,21 @@ ssize_t read_data_callback(nghttp2_session *session,
         //
         qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_DEPTH_OK", conn->conn_id, stream_data->stream_id);
 
-        if (stream_data->next_body_data) {
-            stream_data->curr_body_data = stream_data->next_body_data;
-            stream_data->curr_body_data_result = stream_data->next_body_data_result;
-            stream_data->next_body_data = 0;
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback Use next_body_data", conn->conn_id, stream_data->stream_id);
+        if (stream_data->next_stream_data) {
+            stream_data->curr_stream_data = stream_data->next_stream_data;
+            stream_data->curr_stream_data_result = stream_data->next_stream_data_result;
+            stream_data->next_stream_data = 0;
+            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback Use next_stream_data", conn->conn_id, stream_data->stream_id);
         }
 
-        if (!stream_data->curr_body_data) {
-            stream_data->curr_body_data_result = qd_message_next_body_data(message, &stream_data->curr_body_data);
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback No body data, get qd_message_next_body_data", conn->conn_id, stream_data->stream_id);
+        if (!stream_data->curr_stream_data) {
+            stream_data->curr_stream_data_result = qd_message_next_stream_data(message, &stream_data->curr_stream_data);
+            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback No body data, get qd_message_next_stream_data", conn->conn_id, stream_data->stream_id);
 
         }
 
-        switch (stream_data->curr_body_data_result) {
-        case QD_MESSAGE_BODY_DATA_OK: {
+        switch (stream_data->curr_stream_data_result) {
+        case QD_MESSAGE_STREAM_DATA_BODY_OK: {
             //
             // We have a new valid body-data segment.  Handle it
             //
@@ -966,29 +966,29 @@ ssize_t read_data_callback(nghttp2_session *session,
             }
 
             // total length of the payload (across all qd_buffers in the current body data)
-            size_t payload_length = qd_message_body_data_payload_length(stream_data->curr_body_data);
+            size_t payload_length = qd_message_stream_data_payload_length(stream_data->curr_stream_data);
 
             if (payload_length == 0) {
                 //
-                // Current body data has payload length zero. Release the curr_body_data
+                // Current body data has payload length zero. Release the curr_stream_data
                 //
                 qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, payload_length=0", conn->conn_id, stream_data->stream_id);
-                qd_message_body_data_release(stream_data->curr_body_data);
-                stream_data->curr_body_data = 0;
+                qd_message_stream_data_release(stream_data->curr_stream_data);
+                stream_data->curr_stream_data = 0;
 
-                // The payload length is zero on this body data. Look ahead one body data to see if it is QD_MESSAGE_BODY_DATA_NO_MORE
-                stream_data->next_body_data_result = qd_message_next_body_data(message, &stream_data->next_body_data);
-                if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) {
+                // The payload length is zero on this body data. Look ahead one body data to see if it is QD_MESSAGE_STREAM_DATA_NO_MORE
+                stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
+                if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) {
                     *data_flags |= NGHTTP2_DATA_FLAG_EOF;
                     stream_data->out_msg_body_sent = true;
                     stream_data->full_payload_handled = true;
-                    qd_message_body_data_release(stream_data->next_body_data);
-                    stream_data->next_body_data = 0;
+                    qd_message_stream_data_release(stream_data->next_stream_data);
+                    stream_data->next_stream_data = 0;
                     stream_data->out_dlv_local_disposition = PN_ACCEPTED;
                     if ((*data_flags & NGHTTP2_DATA_FLAG_EOF) && conn->ingress) {
                         _http_record_request(conn, stream_data);
                     }
-                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, payload_length=0 and next_body_data=QD_MESSAGE_BODY_DATA_NO_MORE", conn->conn_id, stream_data->stream_id);
+                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, payload_length=0 and next_stream_data=QD_MESSAGE_STREAM_DATA_NO_MORE", conn->conn_id, stream_data->stream_id);
                     return 0;
                 }
 
@@ -1002,35 +1002,35 @@ ssize_t read_data_callback(nghttp2_session *session,
                 return NGHTTP2_ERR_DEFERRED;
             }
 
-            stream_data->body_data_buff_count = qd_message_body_data_buffer_count(stream_data->curr_body_data);
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, stream_data->body_data_buff_count=%i, payload_length=%zu\n", conn->conn_id, stream_data->stream_id, stream_data->body_data_buff_count, payload_length);
+            stream_data->stream_data_buff_count = qd_message_stream_data_buffer_count(stream_data->curr_stream_data);
+            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, stream_data->stream_data_buff_count=%i, payload_length=%zu\n", conn->conn_id, stream_data->stream_id, stream_data->stream_data_buff_count, payload_length);
 
             size_t bytes_to_send = 0;
             if (payload_length) {
-                size_t remaining_payload_length = payload_length - (stream_data->curr_body_data_qd_buff_offset * BUFFER_SIZE);
+                size_t remaining_payload_length = payload_length - (stream_data->curr_stream_data_qd_buff_offset * BUFFER_SIZE);
 
                 if (remaining_payload_length <= QD_HTTP2_BUFFER_SIZE) {
                     bytes_to_send = remaining_payload_length;
-                    stream_data->qd_buffers_to_send = stream_data->body_data_buff_count;
+                    stream_data->qd_buffers_to_send = stream_data->stream_data_buff_count;
                     stream_data->full_payload_handled = true;
                     qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length (%zu) <= QD_HTTP2_BUFFER_SIZE(16384), bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, remaining_payload_length, bytes_to_send, stream_data->qd_buffers_to_send);
 
                     // Look ahead one body data
-                    stream_data->next_body_data_result = qd_message_next_body_data(message, &stream_data->next_body_data);
-                    if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) {
+                    stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
+                    if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) {
                         *data_flags |= NGHTTP2_DATA_FLAG_EOF;
                         stream_data->out_msg_body_sent = true;
-                        qd_message_body_data_release(stream_data->next_body_data);
-                        stream_data->next_body_data = 0;
+                        qd_message_stream_data_release(stream_data->next_stream_data);
+                        stream_data->next_stream_data = 0;
                         stream_data->out_dlv_local_disposition = PN_ACCEPTED;
-                        qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_BODY_DATA_NO_MORE", conn->conn_id, stream_data->stream_id);
+                        qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_NO_MORE", conn->conn_id, stream_data->stream_id);
                     }
-                    else if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) {
-                        qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_BODY_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id);
+                    else if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_INCOMPLETE) {
+                        qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id);
 
                     }
-                    else if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_OK) {
-                        qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_BODY_DATA_OK", conn->conn_id, stream_data->stream_id);
+                    else if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) {
+                        qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_OK", conn->conn_id, stream_data->stream_id);
 
                     }
                 }
@@ -1050,15 +1050,18 @@ ssize_t read_data_callback(nghttp2_session *session,
             return bytes_to_send;
         }
 
-        case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+        case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
+            break;
+
+        case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
             //
             // A new segment has not completely arrived yet.  Check again later.
             //
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id);
+            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id);
             stream_data->out_dlv_local_disposition = 0;
             return NGHTTP2_ERR_DEFERRED;
 
-        case QD_MESSAGE_BODY_DATA_NO_MORE: {
+        case QD_MESSAGE_STREAM_DATA_NO_MORE: {
             //
             // We have already handled the last body-data segment for this delivery.
             // Complete the "sending" of this delivery and replenish credit.
@@ -1066,7 +1069,7 @@ ssize_t read_data_callback(nghttp2_session *session,
             size_t pn_buffs_write_capacity = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
             if (pn_buffs_write_capacity == 0) {
                 stream_data->out_dlv_local_disposition = 0;
-                qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_NO_MORE - pn_buffs_write_capacity=0 send is not complete", conn->conn_id, stream_data->stream_id);
+                qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_NO_MORE - pn_buffs_write_capacity=0 send is not complete", conn->conn_id, stream_data->stream_id);
                 return NGHTTP2_ERR_DEFERRED;
             }
             else {
@@ -1075,32 +1078,21 @@ ssize_t read_data_callback(nghttp2_session *session,
                 stream_data->full_payload_handled = true;
                 stream_data->out_msg_body_sent = true;
                 stream_data->out_dlv_local_disposition = PN_ACCEPTED;
-                qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_NO_MORE - stream_data->out_dlv_local_disposition = PN_ACCEPTED - send_complete=true, setting NGHTTP2_DATA_FLAG_EOF", conn->conn_id, stream_data->stream_id);
+                qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_NO_MORE - stream_data->out_dlv_local_disposition = PN_ACCEPTED - send_complete=true, setting NGHTTP2_DATA_FLAG_EOF", conn->conn_id, stream_data->stream_id);
             }
 
             break;
         }
 
-        case QD_MESSAGE_BODY_DATA_INVALID:
+        case QD_MESSAGE_STREAM_DATA_INVALID:
             //
             // The body-data is corrupt in some way.  Stop handling the delivery and reject it.
             //
             *data_flags |= NGHTTP2_DATA_FLAG_EOF;
-            qd_message_body_data_release(stream_data->curr_body_data);
-            stream_data->curr_body_data = 0;
-            stream_data->out_dlv_local_disposition = PN_REJECTED;
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_INVALID", conn->conn_id, stream_data->stream_id);
-            break;
-
-        case QD_MESSAGE_BODY_DATA_NOT_DATA:
-            //
-            // Valid data was seen, but it is not a body-data performative.  Reject the delivery.
-            //
-            *data_flags |= NGHTTP2_DATA_FLAG_EOF;
-            qd_message_body_data_release(stream_data->curr_body_data);
-            stream_data->curr_body_data = 0;
+            qd_message_stream_data_release(stream_data->curr_stream_data);
+            stream_data->curr_stream_data = 0;
             stream_data->out_dlv_local_disposition = PN_REJECTED;
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_NOT_DATA", conn->conn_id, stream_data->stream_id);
+            qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_INVALID", conn->conn_id, stream_data->stream_id);
             break;
         }
         break;
@@ -1418,16 +1410,16 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
             create_settings_frame(conn);
 
             uint8_t flags = 0;
-            stream_data->curr_body_data_result = qd_message_next_body_data(message, &stream_data->curr_body_data);
-            if (stream_data->curr_body_data_result == QD_MESSAGE_BODY_DATA_OK) {
-                size_t payload_length = qd_message_body_data_payload_length(stream_data->curr_body_data);
+            stream_data->curr_stream_data_result = qd_message_next_stream_data(message, &stream_data->curr_stream_data);
+            if (stream_data->curr_stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) {
+                size_t payload_length = qd_message_stream_data_payload_length(stream_data->curr_stream_data);
                 if (payload_length == 0) {
-                    stream_data->next_body_data_result = qd_message_next_body_data(message, &stream_data->next_body_data);
-                    if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) {
-                        qd_message_body_data_release(stream_data->next_body_data);
-                        stream_data->next_body_data = 0;
-                        qd_message_body_data_release(stream_data->curr_body_data);
-                        stream_data->curr_body_data = 0;
+                    stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
+                    if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) {
+                        qd_message_stream_data_release(stream_data->next_stream_data);
+                        stream_data->next_stream_data = 0;
+                        qd_message_stream_data_release(stream_data->curr_stream_data);
+                        stream_data->curr_stream_data = 0;
                         flags = NGHTTP2_FLAG_END_STREAM;
                         stream_data->out_msg_has_body = false;
                         qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] Message has no body, sending NGHTTP2_FLAG_END_STREAM with nghttp2_submit_headers", conn->conn_id);
diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h
index 979f34d..047c440 100644
--- a/src/adaptors/http2/http2_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -80,14 +80,14 @@ struct qdr_http2_stream_data_t {
     qd_composed_field_t      *app_properties;
     qd_composed_field_t      *footer_properties;
     qd_composed_field_t      *body;
-    qd_message_body_data_t   *curr_body_data;
-    qd_message_body_data_t   *next_body_data;
+    qd_message_stream_data_t *curr_stream_data;
+    qd_message_stream_data_t *next_stream_data;
     DEQ_LINKS(qdr_http2_stream_data_t);
 
-    qd_message_body_data_result_t  curr_body_data_result;
-    qd_message_body_data_result_t  next_body_data_result;
-    int                            curr_body_data_qd_buff_offset;
-    int                            body_data_buff_count;
+    qd_message_stream_data_result_t  curr_stream_data_result;
+    qd_message_stream_data_result_t  next_stream_data_result;
+    int                            curr_stream_data_qd_buff_offset;
+    int                            stream_data_buff_count;
     int                            in_link_credit;   // provided by router
     int32_t                        stream_id;
     size_t                         qd_buffers_to_send;
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index 7b9af30..84b14ba 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -248,39 +248,67 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t
         // over to the per-message extraction of body-data segments.
         //
         printf("qdr_ref_deliver: depth ok\n");
-        qd_message_body_data_t        *body_data;
-        qd_message_body_data_result_t  body_data_result;
+        qd_message_stream_data_t        *stream_data;
+        qd_message_stream_data_result_t  stream_data_result;
 
         //
         // Process as many body-data segments as are available.
         //
         while (true) {
-            body_data_result = qd_message_next_body_data(msg, &body_data);
+            stream_data_result = qd_message_next_stream_data(msg, &stream_data);
 
-            switch (body_data_result) {
-            case QD_MESSAGE_BODY_DATA_OK: {
+            switch (stream_data_result) {
+            case QD_MESSAGE_STREAM_DATA_BODY_OK: {
                 //
                 // We have a new valid body-data segment.  Handle it
                 //
-                printf("qdr_ref_deliver: body_data_buffer_count: %d\n", qd_message_body_data_buffer_count(body_data));
+                printf("qdr_ref_deliver: stream_data_buffer_count: %d\n", qd_message_stream_data_buffer_count(stream_data));
 
-                qd_iterator_t *body_iter = qd_message_body_data_iterator(body_data);
+                qd_iterator_t *body_iter = qd_message_stream_data_iterator(stream_data);
                 char *body = (char*) qd_iterator_copy(body_iter);
                 printf("qdr_ref_deliver: message body-data received: %s\n", body);
                 free(body);
                 qd_iterator_free(body_iter);
-                qd_message_body_data_release(body_data);
+                qd_message_stream_data_release(stream_data);
+                break;
+            }
+
+            case QD_MESSAGE_STREAM_DATA_FOOTER_OK: {
+                printf("qdr_ref_deliver: Received message footer\n");
+                qd_iterator_t     *footer_iter = qd_message_stream_data_iterator(stream_data);
+                qd_parsed_field_t *footer      = qd_parse(footer_iter);
+
+                if (qd_parse_ok(footer)) {
+                    uint8_t tag = qd_parse_tag(footer);
+                    if (tag == QD_AMQP_MAP8 || tag == QD_AMQP_MAP32) {
+                        uint32_t item_count = qd_parse_sub_count(footer);
+                        for (uint32_t i = 0; i < item_count; i++) {
+                            qd_iterator_t *key_iter   = qd_parse_raw(qd_parse_sub_key(footer, i));
+                            qd_iterator_t *value_iter = qd_parse_raw(qd_parse_sub_value(footer, i));
+                            char *key   = (char*) qd_iterator_copy(key_iter);
+                            char *value = (char*) qd_iterator_copy(value_iter);
+                            printf("qdr_ref_deliver: %s: %s\n", key, value);
+                            free(key);
+                            free(value);
+                        }
+                    } else
+                        printf("qdr_ref_deliver: Unexpected tag in footer: %02x\n", tag);
+                } else
+                    printf("qdr_ref_deliver: Footer parse error: %s\n", qd_parse_error(footer));
+
+                qd_parse_free(footer);
+                qd_iterator_free(footer_iter);
                 break;
             }
             
-            case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+            case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
                 //
                 // A new segment has not completely arrived yet.  Check again later.
                 //
                 printf("qdr_ref_deliver: body-data incomplete\n");
                 return 0;
 
-            case QD_MESSAGE_BODY_DATA_NO_MORE:
+            case QD_MESSAGE_STREAM_DATA_NO_MORE:
                 //
                 // We have already handled the last body-data segment for this delivery.
                 // Complete the "sending" of this delivery and replenish credit.
@@ -293,21 +321,13 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t
                 qdr_link_flow(adaptor->core, link, 1, false);
                 return PN_ACCEPTED; // This will cause the delivery to be settled
             
-            case QD_MESSAGE_BODY_DATA_INVALID:
+            case QD_MESSAGE_STREAM_DATA_INVALID:
                 //
                 // The body-data is corrupt in some way.  Stop handling the delivery and reject it.
                 //
                 printf("qdr_ref_deliver: body-data invalid\n");
                 qdr_link_flow(adaptor->core, link, 1, false);
                 return PN_REJECTED;
-
-            case QD_MESSAGE_BODY_DATA_NOT_DATA:
-                //
-                // Valid data was seen, but it is not a body-data performative.  Reject the delivery.
-                //
-                printf("qdr_ref_deliver: body not data\n");
-                qdr_link_flow(adaptor->core, link, 1, false);
-                return PN_REJECTED;
             }
         }
 
@@ -456,16 +476,24 @@ static void on_stream(void *context)
         //
         // Accumulated buffer list
         //
-        qd_buffer_list_t buffer_list;
-        DEQ_INIT(buffer_list);
-        qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length);
-        qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length);
-
-        //
-        // append this data to the streaming message as one or more DATA
-        // performatives
-        //
-        depth = qd_message_body_data_append(adaptor->streaming_message, &buffer_list);
+        for (int sections = 0; sections < 3; sections++) {
+            qd_buffer_list_t buffer_list;
+            DEQ_INIT(buffer_list);
+            qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length);
+            qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length);
+
+            //
+            // Compose a DATA performative for this section of the stream
+            //
+            qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+            qd_compose_insert_binary_buffers(field, &buffer_list);
+
+            //
+            // Extend the streaming message and free the composed field
+            //
+            depth = qd_message_extend(adaptor->streaming_message, field);
+            qd_compose_free(field);
+        }
 
         //
         // Notify the router that more data is ready to be pushed out on the delivery
@@ -478,6 +506,14 @@ static void on_stream(void *context)
         adaptor->stream_count++;
         printf("on_stream: sent streamed frame %d, depth=%d\n", adaptor->stream_count, depth);
     } else {
+        qd_composed_field_t *footer = qd_compose(QD_PERFORMATIVE_FOOTER, 0);
+        qd_compose_start_map(footer);
+        qd_compose_insert_symbol(footer, "trailer");
+        qd_compose_insert_string(footer, "value");
+        qd_compose_end_map(footer);
+        depth = qd_message_extend(adaptor->streaming_message, footer);
+        qd_compose_free(footer);
+
         qd_message_set_receive_complete(adaptor->streaming_message);
         adaptor->streaming_message = 0;
         adaptor->stream_count      = 0;
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 397f8bd..d711c7a 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -63,7 +63,7 @@ struct qdr_tcp_connection_t {
     uint64_t              last_in_time;
     uint64_t              last_out_time;
 
-    qd_message_body_data_t *outgoing_body_data;   // current segment
+    qd_message_stream_data_t *outgoing_stream_data;   // current segment
     size_t                  outgoing_body_bytes;  // bytes received from current segment
     int                     outgoing_body_offset; // buffer offset into current segment
 
@@ -147,7 +147,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
     grant_read_buffers(conn);
 
     if (conn->instream) {
-        qd_message_body_data_append(qdr_delivery_message(conn->instream), &buffers);
+        qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers);
         qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count);
     } else {
@@ -229,25 +229,23 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r
 {
     int used = 0;
 
-    // Advance to next body_data vbin segment if necessary.
+    // Advance to next stream_data vbin segment if necessary.
     // Return early if no data to process or error
-    if (conn->outgoing_body_data == 0) {
-        qd_message_body_data_result_t body_data_result = qd_message_next_body_data(msg, &conn->outgoing_body_data);
-        if (body_data_result == QD_MESSAGE_BODY_DATA_OK) {
-            // a new body_data segment has been found
+    if (conn->outgoing_stream_data == 0) {
+        qd_message_stream_data_result_t stream_data_result = qd_message_next_stream_data(msg, &conn->outgoing_stream_data);
+        if (stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) {
+            // a new stream_data segment has been found
             conn->outgoing_body_bytes  = 0;
             conn->outgoing_body_offset = 0;
             // continue to process this segment
-        } else if (body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) {
+        } else if (stream_data_result == QD_MESSAGE_STREAM_DATA_INCOMPLETE) {
             return 0;
         } else {
-            switch (body_data_result) {
-            case QD_MESSAGE_BODY_DATA_NO_MORE:
+            switch (stream_data_result) {
+            case QD_MESSAGE_STREAM_DATA_NO_MORE:
                 qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] EOS", conn->conn_id); break;
-            case QD_MESSAGE_BODY_DATA_INVALID:
+            case QD_MESSAGE_STREAM_DATA_INVALID:
                 qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] Invalid body data for streaming message", conn->conn_id); break;
-            case QD_MESSAGE_BODY_DATA_NOT_DATA:
-                qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] Invalid body; expected data section", conn->conn_id); break;
             default:
                 break;
             }
@@ -256,30 +254,30 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r
         }
     }
 
-    // A valid body_data is in place.
+    // A valid stream_data is in place.
     // Try to get a buffer set from it.
-    used = qd_message_body_data_buffers(conn->outgoing_body_data, buffers, conn->outgoing_body_offset, count);
+    used = qd_message_stream_data_buffers(conn->outgoing_stream_data, buffers, conn->outgoing_body_offset, count);
     if (used > 0) {
         // Accumulate the lengths of the returned buffers.
         for (int i=0; i<used; i++) {
             conn->outgoing_body_bytes += buffers[i].size;
         }
 
-        // Buffers returned should never exceed the body_data payload length
-        assert(conn->outgoing_body_bytes <= conn->outgoing_body_data->payload.length);
+        // Buffers returned should never exceed the stream_data payload length
+        assert(conn->outgoing_body_bytes <= conn->outgoing_stream_data->payload.length);
 
-        if (conn->outgoing_body_bytes == conn->outgoing_body_data->payload.length) {
-            // This buffer set consumes the remainder of the body_data segment.
-            // Attach the body_data struct to the last buffer so that the struct
+        if (conn->outgoing_body_bytes == conn->outgoing_stream_data->payload.length) {
+            // This buffer set consumes the remainder of the stream_data segment.
+            // Attach the stream_data struct to the last buffer so that the struct
             // can be freed after the buffer has been transmitted by raw connection out.
-            buffers[used-1].context = (uintptr_t) conn->outgoing_body_data;
+            buffers[used-1].context = (uintptr_t) conn->outgoing_stream_data;
 
-            // Erase the body_data struct from the connection so that
+            // Erase the stream_data struct from the connection so that
             // a new one gets created on the next pass.
-            conn->outgoing_body_data = 0;
+            conn->outgoing_stream_data = 0;
         } else {
-            // Returned buffer set did not consume the entire body_data segment.
-            // Leave existing body_data struct in place for use on next pass.
+            // Returned buffer set did not consume the entire stream_data segment.
+            // Leave existing stream_data struct in place for use on next pass.
             // Add the number of returned buffers to the offset for the next pass.
             conn->outgoing_body_offset += used;
         }
@@ -517,7 +515,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
             for (size_t i = 0; i < n; ++i) {
                 written += buffs[i].size;
                 if (buffs[i].context) {
-                    qd_message_body_data_release((qd_message_body_data_t*) buffs[i].context);
+                    qd_message_stream_data_release((qd_message_stream_data_t*) buffs[i].context);
                 }
             }
         }
diff --git a/src/message.c b/src/message.c
index 360fd84..80d569f 100644
--- a/src/message.c
+++ b/src/message.c
@@ -86,7 +86,7 @@ PN_HANDLE(PN_DELIVERY_CTX)
 
 ALLOC_DEFINE_CONFIG(qd_message_t, sizeof(qd_message_pvt_t), 0, 0);
 ALLOC_DEFINE(qd_message_content_t);
-ALLOC_DEFINE(qd_message_body_data_t);
+ALLOC_DEFINE(qd_message_stream_data_t);
 
 typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length);
 
@@ -1901,7 +1901,7 @@ void qd_message_send(qd_message_t *in_msg,
                         // by freeing a buffer there now may be room to restart a
                         // stalled message receiver
                         if (content->q2_input_holdoff) {
-                            if (qd_message_Q2_holdoff_should_unblock((qd_message_t *)msg)) {
+                            if (qd_message_Q2_holdoff_should_unblock((qd_message_t*) msg)) {
                                 // wake up receive side
                                 // Note: clearing holdoff here is easy compared to
                                 // clearing it in the deferred callback. Tracing
@@ -2368,79 +2368,83 @@ static void find_last_buffer(qd_field_location_t *location, unsigned char **curs
 }
 
 
-void trim_body_data_headers(qd_message_body_data_t *body_data)
+void trim_stream_data_headers(qd_message_stream_data_t *stream_data, bool remove_vbin_header)
 {
-    const qd_field_location_t *location = &body_data->section;
+    const qd_field_location_t *location = &stream_data->section;
     qd_buffer_t               *buffer   = location->buffer;
     unsigned char             *cursor   = qd_buffer_base(buffer) + location->offset;
 
     bool good = advance(&cursor, &buffer, location->hdr_length);
     assert(good);
     if (good) {
-        unsigned char tag = 0;
-        size_t        vbin_hdr_len = 1;
-        // coverity[check_return]
-        next_octet(&cursor, &buffer, &tag);
-        if (tag == QD_AMQP_VBIN8) {
-            advance(&cursor, &buffer, 1);
-            vbin_hdr_len += 1;
-        } else if (tag == QD_AMQP_VBIN32) {
-            advance(&cursor, &buffer, 4);
-            vbin_hdr_len += 4;
+        size_t        vbin_hdr_len = 0;
+        unsigned char tag          = 0;
+
+        if (remove_vbin_header) {
+            vbin_hdr_len = 1;
+            // coverity[check_return]
+            next_octet(&cursor, &buffer, &tag);
+            if (tag == QD_AMQP_VBIN8) {
+                advance(&cursor, &buffer, 1);
+                vbin_hdr_len += 1;
+            } else if (tag == QD_AMQP_VBIN32) {
+                advance(&cursor, &buffer, 4);
+                vbin_hdr_len += 4;
+            }
         }
 
         // coverity[check_return]
         can_advance(&cursor, &buffer); // bump cursor to the next buffer if necessary
 
-        body_data->payload.buffer     = buffer;
-        body_data->payload.offset     = cursor - qd_buffer_base(buffer);
-        body_data->payload.length     = location->length - vbin_hdr_len;
-        body_data->payload.hdr_length = 0;
-        body_data->payload.parsed     = true;
-        body_data->payload.tag        = tag;
+        stream_data->payload.buffer     = buffer;
+        stream_data->payload.offset     = cursor - qd_buffer_base(buffer);
+        stream_data->payload.length     = location->length - vbin_hdr_len;
+        stream_data->payload.hdr_length = 0;
+        stream_data->payload.parsed     = true;
+        stream_data->payload.tag        = tag;
     }
 }
 
 
 /**
- * qd_message_body_data_iterator
+ * qd_message_stream_data_iterator
  *
- * Given a body_data object, return an iterator that refers to the content of that body data.  This iterator
+ * Given a stream_data object, return an iterator that refers to the content of that body data.  This iterator
  * shall not refer to the 3-byte performative header or the header for the vbin{8,32} field.
  *
  * The iterator must be freed eventually by the caller.
  */
-qd_iterator_t *qd_message_body_data_iterator(const qd_message_body_data_t *body_data)
+qd_iterator_t *qd_message_stream_data_iterator(const qd_message_stream_data_t *stream_data)
 {
-    const qd_field_location_t *location = &body_data->payload;
+    const qd_field_location_t *location = &stream_data->payload;
 
     return qd_iterator_buffer(location->buffer, location->offset, location->length, ITER_VIEW_ALL);
 }
 
 /**
- * qd_message_body_data_payload_length
+ * qd_message_stream_data_payload_length
  *
- * Given a body_data object, return the length of the payload.
+ * Given a stream_data object, return the length of the payload.
  */
-size_t qd_message_body_data_payload_length(const qd_message_body_data_t *body_data)
+size_t qd_message_stream_data_payload_length(const qd_message_stream_data_t *stream_data)
 {
-    return body_data->payload.length;
+    return stream_data->payload.length;
 }
 
 
 /**
- * qd_message_body_data_buffer_count
+ * qd_message_stream_data_buffer_count
  *
- * Return the number of buffers contained in payload portion of the body_data object.
+ * Return the number of buffers contained in payload portion of the stream_data object.
  */
-int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data)
+int qd_message_stream_data_buffer_count(const qd_message_stream_data_t *stream_data)
 {
-    if (body_data->payload.length == 0)
+    if (stream_data->payload.length == 0)
         return 0;
 
     int count = 1;
-    qd_buffer_t *buffer = body_data->payload.buffer;
-    while (!!buffer && buffer != body_data->last_buffer) {
+    qd_buffer_t *buffer = stream_data->payload.buffer;
+    while (!!buffer && buffer != stream_data->last_buffer) {
         buffer = DEQ_NEXT(buffer);
         count++;
     }
@@ -2450,23 +2454,23 @@ int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data)
 
 
 /**
- * qd_message_body_data_buffers
+ * qd_message_stream_data_buffers
  *
- * Populate the provided array of pn_raw_buffers with the addresses and lengths of the buffers in the body_data
+ * Populate the provided array of pn_raw_buffers with the addresses and lengths of the buffers in the stream_data
  * object.  Don't fill more than count raw_buffers with data.  Start at offset from the zero-th buffer in the
- * body_data.
+ * stream_data.
  */
-int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffer_t *buffers, int offset, int count)
+int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw_buffer_t *buffers, int offset, int count)
 {
-    qd_buffer_t *buffer       = body_data->payload.buffer;
-    size_t       data_offset  = body_data->payload.offset;
-    size_t       payload_len  = body_data->payload.length;
+    qd_buffer_t *buffer       = stream_data->payload.buffer;
+    size_t       data_offset  = stream_data->payload.offset;
+    size_t       payload_len  = stream_data->payload.length;
 
     //
     // Skip the buffer offset
     //
     if (offset > 0) {
-        assert(offset < qd_message_body_data_buffer_count(body_data));
+        assert(offset < qd_message_stream_data_buffer_count(stream_data));
         while (offset > 0 && payload_len > 0) {
             payload_len -= qd_buffer_size(buffer) - data_offset;
             offset--;
@@ -2498,21 +2502,21 @@ int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffe
 
 
 /**
- * qd_message_body_data_release
+ * qd_message_stream_data_release
  *
- * Decrement the fanout ref-counts for all of the buffers referred to in the body_data.  If any have reached zero,
+ * Decrement the fanout ref-counts for all of the buffers referred to in the stream_data.  If any have reached zero,
  * remove them from the buffer list and free them.  Never dec-ref the last buffer in the content's buffer list.
  */
-void qd_message_body_data_release(qd_message_body_data_t *body_data)
+void qd_message_stream_data_release(qd_message_stream_data_t *stream_data)
 {
-    free_qd_message_body_data_t(body_data);
+    free_qd_message_stream_data_t(stream_data);
 }
 
 
-qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd_message_body_data_t **out_body_data)
+qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *in_msg, qd_message_stream_data_t **out_stream_data)
 {
-    qd_message_pvt_t       *msg       = (qd_message_pvt_t*) in_msg;
-    qd_message_body_data_t *body_data = 0;
+    qd_message_pvt_t         *msg         = (qd_message_pvt_t*) in_msg;
+    qd_message_stream_data_t *stream_data = 0;
 
     if (!msg->body_cursor) {
         //
@@ -2520,58 +2524,67 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd
         //
         qd_message_depth_status_t status = qd_message_check_depth(in_msg, QD_DEPTH_BODY);
         if (status == QD_MESSAGE_DEPTH_OK) {
-            body_data = new_qd_message_body_data_t();
-            ZERO(body_data);
-            body_data->owning_message = msg;
-            body_data->section        = msg->content->section_body;
-
-            find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer);
-            body_data->last_buffer = msg->body_buffer;
-            trim_body_data_headers(body_data);
-
-            assert(DEQ_SIZE(msg->body_data_list) == 0);
-            DEQ_INSERT_TAIL(msg->body_data_list, body_data);
-            *out_body_data = body_data;
-            return QD_MESSAGE_BODY_DATA_OK;
+            stream_data = new_qd_message_stream_data_t();
+            ZERO(stream_data);
+            stream_data->owning_message = msg;
+            stream_data->section        = msg->content->section_body;
+
+            find_last_buffer(&stream_data->section, &msg->body_cursor, &msg->body_buffer);
+            stream_data->last_buffer = msg->body_buffer;
+            trim_stream_data_headers(stream_data, true);
+
+            assert(DEQ_SIZE(msg->stream_data_list) == 0);
+            DEQ_INSERT_TAIL(msg->stream_data_list, stream_data);
+            *out_stream_data = stream_data;
+            return QD_MESSAGE_STREAM_DATA_BODY_OK;
         } else if (status == QD_MESSAGE_DEPTH_INCOMPLETE)
-            return QD_MESSAGE_BODY_DATA_INCOMPLETE;
+            return QD_MESSAGE_STREAM_DATA_INCOMPLETE;
         else if (status == QD_MESSAGE_DEPTH_INVALID)
-            return QD_MESSAGE_BODY_DATA_INVALID;
+            return QD_MESSAGE_STREAM_DATA_INVALID;
     }
 
     qd_section_status_t section_status;
     qd_field_location_t location;
     ZERO(&location);
 
+    bool is_footer = false;
+
     section_status = message_section_check(&msg->body_buffer, &msg->body_cursor,
                                            BODY_DATA_SHORT, 3, TAGS_BINARY,
                                            &location, true);
 
+    if (section_status == QD_SECTION_INVALID || section_status == QD_SECTION_NO_MATCH) {
+        is_footer      = true;
+        section_status = message_section_check(&msg->body_buffer, &msg->body_cursor,
+                                               FOOTER_SHORT, 3, TAGS_MAP,
+                                               &location, true);
+    }
+
     switch (section_status) {
     case QD_SECTION_INVALID:
     case QD_SECTION_NO_MATCH:
-        return QD_MESSAGE_BODY_DATA_INVALID;
+        return QD_MESSAGE_STREAM_DATA_INVALID;
 
     case QD_SECTION_MATCH:
-        body_data = new_qd_message_body_data_t();
-        ZERO(body_data);
-        body_data->owning_message = msg;
-        body_data->section        = location;
-        find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer);
-        body_data->last_buffer = msg->body_buffer;
-        trim_body_data_headers(body_data);
-        DEQ_INSERT_TAIL(msg->body_data_list, body_data);
-        *out_body_data = body_data;
-        return QD_MESSAGE_BODY_DATA_OK;
+        stream_data = new_qd_message_stream_data_t();
+        ZERO(stream_data);
+        stream_data->owning_message = msg;
+        stream_data->section        = location;
+        find_last_buffer(&stream_data->section, &msg->body_cursor, &msg->body_buffer);
+        stream_data->last_buffer = msg->body_buffer;
+        trim_stream_data_headers(stream_data, !is_footer);
+        DEQ_INSERT_TAIL(msg->stream_data_list, stream_data);
+        *out_stream_data = stream_data;
+        return is_footer ? QD_MESSAGE_STREAM_DATA_FOOTER_OK : QD_MESSAGE_STREAM_DATA_BODY_OK;
         
     case QD_SECTION_NEED_MORE:
         if (msg->content->receive_complete)
-            return QD_MESSAGE_BODY_DATA_NO_MORE;
+            return QD_MESSAGE_STREAM_DATA_NO_MORE;
         else
-            return QD_MESSAGE_BODY_DATA_INCOMPLETE;
+            return QD_MESSAGE_STREAM_DATA_INCOMPLETE;
     }
     
-    return QD_MESSAGE_BODY_DATA_NO_MORE;
+    return QD_MESSAGE_STREAM_DATA_NO_MORE;
 }
 
 
@@ -2663,7 +2676,7 @@ void qd_message_release_body(qd_message_t *msg, pn_raw_buffer_t *buffers, int bu
 }
 
 
-qd_parsed_field_t *qd_message_get_ingress    (qd_message_t *msg)
+qd_parsed_field_t *qd_message_get_ingress(qd_message_t *msg)
 {
     return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress;
 }
@@ -2755,7 +2768,7 @@ bool qd_message_oversize(const qd_message_t *msg)
 }
 
 
-int qd_message_body_data_append(qd_message_t *message, qd_buffer_list_t *data)
+int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data)
 {
     unsigned int        length = DEQ_SIZE(*data);
     qd_composed_field_t *field = 0;
diff --git a/src/message_private.h b/src/message_private.h
index a6ca077..3b83de0 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -61,16 +61,16 @@ typedef struct {
 } qd_field_location_t;
 
 
-struct qd_message_body_data_t {
-    DEQ_LINKS(qd_message_body_data_t);    // Linkage to form a DEQ
+struct qd_message_stream_data_t {
+    DEQ_LINKS(qd_message_stream_data_t);    // Linkage to form a DEQ
     qd_message_pvt_t    *owning_message;  // Pointer to the owning message
     qd_field_location_t  section;         // Section descriptor for the field
     qd_field_location_t  payload;         // Descriptor for the payload of the body data
     qd_buffer_t         *last_buffer;     // Pointer to the last buffer in the field
 };
 
-ALLOC_DECLARE(qd_message_body_data_t);
-DEQ_DECLARE(qd_message_body_data_t, qd_message_body_data_list_t);
+ALLOC_DECLARE(qd_message_stream_data_t);
+DEQ_DECLARE(qd_message_stream_data_t, qd_message_stream_data_list_t);
 
 // TODO - consider using pointers to qd_field_location_t below to save memory
 // TODO - provide a way to allocate a message without a lock for the link-routing case.
@@ -141,23 +141,23 @@ typedef struct {
 } qd_message_content_t;
 
 struct qd_message_pvt_t {
-    qd_iterator_pointer_t        cursor;          // A pointer to the current location of the outgoing byte stream.
-    qd_message_depth_t           message_depth;   // What is the depth of the message that has been received so far
-    qd_message_depth_t           sent_depth;      // How much of the message has been sent?  QD_DEPTH_NONE means nothing has been sent so far, QD_DEPTH_HEADER means the header has already been sent, dont send it again and so on.
-    qd_message_content_t        *content;         // The actual content of the message. The content is never copied
-    qd_buffer_list_t             ma_to_override;  // to field in outgoing message annotations.
-    qd_buffer_list_t             ma_trace;        // trace list in outgoing message annotations
-    qd_buffer_list_t             ma_ingress;      // ingress field in outgoing message annotations
-    int                          ma_phase;        // phase for the override address
-    bool                         ma_stream;       // indicates whether this message is streaming
-    qd_message_body_data_list_t  body_data_list;  // TODO - move this to the content for one-time parsing (TLR)
-    qd_message_body_data_t      *next_body_data;
-    unsigned char               *body_cursor;
-    qd_buffer_t                 *body_buffer;
-    bool                         strip_annotations_in;
-    bool                         send_complete;   // Has the message been completely received and completely sent?
-    bool                         tag_sent;        // Tags are sent
-    bool                         is_fanout;       // If msg is an outgoing fanout
+    qd_iterator_pointer_t          cursor;          // A pointer to the current location of the outgoing byte stream.
+    qd_message_depth_t             message_depth;   // What is the depth of the message that has been received so far
+    qd_message_depth_t             sent_depth;      // How much of the message has been sent?  QD_DEPTH_NONE means nothing has been sent so far, QD_DEPTH_HEADER means the header has already been sent, dont send it again and so on.
+    qd_message_content_t          *content;         // The actual content of the message. The content is never copied
+    qd_buffer_list_t               ma_to_override;  // to field in outgoing message annotations.
+    qd_buffer_list_t               ma_trace;        // trace list in outgoing message annotations
+    qd_buffer_list_t               ma_ingress;      // ingress field in outgoing message annotations
+    int                            ma_phase;        // phase for the override address
+    bool                           ma_stream;       // indicates whether this message is streaming
+    qd_message_stream_data_list_t  stream_data_list;  // TODO - move this to the content for one-time parsing (TLR)
+    qd_message_stream_data_t      *next_stream_data;
+    unsigned char                 *body_cursor;
+    qd_buffer_t                   *body_buffer;
+    bool                           strip_annotations_in;
+    bool                           send_complete;   // Has the message been completely received and completely sent?
+    bool                           tag_sent;        // Tags are sent
+    bool                           is_fanout;       // If msg is an outgoing fanout
 };
 
 ALLOC_DECLARE(qd_message_t);
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 6ca2b2b..8c361a2 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -171,11 +171,13 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
                         break;
                     }
                 } while (settled != dlv->settled && !to_new_link);  // oops missed the settlement
+
                 send_complete = qdr_delivery_send_complete(dlv);
                 if (send_complete || to_new_link) {
                     //
-                    // The entire message has been sent. It is now the appropriate time to have the delivery removed
-                    // from the head of the undelivered list and move it to the unsettled list if it is not settled.
+                    // The entire message has been sent or the message has been moved from this link.
+                    // It is now the appropriate time to remove the delivery from the head of the
+                    // undelivered list to the unsettled list if it is not settled.
                     //
                     num_deliveries_completed++;
 
@@ -252,6 +254,42 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
 }
 
 
+void qdr_link_complete_sent_message(qdr_core_t *core, qdr_link_t *link)
+{
+    if (!link || !link->conn)
+        return;
+
+    qdr_connection_t *conn     = link->conn;
+    bool              activate = false;
+
+    sys_mutex_lock(conn->work_lock);
+    qdr_delivery_t *dlv = DEQ_HEAD(link->undelivered);
+    if (!!dlv && qdr_delivery_send_complete(dlv)) {
+        DEQ_REMOVE_HEAD(link->undelivered);
+        if (!dlv->settled && !qdr_delivery_oversize(dlv) && !qdr_delivery_is_aborted(dlv)) {
+            DEQ_INSERT_TAIL(link->unsettled, dlv);
+            dlv->where = QDR_DELIVERY_IN_UNSETTLED;
+            qd_log(core->log, QD_LOG_DEBUG, "Delivery transfer:  dlv:%lx qdr_link_complete_sent_message: undelivered-list -> unsettled-list", (long) dlv);
+        } else {
+            dlv->where = QDR_DELIVERY_NOWHERE;
+            qdr_delivery_decref(core, dlv, "qdr_link_complete_sent_message - removed from undelivered");
+        }
+
+        //
+        // If there's another delivery on the undelivered list, get the outbound process moving again.
+        //
+        if (DEQ_SIZE(link->undelivered) > 0) {
+            qdr_add_link_ref(&conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK);
+            activate = true;
+        }
+    }
+    sys_mutex_unlock(conn->work_lock);
+
+    if (activate)
+        conn->protocol_adaptor->activate_handler(conn->protocol_adaptor->user_context, conn);
+}
+
+
 void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mode)
 {
     qdr_action_t *action = qdr_action(qdr_link_flow_CT, "link_flow");
diff --git a/tests/message_test.c b/tests/message_test.c
index 87271df..1d3a5d3 100644
--- a/tests/message_test.c
+++ b/tests/message_test.c
@@ -706,10 +706,10 @@ exit:
 }
 
 //
-// Testing protocol adapter 'body_data' interfaces
+// Testing protocol adapter 'stream_data' interfaces
 //
 
-static void body_data_generate_message(qd_message_t *msg, char *s_chunk_size, char *s_n_chunks)
+static void stream_data_generate_message(qd_message_t *msg, char *s_chunk_size, char *s_n_chunks)
 {
     // Fill a message with n_chunks of vbin chunk_size body data.
 
@@ -747,20 +747,20 @@ static void body_data_generate_message(qd_message_t *msg, char *s_chunk_size, ch
     }
 }
 
-static void free_body_data_list(qd_message_t *msg_in)
+static void free_stream_data_list(qd_message_t *msg_in)
 {
     // DISPATCH-1800 - this should not be required here
     qd_message_pvt_t *msg = (qd_message_pvt_t *)msg_in;
-    qd_message_body_data_t *bd = DEQ_HEAD(msg->body_data_list);
+    qd_message_stream_data_t *bd = DEQ_HEAD(msg->stream_data_list);
     while (bd) {
-        qd_message_body_data_t *next = DEQ_NEXT(bd);
-        free_qd_message_body_data_t(bd);
+        qd_message_stream_data_t *next = DEQ_NEXT(bd);
+        free_qd_message_stream_data_t(bd);
         bd = next;
     }
 
 }
 
-static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
+static char *check_stream_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
 {
     // Fill a message with n chunks of vbin chunk_size body data.
     // Then test by retrieving n chunks from a message copy and verifing.
@@ -786,7 +786,7 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
     qd_message_pvt_t     *msg_pvt = (qd_message_pvt_t *)msg;
 
     // Set the original message content
-    body_data_generate_message(msg, s_chunk_size, s_n_chunks);
+    stream_data_generate_message(msg, s_chunk_size, s_n_chunks);
 
     // flatten if required
     if (flatten) {
@@ -810,22 +810,22 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
     // Define the number of raw buffers to be extracted on each loop
 #define N_PN_RAW_BUFFS (2)
 
-    qd_message_body_data_t *body_data;
+    qd_message_stream_data_t *stream_data;
 
     for (int j=0; j<n_chunks; j++) {
         received = 0; // this chunk received size in bytes.
 
-        // Set up the next_body_data snapshot
-        qd_message_body_data_result_t body_data_result = qd_message_next_body_data(copy, &body_data);
+        // Set up the next_stream_data snapshot
+        qd_message_stream_data_result_t stream_data_result = qd_message_next_stream_data(copy, &stream_data);
 
-        if (body_data_result == QD_MESSAGE_BODY_DATA_OK) {
-            // check body_data payload length
-            if (body_data->payload.length != chunk_size) {
-                printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
+        if (stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) {
+            // check stream_data payload length
+            if (stream_data->payload.length != chunk_size) {
+                printf("********** check_stream_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
                     "chunk_size:%s, n_chunks:%s, payload length error : %zu \n",
-                    BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, body_data->payload.length);
+                    BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, stream_data->payload.length);
                 fflush(stdout);
-                result = "qd_message_next_body_data returned wrong payload length.";
+                result = "qd_message_next_stream_data returned wrong payload length.";
                 break;
             }
 
@@ -837,14 +837,14 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
             pn_raw_buffer_t buffs[N_PN_RAW_BUFFS];
 
             // used_buffers - Number of qd_buffers in content buffer chain consumed so far.
-            //                This number must increase as dictated by qd_message_body_data_buffers()
-            //                when vbin segments are consumed from the current body_data chunk.
+            //                This number must increase as dictated by qd_message_stream_data_buffers()
+            //                when vbin segments are consumed from the current stream_data chunk.
             //                A single vbin segment may consume 0, 1, or many qd_buffers.
             size_t used_buffers = 0;
 
             while (received < chunk_size) {
                 ZERO(buffs);
-                size_t n_used = qd_message_body_data_buffers(body_data, buffs, used_buffers, N_PN_RAW_BUFFS);
+                size_t n_used = qd_message_stream_data_buffers(stream_data, buffs, used_buffers, N_PN_RAW_BUFFS);
                 if (n_used > 0) {
                     for (size_t ii=0; ii<n_used; ii++) {
                         char e_char = (char)(j + 1);   // expected char in payload
@@ -852,7 +852,7 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
                         for (uint32_t idx=0; idx < buffs[ii].size; idx++) {
                             char actual = buffs[ii].bytes[buffs[ii].offset + idx];
                             if (e_char != actual) {
-                                printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
+                                printf("********** check_stream_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
                                     "chunk_size:%s, n_chunks:%s, verify error at index %d, expected:%d, actual:%d \n",
                                     BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, received + idx, e_char,
                                     actual);
@@ -865,7 +865,7 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
                     used_buffers += n_used;
                     if (!!result) break;
                 } else {
-                    printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
+                    printf("********** check_stream_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
                         "chunk_size:%s, n_chunks:%s, received %d bytes (not enough) \n",
                         BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, received);
                     fflush(stdout);
@@ -873,7 +873,7 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
                     break;
                 }
                 if (received > chunk_size) {
-                    printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
+                    printf("********** check_stream_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
                         "chunk_size:%s, n_chunks:%s, received %d bytes (too many) \n",
                         BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, received);
                     result = "Received too much data";
@@ -882,32 +882,30 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
             }
             // successful check
 
-        } else if (body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) {
+        } else if (stream_data_result == QD_MESSAGE_STREAM_DATA_INCOMPLETE) {
             result = "DATA_INCOMPLETE"; break;
         } else {
-            switch (body_data_result) {
-            case QD_MESSAGE_BODY_DATA_NO_MORE:
+            switch (stream_data_result) {
+            case QD_MESSAGE_STREAM_DATA_NO_MORE:
                 result = "EOS"; break;
-            case QD_MESSAGE_BODY_DATA_INVALID:
+            case QD_MESSAGE_STREAM_DATA_INVALID:
                 result = "Invalid body data for streaming message"; break;
-            case QD_MESSAGE_BODY_DATA_NOT_DATA:
-                result = "Invalid body; expected data section"; break;
             default:
                 result = "result: default"; break;
             }
         }
     }
 
-    free_body_data_list(msg);
+    free_stream_data_list(msg);
     qd_message_free(msg);
     if (!!copy) {
-        free_body_data_list(copy);
+        free_stream_data_list(copy);
         qd_message_free(copy);
     }
     return result;
 }
 
-static char *test_check_body_data(void * context)
+static char *test_check_stream_data(void * context)
 {
     char *result = 0;
 
@@ -919,16 +917,16 @@ static char *test_check_body_data(void * context)
 
     for (int i=0; i<N_CHUNK_SIZES; i++) {
         for (int j=0; j<N_N_CHUNKS; j++) {
-            result = check_body_data(chunk_sizes[i], n_chunks[j], false);
+            result = check_stream_data(chunk_sizes[i], n_chunks[j], false);
             if (!!result) {
-                printf("test_check_body_data: chunk_size:%s, n_chunks:%s, flatten:%s, result:%s   \n",
+                printf("test_check_stream_data: chunk_size:%s, n_chunks:%s, flatten:%s, result:%s   \n",
                        chunk_sizes[i], n_chunks[j], "false", result);
                 fflush(stdout);
                 return result;
             }
-            result = check_body_data(chunk_sizes[i], n_chunks[j], true);
+            result = check_stream_data(chunk_sizes[i], n_chunks[j], true);
             if (!!result) {
-                printf("test_check_body_data: chunk_size:%s, n_chunks:%s, flatten:%s, result:%s   \n",
+                printf("test_check_stream_data: chunk_size:%s, n_chunks:%s, flatten:%s, result:%s   \n",
                        chunk_sizes[i], n_chunks[j], "true", result);
                 fflush(stdout);
                 return result;
@@ -939,10 +937,10 @@ static char *test_check_body_data(void * context)
 }
 
 
-// Verify that qd_message_body_data_append() will break up a long binary data
+// Verify that qd_message_stream_data_append() will break up a long binary data
 // field in order to avoid triggering Q2
 //
-static char *test_check_body_data_append(void * context)
+static char *test_check_stream_data_append(void * context)
 {
     char *result = 0;
     qd_message_t *msg = 0;
@@ -977,7 +975,7 @@ static char *test_check_body_data_append(void * context)
 
     qd_message_compose_2(msg, field, false);
     qd_compose_free(field);
-    int depth = qd_message_body_data_append(msg, &bin_data);
+    int depth = qd_message_stream_data_append(msg, &bin_data);
     if (depth <= buffer_count) {
         // expected to add extra buffer(s) for meta-data
         result = "append length is incorrect";
@@ -995,29 +993,30 @@ static char *test_check_body_data_append(void * context)
 
     int bd_count = 0;
     int total_buffers = 0;
-    qd_message_body_data_t *body_data = 0;
+    qd_message_stream_data_t *stream_data = 0;
     bool done = false;
     while (!done) {
-        switch (qd_message_next_body_data(msg, &body_data)) {
-        case QD_MESSAGE_BODY_DATA_INCOMPLETE:
-        case QD_MESSAGE_BODY_DATA_INVALID:
-        case QD_MESSAGE_BODY_DATA_NOT_DATA:
+        switch (qd_message_next_stream_data(msg, &stream_data)) {
+        case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
+        case QD_MESSAGE_STREAM_DATA_INVALID:
             result = "Next body data failed to get next body data";
             goto exit;
-        case QD_MESSAGE_BODY_DATA_NO_MORE:
+        case QD_MESSAGE_STREAM_DATA_NO_MORE:
             done = true;
             break;
-        case QD_MESSAGE_BODY_DATA_OK:
+        case QD_MESSAGE_STREAM_DATA_BODY_OK:
             bd_count += 1;
-            // qd_message_body_data_append() breaks the buffer list up into
+            // qd_message_stream_data_append() breaks the buffer list up into
             // smaller lists that are no bigger than QD_QLIMIT_Q2_LOWER buffers
             // long
-            total_buffers += qd_message_body_data_buffer_count(body_data);
-            if (qd_message_body_data_buffer_count(body_data) > QD_QLIMIT_Q2_LOWER) {
+            total_buffers += qd_message_stream_data_buffer_count(stream_data);
+            if (qd_message_stream_data_buffer_count(stream_data) > QD_QLIMIT_Q2_LOWER) {
                 result = "Body data list length too long!";
                 goto exit;
             }
-            qd_message_body_data_release(body_data);
+            qd_message_stream_data_release(stream_data);
+            break;
+        case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
             break;
         }
     }
@@ -1051,8 +1050,8 @@ int message_tests(void)
     TEST_CASE(test_q2_input_holdoff_sensing, 0);
     TEST_CASE(test_incomplete_annotations, 0);
     TEST_CASE(test_check_weird_messages, 0);
-    TEST_CASE(test_check_body_data, 0);
-    TEST_CASE(test_check_body_data_append, 0);
+    TEST_CASE(test_check_stream_data, 0);
+    TEST_CASE(test_check_stream_data_append, 0);
 
     return result;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org