You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/08/21 21:03:07 UTC
[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1743
- Use new body data API to convert AMQP to HTTP and vice versa"
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
new ad024e4 DISPATCH-1743 - Use new body data API to convert AMQP to HTTP and vice versa"
ad024e4 is described below
commit ad024e47f1d466f17269d47d87b1dd56caf72670
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Tue Aug 18 23:03:12 2020 -0400
DISPATCH-1743 - Use new body data API to convert AMQP to HTTP and vice versa"
---
src/adaptors/http_adaptor.c | 474 +++++++++++++++++++++++++++++++-------------
src/adaptors/http_adaptor.h | 11 +-
2 files changed, 347 insertions(+), 138 deletions(-)
diff --git a/src/adaptors/http_adaptor.c b/src/adaptors/http_adaptor.c
index 0b70459..8a63095 100644
--- a/src/adaptors/http_adaptor.c
+++ b/src/adaptors/http_adaptor.c
@@ -218,12 +218,23 @@ static int on_data_chunk_recv_callback(nghttp2_session *session,
DEQ_INIT(buffers);
qd_buffer_list_append(&buffers, (uint8_t *)data, len);
- qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
- qd_compose_insert_buffers(body, &buffers);
+ if (stream_data->in_dlv) {
+ qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+ qd_compose_insert_binary_buffers(body, &buffers);
+ qd_message_extend(stream_data->message, body);
+ }
+ else {
+ if (!stream_data->body) {
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback creating stream_data->body", conn->conn_id, stream_id);
+ stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+ }
+ qd_compose_insert_binary_buffers(stream_data->body, &buffers);
+ }
- qd_message_extend(stream_data->message, body);
nghttp2_session_consume(session, stream_id, len);
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 on_data_chunk_recv_callback data length %zu", conn->conn_id, stream_id, len);
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback data length %zu", conn->conn_id, stream_id, len);
+
+ //Returning zero means success.
return 0;
}
@@ -232,10 +243,10 @@ static int on_stream_close_callback(nghttp2_session *session,
nghttp2_error_code error_code,
void *user_data)
{
- qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
- qdr_http2_session_data_t *session_data = conn->session_data;
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 on_stream_close_callback ", conn->conn_id, stream_id);
- free_http2_stream_data(session_data, stream_id);
+// qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+// qdr_http2_session_data_t *session_data = conn->session_data;
+// qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 on_stream_close_callback ", conn->conn_id, stream_id);
+// free_http2_stream_data(session_data, stream_id);
return 0;
}
@@ -249,7 +260,8 @@ static ssize_t send_callback(nghttp2_session *session,
qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
qdr_http2_session_data_t *session_data = conn->session_data;
qd_buffer_list_append(&(session_data->buffs), (uint8_t *)data, length);
- qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] HTTP2 send_callback data length %zu", conn->conn_id, length);
+
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] HTTP2 send_callback data length %zu, DEQ_SIZE(session_data->buffs)=%zu", conn->conn_id, length, DEQ_SIZE(session_data->buffs));
return (ssize_t)length;
}
@@ -309,6 +321,8 @@ static int on_begin_headers_callback(nghttp2_session *session,
}
}
+ //Andrew next Monday
+
return 0;
}
@@ -338,7 +352,7 @@ static int on_header_callback(nghttp2_session *session,
}
qd_compose_insert_string_n(stream_data->app_properties, (const char *)name, namelen);
qd_compose_insert_string_n(stream_data->app_properties, (const char *)value, valuelen);
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 HEADER [%s=%s]", conn->conn_id, stream_id, (char *)name, (char *)value);
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 HEADER Incoming [%s=%s]", conn->conn_id, stream_data->stream_id, (char *)name, (char *)value);
}
break;
default:
@@ -348,56 +362,79 @@ static int on_header_callback(nghttp2_session *session,
}
-static void link_deliver(qdr_http2_stream_data_t *stream_data, bool receive_complete)
+static void compose_and_deliver(qdr_http2_stream_data_t *stream_data, qd_composed_field_t *header_and_prop, qdr_http_connection_t *conn, bool receive_complete)
{
- qd_composed_field_t *header_prop = 0;
+ if (receive_complete) {
+ if (!stream_data->body) {
+ fflush(stdout);
+ stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+ qd_compose_insert_binary(stream_data->body, 0, 0);
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] Inserting empty body data in compose_and_deliver", conn->conn_id, stream_data->stream_id);
+ }
+ }
+ if (stream_data->body) {
+ qd_message_compose_4(stream_data->message, header_and_prop, stream_data->app_properties, stream_data->body, receive_complete);
+ }
+ else {
+ qd_message_compose_3(stream_data->message, header_and_prop, stream_data->app_properties, receive_complete);
+ }
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"][L%"PRIu64"] Initiating qdr_link_deliver", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity);
+ stream_data->in_dlv = qdr_link_deliver(stream_data->in_link, stream_data->message, 0, false, 0, 0, 0, 0);
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"][L%"PRIu64"] Routed delivery dlv:%lx", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity, (long) stream_data->in_dlv);
+
+}
+
+static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_complete)
+{
+ qd_composed_field_t *header_and_prop = 0;
qdr_http_connection_t *conn = stream_data->session_data->conn;
+ bool delivery_routed = false;
+
if (conn->ingress) {
- if (stream_data->reply_to && !stream_data->in_dlv) {
- header_prop = qd_message_compose_amqp(stream_data->message,
+ if (stream_data->reply_to && stream_data->entire_header_arrived && !stream_data->in_dlv) {
+ delivery_routed = true;
+ header_and_prop = qd_message_compose_amqp(stream_data->message,
conn->config->address,
0,
stream_data->reply_to,
0,
0,
stream_data->stream_id);
- qd_compose_end_map(stream_data->app_properties);
- stream_data->app_properties = qd_compose(QD_PERFORMATIVE_BODY_DATA, stream_data->app_properties);
- if (receive_complete) {
- qd_compose_insert_binary(stream_data->app_properties, 0, 0);
- }
- qd_message_compose_3(stream_data->message, header_prop, stream_data->app_properties, receive_complete);
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"][L%"PRIu64"] Initiating qdr_link_deliver", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity);
- stream_data->in_dlv = qdr_link_deliver(stream_data->in_link, stream_data->message, 0, false, 0, 0, 0, 0);
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"][L%"PRIu64"] Routed delivery dlv:%lx", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity, (long) stream_data->in_dlv);
-
+ compose_and_deliver(stream_data, header_and_prop, conn, receive_complete);
}
}
else {
- header_prop = qd_message_compose_amqp(stream_data->message,
- stream_data->reply_to,
- 0,
- 0,
- 0,
- 0,
- stream_data->stream_id);
- qd_compose_end_map(stream_data->app_properties);
- stream_data->app_properties = qd_compose(QD_PERFORMATIVE_BODY_DATA, stream_data->app_properties);
- if (receive_complete) {
- qd_compose_insert_binary(stream_data->app_properties, 0, 0);
+ if (stream_data->entire_header_arrived) {
+ delivery_routed = true;
+ header_and_prop = qd_message_compose_amqp(stream_data->message,
+ stream_data->reply_to,
+ 0,
+ 0,
+ 0,
+ 0,
+ stream_data->stream_id);
+ compose_and_deliver(stream_data, header_and_prop, conn, receive_complete);
}
- qd_message_compose_3(stream_data->message, header_prop, stream_data->app_properties, receive_complete);
- stream_data->in_dlv = qdr_link_deliver(stream_data->in_link, stream_data->message, 0, false, 0, 0, 0, 0);
}
+
+ return delivery_routed;
}
static void write_buffers(qdr_http_connection_t *conn)
{
qdr_http2_session_data_t *session_data = conn->session_data;
- size_t buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
- size_t qd_buff_size = DEQ_SIZE(session_data->buffs);
- size_t num_buffs = qd_buff_size > buffs_to_write ? buffs_to_write : qd_buff_size;
+ size_t pn_buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
+ size_t num_buffs = DEQ_SIZE(session_data->buffs) > pn_buffs_to_write ? pn_buffs_to_write : DEQ_SIZE(session_data->buffs);
+
+ //
+ // No buffers to write, no need to proceed.
+ //
+ if (num_buffs == 0) {
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Zero buffers written in write_buffers - pn_raw_connection_write_buffers_capacity = %zu, DEQ_SIZE(session_data->buffs) = %zu - returning", conn->conn_id, pn_buffs_to_write, DEQ_SIZE(session_data->buffs));
+ return;
+ }
+
pn_raw_buffer_t raw_buffers[num_buffs];
qd_buffer_t *qd_buff = DEQ_HEAD(session_data->buffs);
@@ -414,16 +451,16 @@ static void write_buffers(qdr_http_connection_t *conn)
DEQ_REMOVE_HEAD(session_data->buffs);
qd_buff = DEQ_HEAD(session_data->buffs);
i ++;
+
}
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Writing %i bytes in write_buffers", conn->conn_id, total_bytes);
+
if (i >0) {
size_t num_buffers_written = pn_raw_connection_write_buffers(session_data->conn->pn_raw_conn, raw_buffers, num_buffs);
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Written %i buffer(s) and %i bytes in pn_raw_connection_write_buffers", conn->conn_id, num_buffers_written, total_bytes);
if (num_buffs != num_buffers_written) {
assert(false);
}
}
-
-
}
//static void send_window_update_frame(qdr_http2_session_data_t *session_data, int32_t stream_id)
@@ -448,7 +485,9 @@ static void send_settings_frame(qdr_http_connection_t *conn)
int rv = nghttp2_submit_settings(session_data->session, NGHTTP2_FLAG_NONE, iv, ARRLEN(iv));
if (rv != 0) {
qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Fatal error sending settings frame, rv=%i", conn->conn_id, rv);
+ return;
}
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Initial SETTINGS frame sent", conn->conn_id);
nghttp2_session_send(session_data->session);
write_buffers(session_data->conn);
}
@@ -465,38 +504,62 @@ static int on_frame_recv_callback(nghttp2_session *session,
switch (frame->hd.type) {
case NGHTTP2_SETTINGS: {
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 SETTINGS frame received", conn->conn_id, stream_id);
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 SETTINGS frame received", conn->conn_id, stream_id);
}
break;
case NGHTTP2_WINDOW_UPDATE:
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 WINDOW_UPDATE frame received", conn->conn_id, stream_id);
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 WINDOW_UPDATE frame received", conn->conn_id, stream_id);
break;
case NGHTTP2_DATA: {
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 DATA frame received", conn->conn_id, stream_id);
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] NGHTTP2_DATA frame received", conn->conn_id, stream_id);
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 END_STREAM flag received", conn->conn_id, stream_id);
- MSG_CONTENT(stream_data->message)->receive_complete = true;
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] NGHTTP2_DATA NGHTTP2_FLAG_END_STREAM flag received, receive_complete = true", conn->conn_id, stream_id);
+ qd_message_set_receive_complete(stream_data->message);
+ }
+
+ if (stream_data->in_dlv) {
+ if (!stream_data->body) {
+ stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+ qd_compose_insert_binary(stream_data->body, 0, 0);
+ qd_message_extend(stream_data->message, stream_data->body);
+ }
}
+
if (stream_data->in_dlv) {
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] NGHTTP2_DATA frame received, qdr_delivery_continue(dlv=%lx)", conn->conn_id, stream_id, (long) stream_data->in_dlv);
qdr_delivery_continue(http_adaptor->core, stream_data->in_dlv, false);
}
}
break;
case NGHTTP2_HEADERS:{
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 HEADERS frame received", conn->conn_id, stream_id);
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 HEADERS frame received", conn->conn_id, stream_id);
if (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS) {
/* All the headers have been received. Send out the AMQP message */
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 END_HEADERS flag received", conn->conn_id, stream_id);
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 NGHTTP2_FLAG_END_HEADERS flag received, all headers have arrived", conn->conn_id, stream_id);
stream_data->entire_header_arrived = true;
- // All header fields have been received
- // End the map.
+ //
+ // All header fields have been received. End the application properties map.
+ //
+ qd_compose_end_map(stream_data->app_properties);
+
bool receive_complete = false;
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 NGHTTP2_FLAG_END_HEADERS and NGHTTP2_FLAG_END_STREAM flag received, receive_complete=true", conn->conn_id, stream_id);
+ qd_message_set_receive_complete(stream_data->message);
receive_complete = true;
}
- MSG_CONTENT(stream_data->message)->receive_complete = receive_complete;
- link_deliver(stream_data, receive_complete);
+ //
+ // All headers have arrived, send out the delivery with just the headers,
+ // if/when the body arrives later, we will call the qdr_delivery_continue()
+ //
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] All headers arrived, trying to route delivery", conn->conn_id);
+ if (route_delivery(stream_data, receive_complete)) {
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] All headers arrived, delivery routed successfully", conn->conn_id);
+ }
+ else {
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] All headers arrived, delivery not routed", conn->conn_id);
+ }
}
}
break;
@@ -518,6 +581,7 @@ qdr_http_connection_t *qdr_http_connection_ingress(qd_http_lsnr_t* listener)
ingress_http_conn->session_data = new_qdr_http2_session_data_t();
ZERO(ingress_http_conn->session_data);
+ DEQ_INIT(ingress_http_conn->session_data->buffs);
DEQ_INIT(ingress_http_conn->session_data->streams);
ingress_http_conn->session_data->conn = ingress_http_conn;
@@ -621,7 +685,13 @@ static void qdr_http_second_attach(void *context, qdr_link_t *link,
if (qdr_link_direction(link) == QD_OUTGOING && source->dynamic) {
if (stream_data->session_data->conn->ingress) {
qdr_copy_reply_to(stream_data, qdr_terminus_get_address(source));
- link_deliver(stream_data, MSG_CONTENT(stream_data->message)->receive_complete);
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to is available now, trying to route delivery", stream_data->session_data->conn->conn_id);
+ if (route_delivery(stream_data, qd_message_receive_complete(stream_data->message))) {
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to available now, delivery routed successfully", stream_data->session_data->conn->conn_id);
+ }
+ else {
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to available now, delivery not routed", stream_data->session_data->conn->conn_id);
+ }
grant_read_buffers(stream_data->session_data->conn);
}
qdr_link_flow(http_adaptor->core, link, 10, false);
@@ -671,10 +741,168 @@ ssize_t read_callback(nghttp2_session *session,
nghttp2_data_source *source,
void *user_data)
{
- qd_buffer_t *qd_buff = source->ptr;
- buf = qd_buffer_base(qd_buff);
- size_t ret_val = qd_buffer_size(qd_buff);
- return ret_val;
+ qdr_link_t *link = source->ptr;
+ qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+ qdr_http2_session_data_t *session_data = conn->session_data;
+ qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
+
+ qd_message_depth_status_t status = qd_message_check_depth(stream_data->message, QD_DEPTH_BODY);
+
+ write_buffers(session_data->conn);
+
+ switch (status) {
+ case QD_MESSAGE_DEPTH_OK: {
+ //
+ // At least one complete body performative has arrived. It is now safe to switch
+ // over to the per-message extraction of body-data segments.
+ //
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_DEPTH_OK", conn->conn_id, stream_data->stream_id);
+ qd_message_body_data_t *body_data = 0;
+ qd_message_body_data_result_t body_data_result;
+
+ //
+ // Process as many body-data segments as are available.
+ //
+ int buff_offset = 0;
+ body_data = stream_data->curr_body_data;
+ if (body_data) {
+ //
+ // If we saved the body_data, use the buff_offset.
+ //
+ body_data_result = stream_data->curr_body_data_result;
+ buff_offset = stream_data->curr_body_data_buff_offset;
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback Use existing body_data", conn->conn_id, stream_data->stream_id);
+ }
+ else {
+ body_data_result = qd_message_next_body_data(stream_data->message, &body_data);
+ if (stream_data->curr_body_data) {
+ qd_message_body_data_release(stream_data->curr_body_data);
+ }
+ stream_data->curr_body_data = body_data;
+ stream_data->curr_body_data_result = body_data_result;
+ stream_data->curr_body_data_buff_offset = 0;
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback qd_message_next_body_data", conn->conn_id, stream_data->stream_id);
+ }
+
+ switch (body_data_result) {
+ case QD_MESSAGE_BODY_DATA_OK: {
+ //
+ // We have a new valid body-data segment. Handle it
+ //
+ stream_data->body_data_buff_count = qd_message_body_data_buffer_count(body_data);
+
+ size_t pn_buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
+
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback pn_raw_connection_write_buffers_capacity=%zu", conn->conn_id, stream_data->stream_id, pn_buffs_to_write);
+
+ if (stream_data->body_data_buff_count == 0 || pn_buffs_to_write==0) {
+ // We cannot send anything, we need to come back here.
+
+ //TODO - This will not pass code review. Need to investigate.
+ link->credit_to_core = 0;
+ qdr_link_flow(http_adaptor->core, link, 1, false);
+ if (stream_data->body_data_buff_count == 0) {
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Exiting read_callback QD_MESSAGE_BODY_DATA_OK, body_data_buff_count=0, temporarily pausing stream", conn->conn_id, stream_data->stream_id);
+ qd_message_body_data_release(stream_data->curr_body_data);
+ stream_data->curr_body_data = 0;
+ }
+ if (pn_buffs_to_write == 0)
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Exiting read_callback, pn_buffs_to_write=0, pausing stream", conn->conn_id, stream_data->stream_id);
+
+ //
+ // We don't have any buffers to send but we may or may not get more buffers.
+ // Temporarily pause this stream
+ //
+
+ stream_data->disposition = 0;
+ return NGHTTP2_ERR_DEFERRED;
+ }
+
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_OK, body_data_buff_count=%i", conn->conn_id, stream_data->stream_id, stream_data->body_data_buff_count);
+
+ //
+ // We are looking to write only one pn_raw_buffer_t per iteration.
+ //
+ pn_raw_buffer_t raw_buffers[1];
+ qd_message_body_data_buffers(body_data, raw_buffers, buff_offset, 1);
+ stream_data->curr_body_data_buff_offset += 1;
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback, size of raw_buffer=%zu", conn->conn_id, stream_data->stream_id, raw_buffers[0].size);
+ memcpy(buf, raw_buffers[0].bytes, raw_buffers[0].size);
+ stream_data->body_data_buff_count -= 1;
+ if (!stream_data->body_data_buff_count) {
+ qd_message_body_data_release(stream_data->curr_body_data);
+ stream_data->curr_body_data = 0;
+ }
+ return raw_buffers[0].size;
+ }
+
+ case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+ //
+ // A new segment has not completely arrived yet. Check again later.
+ //
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id);
+ return 0;
+
+ case QD_MESSAGE_BODY_DATA_NO_MORE: {
+ //
+ // We have already handled the last body-data segment for this delivery.
+ // Complete the "sending" of this delivery and replenish credit.
+ //
+ size_t pn_buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
+ if (pn_buffs_to_write == 0) {
+ return NGHTTP2_ERR_DEFERRED;
+ stream_data->disposition = 0;
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_NO_MORE - pn_buffs_to_write=0 send is not complete", conn->conn_id, stream_data->stream_id);
+ }
+ else {
+ qd_message_body_data_release(stream_data->curr_body_data);
+ *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+ qd_message_set_send_complete(stream_data->message);
+ stream_data->disposition = PN_ACCEPTED; // This will cause the delivery to be settled
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_NO_MORE - send is complete, setting NGHTTP2_DATA_FLAG_EOF", conn->conn_id, stream_data->stream_id);
+ }
+
+ qdr_link_flow(http_adaptor->core, link, 1, false);
+ break;
+ }
+
+ case QD_MESSAGE_BODY_DATA_INVALID:
+ //
+ // The body-data is corrupt in some way. Stop handling the delivery and reject it.
+ //
+ *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+ qd_message_body_data_release(stream_data->curr_body_data);
+ qdr_link_flow(http_adaptor->core, link, 1, false);
+ stream_data->disposition = PN_REJECTED;
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_INVALID", conn->conn_id, stream_data->stream_id);
+ break;
+
+ case QD_MESSAGE_BODY_DATA_NOT_DATA:
+ //
+ // Valid data was seen, but it is not a body-data performative. Reject the delivery.
+ //
+ *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+ qd_message_body_data_release(stream_data->curr_body_data);
+ qdr_link_flow(http_adaptor->core, link, 1, false);
+ stream_data->disposition = PN_REJECTED;
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_NOT_DATA", conn->conn_id, stream_data->stream_id);
+ break;
+ }
+ break;
+ }
+
+ case QD_MESSAGE_DEPTH_INVALID:
+ qdr_link_flow(http_adaptor->core, link, 1, false);
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_callback QD_MESSAGE_DEPTH_INVALID", conn->conn_id, stream_data->stream_id);
+ stream_data->disposition = PN_REJECTED;
+ break;
+
+ case QD_MESSAGE_DEPTH_INCOMPLETE:
+ break;
+ }
+
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_callback Returning zero", conn->conn_id, stream_data->stream_id);
+ return 0;
}
uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *link)
@@ -682,12 +910,12 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *
qdr_http2_session_data_t *session_data = stream_data->session_data;
qdr_http_connection_t *conn = session_data->conn;
qd_message_t *message = stream_data->message;
-
if (stream_data->out_dlv) {
- if (!stream_data->header_sent) {
- stream_data->header_sent = true;
+ if (qd_message_send_complete(stream_data->message))
+ return 0;
+ if (!stream_data->header_sent) {
// The HTTP Path is in the AMQP to field.
//qd_iterator_t *to = qd_message_field_iterator(message, QD_FIELD_TO);
//char *path = (char *)qd_iterator_copy(to);
@@ -718,20 +946,8 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *
hdrs[idx].namelen = qd_iterator_length(key_raw);
hdrs[idx].valuelen = qd_iterator_length(val_raw);
hdrs[idx].flags = NGHTTP2_NV_FLAG_NONE;
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%i] HTTP2 Outgoing HEADER [%s=%s]", conn->conn_id, stream_id, (char *)hdrs[idx].name, (char *)hdrs[idx].value);
}
- /*
- * case NGHTTP2_HEADERS
- * case NGHTTP2_PRIORITY
- * case NGHTTP2_RST_STREAM
- * case NGHTTP2_SETTINGS
- * case NGHTTP2_PUSH_PROMISE
- * case NGHTTP2_PING
- * case NGHTTP2_CONTINUATION
- * case NGHTTP2_GOAWAY
- * case NGHTTP2_WINDOW_UPDATE
- */
// This does not really submit the request. We need to read the bytes
//nghttp2_session_set_next_stream_id(session_data->session, stream_data->stream_id);
stream_data->stream_id = nghttp2_submit_headers(session_data->session,
@@ -739,60 +955,47 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *
stream_id, NULL, hdrs,
count,
stream_data);
+ if (stream_id != -1) {
+ stream_data->stream_id = stream_id;
+ }
+
+ for (uint32_t idx = 0; idx < count; idx++) {
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] HTTP2 HEADER Outgoing [%s=%s]", conn->conn_id, stream_data->stream_id, (char *)hdrs[idx].name, (char *)hdrs[idx].value);
+ }
nghttp2_session_send(session_data->session);
write_buffers(session_data->conn);
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Headers submitted", conn->conn_id, stream_data->stream_id);
+ }
+ else {
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Headers already submitted, Proceeding with the body", conn->conn_id, stream_data->stream_id);
+ }
+ if (stream_data->header_sent) {
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Stream was paused, resuming now", conn->conn_id, stream_data->stream_id);
+ nghttp2_session_resume_data(session_data->session, stream_data->stream_id);
+ nghttp2_session_send(session_data->session);
+ write_buffers(session_data->conn);
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] nghttp2_session_send - stream resumed, write_buffers done", conn->conn_id, stream_data->stream_id);
}
+ else {
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Processing message body", conn->conn_id, stream_data->stream_id);
+ conn->data_prd.read_callback = read_callback;
+ conn->data_prd.source.ptr = link;
+ int rv = nghttp2_submit_data(session_data->session, NGHTTP2_FLAG_END_STREAM, stream_data->stream_id, &conn->data_prd);
+ if (rv != 0) {
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] Error submitting data rv=%i", conn->conn_id, stream_data->stream_id, rv);
+ }
+ else {
+ nghttp2_session_send(session_data->session);
+ write_buffers(session_data->conn);
+ qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] nghttp2_session_send - write_buffers done", conn->conn_id, stream_data->stream_id);
+ }
-// qd_message_body_data_t *out_body_data = 0;
-// qd_message_body_data_result_t body_data_result = qd_message_next_body_data(message, &out_body_data);
-// bool has_out_body_data = false;
-//
-// char *body = 0;
-// has_out_body_data = true;
-// switch (body_data_result) {
-// case QD_MESSAGE_BODY_DATA_OK: {
-// qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%i] HTTP2 Outgoing QD_MESSAGE_BODY_DATA_OK", conn->conn_id, stream_data->stream_id);
-// qd_iterator_t *body_iter = qd_message_body_data_iterator(out_body_data);
-// body = (char*) qd_iterator_copy(body_iter);
-// if (body) {
-// conn->data_prd.source.ptr = body;
-// printf("handle_outgoing_http: message body-data received: %s\n", body);
-// nghttp2_submit_data(session_data->session, 0, stream_data->stream_id, &conn->data_prd);
-// }
-// free(body);
-// qd_iterator_free(body_iter);
-// break;
-// }
-//
-// case QD_MESSAGE_BODY_DATA_INCOMPLETE:
-// qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%i] HTTP2 Outgoing QD_MESSAGE_BODY_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id);
-// break;
-//
-// case QD_MESSAGE_BODY_DATA_NO_MORE: {
-// qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%i] HTTP2 Outgoing QD_MESSAGE_BODY_DATA_NO_MORE", conn->conn_id, stream_data->stream_id);
-// qd_message_set_send_complete(message);
-// qdr_link_flow(http_adaptor->core, link, 1, false);
-// nghttp2_submit_data(session_data->session, NGHTTP2_FLAG_END_STREAM, stream_data->stream_id, &conn->data_prd);
-// return PN_ACCEPTED; // This will cause the delivery to be settled
-// }
-//
-// case QD_MESSAGE_BODY_DATA_INVALID:
-// qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%i] HTTP2 Outgoing QD_MESSAGE_BODY_DATA_INVALID", conn->conn_id, stream_data->stream_id);
-// qdr_link_flow(http_adaptor->core, link, 1, false);
-// return PN_REJECTED;
-//
-// case QD_MESSAGE_BODY_DATA_NOT_DATA:
-// qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%i] HTTP2 Outgoing QD_MESSAGE_BODY_DATA_NOT_DATA", conn->conn_id, stream_data->stream_id);
-// qdr_link_flow(http_adaptor->core, link, 1, false);
-// return PN_REJECTED;
-//
-// } // switch
-//
-// if (has_out_body_data)
-// nghttp2_session_send(session_data->session);
+ }
+ stream_data->header_sent = true;
+ return stream_data->disposition;
}
return 0;
}
@@ -812,8 +1015,6 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
stream_data->message = qdr_delivery_message(delivery);
stream_data->out_dlv = delivery;
- conn->initial_stream = stream_data;
-
qdr_terminus_t *source = qdr_terminus(0);
qdr_terminus_set_address(source, conn->config->address);
stream_data->out_link = qdr_link_first_attach(conn->qdr_conn,
@@ -825,15 +1026,12 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
true,
delivery,
&(stream_data->outgoing_id));
- conn->initial_stream->out_link = stream_data->out_link;
qdr_link_set_context(stream_data->out_link, stream_data);
qd_iterator_t *fld_iter = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO);
char *reply_to = (char *)qd_iterator_copy(fld_iter);
stream_data->reply_to = malloc(qd_iterator_length(fld_iter) + 1);
strcpy(stream_data->reply_to, reply_to);
- printf ("qdr_http_deliver if stream_dispatcher reply_to is %s\n", (char *)reply_to);
-
// Sender link
qdr_terminus_t *target = qdr_terminus(0);
qdr_terminus_set_address(target, reply_to);
@@ -849,7 +1047,6 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
// TODO - This is wrong
qdr_link_set_context(stream_data->in_link, stream_data);
- printf ("qdr_http_deliver stream_data->session_data->conn->stream_dispatcher %p\n", (void *)stream_data->in_link);
//Let's make an outbound connection to the configured connector.
qdr_http_connection_t *conn = stream_data->session_data->conn;
@@ -867,6 +1064,7 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
}
return handle_outgoing_http(stream_data, link);
}
+ qdr_link_flow(http_adaptor->core, link, 1, false);
}
return 0;
}
@@ -893,7 +1091,7 @@ static int handle_incoming_http(qdr_http_connection_t *conn)
qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
uint32_t raw_buff_size = raw_buffers[i].size;
qd_buffer_insert(buf, raw_buff_size);
- qd_log(http_adaptor->log_source, QD_LOG_DEBUG, "[C%i] Inserting qd_buffer of size %"PRIu32" ", conn->conn_id, raw_buff_size);
+ qd_log(http_adaptor->log_source, QD_LOG_DEBUG, "[C%i] - handle_incoming_http - Inserting qd_buffer of size %"PRIu32" ", conn->conn_id, raw_buff_size);
count += raw_buffers[i].size;
DEQ_INSERT_TAIL(buffers, buf);
}
@@ -977,27 +1175,26 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
} else {
qd_log(log, QD_LOG_INFO, "[C%i] Connected", conn->conn_id);
conn->connection_established = true;
- handle_outgoing_http(conn->initial_stream, conn->initial_stream->out_link);
qdr_connection_process(conn->qdr_conn);
}
break;
}
case PN_RAW_CONNECTION_CLOSED_READ: {
- qd_log(log, QD_LOG_INFO, "[C%i] Closed for reading", conn->conn_id);
+ qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
break;
}
case PN_RAW_CONNECTION_CLOSED_WRITE: {
- qd_log(log, QD_LOG_INFO, "[C%i] Closed for writing", conn->conn_id);
+ qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id);
break;
}
case PN_RAW_CONNECTION_DISCONNECTED: {
qdr_connection_closed(conn->qdr_conn);
//free_qdr_http_connection(conn);
- qd_log(log, QD_LOG_INFO, "[C%i] Disconnected", conn->conn_id);
+ qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_DISCONNECTED", conn->conn_id);
break;
}
case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
- qd_log(log, QD_LOG_INFO, "[C%i] Need write buffers", conn->conn_id);
+ qd_log(log, QD_LOG_TRACE, "[C%i] Need write buffers", conn->conn_id);
//TODO - Refactor this out
//while (qdr_connection_process(conn->qdr_conn)) {}
@@ -1005,20 +1202,20 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
}
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
grant_read_buffers(conn);
- qd_log(log, QD_LOG_INFO, "[C%i] Need read buffers", conn->conn_id);
+ qd_log(log, QD_LOG_TRACE, "[C%i] Need read buffers", conn->conn_id);
//TODO - No need to call this
//while (qdr_connection_process(conn->qdr_conn)) {}
break;
}
case PN_RAW_CONNECTION_WAKE: {
- qd_log(log, QD_LOG_INFO, "[C%i] Wake-up", conn->conn_id);
+ qd_log(log, QD_LOG_TRACE, "[C%i] Wake-up", conn->conn_id);
while (qdr_connection_process(conn->qdr_conn)) {}
break;
}
case PN_RAW_CONNECTION_READ: {
int read = handle_incoming_http(conn);
- qd_log(log, QD_LOG_INFO, "[C%i] Read %i bytes", conn->conn_id, read);
+ qd_log(log, QD_LOG_TRACE, "[C%i] Read %i bytes", conn->conn_id, read);
while (qdr_connection_process(conn->qdr_conn)) {}
break;
}
@@ -1028,13 +1225,14 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
size_t written = 0;
while ( (n = pn_raw_connection_take_written_buffers(conn->pn_raw_conn, buffs, WRITE_BUFFERS)) ) {
for (size_t i = 0; i < n; ++i) {
+ written += buffs[i].size;
qd_buffer_t *qd_buff = (qd_buffer_t *) buffs[i].context;
assert(qd_buff);
if (qd_buff)
qd_buffer_free(qd_buff);
}
}
- qd_log(log, QD_LOG_INFO, "[C%i] Wrote %i bytes", conn->conn_id, written);
+ qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_WRITTEN Wrote %i bytes", conn->conn_id, written);
while (qdr_connection_process(conn->qdr_conn)) {}
break;
}
@@ -1063,6 +1261,7 @@ static void handle_listener_event(pn_event_t *e, qd_server_t *qd_server, void *c
break;
case PN_LISTENER_CLOSE:
+ qd_log(log, QD_LOG_INFO, "Closing HTTP connection on %s", host_port);
break;
default:
@@ -1131,6 +1330,7 @@ qdr_http_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector
egress_conn->session_data = new_qdr_http2_session_data_t();
ZERO(egress_conn->session_data);
+ DEQ_INIT(egress_conn->session_data->buffs);
DEQ_INIT(egress_conn->session_data->streams);
egress_conn->session_data->conn = egress_conn;
diff --git a/src/adaptors/http_adaptor.h b/src/adaptors/http_adaptor.h
index 2f0134a..cfbdf73 100644
--- a/src/adaptors/http_adaptor.h
+++ b/src/adaptors/http_adaptor.h
@@ -50,6 +50,7 @@ struct qdr_http2_stream_data_t {
qdr_delivery_t *out_dlv;
uint64_t incoming_id;
uint64_t outgoing_id;
+ uint64_t disposition;
qdr_link_t *in_link;
qdr_link_t *out_link;
@@ -58,8 +59,16 @@ struct qdr_http2_stream_data_t {
qd_composed_field_t *field;
qd_composed_field_t *header_properties; // This has the header and the properties.
qd_composed_field_t *app_properties; // This has the application properties.
- bool entire_header_arrived; // true if all the header properties has arrived, just before the start of the data frame or just before the end stream.
+ qd_composed_field_t *body;
+
+ qd_message_body_data_t *curr_body_data;
+ qd_message_body_data_result_t curr_body_data_result;
+ int curr_body_data_buff_offset;
+ int body_data_buff_count;
+
+ bool entire_header_arrived; // true if all the header properties have arrived, just before the start of the data frame or just before the END_STREAM.
bool header_sent;
+ bool has_body;
bool has_data; // Did we ever receive a DATA frame.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org