You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2020/10/20 16:02:19 UTC

[qpid-dispatch] 04/04: DISPATCH-1806: Rearrange TCP adaptor outbound body data handling

This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 4fe73d8b299f343f8a57cce80932a12c16f0385d
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Wed Oct 14 11:34:33 2020 -0400

    DISPATCH-1806: Rearrange TCP adaptor outbound body data handling
---
 src/adaptors/tcp_adaptor.c | 130 +++++++++++++++++++++++++++++++++++----------
 1 file changed, 101 insertions(+), 29 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index cd22384..397f8bd 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -63,6 +63,14 @@ struct qdr_tcp_connection_t {
     uint64_t              last_in_time;
     uint64_t              last_out_time;
 
+    qd_message_body_data_t *outgoing_body_data;   // current segment
+    size_t                  outgoing_body_bytes;  // bytes received from current segment
+    int                     outgoing_body_offset; // buffer offset into current segment
+
+    pn_raw_buffer_t         outgoing_buffs[WRITE_BUFFERS];
+    int                     outgoing_buff_count;  // number of buffers with data
+    int                     outgoing_buff_idx;    // first buffer with data
+
     DEQ_LINKS(qdr_tcp_connection_t);
 };
 
@@ -220,16 +228,18 @@ static void handle_disconnected(qdr_tcp_connection_t* conn)
 static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_raw_buffer_t *buffers, int count)
 {
     int used = 0;
-    qd_message_body_data_t *body_data;
-    while (used < count) {
-        qd_message_body_data_result_t body_data_result = qd_message_next_body_data(msg, &body_data);
+
+    // Advance to next body_data vbin segment if necessary.
+    // Return early if no data to process or error
+    if (conn->outgoing_body_data == 0) {
+        qd_message_body_data_result_t body_data_result = qd_message_next_body_data(msg, &conn->outgoing_body_data);
         if (body_data_result == QD_MESSAGE_BODY_DATA_OK) {
-            used += qd_message_body_data_buffers(body_data, buffers + used, used, count - used);
-            if (used > 0) {
-                buffers[used-1].context = (uintptr_t) body_data;
-            }
+            // a new body_data segment has been found
+            conn->outgoing_body_bytes  = 0;
+            conn->outgoing_body_offset = 0;
+            // continue to process this segment
         } else if (body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) {
-            return used;
+            return 0;
         } else {
             switch (body_data_result) {
             case QD_MESSAGE_BODY_DATA_NO_MORE:
@@ -245,36 +255,98 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r
             return -1;
         }
     }
+
+    // A valid body_data is in place.
+    // Try to get a buffer set from it.
+    used = qd_message_body_data_buffers(conn->outgoing_body_data, buffers, conn->outgoing_body_offset, count);
+    if (used > 0) {
+        // Accumulate the lengths of the returned buffers.
+        for (int i=0; i<used; i++) {
+            conn->outgoing_body_bytes += buffers[i].size;
+        }
+
+        // Buffers returned should never exceed the body_data payload length
+        assert(conn->outgoing_body_bytes <= conn->outgoing_body_data->payload.length);
+
+        if (conn->outgoing_body_bytes == conn->outgoing_body_data->payload.length) {
+            // This buffer set consumes the remainder of the body_data segment.
+            // Attach the body_data struct to the last buffer so that the struct
+            // can be freed after the buffer has been transmitted by raw connection out.
+            buffers[used-1].context = (uintptr_t) conn->outgoing_body_data;
+
+            // Erase the body_data struct from the connection so that
+            // a new one gets created on the next pass.
+            conn->outgoing_body_data = 0;
+        } else {
+            // Returned buffer set did not consume the entire body_data segment.
+            // Leave existing body_data struct in place for use on next pass.
+            // Add the number of returned buffers to the offset for the next pass.
+            conn->outgoing_body_offset += used;
+        }
+    } else {
+        // No buffers returned.
+        // This sender has caught up with all data available on the input stream.
+    }
     return used;
 }
 
+static bool write_outgoing_buffs(qdr_tcp_connection_t *conn)
+{
+    // Send the outgoing buffs to pn_raw_conn.
+    // Return true if all the buffers went out.
+    bool result;
+
+    if (conn->outgoing_buff_count == 0) {
+        result = true;
+    } else {
+        size_t used = pn_raw_connection_write_buffers(conn->socket,
+                                                      &conn->outgoing_buffs[conn->outgoing_buff_idx],
+                                                      conn->outgoing_buff_count);
+        result = used == conn->outgoing_buff_count;
+        conn->outgoing_buff_count -= used;
+        conn->outgoing_buff_idx   += used;
+
+        int bytes_written = 0;
+        for (size_t i = 0; i < used; i++) {
+            if (conn->outgoing_buffs[conn->outgoing_buff_idx + i].bytes) {
+                bytes_written += conn->outgoing_buffs[conn->outgoing_buff_idx + i].size;
+            } else {
+                qd_log(tcp_adaptor->log_source, QD_LOG_ERROR,
+                       "[C%"PRIu64"] empty buffer can't be written (%"PRIu64" of %"PRIu64")", conn->conn_id, i+1, used);
+            }
+        }
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"] Writing %i bytes", conn->conn_id, bytes_written);
+    }
+    return result;
+}
+
 static void handle_outgoing(qdr_tcp_connection_t *conn)
 {
     if (conn->outstream) {
         qd_message_t *msg = qdr_delivery_message(conn->outstream);
-        pn_raw_buffer_t buffs[WRITE_BUFFERS];
-        for (int i = 0; i < WRITE_BUFFERS; i++) {
-            buffs[i].context = 0;
-            buffs[i].bytes = 0;
-            buffs[i].capacity = 0;
-            buffs[i].size = 0;
-            buffs[i].offset = 0;
+        bool read_more_body = true;
+
+        if (conn->outgoing_buff_count > 0) {
+            // flush outgoing buffs that hold body data waiting to go out
+            read_more_body = write_outgoing_buffs(conn);
         }
-        int n = read_message_body(conn, msg, buffs, WRITE_BUFFERS);
-        if (n > 0) {
-            size_t used = pn_raw_connection_write_buffers(conn->socket, buffs, n);
-            int bytes_written = 0;
-            for (size_t i = 0; i < used; i++) {
-                if (buffs[i].bytes) {
-                    bytes_written += buffs[i].size;
-                } else {
-                    qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] empty buffer can't be written (%zu of %zu)", conn->conn_id, i+1, used);
-                }
+        while (read_more_body) {
+            ZERO(conn->outgoing_buffs);
+            conn->outgoing_buff_idx   = 0;
+            conn->outgoing_buff_count = read_message_body(conn, msg, conn->outgoing_buffs, WRITE_BUFFERS);
+
+            if (conn->outgoing_buff_count == 0) {
+                // The incoming stream has no new data to send
+                break;
+            } else if (conn->outgoing_buff_count > 0) {
+                // Send the data just returned
+                read_more_body = write_outgoing_buffs(conn);
+            }
+            if (qd_message_receive_complete(msg) || qd_message_send_complete(msg)) {
+                pn_raw_connection_close(conn->socket);
+                break;
             }
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Writing %i bytes", conn->conn_id, bytes_written);
-        }
-        if (qd_message_receive_complete(msg) || qd_message_send_complete(msg)) {
-            pn_raw_connection_close(conn->socket);
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org