You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2021/09/07 13:20:31 UTC

[qpid-dispatch] branch main updated: DISPATCH-2232: Combine output buffers and delay sending them so that the TCP frames are not fragmented. This closes #1343.

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 27cf112  DISPATCH-2232: Combine output buffers and delay sending them so that the TCP frames are not fragmented. This closes #1343.
27cf112 is described below

commit 27cf11216f397a8ac01590abd0e807bcc922b476
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Mon Aug 16 16:59:15 2021 -0400

    DISPATCH-2232: Combine output buffers and delay sending them so that the TCP frames are not fragmented. This closes #1343.
---
 src/adaptors/http2/http2_adaptor.c | 189 ++++++++++++++++++++++---------------
 src/adaptors/http2/http2_adaptor.h |   2 +
 2 files changed, 117 insertions(+), 74 deletions(-)

diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index b5752fa..e70c331 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -146,13 +146,13 @@ qd_http2_buffer_t *qd_http2_buffer(void)
     return buf;
 }
 
-void qd_http2_buffer_list_append(qd_http2_buffer_list_t *buflist, const uint8_t *data, size_t len)
+qd_http2_buffer_t *qd_http2_buffer_list_append(qd_http2_buffer_list_t *buflist, const uint8_t *data, size_t len)
 {
     //
     // If len is zero, there's no work to do.
     //
     if (len == 0)
-        return;
+        return DEQ_TAIL(*buflist);
 
     //
     // If the buffer list is empty and there's some data, add one empty buffer before we begin.
@@ -177,6 +177,8 @@ void qd_http2_buffer_list_append(qd_http2_buffer_list_t *buflist, const uint8_t
             DEQ_INSERT_TAIL(*buflist, tail);
         }
     }
+
+    return DEQ_TAIL(*buflist);
 }
 
 
@@ -315,7 +317,9 @@ static size_t write_buffers(qdr_http2_connection_t *conn)
 
     int i = 0;
     int total_bytes = 0;
-    while (i < num_buffs && qd_http2_buff != 0) {
+
+    while (i < num_buffs) {
+        assert (qd_http2_buff != 0);
         raw_buffers[i].bytes = (char *)qd_http2_buffer_base(qd_http2_buff);
         size_t buffer_size = qd_http2_buffer_size(qd_http2_buff);
         raw_buffers[i].capacity = buffer_size;
@@ -326,19 +330,12 @@ static size_t write_buffers(qdr_http2_connection_t *conn)
         DEQ_REMOVE_HEAD(conn->buffs);
         qd_http2_buff = DEQ_HEAD(conn->buffs);
         i ++;
-
     }
 
-    if (i >0) {
-        size_t num_buffers_written = pn_raw_connection_write_buffers(conn->pn_raw_conn, raw_buffers, num_buffs);
-        qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Written %zu buffer(s) and %i bytes in write_buffers() using pn_raw_connection_write_buffers()", conn->conn_id, num_buffers_written, total_bytes);
-        if (num_buffs != num_buffers_written) {
-            //TODO - This is not good.
-        }
-        return num_buffers_written;
-    }
-
-    return 0;
+    size_t num_buffers_written = pn_raw_connection_write_buffers(conn->pn_raw_conn, raw_buffers, num_buffs);
+    qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Written %zu buffer(s) and %i bytes in write_buffers() using pn_raw_connection_write_buffers()", conn->conn_id, num_buffers_written, total_bytes);
+    assert(num_buffs == num_buffers_written);
+    return num_buffers_written;
 }
 
 
@@ -379,6 +376,7 @@ static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool on
     // If the httpConnector was deleted, a client request has nowhere to go because of lack of receiver and hence credit.
     // No delivery was created. The message that was created for such a hanging request must be freed here..
     if (!stream_data->in_dlv && stream_data->message) {
+        qd_message_clear_q2_unblocked_handler(stream_data->message);
         qd_message_free(stream_data->message);
     }
 
@@ -463,6 +461,7 @@ void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdo
     sys_atomic_destroy(&http_conn->raw_closed_read);
     sys_atomic_destroy(&http_conn->raw_closed_write);
     sys_atomic_destroy(&http_conn->q2_restart);
+    sys_atomic_destroy(&http_conn->delay_buffer_write);
 
     free_qdr_http2_connection_t(http_conn);
 }
@@ -584,20 +583,7 @@ static int on_data_chunk_recv_callback(nghttp2_session *session,
 }
 
 
-
-/**
- * Callback function invoked when NGHTTP2_DATA_FLAG_NO_COPY is used in nghttp2_data_source_read_callback to send complete DATA frame.
- */
-static int on_stream_close_callback(nghttp2_session *session,
-                                    int32_t stream_id,
-                                    nghttp2_error_code error_code,
-                                    void *user_data)
-{
-    return 0;
-}
-
-
-static int snd_data_callback(nghttp2_session *session,
+static int send_data_callback(nghttp2_session *session,
                              nghttp2_frame *frame,
                              const uint8_t *framehd,
                              size_t length,
@@ -608,47 +594,63 @@ static int snd_data_callback(nghttp2_session *session,
     qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
     qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t *)source->ptr;
 
-    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback length=%zu", conn->conn_id, stream_data->stream_id, length);
+    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] send_data_callback length=%zu", conn->conn_id, stream_data->stream_id, length);
 
     int bytes_sent = 0; // This should not include the header length of 9.
     bool write_buffs = false;
     if (length) {
-        write_buffs = true;
-        qd_http2_buffer_t *http2_buff = qd_http2_buffer();
-        DEQ_INSERT_TAIL(conn->buffs, http2_buff);
-        // Insert the framehd of length 9 bytes into the buffer
-        memcpy(qd_http2_buffer_cursor(http2_buff), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH);
-        qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH);
-
-        uint32_t octets_remaining = qd_iterator_remaining(stream_data->curr_stream_data_iter);
-
-        size_t len = MIN(octets_remaining, length);
-        int copied = qd_iterator_ncopy(stream_data->curr_stream_data_iter, qd_http2_buffer_cursor(http2_buff), len);
-        assert(copied == len);
-        qd_http2_buffer_insert(http2_buff, len);
-        octets_remaining -= copied;
-        bytes_sent += copied;
-        qd_iterator_trim_view(stream_data->curr_stream_data_iter, octets_remaining);
+        qd_http2_buffer_t *tail_buff = qd_http2_buffer_list_append(&(conn->buffs), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH);
+        size_t tail_buff_capacity = qd_http2_buffer_capacity(tail_buff);
+        if (tail_buff_capacity == 0) {
+            tail_buff = qd_http2_buffer();
+            DEQ_INSERT_TAIL(conn->buffs, tail_buff);
+            tail_buff_capacity = qd_http2_buffer_capacity(tail_buff);
+        }
+        size_t bytes_to_write = length;
+        while (bytes_to_write > 0) {
+            uint32_t octets_remaining = qd_iterator_remaining(stream_data->curr_stream_data_iter);
+            size_t len = MIN(tail_buff_capacity, bytes_to_write);
+            len = MIN(len, octets_remaining);
+            int copied = qd_iterator_ncopy(stream_data->curr_stream_data_iter, qd_http2_buffer_cursor(tail_buff), len);
+            assert(copied == len);
+            qd_http2_buffer_insert(tail_buff, len);
+            octets_remaining -= copied;
+            bytes_sent += copied;
+            qd_iterator_trim_view(stream_data->curr_stream_data_iter, octets_remaining);
+            bytes_to_write -= len;
+            if (bytes_to_write > 0 && qd_http2_buffer_capacity(tail_buff) == 0) {
+                tail_buff = qd_http2_buffer();
+                DEQ_INSERT_TAIL(conn->buffs, tail_buff);
+                tail_buff_capacity = qd_http2_buffer_capacity(tail_buff);
+            }
+        }
     }
     else if (length == 0 && stream_data->out_msg_data_flag_eof) {
-        write_buffs = true;
         qd_http2_buffer_t *http2_buff = qd_http2_buffer();
         DEQ_INSERT_TAIL(conn->buffs, http2_buff);
-        // Insert the framehd of length 9 bytes into the buffer
         memcpy(qd_http2_buffer_cursor(http2_buff), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH);
         qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH);
     }
 
+    //
+    // If the message has a footer, don't flush the buffers now. Flush them after you write out the footer.
+    //
+    if (!stream_data->out_msg_has_footer) {
+        write_buffs = true;
+    }
+
     if (stream_data->full_payload_handled) {
-        if (!stream_data->out_msg_has_footer && stream_data->curr_stream_data) {
-            qd_message_stream_data_release(stream_data->curr_stream_data);
-            qd_iterator_free(stream_data->curr_stream_data_iter);
-            stream_data->curr_stream_data = 0;
+        if (stream_data->curr_stream_data) {
+            if (stream_data->curr_stream_data_result == QD_MESSAGE_STREAM_DATA_FOOTER_OK) {
+                stream_data->footer_stream_data = stream_data->curr_stream_data;
+                stream_data->footer_stream_data_iter = stream_data->curr_stream_data_iter;
+            }
+            else {
+                qd_message_stream_data_release(stream_data->curr_stream_data);
+                qd_iterator_free(stream_data->curr_stream_data_iter);
+            }
             stream_data->curr_stream_data_iter = 0;
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, no footer, qd_message_stream_data_release", conn->conn_id, stream_data->stream_id);
-        }
-        else {
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, out_msg_has_footer", conn->conn_id, stream_data->stream_id);
+            stream_data->curr_stream_data = 0;
         }
         stream_data->payload_handled = 0;
     }
@@ -656,7 +658,7 @@ static int snd_data_callback(nghttp2_session *session,
     	stream_data->payload_handled += bytes_sent;
     }
 
-    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 snd_data_callback finished, length=%zu, bytes_sent=%i, stream_data=%p", conn->conn_id, stream_data->stream_id, length, bytes_sent, (void *)stream_data);
+    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 send_data_callback finished, length=%zu, bytes_sent=%i, stream_data=%p", conn->conn_id, stream_data->stream_id, length, bytes_sent, (void *)stream_data);
 
     if (length) {
         assert(bytes_sent == length);
@@ -678,7 +680,10 @@ static ssize_t send_callback(nghttp2_session *session,
     qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
     qd_http2_buffer_list_append(&(conn->buffs), (uint8_t *)data, length);
     qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] HTTP2 send_callback data length %zu", conn->conn_id, length);
-    write_buffers(conn);
+    if (! IS_ATOMIC_FLAG_SET(&conn->delay_buffer_write)) {
+        write_buffers(conn);
+    }
+
     return (ssize_t)length;
 }
 
@@ -1231,7 +1236,6 @@ ssize_t read_data_callback(nghttp2_session *session,
 
                 // The payload length is zero on this body data. Look ahead one body data to see if it is QD_MESSAGE_STREAM_DATA_NO_MORE
                 stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
-
                 if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) {
                     if (!stream_data->out_msg_has_footer) {
                         qd_message_stream_data_release(stream_data->curr_stream_data);
@@ -1295,6 +1299,7 @@ ssize_t read_data_callback(nghttp2_session *session,
                             qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_NO_MORE", conn->conn_id, stream_data->stream_id);
                         }
                         else if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_FOOTER_OK) {
+                            stream_data->out_msg_has_footer = true;
                             stream_data->out_msg_body_sent = true;
                             qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data, QD_MESSAGE_STREAM_DATA_FOOTER_OK", conn->conn_id, stream_data->stream_id);
                         }
@@ -1316,8 +1321,11 @@ ssize_t read_data_callback(nghttp2_session *session,
 
         case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
         	qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_FOOTER_OK", conn->conn_id, stream_data->stream_id);
-            stream_data->out_msg_has_footer = true;
-            stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
+        	if (!stream_data->out_msg_has_footer) {
+        	    stream_data->out_msg_has_footer = true;
+        	    stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
+        	}
+
             if (stream_data->next_stream_data) {
                 qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_FOOTER_OK, we have a next_stream_data", conn->conn_id, stream_data->stream_id);
             }
@@ -1402,7 +1410,6 @@ ssize_t read_data_callback(nghttp2_session *session,
         return NGHTTP2_ERR_DEFERRED;
     }
 
-    qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] read_data_callback Returning zero", conn->conn_id, stream_data->stream_id);
     return 0;
 }
 
@@ -1421,6 +1428,7 @@ qdr_http2_connection_t *qdr_http_connection_ingress(qd_http_listener_t* listener
     sys_atomic_init(&ingress_http_conn->raw_closed_read, 0);
     sys_atomic_init(&ingress_http_conn->raw_closed_write, 0);
     sys_atomic_init(&ingress_http_conn->q2_restart, 0);
+    sys_atomic_init(&ingress_http_conn->delay_buffer_write, 0);
     DEQ_INIT(ingress_http_conn->buffs);
     DEQ_INIT(ingress_http_conn->streams);
     DEQ_INIT(ingress_http_conn->granted_read_buffs);
@@ -1723,6 +1731,9 @@ static void http_connector_establish(qdr_http2_connection_t *conn)
 }
 
 
+/**
+ * Converts the AMQP message into a HTTP request or response
+ */
 uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
 {
     qdr_http2_connection_t *conn = stream_data->conn;
@@ -1791,7 +1802,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
                 size_t payload_length = qd_message_stream_data_payload_length(stream_data->curr_stream_data);
 
                 if (payload_length == 0) {
-                    stream_data->next_stream_data_result = 	qd_message_next_stream_data(message, &stream_data->next_stream_data);
+                    stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
 
                     if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) {
                         if (stream_data->next_stream_data) {
@@ -1812,6 +1823,12 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
                 }
             }
 
+            // There is a body for this message, set the delay_buffer_write so the buffers are not immediately
+            // pushed out on header submission.
+            if (stream_data->out_msg_has_body) {
+                SET_ATOMIC_FLAG(&conn->delay_buffer_write);
+            }
+
             stream_data->stream_id = nghttp2_submit_headers(conn->session,
                                                             flags,
                                                             stream_id,
@@ -1849,7 +1866,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
 
         if (stream_data->out_msg_has_body) {
             if (stream_data->out_msg_header_sent) {
-                qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Stream was paused, resuming now", conn->conn_id, stream_data->stream_id);
+                // This is usually called if there are many AMQP data streams objects in a delivery. These data streams were created on the inbound AMQP side using the qdr_delivery_continue() function.
                 nghttp2_session_resume_data(conn->session, stream_data->stream_id);
                 nghttp2_session_send(conn->session);
                 qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] nghttp2_session_send - write_buffers done for resumed stream", conn->conn_id, stream_data->stream_id);
@@ -1864,10 +1881,10 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
                     qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] Error submitting data rv=%i", conn->conn_id, stream_data->stream_id, rv);
                 }
                 else {
-                	if (conn->session) {
-                		nghttp2_session_send(conn->session);
-                		qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] nghttp2_session_send - done", conn->conn_id, stream_data->stream_id);
-                	}
+                    if (conn->session) {
+                        nghttp2_session_send(conn->session);
+                        qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] nghttp2_session_send - done", conn->conn_id, stream_data->stream_id);
+                    }
                 }
             }
         }
@@ -1876,11 +1893,12 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
         }
         stream_data->out_msg_header_sent = true;
 
+        CLEAR_ATOMIC_FLAG(&conn->delay_buffer_write);
+
         if (stream_data->out_msg_has_footer) {
             qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Message has a footer", conn->conn_id, stream_data->stream_id);
             bool send_footer = false;
-            if (stream_data->out_msg_has_body) {
-                qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] stream_data->out_msg_has_body", conn->conn_id, stream_data->stream_id);
+            if (stream_data->out_msg_has_body && !stream_data->out_msg_body_sent) {
                 if (stream_data->out_msg_body_sent) {
                     send_footer = true;
                     qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] send_footer = true", conn->conn_id, stream_data->stream_id);
@@ -1899,7 +1917,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
             if (send_footer) {
                 qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Starting to send footer", conn->conn_id, stream_data->stream_id);
                 // Send the properties in the footer as a HEADERS frame.
-                qd_iterator_t     *footer_properties_iter = qd_message_stream_data_iterator(stream_data->curr_stream_data);
+                qd_iterator_t     *footer_properties_iter = stream_data->footer_stream_data_iter;
                 qd_parsed_field_t *footer_properties_fld = qd_parse(footer_properties_iter);
 
                 uint32_t count = qd_parse_sub_count(footer_properties_fld);
@@ -1934,8 +1952,12 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
                 nghttp2_session_send(conn->session);
                 qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Headers(from footer) submitted", conn->conn_id, stream_data->stream_id);
 
+
                 qd_iterator_free(footer_properties_iter);
                 qd_parse_free(footer_properties_fld);
+                if (stream_data->footer_stream_data) {
+                    qd_message_stream_data_release(stream_data->footer_stream_data);
+                }
                 if (stream_data->curr_stream_data) {
                     qd_message_stream_data_release(stream_data->curr_stream_data);
                     qd_iterator_free(stream_data->curr_stream_data_iter);
@@ -2061,6 +2083,11 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
 }
 
 
+/**
+ * Takes read buffers from proton raw connection and feeds the binary http2 frame data
+ * to nghttp2 via the nghttp2_session_mem_recv() function.
+ * All pertinent nghttp2 callbacks are called before the call to nghttp2_session_mem_recv() completes.
+ */
 static int handle_incoming_http(qdr_http2_connection_t *conn)
 {
     //
@@ -2392,6 +2419,7 @@ qdr_http2_connection_t *qdr_http_connection_egress(qd_http_connector_t *connecto
     DEQ_INIT(egress_http_conn->granted_read_buffs);
     sys_atomic_init(&egress_http_conn->raw_closed_read, 0);
     sys_atomic_init(&egress_http_conn->raw_closed_write, 0);
+    sys_atomic_init(&egress_http_conn->delay_buffer_write, 0);
     sys_atomic_init(&egress_http_conn->q2_restart, 0);
 
     sys_mutex_lock(http2_adaptor->lock);
@@ -2654,6 +2682,9 @@ qd_http_connector_t *qd_http2_configure_connector(qd_dispatch_t *qd, const qd_ht
     return c;
 }
 
+/**
+ * Called just before shutdown of the router. Frees listeners and connectors and any http2 buffers
+ */
 static void qdr_http2_adaptor_final(void *adaptor_context)
 {
     qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "Shutting down HTTP2 Protocol adaptor");
@@ -2736,14 +2767,24 @@ static void qdr_http2_adaptor_init(qdr_core_t *core, void **adaptor_context)
     //
     nghttp2_session_callbacks *callbacks;
     nghttp2_session_callbacks_new(&callbacks);
+
+    //
+    // These callbacks are called when we feed the incoming binary http2 data from the client or the server to nghttp2_session_mem_recv()
+    // in handle_incoming_http
+    //
     nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback);
     nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, on_begin_headers_callback);
     nghttp2_session_callbacks_set_on_header_callback(callbacks, on_header_callback);
-    nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, on_stream_close_callback);
     nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, on_data_chunk_recv_callback);
-    nghttp2_session_callbacks_set_send_data_callback(callbacks, snd_data_callback);
-    nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
     nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks, on_invalid_frame_recv_callback);
+
+
+    // These callbacks are called when you try to push out amqp data to http2
+    // More specifically, they are called from handle_outgoing_http()
+    nghttp2_session_callbacks_set_send_data_callback(callbacks, send_data_callback);
+    nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
+
+    // This is a general callback
     nghttp2_session_callbacks_set_error_callback2(callbacks, on_error_callback);
 
     adaptor->callbacks = callbacks;
@@ -2753,4 +2794,4 @@ static void qdr_http2_adaptor_init(qdr_core_t *core, void **adaptor_context)
 /**
  * Declare the adaptor so that it will self-register on process startup.
  */
-QDR_CORE_ADAPTOR_DECLARE("http-adaptor", qdr_http2_adaptor_init, qdr_http2_adaptor_final)
+QDR_CORE_ADAPTOR_DECLARE("http2-adaptor", qdr_http2_adaptor_init, qdr_http2_adaptor_final)
diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h
index 376e6d1..819c02d 100644
--- a/src/adaptors/http2/http2_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -80,6 +80,7 @@ struct qdr_http2_stream_data_t {
     qd_iterator_t            *curr_stream_data_iter; // points to the data contained in the stream_data/raw_buffers
     qd_message_stream_data_t *next_stream_data;
     qd_message_stream_data_t *footer_stream_data;
+    qd_iterator_t            *footer_stream_data_iter;
     DEQ_LINKS(qdr_http2_stream_data_t);
 
     qd_message_stream_data_result_t  curr_stream_data_result;
@@ -146,6 +147,7 @@ struct qdr_http2_connection_t {
     sys_atomic_t 			  raw_closed_write;
     bool                      q2_blocked;      // send a connection level WINDOW_UPDATE frame to tell the client to stop sending data.
     sys_atomic_t              q2_restart;      // signal to resume receive
+    sys_atomic_t              delay_buffer_write;   // if true, buffers will not be written to proton.
     DEQ_LINKS(qdr_http2_connection_t);
  };
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org