You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2020/10/20 16:02:19 UTC
[qpid-dispatch] 04/04: DISPATCH-1806: Rearrange TCP adaptor
outbound body data handling
This is an automated email from the ASF dual-hosted git repository.
chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 4fe73d8b299f343f8a57cce80932a12c16f0385d
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Wed Oct 14 11:34:33 2020 -0400
DISPATCH-1806: Rearrange TCP adaptor outbound body data handling
---
src/adaptors/tcp_adaptor.c | 130 +++++++++++++++++++++++++++++++++++----------
1 file changed, 101 insertions(+), 29 deletions(-)
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index cd22384..397f8bd 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -63,6 +63,14 @@ struct qdr_tcp_connection_t {
uint64_t last_in_time;
uint64_t last_out_time;
+ qd_message_body_data_t *outgoing_body_data; // current segment
+ size_t outgoing_body_bytes; // bytes received from current segment
+ int outgoing_body_offset; // buffer offset into current segment
+
+ pn_raw_buffer_t outgoing_buffs[WRITE_BUFFERS];
+ int outgoing_buff_count; // number of buffers with data
+ int outgoing_buff_idx; // first buffer with data
+
DEQ_LINKS(qdr_tcp_connection_t);
};
@@ -220,16 +228,18 @@ static void handle_disconnected(qdr_tcp_connection_t* conn)
static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_raw_buffer_t *buffers, int count)
{
int used = 0;
- qd_message_body_data_t *body_data;
- while (used < count) {
- qd_message_body_data_result_t body_data_result = qd_message_next_body_data(msg, &body_data);
+
+ // Advance to next body_data vbin segment if necessary.
+ // Return early if no data to process or error
+ if (conn->outgoing_body_data == 0) {
+ qd_message_body_data_result_t body_data_result = qd_message_next_body_data(msg, &conn->outgoing_body_data);
if (body_data_result == QD_MESSAGE_BODY_DATA_OK) {
- used += qd_message_body_data_buffers(body_data, buffers + used, used, count - used);
- if (used > 0) {
- buffers[used-1].context = (uintptr_t) body_data;
- }
+ // a new body_data segment has been found
+ conn->outgoing_body_bytes = 0;
+ conn->outgoing_body_offset = 0;
+ // continue to process this segment
} else if (body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) {
- return used;
+ return 0;
} else {
switch (body_data_result) {
case QD_MESSAGE_BODY_DATA_NO_MORE:
@@ -245,36 +255,98 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r
return -1;
}
}
+
+ // A valid body_data is in place.
+ // Try to get a buffer set from it.
+ used = qd_message_body_data_buffers(conn->outgoing_body_data, buffers, conn->outgoing_body_offset, count);
+ if (used > 0) {
+ // Accumulate the lengths of the returned buffers.
+ for (int i=0; i<used; i++) {
+ conn->outgoing_body_bytes += buffers[i].size;
+ }
+
+ // Buffers returned should never exceed the body_data payload length
+ assert(conn->outgoing_body_bytes <= conn->outgoing_body_data->payload.length);
+
+ if (conn->outgoing_body_bytes == conn->outgoing_body_data->payload.length) {
+ // This buffer set consumes the remainder of the body_data segment.
+ // Attach the body_data struct to the last buffer so that the struct
+ // can be freed after the buffer has been transmitted by raw connection out.
+ buffers[used-1].context = (uintptr_t) conn->outgoing_body_data;
+
+ // Erase the body_data struct from the connection so that
+ // a new one gets created on the next pass.
+ conn->outgoing_body_data = 0;
+ } else {
+ // Returned buffer set did not consume the entire body_data segment.
+ // Leave existing body_data struct in place for use on next pass.
+ // Add the number of returned buffers to the offset for the next pass.
+ conn->outgoing_body_offset += used;
+ }
+ } else {
+ // No buffers returned.
+ // This sender has caught up with all data available on the input stream.
+ }
return used;
}
+static bool write_outgoing_buffs(qdr_tcp_connection_t *conn)
+{
+ // Send the outgoing buffs to pn_raw_conn.
+ // Return true if all the buffers went out.
+ bool result;
+
+ if (conn->outgoing_buff_count == 0) {
+ result = true;
+ } else {
+ size_t used = pn_raw_connection_write_buffers(conn->socket,
+ &conn->outgoing_buffs[conn->outgoing_buff_idx],
+ conn->outgoing_buff_count);
+ result = used == conn->outgoing_buff_count;
+ conn->outgoing_buff_count -= used;
+ conn->outgoing_buff_idx += used;
+
+ int bytes_written = 0;
+ for (size_t i = 0; i < used; i++) {
+ if (conn->outgoing_buffs[conn->outgoing_buff_idx + i].bytes) {
+ bytes_written += conn->outgoing_buffs[conn->outgoing_buff_idx + i].size;
+ } else {
+ qd_log(tcp_adaptor->log_source, QD_LOG_ERROR,
+ "[C%"PRIu64"] empty buffer can't be written (%"PRIu64" of %"PRIu64")", conn->conn_id, i+1, used);
+ }
+ }
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+ "[C%"PRIu64"] Writing %i bytes", conn->conn_id, bytes_written);
+ }
+ return result;
+}
+
static void handle_outgoing(qdr_tcp_connection_t *conn)
{
if (conn->outstream) {
qd_message_t *msg = qdr_delivery_message(conn->outstream);
- pn_raw_buffer_t buffs[WRITE_BUFFERS];
- for (int i = 0; i < WRITE_BUFFERS; i++) {
- buffs[i].context = 0;
- buffs[i].bytes = 0;
- buffs[i].capacity = 0;
- buffs[i].size = 0;
- buffs[i].offset = 0;
+ bool read_more_body = true;
+
+ if (conn->outgoing_buff_count > 0) {
+ // flush outgoing buffs that hold body data waiting to go out
+ read_more_body = write_outgoing_buffs(conn);
}
- int n = read_message_body(conn, msg, buffs, WRITE_BUFFERS);
- if (n > 0) {
- size_t used = pn_raw_connection_write_buffers(conn->socket, buffs, n);
- int bytes_written = 0;
- for (size_t i = 0; i < used; i++) {
- if (buffs[i].bytes) {
- bytes_written += buffs[i].size;
- } else {
- qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] empty buffer can't be written (%zu of %zu)", conn->conn_id, i+1, used);
- }
+ while (read_more_body) {
+ ZERO(conn->outgoing_buffs);
+ conn->outgoing_buff_idx = 0;
+ conn->outgoing_buff_count = read_message_body(conn, msg, conn->outgoing_buffs, WRITE_BUFFERS);
+
+ if (conn->outgoing_buff_count == 0) {
+ // The incoming stream has no new data to send
+ break;
+ } else if (conn->outgoing_buff_count > 0) {
+ // Send the data just returned
+ read_more_body = write_outgoing_buffs(conn);
+ }
+ if (qd_message_receive_complete(msg) || qd_message_send_complete(msg)) {
+ pn_raw_connection_close(conn->socket);
+ break;
}
- qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Writing %i bytes", conn->conn_id, bytes_written);
- }
- if (qd_message_receive_complete(msg) || qd_message_send_complete(msg)) {
- pn_raw_connection_close(conn->socket);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org