You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2021/09/07 13:20:31 UTC
[qpid-dispatch] branch main updated: DISPATCH-2232: Combine output
buffers and delay sending them so that the TCP frames are not fragmented.
This closes #1343.
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new 27cf112 DISPATCH-2232: Combine output buffers and delay sending them so that the TCP frames are not fragmented. This closes #1343.
27cf112 is described below
commit 27cf11216f397a8ac01590abd0e807bcc922b476
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Mon Aug 16 16:59:15 2021 -0400
DISPATCH-2232: Combine output buffers and delay sending them so that the TCP frames are not fragmented. This closes #1343.
---
src/adaptors/http2/http2_adaptor.c | 189 ++++++++++++++++++++++---------------
src/adaptors/http2/http2_adaptor.h | 2 +
2 files changed, 117 insertions(+), 74 deletions(-)
diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index b5752fa..e70c331 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -146,13 +146,13 @@ qd_http2_buffer_t *qd_http2_buffer(void)
return buf;
}
-void qd_http2_buffer_list_append(qd_http2_buffer_list_t *buflist, const uint8_t *data, size_t len)
+qd_http2_buffer_t *qd_http2_buffer_list_append(qd_http2_buffer_list_t *buflist, const uint8_t *data, size_t len)
{
//
// If len is zero, there's no work to do.
//
if (len == 0)
- return;
+ return DEQ_TAIL(*buflist);
//
// If the buffer list is empty and there's some data, add one empty buffer before we begin.
@@ -177,6 +177,8 @@ void qd_http2_buffer_list_append(qd_http2_buffer_list_t *buflist, const uint8_t
DEQ_INSERT_TAIL(*buflist, tail);
}
}
+
+ return DEQ_TAIL(*buflist);
}
@@ -315,7 +317,9 @@ static size_t write_buffers(qdr_http2_connection_t *conn)
int i = 0;
int total_bytes = 0;
- while (i < num_buffs && qd_http2_buff != 0) {
+
+ while (i < num_buffs) {
+ assert (qd_http2_buff != 0);
raw_buffers[i].bytes = (char *)qd_http2_buffer_base(qd_http2_buff);
size_t buffer_size = qd_http2_buffer_size(qd_http2_buff);
raw_buffers[i].capacity = buffer_size;
@@ -326,19 +330,12 @@ static size_t write_buffers(qdr_http2_connection_t *conn)
DEQ_REMOVE_HEAD(conn->buffs);
qd_http2_buff = DEQ_HEAD(conn->buffs);
i ++;
-
}
- if (i >0) {
- size_t num_buffers_written = pn_raw_connection_write_buffers(conn->pn_raw_conn, raw_buffers, num_buffs);
- qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Written %zu buffer(s) and %i bytes in write_buffers() using pn_raw_connection_write_buffers()", conn->conn_id, num_buffers_written, total_bytes);
- if (num_buffs != num_buffers_written) {
- //TODO - This is not good.
- }
- return num_buffers_written;
- }
-
- return 0;
+ size_t num_buffers_written = pn_raw_connection_write_buffers(conn->pn_raw_conn, raw_buffers, num_buffs);
+ qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Written %zu buffer(s) and %i bytes in write_buffers() using pn_raw_connection_write_buffers()", conn->conn_id, num_buffers_written, total_bytes);
+ assert(num_buffs == num_buffers_written);
+ return num_buffers_written;
}
@@ -379,6 +376,7 @@ static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool on
// If the httpConnector was deleted, a client request has nowhere to go because of lack of receiver and hence credit.
// No delivery was created. The message that was created for such a hanging request must be freed here..
if (!stream_data->in_dlv && stream_data->message) {
+ qd_message_clear_q2_unblocked_handler(stream_data->message);
qd_message_free(stream_data->message);
}
@@ -463,6 +461,7 @@ void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdo
sys_atomic_destroy(&http_conn->raw_closed_read);
sys_atomic_destroy(&http_conn->raw_closed_write);
sys_atomic_destroy(&http_conn->q2_restart);
+ sys_atomic_destroy(&http_conn->delay_buffer_write);
free_qdr_http2_connection_t(http_conn);
}
@@ -584,20 +583,7 @@ static int on_data_chunk_recv_callback(nghttp2_session *session,
}
-
-/**
- * Callback function invoked when NGHTTP2_DATA_FLAG_NO_COPY is used in nghttp2_data_source_read_callback to send complete DATA frame.
- */
-static int on_stream_close_callback(nghttp2_session *session,
- int32_t stream_id,
- nghttp2_error_code error_code,
- void *user_data)
-{
- return 0;
-}
-
-
-static int snd_data_callback(nghttp2_session *session,
+static int send_data_callback(nghttp2_session *session,
nghttp2_frame *frame,
const uint8_t *framehd,
size_t length,
@@ -608,47 +594,63 @@ static int snd_data_callback(nghttp2_session *session,
qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t *)source->ptr;
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback length=%zu", conn->conn_id, stream_data->stream_id, length);
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] send_data_callback length=%zu", conn->conn_id, stream_data->stream_id, length);
int bytes_sent = 0; // This should not include the header length of 9.
bool write_buffs = false;
if (length) {
- write_buffs = true;
- qd_http2_buffer_t *http2_buff = qd_http2_buffer();
- DEQ_INSERT_TAIL(conn->buffs, http2_buff);
- // Insert the framehd of length 9 bytes into the buffer
- memcpy(qd_http2_buffer_cursor(http2_buff), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH);
- qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH);
-
- uint32_t octets_remaining = qd_iterator_remaining(stream_data->curr_stream_data_iter);
-
- size_t len = MIN(octets_remaining, length);
- int copied = qd_iterator_ncopy(stream_data->curr_stream_data_iter, qd_http2_buffer_cursor(http2_buff), len);
- assert(copied == len);
- qd_http2_buffer_insert(http2_buff, len);
- octets_remaining -= copied;
- bytes_sent += copied;
- qd_iterator_trim_view(stream_data->curr_stream_data_iter, octets_remaining);
+ qd_http2_buffer_t *tail_buff = qd_http2_buffer_list_append(&(conn->buffs), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH);
+ size_t tail_buff_capacity = qd_http2_buffer_capacity(tail_buff);
+ if (tail_buff_capacity == 0) {
+ tail_buff = qd_http2_buffer();
+ DEQ_INSERT_TAIL(conn->buffs, tail_buff);
+ tail_buff_capacity = qd_http2_buffer_capacity(tail_buff);
+ }
+ size_t bytes_to_write = length;
+ while (bytes_to_write > 0) {
+ uint32_t octets_remaining = qd_iterator_remaining(stream_data->curr_stream_data_iter);
+ size_t len = MIN(tail_buff_capacity, bytes_to_write);
+ len = MIN(len, octets_remaining);
+ int copied = qd_iterator_ncopy(stream_data->curr_stream_data_iter, qd_http2_buffer_cursor(tail_buff), len);
+ assert(copied == len);
+ qd_http2_buffer_insert(tail_buff, len);
+ octets_remaining -= copied;
+ bytes_sent += copied;
+ qd_iterator_trim_view(stream_data->curr_stream_data_iter, octets_remaining);
+ bytes_to_write -= len;
+ if (bytes_to_write > 0 && qd_http2_buffer_capacity(tail_buff) == 0) {
+ tail_buff = qd_http2_buffer();
+ DEQ_INSERT_TAIL(conn->buffs, tail_buff);
+ tail_buff_capacity = qd_http2_buffer_capacity(tail_buff);
+ }
+ }
}
else if (length == 0 && stream_data->out_msg_data_flag_eof) {
- write_buffs = true;
qd_http2_buffer_t *http2_buff = qd_http2_buffer();
DEQ_INSERT_TAIL(conn->buffs, http2_buff);
- // Insert the framehd of length 9 bytes into the buffer
memcpy(qd_http2_buffer_cursor(http2_buff), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH);
qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH);
}
+ //
+ // If the message has a footer, don't flush the buffers now. Flush them after you write out the footer.
+ //
+ if (!stream_data->out_msg_has_footer) {
+ write_buffs = true;
+ }
+
if (stream_data->full_payload_handled) {
- if (!stream_data->out_msg_has_footer && stream_data->curr_stream_data) {
- qd_message_stream_data_release(stream_data->curr_stream_data);
- qd_iterator_free(stream_data->curr_stream_data_iter);
- stream_data->curr_stream_data = 0;
+ if (stream_data->curr_stream_data) {
+ if (stream_data->curr_stream_data_result == QD_MESSAGE_STREAM_DATA_FOOTER_OK) {
+ stream_data->footer_stream_data = stream_data->curr_stream_data;
+ stream_data->footer_stream_data_iter = stream_data->curr_stream_data_iter;
+ }
+ else {
+ qd_message_stream_data_release(stream_data->curr_stream_data);
+ qd_iterator_free(stream_data->curr_stream_data_iter);
+ }
stream_data->curr_stream_data_iter = 0;
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, no footer, qd_message_stream_data_release", conn->conn_id, stream_data->stream_id);
- }
- else {
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, out_msg_has_footer", conn->conn_id, stream_data->stream_id);
+ stream_data->curr_stream_data = 0;
}
stream_data->payload_handled = 0;
}
@@ -656,7 +658,7 @@ static int snd_data_callback(nghttp2_session *session,
stream_data->payload_handled += bytes_sent;
}
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 snd_data_callback finished, length=%zu, bytes_sent=%i, stream_data=%p", conn->conn_id, stream_data->stream_id, length, bytes_sent, (void *)stream_data);
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 send_data_callback finished, length=%zu, bytes_sent=%i, stream_data=%p", conn->conn_id, stream_data->stream_id, length, bytes_sent, (void *)stream_data);
if (length) {
assert(bytes_sent == length);
@@ -678,7 +680,10 @@ static ssize_t send_callback(nghttp2_session *session,
qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
qd_http2_buffer_list_append(&(conn->buffs), (uint8_t *)data, length);
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] HTTP2 send_callback data length %zu", conn->conn_id, length);
- write_buffers(conn);
+ if (! IS_ATOMIC_FLAG_SET(&conn->delay_buffer_write)) {
+ write_buffers(conn);
+ }
+
return (ssize_t)length;
}
@@ -1231,7 +1236,6 @@ ssize_t read_data_callback(nghttp2_session *session,
// The payload length is zero on this body data. Look ahead one body data to see if it is QD_MESSAGE_STREAM_DATA_NO_MORE
stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
-
if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) {
if (!stream_data->out_msg_has_footer) {
qd_message_stream_data_release(stream_data->curr_stream_data);
@@ -1295,6 +1299,7 @@ ssize_t read_data_callback(nghttp2_session *session,
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_NO_MORE", conn->conn_id, stream_data->stream_id);
}
else if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_FOOTER_OK) {
+ stream_data->out_msg_has_footer = true;
stream_data->out_msg_body_sent = true;
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data, QD_MESSAGE_STREAM_DATA_FOOTER_OK", conn->conn_id, stream_data->stream_id);
}
@@ -1316,8 +1321,11 @@ ssize_t read_data_callback(nghttp2_session *session,
case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_FOOTER_OK", conn->conn_id, stream_data->stream_id);
- stream_data->out_msg_has_footer = true;
- stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
+ if (!stream_data->out_msg_has_footer) {
+ stream_data->out_msg_has_footer = true;
+ stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
+ }
+
if (stream_data->next_stream_data) {
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_FOOTER_OK, we have a next_stream_data", conn->conn_id, stream_data->stream_id);
}
@@ -1402,7 +1410,6 @@ ssize_t read_data_callback(nghttp2_session *session,
return NGHTTP2_ERR_DEFERRED;
}
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] read_data_callback Returning zero", conn->conn_id, stream_data->stream_id);
return 0;
}
@@ -1421,6 +1428,7 @@ qdr_http2_connection_t *qdr_http_connection_ingress(qd_http_listener_t* listener
sys_atomic_init(&ingress_http_conn->raw_closed_read, 0);
sys_atomic_init(&ingress_http_conn->raw_closed_write, 0);
sys_atomic_init(&ingress_http_conn->q2_restart, 0);
+ sys_atomic_init(&ingress_http_conn->delay_buffer_write, 0);
DEQ_INIT(ingress_http_conn->buffs);
DEQ_INIT(ingress_http_conn->streams);
DEQ_INIT(ingress_http_conn->granted_read_buffs);
@@ -1723,6 +1731,9 @@ static void http_connector_establish(qdr_http2_connection_t *conn)
}
+/**
+ * Converts the AMQP message into a HTTP request or response
+ */
uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
{
qdr_http2_connection_t *conn = stream_data->conn;
@@ -1791,7 +1802,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
size_t payload_length = qd_message_stream_data_payload_length(stream_data->curr_stream_data);
if (payload_length == 0) {
- stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
+ stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data);
if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) {
if (stream_data->next_stream_data) {
@@ -1812,6 +1823,12 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
}
}
+ // There is a body for this message, set the delay_buffer_write so the buffers are not immediately
+ // pushed out on header submission.
+ if (stream_data->out_msg_has_body) {
+ SET_ATOMIC_FLAG(&conn->delay_buffer_write);
+ }
+
stream_data->stream_id = nghttp2_submit_headers(conn->session,
flags,
stream_id,
@@ -1849,7 +1866,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
if (stream_data->out_msg_has_body) {
if (stream_data->out_msg_header_sent) {
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Stream was paused, resuming now", conn->conn_id, stream_data->stream_id);
+ // This is usually called if there are many AMQP data streams objects in a delivery. These data streams were created on the inbound AMQP side using the qdr_delivery_continue() function.
nghttp2_session_resume_data(conn->session, stream_data->stream_id);
nghttp2_session_send(conn->session);
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] nghttp2_session_send - write_buffers done for resumed stream", conn->conn_id, stream_data->stream_id);
@@ -1864,10 +1881,10 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] Error submitting data rv=%i", conn->conn_id, stream_data->stream_id, rv);
}
else {
- if (conn->session) {
- nghttp2_session_send(conn->session);
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] nghttp2_session_send - done", conn->conn_id, stream_data->stream_id);
- }
+ if (conn->session) {
+ nghttp2_session_send(conn->session);
+ qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] nghttp2_session_send - done", conn->conn_id, stream_data->stream_id);
+ }
}
}
}
@@ -1876,11 +1893,12 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
}
stream_data->out_msg_header_sent = true;
+ CLEAR_ATOMIC_FLAG(&conn->delay_buffer_write);
+
if (stream_data->out_msg_has_footer) {
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Message has a footer", conn->conn_id, stream_data->stream_id);
bool send_footer = false;
- if (stream_data->out_msg_has_body) {
- qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] stream_data->out_msg_has_body", conn->conn_id, stream_data->stream_id);
+ if (stream_data->out_msg_has_body && !stream_data->out_msg_body_sent) {
if (stream_data->out_msg_body_sent) {
send_footer = true;
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] send_footer = true", conn->conn_id, stream_data->stream_id);
@@ -1899,7 +1917,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
if (send_footer) {
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Starting to send footer", conn->conn_id, stream_data->stream_id);
// Send the properties in the footer as a HEADERS frame.
- qd_iterator_t *footer_properties_iter = qd_message_stream_data_iterator(stream_data->curr_stream_data);
+ qd_iterator_t *footer_properties_iter = stream_data->footer_stream_data_iter;
qd_parsed_field_t *footer_properties_fld = qd_parse(footer_properties_iter);
uint32_t count = qd_parse_sub_count(footer_properties_fld);
@@ -1934,8 +1952,12 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
nghttp2_session_send(conn->session);
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Headers(from footer) submitted", conn->conn_id, stream_data->stream_id);
+
qd_iterator_free(footer_properties_iter);
qd_parse_free(footer_properties_fld);
+ if (stream_data->footer_stream_data) {
+ qd_message_stream_data_release(stream_data->footer_stream_data);
+ }
if (stream_data->curr_stream_data) {
qd_message_stream_data_release(stream_data->curr_stream_data);
qd_iterator_free(stream_data->curr_stream_data_iter);
@@ -2061,6 +2083,11 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
}
+/**
+ * Takes read buffers from proton raw connection and feeds the binary http2 frame data
+ * to nghttp2 via the nghttp2_session_mem_recv() function.
+ * All pertinent nghttp2 callbacks are called before the call to nghttp2_session_mem_recv() completes.
+ */
static int handle_incoming_http(qdr_http2_connection_t *conn)
{
//
@@ -2392,6 +2419,7 @@ qdr_http2_connection_t *qdr_http_connection_egress(qd_http_connector_t *connecto
DEQ_INIT(egress_http_conn->granted_read_buffs);
sys_atomic_init(&egress_http_conn->raw_closed_read, 0);
sys_atomic_init(&egress_http_conn->raw_closed_write, 0);
+ sys_atomic_init(&egress_http_conn->delay_buffer_write, 0);
sys_atomic_init(&egress_http_conn->q2_restart, 0);
sys_mutex_lock(http2_adaptor->lock);
@@ -2654,6 +2682,9 @@ qd_http_connector_t *qd_http2_configure_connector(qd_dispatch_t *qd, const qd_ht
return c;
}
+/**
+ * Called just before shutdown of the router. Frees listeners and connectors and any http2 buffers
+ */
static void qdr_http2_adaptor_final(void *adaptor_context)
{
qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "Shutting down HTTP2 Protocol adaptor");
@@ -2736,14 +2767,24 @@ static void qdr_http2_adaptor_init(qdr_core_t *core, void **adaptor_context)
//
nghttp2_session_callbacks *callbacks;
nghttp2_session_callbacks_new(&callbacks);
+
+ //
+ // These callbacks are called when we feed the incoming binary http2 data from the client or the server to nghttp2_session_mem_recv()
+ // in handle_incoming_http
+ //
nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, on_frame_recv_callback);
nghttp2_session_callbacks_set_on_begin_headers_callback(callbacks, on_begin_headers_callback);
nghttp2_session_callbacks_set_on_header_callback(callbacks, on_header_callback);
- nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, on_stream_close_callback);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, on_data_chunk_recv_callback);
- nghttp2_session_callbacks_set_send_data_callback(callbacks, snd_data_callback);
- nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks, on_invalid_frame_recv_callback);
+
+
+ // These callbacks are called when you try to push out amqp data to http2
+ // More specifically, they are called from handle_outgoing_http()
+ nghttp2_session_callbacks_set_send_data_callback(callbacks, send_data_callback);
+ nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
+
+ // This is a general callback
nghttp2_session_callbacks_set_error_callback2(callbacks, on_error_callback);
adaptor->callbacks = callbacks;
@@ -2753,4 +2794,4 @@ static void qdr_http2_adaptor_init(qdr_core_t *core, void **adaptor_context)
/**
* Declare the adaptor so that it will self-register on process startup.
*/
-QDR_CORE_ADAPTOR_DECLARE("http-adaptor", qdr_http2_adaptor_init, qdr_http2_adaptor_final)
+QDR_CORE_ADAPTOR_DECLARE("http2-adaptor", qdr_http2_adaptor_init, qdr_http2_adaptor_final)
diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h
index 376e6d1..819c02d 100644
--- a/src/adaptors/http2/http2_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -80,6 +80,7 @@ struct qdr_http2_stream_data_t {
qd_iterator_t *curr_stream_data_iter; // points to the data contained in the stream_data/raw_buffers
qd_message_stream_data_t *next_stream_data;
qd_message_stream_data_t *footer_stream_data;
+ qd_iterator_t *footer_stream_data_iter;
DEQ_LINKS(qdr_http2_stream_data_t);
qd_message_stream_data_result_t curr_stream_data_result;
@@ -146,6 +147,7 @@ struct qdr_http2_connection_t {
sys_atomic_t raw_closed_write;
bool q2_blocked; // send a connection level WINDOW_UPDATE frame to tell the client to stop sending data.
sys_atomic_t q2_restart; // signal to resume receive
+ sys_atomic_t delay_buffer_write; // if true, buffers will not be written to proton.
DEQ_LINKS(qdr_http2_connection_t);
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org