You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/09/15 19:06:34 UTC
[qpid-dispatch] branch dev-protocol-adaptors updated:
DISPATCH-1743: Copy the data only once in the snd_data_callback. Do not
copy the data in the read_callback. Introduced qd_http2_buffers
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
new f01d63a DISPATCH-1743: Copy the data only once in the snd_data_callback. Do not copy the data in the read_callback. Introduced qd_http2_buffers
f01d63a is described below
commit f01d63a1f995b2e2c908ca086f70de22917f53e8
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Tue Sep 15 12:27:43 2020 -0400
DISPATCH-1743: Copy the data only once in the snd_data_callback. Do not copy the data in the read_callback. Introduced qd_http2_buffers
---
src/adaptors/http2/http2_adaptor.c | 302 ++++++++++++++++++++++---------------
src/adaptors/http2/http2_adaptor.h | 76 +++++++++-
2 files changed, 251 insertions(+), 127 deletions(-)
diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index f1ac33a..b071604 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -48,6 +48,7 @@ const char *CONTENT_ENCODING = "content-encoding";
ALLOC_DEFINE(qdr_http2_session_data_t);
ALLOC_DEFINE(qdr_http2_stream_data_t);
ALLOC_DEFINE(qdr_http2_connection_t);
+ALLOC_DEFINE(qd_http2_buffer_t);
typedef struct qdr_http_adaptor_t {
qdr_core_t *core;
@@ -64,6 +65,49 @@ static qdr_http_adaptor_t *http_adaptor;
static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void *context);
+
+qd_http2_buffer_t *qd_http2_buffer(void)
+{
+ qd_http2_buffer_t *buf = new_qd_http2_buffer_t();
+ ZERO(buf);
+ DEQ_ITEM_INIT(buf);
+ buf->size = 0;
+ return buf;
+}
+
+void qd_http2_buffer_list_append(qd_http2_buffer_list_t *buflist, const uint8_t *data, size_t len)
+{
+ //
+ // If len is zero, there's no work to do.
+ //
+ if (len == 0)
+ return;
+
+ //
+ // If the buffer list is empty and there's some data, add one empty buffer before we begin.
+ //
+ if (DEQ_SIZE(*buflist) == 0) {
+ qd_http2_buffer_t *buf = qd_http2_buffer();
+ DEQ_INSERT_TAIL(*buflist, buf);
+ }
+
+ qd_http2_buffer_t *tail = DEQ_TAIL(*buflist);
+
+ while (len > 0) {
+ size_t to_copy = MIN(len, qd_http2_buffer_capacity(tail));
+ if (to_copy > 0) {
+ memcpy(qd_http2_buffer_cursor(tail), data, to_copy);
+ qd_http2_buffer_insert(tail, to_copy);
+ data += to_copy;
+ len -= to_copy;
+ }
+ if (len > 0) {
+ tail = qd_http2_buffer();
+ DEQ_INSERT_TAIL(*buflist, tail);
+ }
+ }
+}
+
/**
* HTTP :path is mapped to the AMQP 'to' field.
*/
@@ -149,8 +193,8 @@ static size_t write_buffers(qdr_http2_connection_t *conn)
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] write_buffers pn_raw_connection_write_buffers_capacity=%zu", conn->conn_id, pn_buffs_to_write);
- size_t qd_buffs_to_write = DEQ_SIZE(session_data->buffs);
- size_t num_buffs = qd_buffs_to_write > pn_buffs_to_write ? pn_buffs_to_write : qd_buffs_to_write;
+ size_t qd_raw_buffs_to_write = DEQ_SIZE(session_data->buffs);
+ size_t num_buffs = qd_raw_buffs_to_write > pn_buffs_to_write ? pn_buffs_to_write : qd_raw_buffs_to_write;
if (num_buffs == 0) {
//
@@ -161,20 +205,20 @@ static size_t write_buffers(qdr_http2_connection_t *conn)
}
pn_raw_buffer_t raw_buffers[num_buffs];
- qd_buffer_t *qd_buff = DEQ_HEAD(session_data->buffs);
+ qd_http2_buffer_t *qd_http2_buff = DEQ_HEAD(session_data->buffs);
int i = 0;
int total_bytes = 0;
- while (i < num_buffs && qd_buff != 0) {
- raw_buffers[i].bytes = (char *)qd_buffer_base(qd_buff);
- size_t buffer_size = qd_buffer_size(qd_buff);
+ while (i < num_buffs && qd_http2_buff != 0) {
+ raw_buffers[i].bytes = (char *)qd_http2_buffer_base(qd_http2_buff);
+ size_t buffer_size = qd_http2_buffer_size(qd_http2_buff);
raw_buffers[i].capacity = buffer_size;
raw_buffers[i].size = buffer_size;
total_bytes += buffer_size;
raw_buffers[i].offset = 0;
- raw_buffers[i].context = (uintptr_t) qd_buff;
+ raw_buffers[i].context = (uintptr_t) qd_http2_buff;
DEQ_REMOVE_HEAD(session_data->buffs);
- qd_buff = DEQ_HEAD(session_data->buffs);
+ qd_http2_buff = DEQ_HEAD(session_data->buffs);
i ++;
}
@@ -312,6 +356,11 @@ static int on_data_chunk_recv_callback(nghttp2_session *session,
return 0;
}
+
+
+/**
+ * Callback function invoked when NGHTTP2_DATA_FLAG_NO_COPY is used in nghttp2_data_source_read_callback to send complete DATA frame.
+ */
static int on_stream_close_callback(nghttp2_session *session,
int32_t stream_id,
nghttp2_error_code error_code,
@@ -324,8 +373,62 @@ static int on_stream_close_callback(nghttp2_session *session,
return 0;
}
-/* nghttp2_send_callback. The data pointer passed into this function contains encoded HTTP data. Here we transmit the |data|, |length| bytes,
- to the network. */
+
+
+static int snd_data_callback(nghttp2_session *session,
+ nghttp2_frame *frame,
+ const uint8_t *framehd,
+ size_t length,
+ nghttp2_data_source *source,
+ void *user_data) {
+ // The frame is a DATA frame to send. The framehd is the serialized frame header (9 bytes).
+ // The length is the length of application data to send (this does not include padding)
+ qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
+ qdr_http2_session_data_t *session_data = conn->session_data;
+ qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t *)source->ptr;
+
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 snd_data_callback length=%zu", conn->conn_id, stream_data->stream_id, length);
+
+ qd_http2_buffer_t *http2_buff = qd_http2_buffer();
+ DEQ_INSERT_TAIL(session_data->buffs, http2_buff);
+ // Insert the framehd of length 9 bytes into the buffer
+ memcpy(qd_http2_buffer_cursor(http2_buff), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH);
+ qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH);
+
+ int bytes_sent = 0; // This should not include the header length of 9.
+ if (length) {
+ pn_raw_buffer_t pn_raw_buffs[stream_data->qd_buffers_to_send];
+ qd_message_body_data_buffers(stream_data->curr_body_data, pn_raw_buffs, 0, stream_data->qd_buffers_to_send);
+
+ int idx = 0;
+ while (idx < stream_data->qd_buffers_to_send) {
+ if (pn_raw_buffs[idx].size > 0) {
+ memcpy(qd_http2_buffer_cursor(http2_buff), pn_raw_buffs[idx].bytes, pn_raw_buffs[idx].size);
+ qd_http2_buffer_insert(http2_buff, pn_raw_buffs[idx].size);
+ bytes_sent += pn_raw_buffs[idx].size;
+ stream_data->curr_body_data_qd_buff_offset += 1;
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] snd_data_callback memcpy pn_raw_buffs[%i].size=%zu", conn->conn_id, stream_data->stream_id, idx, pn_raw_buffs[idx].size);
+ }
+ idx += 1;
+ }
+ }
+ if (stream_data->full_payload_handled) {
+ qd_message_body_data_release(stream_data->curr_body_data);
+ stream_data->curr_body_data = 0;
+ stream_data->curr_body_data_qd_buff_offset = 0;
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] snd_data_callback qd_message_body_data_release", conn->conn_id, stream_data->stream_id);
+ stream_data->full_payload_handled = false;
+ }
+
+ if (length)
+ assert(bytes_sent == length);
+
+ write_buffers(conn);
+
+ return 0;
+
+}
+
static ssize_t send_callback(nghttp2_session *session,
const uint8_t *data,
size_t length,
@@ -333,7 +436,7 @@ static ssize_t send_callback(nghttp2_session *session,
void *user_data) {
qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
qdr_http2_session_data_t *session_data = conn->session_data;
- qd_buffer_list_append(&(session_data->buffs), (uint8_t *)data, length);
+ qd_http2_buffer_list_append(&(session_data->buffs), (uint8_t *)data, length);
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] HTTP2 send_callback data length %zu", conn->conn_id, length);
write_buffers(conn);
return (ssize_t)length;
@@ -481,7 +584,6 @@ static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_co
bool delivery_routed = false;
if (conn->ingress) {
- // TODO - Remove the redunant check stream_data->entire_header_arrived && !stream_data->in_dlv
if (stream_data->reply_to && stream_data->entire_header_arrived && !stream_data->in_dlv) {
header_and_props = qd_message_compose_amqp(stream_data->message,
conn->config->address, // const char *to
@@ -632,13 +734,15 @@ ssize_t read_callback(nghttp2_session *session,
void *user_data)
{
qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
- qdr_http2_session_data_t *session_data = conn->session_data;
- qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
+ qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t *)source->ptr;
qd_message_t *message = qdr_delivery_message(stream_data->out_dlv);
qd_message_depth_status_t status = qd_message_check_depth(message, QD_DEPTH_BODY);
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback with length=%zu", conn->conn_id, stream_data->stream_id, length);
+ // This flag tells nghttp2 that the data is not being copied into its buffer (uint8_t *buf).
+ *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
+
switch (status) {
case QD_MESSAGE_DEPTH_OK: {
@@ -650,17 +754,12 @@ ssize_t read_callback(nghttp2_session *session,
qd_message_body_data_t *body_data = 0;
qd_message_body_data_result_t body_data_result;
- //
- // Process as many body-data segments as are available.
- //
- int buff_offset = 0;
body_data = stream_data->curr_body_data;
if (body_data) {
//
// If we saved the body_data, use the buff_offset.
//
body_data_result = stream_data->curr_body_data_result;
- buff_offset = stream_data->curr_body_data_buff_offset;
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback Use existing body_data", conn->conn_id, stream_data->stream_id);
}
else {
@@ -670,7 +769,7 @@ ssize_t read_callback(nghttp2_session *session,
}
stream_data->curr_body_data = body_data;
stream_data->curr_body_data_result = body_data_result;
- stream_data->curr_body_data_buff_offset = 0;
+ stream_data->curr_body_data_qd_buff_offset = 0;
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback qd_message_next_body_data", conn->conn_id, stream_data->stream_id);
}
@@ -679,100 +778,56 @@ ssize_t read_callback(nghttp2_session *session,
//
// We have a new valid body-data segment. Handle it
//
- stream_data->body_data_buff_count = qd_message_body_data_buffer_count(body_data);
-
- //TODO - Dont do this here.
- size_t pn_buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
+ size_t pn_buffs_write_capacity = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
- qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback pn_raw_connection_write_buffers_capacity=%zu", conn->conn_id, stream_data->stream_id, pn_buffs_to_write);
+ // total length of the payload (across all qd_buffers in the current body data)
+ size_t payload_length = qd_message_body_data_payload_length(body_data);
- if (stream_data->body_data_buff_count == 0 || pn_buffs_to_write == 0) {
- // We cannot send anything, we need to come back here.
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback pn_raw_connection_write_buffers_capacity=%zu", conn->conn_id, stream_data->stream_id, pn_buffs_write_capacity);
- if (stream_data->body_data_buff_count == 0) {
- qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Exiting read_callback QD_MESSAGE_BODY_DATA_OK, body_data_buff_count=0, temporarily pausing stream", conn->conn_id, stream_data->stream_id);
+ if (pn_buffs_write_capacity == 0 || payload_length == 0) {
+ if (pn_buffs_write_capacity == 0)
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Exiting read_callback, pn_buffs_write_capacity=0, pausing stream", conn->conn_id, stream_data->stream_id);
+ if (payload_length == 0) {
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Exiting read_callback, payload_length=0, pausing stream", conn->conn_id, stream_data->stream_id);
qd_message_body_data_release(stream_data->curr_body_data);
stream_data->curr_body_data = 0;
}
- if (pn_buffs_to_write == 0)
- qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Exiting read_callback, pn_buffs_to_write=0, pausing stream", conn->conn_id, stream_data->stream_id);
//
- // We don't have any buffers to send but we may or may not get more buffers.
- // Temporarily pause this stream
+ // Temporarily pause this stream.
+ // We will unpause this stream later when more data arrives or when proton returns after writing buffers.
//
-
stream_data->disposition = 0;
return NGHTTP2_ERR_DEFERRED;
}
- qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_OK, body_data_buff_count=%i", conn->conn_id, stream_data->stream_id, stream_data->body_data_buff_count);
+ stream_data->body_data_buff_count = qd_message_body_data_buffer_count(body_data);
- int bytes_read = 0;
- //
- // We are looking to write only a maximum of one pn_raw_buffer_t per call of read_callback.
- //
- if (stream_data->raw_buffer.offset != 0) {
- int remaining = stream_data->raw_buffer.size - stream_data->raw_buffer.offset;
- if (length < remaining) {
- // Account for the case where the current buffer might not be fully consumed.
- qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback, (length < remaining) memcpy size=%zu", conn->conn_id, stream_data->stream_id, length);
- memcpy(buf, stream_data->raw_buffer.bytes + stream_data->raw_buffer.offset, length);
- stream_data->raw_buffer.offset += length;
- bytes_read = length;
- }
- else {
- // Current buffer fully consumed.
- qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback, (length >= remaining) remaining=%zu", conn->conn_id, stream_data->stream_id, remaining);
- memcpy(buf, stream_data->raw_buffer.bytes + stream_data->raw_buffer.offset, remaining);
- bytes_read = remaining;
-
- stream_data->raw_buffer.context = 0;
- stream_data->raw_buffer.bytes = 0;
- stream_data->raw_buffer.capacity = 0;
- stream_data->raw_buffer.size = 0;
- stream_data->raw_buffer.offset = 0;
-
- stream_data->curr_body_data_buff_offset += 1;
- stream_data->body_data_buff_count -= 1;
- }
- }
- else {
- // TODO - Make multiple calls to qd_message_body_data_buffers and gather all the data depending on length.
- qd_message_body_data_buffers(body_data, &stream_data->raw_buffer, buff_offset, 1);
- int remaining = stream_data->raw_buffer.size - stream_data->raw_buffer.offset;
-
- if (length < remaining) {
- qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback, qd_message_body_data_buffers (length < remaining) memcpy size=%zu", conn->conn_id, stream_data->stream_id, length);
- memcpy(buf, stream_data->raw_buffer.bytes, length);
- stream_data->raw_buffer.offset += length;
- bytes_read = length;
+ assert (payload_length >= 0);
+ size_t bytes_to_send = 0;
+ if (payload_length) {
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_OK, body_data_buff_count=%i", conn->conn_id, stream_data->stream_id, stream_data->body_data_buff_count);
+
+ // Add the HTTP2 DATA frame header to the payload length to get the complete size of the payload.
+ size_t remaining_payload_length = payload_length - (stream_data->curr_body_data_qd_buff_offset * BUFFER_SIZE);
+
+ if (QD_HTTP2_BUFFER_SIZE >= remaining_payload_length) {
+ bytes_to_send = remaining_payload_length;
+ stream_data->qd_buffers_to_send = stream_data->body_data_buff_count;
+ stream_data->full_payload_handled = true;
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_HTTP2_BUFFER_SIZE >= remaining_payload_length bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send, stream_data->qd_buffers_to_send);
}
else {
- stream_data->curr_body_data_buff_offset += 1;
- qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback, qd_message_body_data_buffers (length >= remaining) size of raw_buffer=%zu", conn->conn_id, stream_data->stream_id, stream_data->raw_buffer.size);
- memcpy(buf, stream_data->raw_buffer.bytes, stream_data->raw_buffer.size);
- bytes_read = stream_data->raw_buffer.size;
- stream_data->body_data_buff_count -= 1;
+ // This means that there is more that 16k worth of payload in one body data.
+ // We want to send only 16k data per read_callback
+ bytes_to_send = QD_HTTP2_BUFFER_SIZE;
+ stream_data->full_payload_handled = false;
+ stream_data->qd_buffers_to_send = NUM_QD_BUFFERS_IN_ONE_HTTP2_BUFFER;
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_HTTP2_BUFFER_SIZE >= remaining_payload_length ELSE bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send, stream_data->qd_buffers_to_send);
}
}
-
- if (!stream_data->body_data_buff_count) {
- qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Releasing qd_message_body_data", conn->conn_id, stream_data->stream_id);
- qd_message_body_data_release(stream_data->curr_body_data);
- stream_data->curr_body_data = 0;
-
- stream_data->raw_buffer.context = 0;
- stream_data->raw_buffer.bytes = 0;
- stream_data->raw_buffer.capacity = 0;
- stream_data->raw_buffer.size = 0;
- stream_data->raw_buffer.offset = 0;
-
- stream_data->curr_body_data = 0;
- }
-
-
- return bytes_read;
+ return bytes_to_send;
}
case QD_MESSAGE_BODY_DATA_INCOMPLETE:
@@ -780,21 +835,21 @@ ssize_t read_callback(nghttp2_session *session,
// A new segment has not completely arrived yet. Check again later.
//
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id);
- return 0;
+ break;
case QD_MESSAGE_BODY_DATA_NO_MORE: {
//
// We have already handled the last body-data segment for this delivery.
// Complete the "sending" of this delivery and replenish credit.
//
- size_t pn_buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
- if (pn_buffs_to_write == 0) {
+ size_t pn_buffs_write_capacity = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
+ if (pn_buffs_write_capacity == 0) {
return NGHTTP2_ERR_DEFERRED;
stream_data->disposition = 0;
- qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_NO_MORE - pn_buffs_to_write=0 send is not complete", conn->conn_id, stream_data->stream_id);
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_NO_MORE - pn_buffs_write_capacity=0 send is not complete", conn->conn_id, stream_data->stream_id);
}
else {
- qd_message_body_data_release(stream_data->curr_body_data);
+ stream_data->qd_buffers_to_send = 0;
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
qd_message_set_send_complete(message);
// TODO - Dont do the disposition here.
@@ -834,6 +889,7 @@ ssize_t read_callback(nghttp2_session *session,
break;
case QD_MESSAGE_DEPTH_INCOMPLETE:
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_DEPTH_INCOMPLETE", conn->conn_id, stream_data->stream_id);
break;
}
@@ -870,16 +926,14 @@ qdr_http2_connection_t *qdr_http_connection_ingress(qd_http_lsnr_t* listener)
static void grant_read_buffers(qdr_http2_connection_t *conn)
{
pn_raw_buffer_t raw_buffers[READ_BUFFERS];
- // Give proactor more read buffers for the pn_raw_conn
- // TODO - Look into using bigger buffers here.
if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) {
size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn);
while (desired) {
size_t i;
for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
- qd_buffer_t *buf = qd_buffer();
- raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
- raw_buffers[i].capacity = qd_buffer_capacity(buf);
+ qd_http2_buffer_t *buf = qd_http2_buffer();
+ raw_buffers[i].bytes = (char*) qd_http2_buffer_base(buf);
+ raw_buffers[i].capacity = qd_http2_buffer_capacity(buf);
raw_buffers[i].size = 0;
raw_buffers[i].offset = 0;
raw_buffers[i].context = (uintptr_t) buf;
@@ -965,7 +1019,7 @@ static void qdr_http_second_attach(void *context, qdr_link_t *link,
else {
qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to available but delivery not routed", stream_data->session_data->conn->conn_id);
}
- grant_read_buffers(stream_data->session_data->conn);
+ //grant_read_buffers(stream_data->session_data->conn);
}
qdr_link_flow(http_adaptor->core, link, 10, false);
}
@@ -981,6 +1035,7 @@ static void qdr_http_activate(void *notused, qdr_connection_t *c)
qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Activation triggered, calling pn_raw_connection_wake()", conn->conn_id);
pn_raw_connection_wake(conn->pn_raw_conn);
}
+ //TODO is conn->timer_scheduled required ?
else if (conn->activate_timer && !conn->timer_scheduled) {
conn->timer_scheduled = true;
// On egress, the raw connection is only created once the
@@ -1085,6 +1140,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
}
else {
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Processing message body", conn->conn_id, stream_data->stream_id);
+ conn->data_prd.source.ptr = stream_data;
int rv = nghttp2_submit_data(session_data->session, NGHTTP2_FLAG_END_STREAM, stream_data->stream_id, &conn->data_prd);
if (rv != 0) {
qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] Error submitting data rv=%i", conn->conn_id, stream_data->stream_id, rv);
@@ -1177,7 +1233,7 @@ void qd_http2_delete_connector(qd_dispatch_t *qd, qd_http_connector_t *connector
static int handle_incoming_http(qdr_http2_connection_t *conn)
{
- qd_buffer_list_t buffers;
+ qd_http2_buffer_list_t buffers;
DEQ_INIT(buffers);
pn_raw_buffer_t raw_buffers[READ_BUFFERS];
size_t n;
@@ -1189,11 +1245,11 @@ static int handle_incoming_http(qdr_http2_connection_t *conn)
while ( (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) {
for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) {
- qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
+ qd_http2_buffer_t *buf = (qd_http2_buffer_t*) raw_buffers[i].context;
uint32_t raw_buff_size = raw_buffers[i].size;
- qd_buffer_insert(buf, raw_buff_size);
- qd_log(http_adaptor->log_source, QD_LOG_DEBUG, "[C%i] - handle_incoming_http - Inserting qd_buffer of size %"PRIu32" ", conn->conn_id, raw_buff_size);
- count += raw_buffers[i].size;
+ qd_http2_buffer_insert(buf, raw_buff_size);
+ qd_log(http_adaptor->log_source, QD_LOG_DEBUG, "[C%i] - handle_incoming_http - Inserting qd_http2_buffer of size %"PRIu32" ", conn->conn_id, raw_buff_size);
+ count += raw_buff_size;
DEQ_INSERT_TAIL(buffers, buf);
}
}
@@ -1201,14 +1257,14 @@ static int handle_incoming_http(qdr_http2_connection_t *conn)
//
// Read each buffer in the buffer chain and call nghttp2_session_mem_recv with buffer content
//
- qd_buffer_t *buf = DEQ_HEAD(buffers);
- qd_buffer_t *curr_buf = 0;
+ qd_http2_buffer_t *buf = DEQ_HEAD(buffers);
+ qd_http2_buffer_t *curr_buf = 0;
//TODO - Look into reusing the buffers between here and grant_read_buffers
int rv = 0;
while (buf) {
// TODO - Check the return value of nghttp2_session_mem_recv.
- rv = nghttp2_session_mem_recv(conn->session_data->session, qd_buffer_base(buf), qd_buffer_size(buf));
+ rv = nghttp2_session_mem_recv(conn->session_data->session, qd_http2_buffer_base(buf), qd_http2_buffer_size(buf));
if (rv < 0) {
if (rv == NGHTTP2_ERR_FLOODED || rv == NGHTTP2_ERR_BAD_CLIENT_MAGIC) {
// Flooding was detected in this HTTP/2 session, and it must be closed. This is most likely caused by misbehavior of peer.
@@ -1226,7 +1282,7 @@ static int handle_incoming_http(qdr_http2_connection_t *conn)
curr_buf = buf;
DEQ_REMOVE_HEAD(buffers);
buf = DEQ_HEAD(buffers);
- qd_buffer_free(curr_buf);
+ free_qd_http2_buffer_t(curr_buf);
}
if (rv > 0)
@@ -1321,7 +1377,7 @@ static void handle_disconnected(qdr_http2_connection_t* conn)
if (conn->pn_raw_conn) {
pn_raw_connection_set_context(conn->pn_raw_conn, 0);
- conn->pn_raw_conn = 0;
+ //conn->pn_raw_conn = 0;
}
qdr_action_t *action = qdr_action(qdr_del_http2_connection_CT, "delete_http2_connection");
@@ -1349,7 +1405,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
}
case PN_RAW_CONNECTION_CLOSED_READ: {
pn_raw_connection_close(conn->pn_raw_conn);
- conn->pn_raw_conn = 0;
+ //conn->pn_raw_conn = 0;
qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
break;
}
@@ -1393,10 +1449,11 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
while ( (n = pn_raw_connection_take_written_buffers(conn->pn_raw_conn, buffs, WRITE_BUFFERS)) ) {
for (size_t i = 0; i < n; ++i) {
written += buffs[i].size;
- qd_buffer_t *qd_buff = (qd_buffer_t *) buffs[i].context;
- assert(qd_buff);
- if (qd_buff != NULL)
- qd_buffer_free(qd_buff);
+ qd_http2_buffer_t *qd_http2_buff = (qd_http2_buffer_t *) buffs[i].context;
+ assert(qd_http2_buff);
+ if (qd_http2_buff != NULL) {
+ free_qd_http2_buffer_t(qd_http2_buff);
+ }
}
}
qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_WRITTEN Wrote %i bytes, DEQ_SIZE(session_data->buffs) = %zu", conn->conn_id, written, DEQ_SIZE(conn->session_data->buffs));
@@ -1646,6 +1703,7 @@ static void qdr_http_adaptor_init(qdr_core_t *core, void **adaptor_context)
nghttp2_session_callbacks_set_on_header_callback(callbacks, on_header_callback);
nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, on_stream_close_callback);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, on_data_chunk_recv_callback);
+ nghttp2_session_callbacks_set_send_data_callback(callbacks, snd_data_callback);
nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
adaptor->callbacks = callbacks;
diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h
index dc19f90..9a51dc9 100644
--- a/src/adaptors/http2/http2_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -29,6 +29,11 @@
#include "server_private.h"
#include "adaptors/http_common.h"
+size_t QD_HTTP2_BUFFER_SIZE = 16384;
+size_t NUM_QD_BUFFERS_IN_ONE_HTTP2_BUFFER = 32;
+size_t MAX_BUFFERS = 16;
+size_t HTTP2_DATA_FRAME_HEADER_LENGTH = 9;
+
// We already have a qd_http_listener_t defined in http-libwebsockets.c
// We will call this as qd_http_lsnr_t in order to avoid a clash.
@@ -36,14 +41,18 @@
// and get rid of http-libwebsockets.c and rename this as qd_http_listener_t
typedef struct qdr_http2_session_data_t qdr_http2_session_data_t;
typedef struct qdr_http2_stream_data_t qdr_http2_stream_data_t;
-typedef struct qdr_http2_connection_t qdr_http2_connection_t;
+typedef struct qdr_http2_connection_t qdr_http2_connection_t;
+typedef struct qd_http2_buffer_t qd_http2_buffer_t;
+
DEQ_DECLARE(qdr_http2_stream_data_t, qd_http2_stream_data_list_t);
+DEQ_DECLARE(qd_http2_buffer_t, qd_http2_buffer_list_t);
struct qdr_http2_session_data_t {
qdr_http2_connection_t *conn; // Connection associated with the session_data
nghttp2_session *session; // A pointer to the nghttp2s' session object
qd_http2_stream_data_list_t streams; // A session can have many streams.
- qd_buffer_list_t buffs; // Buffers for writing
+ qd_http2_buffer_list_t buffs; // Buffers for writing
+ bool max_buffs_in_pool;
};
struct qdr_http2_stream_data_t {
@@ -63,14 +72,16 @@ struct qdr_http2_stream_data_t {
DEQ_LINKS(qdr_http2_stream_data_t);
qd_message_body_data_result_t curr_body_data_result;
- int curr_body_data_buff_offset;
+ int curr_body_data_qd_buff_offset;
int body_data_buff_count;
int32_t stream_id;
+ size_t qd_buffers_to_send;
bool entire_header_arrived;
bool header_sent;
bool steam_closed;
- pn_raw_buffer_t raw_buffer;
+
+ bool full_payload_handled;
};
struct qdr_http2_connection_t {
@@ -88,7 +99,6 @@ struct qdr_http2_connection_t {
char *remote_address;
qdr_link_t *stream_dispatcher;
uint64_t stream_dispatcher_id;
- qdr_http2_stream_data_t *initial_stream;
char *reply_to;
nghttp2_data_provider data_prd;
bool connection_established;
@@ -97,8 +107,64 @@ struct qdr_http2_connection_t {
bool timer_scheduled;
};
+struct qd_http2_buffer_t {
+ unsigned int size; ///< Size of data content
+ unsigned char content[16393]; // 16k max content + 9 bytes for the HTTP2 header, 16384 + 9 = 16393
+ DEQ_LINKS(qd_http2_buffer_t);
+};
+
+
+static inline unsigned char *qd_http2_buffer_base(const qd_http2_buffer_t *buf)
+{
+ return (unsigned char*) &buf->content[0];
+}
+
+/**
+ * Return a pointer to the first unused byte in the buffer.
+ * @param buf A pointer to an allocated buffer
+ * @return A pointer to the first free octet in the buffer, the insert point for new data.
+ */
+static inline unsigned char *qd_http2_buffer_cursor(const qd_http2_buffer_t *buf)
+{
+ return ( (unsigned char*) &(buf->content[0]) ) + buf->size;
+}
+
+/**
+ * Return remaining capacity at end of buffer.
+ * @param buf A pointer to an allocated buffer
+ * @return The number of octets in the buffer's free space, how many octets may be inserted.
+ */
+static inline size_t qd_http2_buffer_capacity(const qd_http2_buffer_t *buf)
+{
+ return QD_HTTP2_BUFFER_SIZE - buf->size;
+}
+
+/**
+ * Return the size of the buffers data content.
+ * @param buf A pointer to an allocated buffer
+ * @return The number of octets of data in the buffer
+ */
+static inline size_t qd_http2_buffer_size(const qd_http2_buffer_t *buf)
+{
+ return buf->size;
+}
+
+/**
+ * Notify the buffer that octets have been inserted at the buffer's cursor. This will advance the
+ * cursor by len octets.
+ *
+ * @param buf A pointer to an allocated buffer
+ * @param len The number of octets that have been appended to the buffer
+ */
+static inline void qd_http2_buffer_insert(qd_http2_buffer_t *buf, size_t len)
+{
+ buf->size += len;
+ assert(buf->size <= QD_HTTP2_BUFFER_SIZE);
+}
+
ALLOC_DECLARE(qdr_http2_session_data_t);
ALLOC_DECLARE(qdr_http2_stream_data_t);
ALLOC_DECLARE(qdr_http2_connection_t);
+ALLOC_DECLARE(qd_http2_buffer_t);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org