You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2020/10/20 16:02:15 UTC

[qpid-dispatch] branch dev-protocol-adaptors-2 updated (eccba23 -> 4fe73d8)

This is an automated email from the ASF dual-hosted git repository.

chug pushed a change to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git.


    from eccba23  DISPATCH-1807: TCP adaptor test echo server
     new 9dc69d5  DISPATCH-1654: need to set to field on message
     new c56a1fb  DISPATCH-1654: allow content to be seen when available
     new a8e548b  DISPATCH-1654: fix for streaming message
     new 4fe73d8  DISPATCH-1806: Rearrange TCP adaptor outbound body data handling

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 include/qpid/dispatch/amqp.h    |   2 +
 include/qpid/dispatch/message.h |  17 +++++
 include/qpid/dispatch/parse.h   |   2 +
 src/adaptors/tcp_adaptor.c      | 145 +++++++++++++++++++++++++++++++---------
 src/amqp.c                      |   1 +
 src/message.c                   |  35 +++++++++-
 src/message_private.h           |   2 +
 src/parse.c                     |  12 +++-
 src/router_node.c               |  64 +++++++++---------
 9 files changed, 214 insertions(+), 66 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 01/04: DISPATCH-1654: need to set to field on message

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 9dc69d55b003cd097efa8b30edaed5b3f789a10d
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Mon Oct 5 20:14:42 2020 +0100

    DISPATCH-1654: need to set to field on message
---
 src/adaptors/tcp_adaptor.c | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 0e06059..f5d0ac7 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -149,9 +149,15 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
         qd_compose_start_list(props);
         qd_compose_insert_null(props);                      // message-id
         qd_compose_insert_null(props);                      // user-id
-        qd_compose_insert_null(props);                      // to
-        qd_compose_insert_string(props, conn->global_id);   // subject
-        qd_compose_insert_string(props, conn->reply_to);    // reply-to
+        if (conn->ingress) {
+            qd_compose_insert_string(props, conn->config.address); // to
+            qd_compose_insert_string(props, conn->global_id);   // subject
+            qd_compose_insert_string(props, conn->reply_to);    // reply-to
+        } else {
+            qd_compose_insert_string(props, conn->reply_to); // to
+            qd_compose_insert_string(props, conn->global_id);   // subject
+            qd_compose_insert_null(props);    // reply-to
+        }
         //qd_compose_insert_null(props);                      // correlation-id
         //qd_compose_insert_null(props);                      // content-type
         //qd_compose_insert_null(props);                      // content-encoding


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 03/04: DISPATCH-1654: fix for streaming message

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit a8e548bfca44851305a8b6a59cfb21272ca668ca
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Wed Oct 7 12:09:57 2020 +0100

    DISPATCH-1654: fix for streaming message
---
 include/qpid/dispatch/amqp.h    |  2 ++
 include/qpid/dispatch/message.h | 17 +++++++++++
 include/qpid/dispatch/parse.h   |  2 ++
 src/adaptors/tcp_adaptor.c      |  3 ++
 src/amqp.c                      |  1 +
 src/message.c                   | 28 +++++++++++++++++-
 src/message_private.h           |  2 ++
 src/parse.c                     | 12 +++++++-
 src/router_node.c               | 64 ++++++++++++++++++++---------------------
 9 files changed, 97 insertions(+), 34 deletions(-)

diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index aca440c..6c1064f 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -112,6 +112,7 @@ extern const char * const QD_MA_TRACE;    ///< Trace
 extern const char * const QD_MA_TO;       ///< To-Override
 extern const char * const QD_MA_PHASE;    ///< Phase for override address
 extern const char * const QD_MA_CLASS;    ///< Message-Class
+extern const char * const QD_MA_STREAM;   ///< Indicate streaming message
 
 #define QD_MA_PREFIX_LEN  (9)
 #define QD_MA_INGRESS_LEN (16)
@@ -119,6 +120,7 @@ extern const char * const QD_MA_CLASS;    ///< Message-Class
 #define QD_MA_TO_LEN      (11)
 #define QD_MA_PHASE_LEN   (14)
 #define QD_MA_CLASS_LEN   (14)
+#define QD_MA_STREAM_LEN  (15)
 
 extern const int          QD_MA_MAX_KEY_LEN;  ///< strlen of longest key name
 extern const int          QD_MA_N_KEYS;       ///< number of router annotation keys
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 9978d1c..b797eca 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -197,6 +197,23 @@ void qd_message_set_phase_annotation(qd_message_t *msg, int phase);
 int  qd_message_get_phase_annotation(const qd_message_t *msg);
 
 /**
+ * Indicate whether message should be considered to be streaming.
+ *
+ * @param msg Pointer to an outgoing message.
+ * @param stream true if the message is streaming
+ *
+ */
+void qd_message_set_stream_annotation(qd_message_t *msg, bool stream);
+/**
+ * Test whether received message should be considered to be streaming.
+ *
+ * @param msg Pointer to an outgoing message.
+ * @return true if the received message has the streaming annotation set, else false.
+ *
+ */
+int qd_message_is_streaming(qd_message_t *msg);
+
+/**
  * Set the value for the QD_MA_INGRESS field in the outgoing message
  * annotations for the message.
  *
diff --git a/include/qpid/dispatch/parse.h b/include/qpid/dispatch/parse.h
index 7fed15d..f6e5fd7 100644
--- a/include/qpid/dispatch/parse.h
+++ b/include/qpid/dispatch/parse.h
@@ -301,6 +301,7 @@ void qd_parse_annotations(
     qd_parsed_field_t    **ma_phase,
     qd_parsed_field_t    **ma_to_override,
     qd_parsed_field_t    **ma_trace,
+    qd_parsed_field_t    **ma_stream,
     qd_iterator_pointer_t *blob_pointer,
     uint32_t              *blob_item_count);
 
@@ -312,6 +313,7 @@ typedef enum {
     QD_MAE_TRACE,
     QD_MAE_TO,
     QD_MAE_PHASE,
+    QD_MAE_STREAM,
     QD_MAE_NONE
 } qd_ma_enum_t;
 
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index f5d0ac7..cd22384 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -145,6 +145,8 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
     } else {
         qd_message_t *msg = qd_message();
 
+        qd_message_set_stream_annotation(msg, true);
+
         qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
         qd_compose_start_list(props);
         qd_compose_insert_null(props);                      // message-id
@@ -966,6 +968,7 @@ static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context)
     adaptor->log_source  = qd_log_source("TCP_ADAPTOR");
     DEQ_INIT(adaptor->listeners);
     DEQ_INIT(adaptor->connectors);
+    DEQ_INIT(adaptor->connections);
     *adaptor_context = adaptor;
 
     tcp_adaptor = adaptor;
diff --git a/src/amqp.c b/src/amqp.c
index 7c9d9c0..8da6db2 100644
--- a/src/amqp.c
+++ b/src/amqp.c
@@ -30,6 +30,7 @@ const char * const QD_MA_TRACE   = "x-opt-qd.trace";
 const char * const QD_MA_TO      = "x-opt-qd.to";
 const char * const QD_MA_PHASE   = "x-opt-qd.phase";
 const char * const QD_MA_CLASS   = "x-opt-qd.class";
+const char * const QD_MA_STREAM  = "x-opt-qd.stream";
 const int          QD_MA_MAX_KEY_LEN = 16;
 const int          QD_MA_N_KEYS      = 5;  // max number of router annotations to send/receive
 const int          QD_MA_FILTER_LEN  = 5;  // N tailing inbound entries to search for stripping
diff --git a/src/message.c b/src/message.c
index a3a8152..360fd84 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1100,6 +1100,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
     qd_buffer_list_clone(&copy->ma_trace, &msg->ma_trace);
     qd_buffer_list_clone(&copy->ma_ingress, &msg->ma_ingress);
     copy->ma_phase = msg->ma_phase;
+    copy->ma_stream = msg->ma_stream;
     copy->strip_annotations_in  = msg->strip_annotations_in;
 
     copy->content = content;
@@ -1131,6 +1132,7 @@ void qd_message_message_annotations(qd_message_t *in_msg)
     if (content->ma_field_iter_in == 0)
         return;
 
+    qd_parsed_field_t *ma_pf_stream = 0;
     qd_parse_annotations(
         msg->strip_annotations_in,
         content->ma_field_iter_in,
@@ -1138,6 +1140,7 @@ void qd_message_message_annotations(qd_message_t *in_msg)
         &content->ma_pf_phase,
         &content->ma_pf_to_override,
         &content->ma_pf_trace,
+        &ma_pf_stream,
         &content->ma_user_annotation_blob,
         &content->ma_count);
 
@@ -1157,6 +1160,10 @@ void qd_message_message_annotations(qd_message_t *in_msg)
         content->ma_int_phase = qd_parse_as_int(content->ma_pf_phase);
     }
 
+    if (ma_pf_stream) {
+        content->ma_stream = qd_parse_as_int(ma_pf_stream);
+    }
+
     return;
 }
 
@@ -1189,6 +1196,12 @@ int qd_message_get_phase_annotation(const qd_message_t *in_msg)
     return msg->ma_phase;
 }
 
+void qd_message_set_stream_annotation(qd_message_t *in_msg, bool stream)
+{
+    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+    msg->ma_stream = stream;
+}
+
 void qd_message_set_ingress_annotation(qd_message_t *in_msg, qd_composed_field_t *ingress_field)
 {
     qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
@@ -1613,7 +1626,8 @@ static void compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list
     if (!DEQ_IS_EMPTY(msg->ma_to_override) ||
         !DEQ_IS_EMPTY(msg->ma_trace) ||
         !DEQ_IS_EMPTY(msg->ma_ingress) ||
-        msg->ma_phase != 0) {
+        msg->ma_phase != 0 ||
+        msg->ma_stream) {
 
         if (!map_started) {
             qd_compose_start_map(out_ma);
@@ -1643,6 +1657,12 @@ static void compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list
             qd_compose_insert_int(field, msg->ma_phase);
             field_count++;
         }
+
+        if (msg->ma_stream) {
+            qd_compose_insert_symbol(field, QD_MA_STREAM);
+            qd_compose_insert_int(field, msg->ma_stream);
+            field_count++;
+        }
         // pad out to N fields
         for  (; field_count < QD_MA_N_KEYS; field_count++) {
             qd_compose_insert_symbol(field, QD_MA_PREFIX);
@@ -2126,6 +2146,7 @@ qd_message_depth_status_t qd_message_check_depth(const qd_message_t *in_msg, qd_
     qd_message_depth_status_t    result;
 
     LOCK(content->lock);
+    //printf("qd_message_check_depth(%p, %i)\n", (void*) in_msg, depth);
     result = qd_message_check_LH(content, depth);
     UNLOCK(content->lock);
     return result;
@@ -2671,6 +2692,11 @@ int qd_message_get_phase_val(qd_message_t *msg)
     return ((qd_message_pvt_t*)msg)->content->ma_int_phase;
 }
 
+int qd_message_is_streaming(qd_message_t *msg)
+{
+    return ((qd_message_pvt_t*)msg)->content->ma_stream;
+}
+
 
 void qd_message_Q2_holdoff_disable(qd_message_t *msg)
 {
diff --git a/src/message_private.h b/src/message_private.h
index 02f002c..a6ca077 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -121,6 +121,7 @@ typedef struct {
     qd_parsed_field_t   *ma_pf_to_override;
     qd_parsed_field_t   *ma_pf_trace;
     int                  ma_int_phase;
+    bool                 ma_stream;
     uint64_t             max_message_size;               // configured max; 0 if no max to enforce
     uint64_t             bytes_received;                 // bytes returned by pn_link_recv() when enforcing max_message_size
     uint32_t             fanout;                         // The number of receivers for this message, including in-process subscribers.
@@ -148,6 +149,7 @@ struct qd_message_pvt_t {
     qd_buffer_list_t             ma_trace;        // trace list in outgoing message annotations
     qd_buffer_list_t             ma_ingress;      // ingress field in outgoing message annotations
     int                          ma_phase;        // phase for the override address
+    bool                         ma_stream;       // indicates whether this message is streaming
     qd_message_body_data_list_t  body_data_list;  // TODO - move this to the content for one-time parsing (TLR)
     qd_message_body_data_t      *next_body_data;
     unsigned char               *body_cursor;
diff --git a/src/parse.c b/src/parse.c
index 25d303d..3b43c10 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -777,6 +777,7 @@ const char *qd_parse_annotations_v1(
     qd_parsed_field_t    **ma_phase,
     qd_parsed_field_t    **ma_to_override,
     qd_parsed_field_t    **ma_trace,
+    qd_parsed_field_t    **ma_stream,
     qd_iterator_pointer_t *blob_pointer,
     uint32_t              *blob_item_count)
 {
@@ -850,6 +851,11 @@ const char *qd_parse_annotations_v1(
                         ma_type = QD_MAE_INGRESS;
                     }
                     break;
+                case QD_MA_STREAM_LEN:
+                    if (memcmp(QD_MA_STREAM + QMPL, dp, QD_MA_STREAM_LEN - QMPL) == 0) {
+                        ma_type = QD_MAE_STREAM;
+                    }
+                    break;
                 default:
                     // padding annotations are ignored here
                     break;
@@ -885,6 +891,9 @@ const char *qd_parse_annotations_v1(
                     case QD_MAE_PHASE:
                         *ma_phase = val_field;
                         break;
+                    case QD_MAE_STREAM:
+                        *ma_stream = val_field;
+                        break;
                     case QD_MAE_NONE:
                         assert(false);
                         break;
@@ -920,6 +929,7 @@ void qd_parse_annotations(
     qd_parsed_field_t    **ma_phase,
     qd_parsed_field_t    **ma_to_override,
     qd_parsed_field_t    **ma_trace,
+    qd_parsed_field_t    **ma_stream,
     qd_iterator_pointer_t *blob_pointer,
     uint32_t              *blob_item_count)
 {
@@ -958,7 +968,7 @@ void qd_parse_annotations(
     qd_iterator_free(raw_iter);
 
     (void) qd_parse_annotations_v1(strip_annotations_in, ma_iter_in, ma_ingress, ma_phase,
-                                    ma_to_override, ma_trace,
+                                    ma_to_override, ma_trace, ma_stream,
                                     blob_pointer, blob_item_count);
 
     return;
diff --git a/src/router_node.c b/src/router_node.c
index 956ce77..aa4cf04 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -491,38 +491,6 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     }
 
     //
-    // Head of line blocking avoidance (DISPATCH-1545)
-    //
-    // Before we can forward a message we need to determine whether or not this
-    // message is "streaming" - a large message that has the potential to block
-    // other messages sharing the trunk link.  At this point we cannot for sure
-    // know the actual length of the incoming message, so we employ the
-    // following heuristic to determine if the message is "streaming":
-    //
-    // - If the message is receive-complete it is NOT a streaming message.
-    // - If it is NOT receive-complete:
-    //   Continue buffering incoming data until:
-    //   - receive has completed => NOT a streaming message
-    //   - not rx-complete AND Q2 threshold hit => a streaming message
-    //
-    // Once Q2 is hit we MUST forward the message regardless of rx-complete
-    // since Q2 will block forever unless the incoming data is drained via
-    // forwarding.
-    //
-    if (!receive_complete) {
-        if (qd_message_is_Q2_blocked(msg)) {
-            qd_log(router->log_source, QD_LOG_DEBUG,
-                   "[C%"PRIu64" L%"PRIu64"] Incoming message classified as streaming. User:%s",
-                   conn->connection_id,
-                   qd_link_link_id(link),
-                   conn->user_id);
-        } else {
-            // Continue buffering this message
-            return false;
-        }
-    }
-
-    //
     // Determine if the incoming link is anonymous.  If the link is addressed,
     // there are some optimizations we can take advantage of.
     //
@@ -607,6 +575,38 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions, &distance, &ingress_index);
 
     //
+    // Head of line blocking avoidance (DISPATCH-1545)
+    //
+    // Before we can forward a message we need to determine whether or not this
+    // message is "streaming" - a large message that has the potential to block
+    // other messages sharing the trunk link.  At this point we cannot for sure
+    // know the actual length of the incoming message, so we employ the
+    // following heuristic to determine if the message is "streaming":
+    //
+    // - If the message is receive-complete it is NOT a streaming message.
+    // - If it is NOT receive-complete:
+    //   Continue buffering incoming data until:
+    //   - receive has completed => NOT a streaming message
+    //   - not rx-complete AND Q2 threshold hit => a streaming message
+    //
+    // Once Q2 is hit we MUST forward the message regardless of rx-complete
+    // since Q2 will block forever unless the incoming data is drained via
+    // forwarding.
+    //
+    if (!receive_complete) {
+        if (qd_message_is_streaming(msg) || qd_message_is_Q2_blocked(msg)) {
+            qd_log(router->log_source, QD_LOG_DEBUG,
+                   "[C%"PRIu64" L%"PRIu64"] Incoming message classified as streaming. User:%s",
+                   conn->connection_id,
+                   qd_link_link_id(link),
+                   conn->user_id);
+        } else {
+            // Continue buffering this message
+            return false;
+        }
+    }
+
+    //
     // If this delivery has traveled further than the known radius of the network topology (plus 1),
     // release and settle the delivery.  This can happen in the case of "flood" multicast where the
     // deliveries follow all available paths.  This will only discard messages that will reach their


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 02/04: DISPATCH-1654: allow content to be seen when available

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit c56a1fba5165589c11a7779580a251c45e20828a
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Wed Oct 7 12:09:02 2020 +0100

    DISPATCH-1654: allow content to be seen when available
---
 src/message.c | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/src/message.c b/src/message.c
index 23381db..a3a8152 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1554,6 +1554,13 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
             // the entire message.  We'll be back later to finish it up.
             // Return the message so that the caller can start sending out whatever we have received so far
             //
+            // push what we do have for testing/processing
+            LOCK(content->lock);
+            qd_buffer_set_fanout(content->pending, content->fanout);
+            DEQ_INSERT_TAIL(content->buffers, content->pending);
+            content->pending = 0;
+            UNLOCK(content->lock);
+            content->pending = qd_buffer();
             break;
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 04/04: DISPATCH-1806: Rearrange TCP adaptor outbound body data handling

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 4fe73d8b299f343f8a57cce80932a12c16f0385d
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Wed Oct 14 11:34:33 2020 -0400

    DISPATCH-1806: Rearrange TCP adaptor outbound body data handling
---
 src/adaptors/tcp_adaptor.c | 130 +++++++++++++++++++++++++++++++++++----------
 1 file changed, 101 insertions(+), 29 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index cd22384..397f8bd 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -63,6 +63,14 @@ struct qdr_tcp_connection_t {
     uint64_t              last_in_time;
     uint64_t              last_out_time;
 
+    qd_message_body_data_t *outgoing_body_data;   // current segment
+    size_t                  outgoing_body_bytes;  // bytes received from current segment
+    int                     outgoing_body_offset; // buffer offset into current segment
+
+    pn_raw_buffer_t         outgoing_buffs[WRITE_BUFFERS];
+    int                     outgoing_buff_count;  // number of buffers with data
+    int                     outgoing_buff_idx;    // first buffer with data
+
     DEQ_LINKS(qdr_tcp_connection_t);
 };
 
@@ -220,16 +228,18 @@ static void handle_disconnected(qdr_tcp_connection_t* conn)
 static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_raw_buffer_t *buffers, int count)
 {
     int used = 0;
-    qd_message_body_data_t *body_data;
-    while (used < count) {
-        qd_message_body_data_result_t body_data_result = qd_message_next_body_data(msg, &body_data);
+
+    // Advance to next body_data vbin segment if necessary.
+    // Return early if no data to process or error
+    if (conn->outgoing_body_data == 0) {
+        qd_message_body_data_result_t body_data_result = qd_message_next_body_data(msg, &conn->outgoing_body_data);
         if (body_data_result == QD_MESSAGE_BODY_DATA_OK) {
-            used += qd_message_body_data_buffers(body_data, buffers + used, used, count - used);
-            if (used > 0) {
-                buffers[used-1].context = (uintptr_t) body_data;
-            }
+            // a new body_data segment has been found
+            conn->outgoing_body_bytes  = 0;
+            conn->outgoing_body_offset = 0;
+            // continue to process this segment
         } else if (body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) {
-            return used;
+            return 0;
         } else {
             switch (body_data_result) {
             case QD_MESSAGE_BODY_DATA_NO_MORE:
@@ -245,36 +255,98 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r
             return -1;
         }
     }
+
+    // A valid body_data is in place.
+    // Try to get a buffer set from it.
+    used = qd_message_body_data_buffers(conn->outgoing_body_data, buffers, conn->outgoing_body_offset, count);
+    if (used > 0) {
+        // Accumulate the lengths of the returned buffers.
+        for (int i=0; i<used; i++) {
+            conn->outgoing_body_bytes += buffers[i].size;
+        }
+
+        // Buffers returned should never exceed the body_data payload length
+        assert(conn->outgoing_body_bytes <= conn->outgoing_body_data->payload.length);
+
+        if (conn->outgoing_body_bytes == conn->outgoing_body_data->payload.length) {
+            // This buffer set consumes the remainder of the body_data segment.
+            // Attach the body_data struct to the last buffer so that the struct
+            // can be freed after the buffer has been transmitted by raw connection out.
+            buffers[used-1].context = (uintptr_t) conn->outgoing_body_data;
+
+            // Erase the body_data struct from the connection so that
+            // a new one gets created on the next pass.
+            conn->outgoing_body_data = 0;
+        } else {
+            // Returned buffer set did not consume the entire body_data segment.
+            // Leave existing body_data struct in place for use on next pass.
+            // Add the number of returned buffers to the offset for the next pass.
+            conn->outgoing_body_offset += used;
+        }
+    } else {
+        // No buffers returned.
+        // This sender has caught up with all data available on the input stream.
+    }
     return used;
 }
 
+static bool write_outgoing_buffs(qdr_tcp_connection_t *conn)
+{
+    // Send the outgoing buffs to pn_raw_conn.
+    // Return true if all the buffers went out.
+    bool result;
+
+    if (conn->outgoing_buff_count == 0) {
+        result = true;
+    } else {
+        size_t used = pn_raw_connection_write_buffers(conn->socket,
+                                                      &conn->outgoing_buffs[conn->outgoing_buff_idx],
+                                                      conn->outgoing_buff_count);
+        result = used == conn->outgoing_buff_count;
+        conn->outgoing_buff_count -= used;
+        conn->outgoing_buff_idx   += used;
+
+        int bytes_written = 0;
+        for (size_t i = 0; i < used; i++) {
+            if (conn->outgoing_buffs[conn->outgoing_buff_idx + i].bytes) {
+                bytes_written += conn->outgoing_buffs[conn->outgoing_buff_idx + i].size;
+            } else {
+                qd_log(tcp_adaptor->log_source, QD_LOG_ERROR,
+                       "[C%"PRIu64"] empty buffer can't be written (%"PRIu64" of %"PRIu64")", conn->conn_id, i+1, used);
+            }
+        }
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"] Writing %i bytes", conn->conn_id, bytes_written);
+    }
+    return result;
+}
+
 static void handle_outgoing(qdr_tcp_connection_t *conn)
 {
     if (conn->outstream) {
         qd_message_t *msg = qdr_delivery_message(conn->outstream);
-        pn_raw_buffer_t buffs[WRITE_BUFFERS];
-        for (int i = 0; i < WRITE_BUFFERS; i++) {
-            buffs[i].context = 0;
-            buffs[i].bytes = 0;
-            buffs[i].capacity = 0;
-            buffs[i].size = 0;
-            buffs[i].offset = 0;
+        bool read_more_body = true;
+
+        if (conn->outgoing_buff_count > 0) {
+            // flush outgoing buffs that hold body data waiting to go out
+            read_more_body = write_outgoing_buffs(conn);
         }
-        int n = read_message_body(conn, msg, buffs, WRITE_BUFFERS);
-        if (n > 0) {
-            size_t used = pn_raw_connection_write_buffers(conn->socket, buffs, n);
-            int bytes_written = 0;
-            for (size_t i = 0; i < used; i++) {
-                if (buffs[i].bytes) {
-                    bytes_written += buffs[i].size;
-                } else {
-                    qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] empty buffer can't be written (%zu of %zu)", conn->conn_id, i+1, used);
-                }
+        while (read_more_body) {
+            ZERO(conn->outgoing_buffs);
+            conn->outgoing_buff_idx   = 0;
+            conn->outgoing_buff_count = read_message_body(conn, msg, conn->outgoing_buffs, WRITE_BUFFERS);
+
+            if (conn->outgoing_buff_count == 0) {
+                // The incoming stream has no new data to send
+                break;
+            } else if (conn->outgoing_buff_count > 0) {
+                // Send the data just returned
+                read_more_body = write_outgoing_buffs(conn);
+            }
+            if (qd_message_receive_complete(msg) || qd_message_send_complete(msg)) {
+                pn_raw_connection_close(conn->socket);
+                break;
             }
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Writing %i bytes", conn->conn_id, bytes_written);
-        }
-        if (qd_message_receive_complete(msg) || qd_message_send_complete(msg)) {
-            pn_raw_connection_close(conn->socket);
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org