You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/09/15 19:06:34 UTC

[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1743: Copy the data only once in the snd_data_callback. Do not copy the data in the read_callback. Introduced qd_http2_buffers

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
     new f01d63a  DISPATCH-1743: Copy the data only once in the snd_data_callback. Do not copy the data in the read_callback. Introduced qd_http2_buffers
f01d63a is described below

commit f01d63a1f995b2e2c908ca086f70de22917f53e8
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Tue Sep 15 12:27:43 2020 -0400

    DISPATCH-1743: Copy the data only once in the snd_data_callback. Do not copy the data in the read_callback. Introduced qd_http2_buffers
---
 src/adaptors/http2/http2_adaptor.c | 302 ++++++++++++++++++++++---------------
 src/adaptors/http2/http2_adaptor.h |  76 +++++++++-
 2 files changed, 251 insertions(+), 127 deletions(-)

diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index f1ac33a..b071604 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -48,6 +48,7 @@ const char *CONTENT_ENCODING = "content-encoding";
 ALLOC_DEFINE(qdr_http2_session_data_t);
 ALLOC_DEFINE(qdr_http2_stream_data_t);
 ALLOC_DEFINE(qdr_http2_connection_t);
+ALLOC_DEFINE(qd_http2_buffer_t);
 
 typedef struct qdr_http_adaptor_t {
     qdr_core_t              *core;
@@ -64,6 +65,49 @@ static qdr_http_adaptor_t *http_adaptor;
 
 static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void *context);
 
+
+qd_http2_buffer_t *qd_http2_buffer(void)
+{
+    qd_http2_buffer_t *buf = new_qd_http2_buffer_t();
+    ZERO(buf);
+    DEQ_ITEM_INIT(buf);
+    buf->size   = 0;
+    return buf;
+}
+
+void qd_http2_buffer_list_append(qd_http2_buffer_list_t *buflist, const uint8_t *data, size_t len)
+{
+    //
+    // If len is zero, there's no work to do.
+    //
+    if (len == 0)
+        return;
+
+    //
+    // If the buffer list is empty and there's some data, add one empty buffer before we begin.
+    //
+    if (DEQ_SIZE(*buflist) == 0) {
+        qd_http2_buffer_t *buf = qd_http2_buffer();
+        DEQ_INSERT_TAIL(*buflist, buf);
+    }
+
+    qd_http2_buffer_t *tail = DEQ_TAIL(*buflist);
+
+    while (len > 0) {
+        size_t to_copy = MIN(len, qd_http2_buffer_capacity(tail));
+        if (to_copy > 0) {
+            memcpy(qd_http2_buffer_cursor(tail), data, to_copy);
+            qd_http2_buffer_insert(tail, to_copy);
+            data += to_copy;
+            len  -= to_copy;
+        }
+        if (len > 0) {
+            tail = qd_http2_buffer();
+            DEQ_INSERT_TAIL(*buflist, tail);
+        }
+    }
+}
+
 /**
  * HTTP :path is mapped to the AMQP 'to' field.
  */
@@ -149,8 +193,8 @@ static size_t write_buffers(qdr_http2_connection_t *conn)
 
     qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] write_buffers pn_raw_connection_write_buffers_capacity=%zu", conn->conn_id,  pn_buffs_to_write);
 
-    size_t qd_buffs_to_write = DEQ_SIZE(session_data->buffs);
-    size_t num_buffs = qd_buffs_to_write > pn_buffs_to_write ? pn_buffs_to_write : qd_buffs_to_write;
+    size_t qd_raw_buffs_to_write = DEQ_SIZE(session_data->buffs);
+    size_t num_buffs = qd_raw_buffs_to_write > pn_buffs_to_write ? pn_buffs_to_write : qd_raw_buffs_to_write;
 
     if (num_buffs == 0) {
         //
@@ -161,20 +205,20 @@ static size_t write_buffers(qdr_http2_connection_t *conn)
     }
 
     pn_raw_buffer_t raw_buffers[num_buffs];
-    qd_buffer_t *qd_buff = DEQ_HEAD(session_data->buffs);
+    qd_http2_buffer_t *qd_http2_buff = DEQ_HEAD(session_data->buffs);
 
     int i = 0;
     int total_bytes = 0;
-    while (i < num_buffs && qd_buff != 0) {
-        raw_buffers[i].bytes = (char *)qd_buffer_base(qd_buff);
-        size_t buffer_size = qd_buffer_size(qd_buff);
+    while (i < num_buffs && qd_http2_buff != 0) {
+        raw_buffers[i].bytes = (char *)qd_http2_buffer_base(qd_http2_buff);
+        size_t buffer_size = qd_http2_buffer_size(qd_http2_buff);
         raw_buffers[i].capacity = buffer_size;
         raw_buffers[i].size = buffer_size;
         total_bytes += buffer_size;
         raw_buffers[i].offset = 0;
-        raw_buffers[i].context = (uintptr_t) qd_buff;
+        raw_buffers[i].context = (uintptr_t) qd_http2_buff;
         DEQ_REMOVE_HEAD(session_data->buffs);
-        qd_buff = DEQ_HEAD(session_data->buffs);
+        qd_http2_buff = DEQ_HEAD(session_data->buffs);
         i ++;
 
     }
@@ -312,6 +356,11 @@ static int on_data_chunk_recv_callback(nghttp2_session *session,
     return 0;
 }
 
+
+
+/**
+ * Callback function invoked when NGHTTP2_DATA_FLAG_NO_COPY is used in nghttp2_data_source_read_callback to send complete DATA frame.
+ */
 static int on_stream_close_callback(nghttp2_session *session,
                                     int32_t stream_id,
                                     nghttp2_error_code error_code,
@@ -324,8 +373,62 @@ static int on_stream_close_callback(nghttp2_session *session,
     return 0;
 }
 
-/* nghttp2_send_callback. The data pointer passed into this function contains encoded HTTP data. Here we transmit the |data|, |length| bytes,
-   to the network. */
+
+
+static int snd_data_callback(nghttp2_session *session,
+                                  nghttp2_frame *frame,
+                                  const uint8_t *framehd,
+                                  size_t length,
+                                  nghttp2_data_source *source,
+                                  void *user_data) {
+    // The frame is a DATA frame to send. The framehd is the serialized frame header (9 bytes).
+    // The length is the length of application data to send (this does not include padding)
+    qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
+    qdr_http2_session_data_t *session_data = conn->session_data;
+    qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t *)source->ptr;
+
+    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 snd_data_callback length=%zu", conn->conn_id, stream_data->stream_id, length);
+
+    qd_http2_buffer_t *http2_buff = qd_http2_buffer();
+    DEQ_INSERT_TAIL(session_data->buffs, http2_buff);
+    // Insert the framehd of length 9 bytes into the buffer
+    memcpy(qd_http2_buffer_cursor(http2_buff), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH);
+    qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH);
+
+    int bytes_sent = 0; // This should not include the header length of 9.
+    if (length) {
+        pn_raw_buffer_t pn_raw_buffs[stream_data->qd_buffers_to_send];
+        qd_message_body_data_buffers(stream_data->curr_body_data, pn_raw_buffs, 0, stream_data->qd_buffers_to_send);
+
+        int idx = 0;
+        while (idx < stream_data->qd_buffers_to_send) {
+            if (pn_raw_buffs[idx].size > 0) {
+                memcpy(qd_http2_buffer_cursor(http2_buff), pn_raw_buffs[idx].bytes, pn_raw_buffs[idx].size);
+                qd_http2_buffer_insert(http2_buff, pn_raw_buffs[idx].size);
+                bytes_sent += pn_raw_buffs[idx].size;
+                stream_data->curr_body_data_qd_buff_offset += 1;
+                qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] snd_data_callback memcpy pn_raw_buffs[%i].size=%zu", conn->conn_id, stream_data->stream_id, idx, pn_raw_buffs[idx].size);
+            }
+            idx += 1;
+        }
+    }
+    if (stream_data->full_payload_handled) {
+        qd_message_body_data_release(stream_data->curr_body_data);
+        stream_data->curr_body_data = 0;
+        stream_data->curr_body_data_qd_buff_offset = 0;
+        qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] snd_data_callback qd_message_body_data_release", conn->conn_id, stream_data->stream_id);
+        stream_data->full_payload_handled = false;
+    }
+
+    if (length)
+        assert(bytes_sent == length);
+
+    write_buffers(conn);
+
+    return 0;
+
+}
+
 static ssize_t send_callback(nghttp2_session *session,
                              const uint8_t *data,
                              size_t length,
@@ -333,7 +436,7 @@ static ssize_t send_callback(nghttp2_session *session,
                              void *user_data) {
     qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
     qdr_http2_session_data_t *session_data = conn->session_data;
-    qd_buffer_list_append(&(session_data->buffs), (uint8_t *)data, length);
+    qd_http2_buffer_list_append(&(session_data->buffs), (uint8_t *)data, length);
     qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] HTTP2 send_callback data length %zu", conn->conn_id, length);
     write_buffers(conn);
     return (ssize_t)length;
@@ -481,7 +584,6 @@ static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_co
     bool delivery_routed = false;
 
     if (conn->ingress) {
-        // TODO - Remove the redunant check stream_data->entire_header_arrived && !stream_data->in_dlv
         if (stream_data->reply_to && stream_data->entire_header_arrived && !stream_data->in_dlv) {
             header_and_props = qd_message_compose_amqp(stream_data->message,
                                                   conn->config->address,  // const char *to
@@ -632,13 +734,15 @@ ssize_t read_callback(nghttp2_session *session,
                       void *user_data)
 {
     qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
-    qdr_http2_session_data_t *session_data = conn->session_data;
-    qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
+    qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t *)source->ptr;
     qd_message_t *message = qdr_delivery_message(stream_data->out_dlv);
     qd_message_depth_status_t status = qd_message_check_depth(message, QD_DEPTH_BODY);
 
     qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback with length=%zu", conn->conn_id, stream_data->stream_id, length);
 
+    // This flag tells nghttp2 that the data is not being copied into its buffer (uint8_t *buf).
+    *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
+
 
     switch (status) {
     case QD_MESSAGE_DEPTH_OK: {
@@ -650,17 +754,12 @@ ssize_t read_callback(nghttp2_session *session,
         qd_message_body_data_t        *body_data = 0;
         qd_message_body_data_result_t  body_data_result;
 
-        //
-        // Process as many body-data segments as are available.
-        //
-        int buff_offset = 0;
         body_data = stream_data->curr_body_data;
         if (body_data) {
             //
             // If we saved the body_data, use the buff_offset.
             //
             body_data_result = stream_data->curr_body_data_result;
-            buff_offset = stream_data->curr_body_data_buff_offset;
             qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback Use existing body_data", conn->conn_id, stream_data->stream_id);
         }
         else {
@@ -670,7 +769,7 @@ ssize_t read_callback(nghttp2_session *session,
             }
             stream_data->curr_body_data = body_data;
             stream_data->curr_body_data_result = body_data_result;
-            stream_data->curr_body_data_buff_offset = 0;
+            stream_data->curr_body_data_qd_buff_offset = 0;
             qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback qd_message_next_body_data", conn->conn_id, stream_data->stream_id);
         }
 
@@ -679,100 +778,56 @@ ssize_t read_callback(nghttp2_session *session,
             //
             // We have a new valid body-data segment.  Handle it
             //
-            stream_data->body_data_buff_count = qd_message_body_data_buffer_count(body_data);
-
-            //TODO - Dont do this here.
-            size_t pn_buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
+            size_t pn_buffs_write_capacity = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
 
-            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback pn_raw_connection_write_buffers_capacity=%zu", conn->conn_id, stream_data->stream_id, pn_buffs_to_write);
+            // total length of the payload (across all qd_buffers in the current body data)
+            size_t payload_length = qd_message_body_data_payload_length(body_data);
 
-            if (stream_data->body_data_buff_count == 0 || pn_buffs_to_write == 0) {
-                // We cannot send anything, we need to come back here.
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback pn_raw_connection_write_buffers_capacity=%zu", conn->conn_id, stream_data->stream_id, pn_buffs_write_capacity);
 
-                if (stream_data->body_data_buff_count == 0) {
-                    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Exiting read_callback QD_MESSAGE_BODY_DATA_OK, body_data_buff_count=0, temporarily pausing stream", conn->conn_id, stream_data->stream_id);
+            if (pn_buffs_write_capacity == 0 || payload_length == 0) {
+                if (pn_buffs_write_capacity == 0)
+                    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Exiting read_callback, pn_buffs_write_capacity=0, pausing stream", conn->conn_id, stream_data->stream_id);
+                if (payload_length == 0) {
+                    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Exiting read_callback, payload_length=0, pausing stream", conn->conn_id, stream_data->stream_id);
                     qd_message_body_data_release(stream_data->curr_body_data);
                     stream_data->curr_body_data = 0;
                 }
-                if (pn_buffs_to_write == 0)
-                    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Exiting read_callback, pn_buffs_to_write=0, pausing stream", conn->conn_id, stream_data->stream_id);
 
                 //
-                // We don't have any buffers to send but we may or may not get more buffers.
-                // Temporarily pause this stream
+                // Temporarily pause this stream.
+                // We will unpause this stream later when more data arrives or when proton returns after writing buffers.
                 //
-
                 stream_data->disposition = 0;
                 return NGHTTP2_ERR_DEFERRED;
             }
 
-            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_OK, body_data_buff_count=%i", conn->conn_id, stream_data->stream_id, stream_data->body_data_buff_count);
+            stream_data->body_data_buff_count = qd_message_body_data_buffer_count(body_data);
 
-            int bytes_read = 0;
-            //
-            // We are looking to write only a maximum of one pn_raw_buffer_t per call of read_callback.
-            //
-            if (stream_data->raw_buffer.offset != 0) {
-                int remaining = stream_data->raw_buffer.size - stream_data->raw_buffer.offset;
-                if (length < remaining) {
-                    // Account for the case where the current buffer might not be fully consumed.
-                    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback, (length < remaining) memcpy size=%zu", conn->conn_id, stream_data->stream_id, length);
-                    memcpy(buf, stream_data->raw_buffer.bytes + stream_data->raw_buffer.offset, length);
-                    stream_data->raw_buffer.offset += length;
-                    bytes_read = length;
-                }
-                else {
-                    // Current buffer fully consumed.
-                    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback, (length >= remaining) remaining=%zu", conn->conn_id, stream_data->stream_id, remaining);
-                    memcpy(buf, stream_data->raw_buffer.bytes + stream_data->raw_buffer.offset, remaining);
-                    bytes_read = remaining;
-
-                    stream_data->raw_buffer.context  = 0;
-                    stream_data->raw_buffer.bytes    = 0;
-                    stream_data->raw_buffer.capacity = 0;
-                    stream_data->raw_buffer.size     = 0;
-                    stream_data->raw_buffer.offset   = 0;
-
-                    stream_data->curr_body_data_buff_offset += 1;
-                    stream_data->body_data_buff_count -= 1;
-                }
-            }
-            else {
-                // TODO - Make multiple calls to qd_message_body_data_buffers and gather all the data depending on length.
-                qd_message_body_data_buffers(body_data, &stream_data->raw_buffer, buff_offset, 1);
-                int remaining = stream_data->raw_buffer.size - stream_data->raw_buffer.offset;
-
-                if (length < remaining) {
-                    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback, qd_message_body_data_buffers (length < remaining) memcpy size=%zu", conn->conn_id, stream_data->stream_id, length);
-                    memcpy(buf, stream_data->raw_buffer.bytes, length);
-                    stream_data->raw_buffer.offset += length;
-                    bytes_read = length;
+            assert (payload_length >= 0);
+            size_t bytes_to_send = 0;
+            if (payload_length) {
+                qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_OK, body_data_buff_count=%i", conn->conn_id, stream_data->stream_id, stream_data->body_data_buff_count);
+
+                // Add the HTTP2 DATA frame header to the payload length to get the complete size of the payload.
+                size_t remaining_payload_length = payload_length - (stream_data->curr_body_data_qd_buff_offset * BUFFER_SIZE);
+
+                if (QD_HTTP2_BUFFER_SIZE >= remaining_payload_length) {
+                    bytes_to_send = remaining_payload_length;
+                    stream_data->qd_buffers_to_send = stream_data->body_data_buff_count;
+                    stream_data->full_payload_handled = true;
+                    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_HTTP2_BUFFER_SIZE >= remaining_payload_length bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send, stream_data->qd_buffers_to_send);
                 }
                 else {
-                    stream_data->curr_body_data_buff_offset += 1;
-                    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback, qd_message_body_data_buffers (length >= remaining) size of raw_buffer=%zu", conn->conn_id, stream_data->stream_id, stream_data->raw_buffer.size);
-                    memcpy(buf, stream_data->raw_buffer.bytes, stream_data->raw_buffer.size);
-                    bytes_read = stream_data->raw_buffer.size;
-                    stream_data->body_data_buff_count -= 1;
+                    // This means that there is more that 16k worth of payload in one body data.
+                    // We want to send only 16k data per read_callback
+                    bytes_to_send = QD_HTTP2_BUFFER_SIZE;
+                    stream_data->full_payload_handled = false;
+                    stream_data->qd_buffers_to_send = NUM_QD_BUFFERS_IN_ONE_HTTP2_BUFFER;
+                    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_HTTP2_BUFFER_SIZE >= remaining_payload_length ELSE bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send, stream_data->qd_buffers_to_send);
                 }
             }
-
-            if (!stream_data->body_data_buff_count) {
-                qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Releasing qd_message_body_data", conn->conn_id, stream_data->stream_id);
-                qd_message_body_data_release(stream_data->curr_body_data);
-                stream_data->curr_body_data = 0;
-
-                stream_data->raw_buffer.context  = 0;
-                stream_data->raw_buffer.bytes    = 0;
-                stream_data->raw_buffer.capacity = 0;
-                stream_data->raw_buffer.size     = 0;
-                stream_data->raw_buffer.offset   = 0;
-
-                stream_data->curr_body_data = 0;
-            }
-
-
-            return bytes_read;
+            return bytes_to_send;
         }
 
         case QD_MESSAGE_BODY_DATA_INCOMPLETE:
@@ -780,21 +835,21 @@ ssize_t read_callback(nghttp2_session *session,
             // A new segment has not completely arrived yet.  Check again later.
             //
             qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id);
-            return 0;
+            break;
 
         case QD_MESSAGE_BODY_DATA_NO_MORE: {
             //
             // We have already handled the last body-data segment for this delivery.
             // Complete the "sending" of this delivery and replenish credit.
             //
-            size_t pn_buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
-            if (pn_buffs_to_write == 0) {
+            size_t pn_buffs_write_capacity = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
+            if (pn_buffs_write_capacity == 0) {
                 return NGHTTP2_ERR_DEFERRED;
                 stream_data->disposition = 0;
-                qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_NO_MORE - pn_buffs_to_write=0 send is not complete", conn->conn_id, stream_data->stream_id);
+                qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_NO_MORE - pn_buffs_write_capacity=0 send is not complete", conn->conn_id, stream_data->stream_id);
             }
             else {
-                qd_message_body_data_release(stream_data->curr_body_data);
+                stream_data->qd_buffers_to_send = 0;
                 *data_flags |= NGHTTP2_DATA_FLAG_EOF;
                 qd_message_set_send_complete(message);
                 // TODO - Dont do the disposition here.
@@ -834,6 +889,7 @@ ssize_t read_callback(nghttp2_session *session,
         break;
 
     case QD_MESSAGE_DEPTH_INCOMPLETE:
+        qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_DEPTH_INCOMPLETE", conn->conn_id, stream_data->stream_id);
         break;
     }
 
@@ -870,16 +926,14 @@ qdr_http2_connection_t *qdr_http_connection_ingress(qd_http_lsnr_t* listener)
 static void grant_read_buffers(qdr_http2_connection_t *conn)
 {
     pn_raw_buffer_t raw_buffers[READ_BUFFERS];
-    // Give proactor more read buffers for the pn_raw_conn
-    // TODO - Look into using bigger buffers here.
     if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) {
         size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn);
         while (desired) {
             size_t i;
             for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
-                qd_buffer_t *buf = qd_buffer();
-                raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
-                raw_buffers[i].capacity = qd_buffer_capacity(buf);
+                qd_http2_buffer_t *buf = qd_http2_buffer();
+                raw_buffers[i].bytes = (char*) qd_http2_buffer_base(buf);
+                raw_buffers[i].capacity = qd_http2_buffer_capacity(buf);
                 raw_buffers[i].size = 0;
                 raw_buffers[i].offset = 0;
                 raw_buffers[i].context = (uintptr_t) buf;
@@ -965,7 +1019,7 @@ static void qdr_http_second_attach(void *context, qdr_link_t *link,
                 else {
                     qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to available but delivery not routed", stream_data->session_data->conn->conn_id);
                 }
-                grant_read_buffers(stream_data->session_data->conn);
+                //grant_read_buffers(stream_data->session_data->conn);
             }
             qdr_link_flow(http_adaptor->core, link, 10, false);
         }
@@ -981,6 +1035,7 @@ static void qdr_http_activate(void *notused, qdr_connection_t *c)
             qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Activation triggered, calling pn_raw_connection_wake()", conn->conn_id);
             pn_raw_connection_wake(conn->pn_raw_conn);
         }
+        //TODO is conn->timer_scheduled required ?
         else if (conn->activate_timer && !conn->timer_scheduled) {
             conn->timer_scheduled = true;
             // On egress, the raw connection is only created once the
@@ -1085,6 +1140,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
         }
         else {
             qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Processing message body", conn->conn_id, stream_data->stream_id);
+            conn->data_prd.source.ptr = stream_data;
             int rv = nghttp2_submit_data(session_data->session, NGHTTP2_FLAG_END_STREAM, stream_data->stream_id, &conn->data_prd);
             if (rv != 0) {
                 qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] Error submitting data rv=%i", conn->conn_id, stream_data->stream_id, rv);
@@ -1177,7 +1233,7 @@ void qd_http2_delete_connector(qd_dispatch_t *qd, qd_http_connector_t *connector
 
 static int handle_incoming_http(qdr_http2_connection_t *conn)
 {
-    qd_buffer_list_t buffers;
+    qd_http2_buffer_list_t buffers;
     DEQ_INIT(buffers);
     pn_raw_buffer_t raw_buffers[READ_BUFFERS];
     size_t n;
@@ -1189,11 +1245,11 @@ static int handle_incoming_http(qdr_http2_connection_t *conn)
 
     while ( (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) {
         for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) {
-            qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
+            qd_http2_buffer_t *buf = (qd_http2_buffer_t*) raw_buffers[i].context;
             uint32_t raw_buff_size = raw_buffers[i].size;
-            qd_buffer_insert(buf, raw_buff_size);
-            qd_log(http_adaptor->log_source, QD_LOG_DEBUG, "[C%i] - handle_incoming_http - Inserting qd_buffer of size %"PRIu32" ", conn->conn_id, raw_buff_size);
-            count += raw_buffers[i].size;
+            qd_http2_buffer_insert(buf, raw_buff_size);
+            qd_log(http_adaptor->log_source, QD_LOG_DEBUG, "[C%i] - handle_incoming_http - Inserting qd_http2_buffer of size %"PRIu32" ", conn->conn_id, raw_buff_size);
+            count += raw_buff_size;
             DEQ_INSERT_TAIL(buffers, buf);
         }
     }
@@ -1201,14 +1257,14 @@ static int handle_incoming_http(qdr_http2_connection_t *conn)
     //
     // Read each buffer in the buffer chain and call nghttp2_session_mem_recv with buffer content
     //
-    qd_buffer_t *buf = DEQ_HEAD(buffers);
-    qd_buffer_t *curr_buf = 0;
+    qd_http2_buffer_t *buf = DEQ_HEAD(buffers);
+    qd_http2_buffer_t *curr_buf = 0;
 
     //TODO - Look into reusing the buffers between here and grant_read_buffers
     int rv = 0;
     while (buf) {
         // TODO - Check the return value of nghttp2_session_mem_recv.
-        rv = nghttp2_session_mem_recv(conn->session_data->session, qd_buffer_base(buf), qd_buffer_size(buf));
+        rv = nghttp2_session_mem_recv(conn->session_data->session, qd_http2_buffer_base(buf), qd_http2_buffer_size(buf));
         if (rv < 0) {
             if (rv == NGHTTP2_ERR_FLOODED || rv == NGHTTP2_ERR_BAD_CLIENT_MAGIC) {
                 // Flooding was detected in this HTTP/2 session, and it must be closed. This is most likely caused by misbehavior of peer.
@@ -1226,7 +1282,7 @@ static int handle_incoming_http(qdr_http2_connection_t *conn)
         curr_buf = buf;
         DEQ_REMOVE_HEAD(buffers);
         buf = DEQ_HEAD(buffers);
-        qd_buffer_free(curr_buf);
+        free_qd_http2_buffer_t(curr_buf);
     }
 
     if (rv > 0)
@@ -1321,7 +1377,7 @@ static void handle_disconnected(qdr_http2_connection_t* conn)
 
     if (conn->pn_raw_conn) {
         pn_raw_connection_set_context(conn->pn_raw_conn, 0);
-        conn->pn_raw_conn = 0;
+        //conn->pn_raw_conn = 0;
     }
 
     qdr_action_t *action = qdr_action(qdr_del_http2_connection_CT, "delete_http2_connection");
@@ -1349,7 +1405,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
     }
     case PN_RAW_CONNECTION_CLOSED_READ: {
         pn_raw_connection_close(conn->pn_raw_conn);
-        conn->pn_raw_conn = 0;
+        //conn->pn_raw_conn = 0;
         qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
         break;
     }
@@ -1393,10 +1449,11 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
         while ( (n = pn_raw_connection_take_written_buffers(conn->pn_raw_conn, buffs, WRITE_BUFFERS)) ) {
             for (size_t i = 0; i < n; ++i) {
                 written += buffs[i].size;
-                qd_buffer_t *qd_buff = (qd_buffer_t *) buffs[i].context;
-                assert(qd_buff);
-                if (qd_buff != NULL)
-                    qd_buffer_free(qd_buff);
+                qd_http2_buffer_t *qd_http2_buff = (qd_http2_buffer_t *) buffs[i].context;
+                assert(qd_http2_buff);
+                if (qd_http2_buff != NULL) {
+                    free_qd_http2_buffer_t(qd_http2_buff);
+                }
             }
         }
         qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_WRITTEN Wrote %i bytes, DEQ_SIZE(session_data->buffs) = %zu", conn->conn_id, written, DEQ_SIZE(conn->session_data->buffs));
@@ -1646,6 +1703,7 @@ static void qdr_http_adaptor_init(qdr_core_t *core, void **adaptor_context)
     nghttp2_session_callbacks_set_on_header_callback(callbacks, on_header_callback);
     nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, on_stream_close_callback);
     nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, on_data_chunk_recv_callback);
+    nghttp2_session_callbacks_set_send_data_callback(callbacks, snd_data_callback);
     nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
 
     adaptor->callbacks = callbacks;
diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h
index dc19f90..9a51dc9 100644
--- a/src/adaptors/http2/http2_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -29,6 +29,11 @@
 #include "server_private.h"
 #include "adaptors/http_common.h"
 
+size_t QD_HTTP2_BUFFER_SIZE = 16384;
+size_t NUM_QD_BUFFERS_IN_ONE_HTTP2_BUFFER = 32;
+size_t MAX_BUFFERS = 16;
+size_t HTTP2_DATA_FRAME_HEADER_LENGTH = 9;
+
 
 // We already have a qd_http_listener_t defined in http-libwebsockets.c
 // We will call this as qd_http_lsnr_t in order to avoid a clash.
@@ -36,14 +41,18 @@
 // and get rid of http-libwebsockets.c and rename this as qd_http_listener_t
 typedef struct qdr_http2_session_data_t qdr_http2_session_data_t;
 typedef struct qdr_http2_stream_data_t  qdr_http2_stream_data_t;
-typedef struct qdr_http2_connection_t    qdr_http2_connection_t;
+typedef struct qdr_http2_connection_t   qdr_http2_connection_t;
+typedef struct qd_http2_buffer_t          qd_http2_buffer_t;
+
 DEQ_DECLARE(qdr_http2_stream_data_t, qd_http2_stream_data_list_t);
+DEQ_DECLARE(qd_http2_buffer_t,         qd_http2_buffer_list_t);
 
 struct qdr_http2_session_data_t {
     qdr_http2_connection_t       *conn;       // Connection associated with the session_data
     nghttp2_session             *session;    // A pointer to the nghttp2s' session object
     qd_http2_stream_data_list_t  streams;    // A session can have many streams.
-    qd_buffer_list_t             buffs;      // Buffers for writing
+    qd_http2_buffer_list_t         buffs;      // Buffers for writing
+    bool                         max_buffs_in_pool;
 };
 
 struct qdr_http2_stream_data_t {
@@ -63,14 +72,16 @@ struct qdr_http2_stream_data_t {
     DEQ_LINKS(qdr_http2_stream_data_t);
 
     qd_message_body_data_result_t  curr_body_data_result;
-    int                            curr_body_data_buff_offset;
+    int                            curr_body_data_qd_buff_offset;
     int                            body_data_buff_count;
     int32_t                        stream_id;
+    size_t                         qd_buffers_to_send;
 
     bool                     entire_header_arrived;
     bool                     header_sent;
     bool                     steam_closed;
-    pn_raw_buffer_t          raw_buffer;
+
+    bool                     full_payload_handled;
 };
 
 struct qdr_http2_connection_t {
@@ -88,7 +99,6 @@ struct qdr_http2_connection_t {
     char                    *remote_address;
     qdr_link_t              *stream_dispatcher;
     uint64_t                 stream_dispatcher_id;
-    qdr_http2_stream_data_t *initial_stream;
     char                     *reply_to;
     nghttp2_data_provider    data_prd;
     bool                     connection_established;
@@ -97,8 +107,64 @@ struct qdr_http2_connection_t {
     bool                     timer_scheduled;
  };
 
+struct qd_http2_buffer_t {
+    unsigned int  size;     ///< Size of data content
+    unsigned char content[16393];   // 16k max content + 9 bytes for the HTTP2 header, 16384 + 9 = 16393
+    DEQ_LINKS(qd_http2_buffer_t);
+};
+
+
+static inline unsigned char *qd_http2_buffer_base(const qd_http2_buffer_t *buf)
+{
+    return (unsigned char*) &buf->content[0];
+}
+
+/**
+ * Return a pointer to the first unused byte in the buffer.
+ * @param buf A pointer to an allocated buffer
+ * @return A pointer to the first free octet in the buffer, the insert point for new data.
+ */
+static inline unsigned char *qd_http2_buffer_cursor(const qd_http2_buffer_t *buf)
+{
+    return ( (unsigned char*) &(buf->content[0]) ) + buf->size;
+}
+
+/**
+ * Return remaining capacity at end of buffer.
+ * @param buf A pointer to an allocated buffer
+ * @return The number of octets in the buffer's free space, how many octets may be inserted.
+ */
+static inline size_t qd_http2_buffer_capacity(const qd_http2_buffer_t *buf)
+{
+    return QD_HTTP2_BUFFER_SIZE - buf->size;
+}
+
+/**
+ * Return the size of the buffers data content.
+ * @param buf A pointer to an allocated buffer
+ * @return The number of octets of data in the buffer
+ */
+static inline size_t qd_http2_buffer_size(const qd_http2_buffer_t *buf)
+{
+    return buf->size;
+}
+
+/**
+ * Notify the buffer that octets have been inserted at the buffer's cursor.  This will advance the
+ * cursor by len octets.
+ *
+ * @param buf A pointer to an allocated buffer
+ * @param len The number of octets that have been appended to the buffer
+ */
+static inline void qd_http2_buffer_insert(qd_http2_buffer_t *buf, size_t len)
+{
+    buf->size += len;
+    assert(buf->size <= QD_HTTP2_BUFFER_SIZE);
+}
+
 ALLOC_DECLARE(qdr_http2_session_data_t);
 ALLOC_DECLARE(qdr_http2_stream_data_t);
 ALLOC_DECLARE(qdr_http2_connection_t);
+ALLOC_DECLARE(qd_http2_buffer_t);
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org