You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/08/21 21:03:07 UTC

[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1743 - Use new body data API to convert AMQP to HTTP and vice versa"

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
     new ad024e4  DISPATCH-1743 - Use new body data API to convert AMQP to HTTP and vice versa"
ad024e4 is described below

commit ad024e47f1d466f17269d47d87b1dd56caf72670
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Tue Aug 18 23:03:12 2020 -0400

    DISPATCH-1743 - Use new body data API to convert AMQP to HTTP and vice versa"
---
 src/adaptors/http_adaptor.c | 474 +++++++++++++++++++++++++++++++-------------
 src/adaptors/http_adaptor.h |  11 +-
 2 files changed, 347 insertions(+), 138 deletions(-)

diff --git a/src/adaptors/http_adaptor.c b/src/adaptors/http_adaptor.c
index 0b70459..8a63095 100644
--- a/src/adaptors/http_adaptor.c
+++ b/src/adaptors/http_adaptor.c
@@ -218,12 +218,23 @@ static int on_data_chunk_recv_callback(nghttp2_session *session,
     DEQ_INIT(buffers);
     qd_buffer_list_append(&buffers, (uint8_t *)data, len);
 
-    qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
-    qd_compose_insert_buffers(body, &buffers);
+    if (stream_data->in_dlv) {
+        qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+        qd_compose_insert_binary_buffers(body, &buffers);
+        qd_message_extend(stream_data->message, body);
+    }
+    else {
+        if (!stream_data->body) {
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback creating stream_data->body", conn->conn_id, stream_id);
+            stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+        }
+        qd_compose_insert_binary_buffers(stream_data->body, &buffers);
+    }
 
-    qd_message_extend(stream_data->message, body);
     nghttp2_session_consume(session, stream_id, len);
-    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 on_data_chunk_recv_callback data length %zu", conn->conn_id, stream_id, len);
+    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback data length %zu", conn->conn_id, stream_id, len);
+
+    //Returning zero means success.
     return 0;
 }
 
@@ -232,10 +243,10 @@ static int on_stream_close_callback(nghttp2_session *session,
                                     nghttp2_error_code error_code,
                                     void *user_data)
 {
-    qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
-    qdr_http2_session_data_t *session_data = conn->session_data;
-    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 on_stream_close_callback ", conn->conn_id, stream_id);
-    free_http2_stream_data(session_data, stream_id);
+//    qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+//    qdr_http2_session_data_t *session_data = conn->session_data;
+//    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 on_stream_close_callback ", conn->conn_id, stream_id);
+//    free_http2_stream_data(session_data, stream_id);
     return 0;
 }
 
@@ -249,7 +260,8 @@ static ssize_t send_callback(nghttp2_session *session,
     qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
     qdr_http2_session_data_t *session_data = conn->session_data;
     qd_buffer_list_append(&(session_data->buffs), (uint8_t *)data, length);
-    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] HTTP2 send_callback data length %zu", conn->conn_id, length);
+
+    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] HTTP2 send_callback data length %zu, DEQ_SIZE(session_data->buffs)=%zu", conn->conn_id, length, DEQ_SIZE(session_data->buffs));
     return (ssize_t)length;
 }
 
@@ -309,6 +321,8 @@ static int on_begin_headers_callback(nghttp2_session *session,
         }
     }
 
+    //Andrew next Monday
+
     return 0;
 }
 
@@ -338,7 +352,7 @@ static int on_header_callback(nghttp2_session *session,
             }
             qd_compose_insert_string_n(stream_data->app_properties, (const char *)name, namelen);
             qd_compose_insert_string_n(stream_data->app_properties, (const char *)value, valuelen);
-            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 HEADER [%s=%s]", conn->conn_id, stream_id, (char *)name, (char *)value);
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 HEADER Incoming [%s=%s]", conn->conn_id, stream_data->stream_id, (char *)name, (char *)value);
         }
         break;
         default:
@@ -348,56 +362,79 @@ static int on_header_callback(nghttp2_session *session,
 }
 
 
-static void link_deliver(qdr_http2_stream_data_t *stream_data, bool receive_complete)
+static void compose_and_deliver(qdr_http2_stream_data_t *stream_data, qd_composed_field_t  *header_and_prop, qdr_http_connection_t *conn, bool receive_complete)
 {
-    qd_composed_field_t  *header_prop = 0;
+    if (receive_complete) {
+        if (!stream_data->body) {
+            fflush(stdout);
+            stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+            qd_compose_insert_binary(stream_data->body, 0, 0);
+            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] Inserting empty body data in compose_and_deliver", conn->conn_id, stream_data->stream_id);
+        }
+    }
+    if (stream_data->body) {
+        qd_message_compose_4(stream_data->message, header_and_prop, stream_data->app_properties, stream_data->body, receive_complete);
+    }
+    else {
+        qd_message_compose_3(stream_data->message, header_and_prop, stream_data->app_properties, receive_complete);
+    }
+    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"][L%"PRIu64"] Initiating qdr_link_deliver", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity);
+    stream_data->in_dlv = qdr_link_deliver(stream_data->in_link, stream_data->message, 0, false, 0, 0, 0, 0);
+    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"][L%"PRIu64"] Routed delivery dlv:%lx", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity, (long) stream_data->in_dlv);
+
+}
+
+static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_complete)
+{
+    qd_composed_field_t  *header_and_prop = 0;
     qdr_http_connection_t *conn  = stream_data->session_data->conn;
 
+    bool delivery_routed = false;
+
     if (conn->ingress) {
-        if (stream_data->reply_to && !stream_data->in_dlv) {
-            header_prop = qd_message_compose_amqp(stream_data->message,
+        if (stream_data->reply_to && stream_data->entire_header_arrived && !stream_data->in_dlv) {
+            delivery_routed = true;
+            header_and_prop = qd_message_compose_amqp(stream_data->message,
                                                   conn->config->address,
                                                   0,
                                                   stream_data->reply_to,
                                                   0,
                                                   0,
                                                   stream_data->stream_id);
-            qd_compose_end_map(stream_data->app_properties);
-            stream_data->app_properties = qd_compose(QD_PERFORMATIVE_BODY_DATA, stream_data->app_properties);
-            if (receive_complete) {
-                qd_compose_insert_binary(stream_data->app_properties, 0, 0);
-            }
-            qd_message_compose_3(stream_data->message, header_prop, stream_data->app_properties, receive_complete);
-            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"][L%"PRIu64"] Initiating qdr_link_deliver", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity);
-            stream_data->in_dlv = qdr_link_deliver(stream_data->in_link, stream_data->message, 0, false, 0, 0, 0, 0);
-            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"][L%"PRIu64"] Routed delivery dlv:%lx", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity, (long) stream_data->in_dlv);
-
+            compose_and_deliver(stream_data, header_and_prop, conn, receive_complete);
         }
     }
     else {
-        header_prop = qd_message_compose_amqp(stream_data->message,
-                                              stream_data->reply_to,
-                                              0,
-                                              0,
-                                              0,
-                                              0,
-                                              stream_data->stream_id);
-        qd_compose_end_map(stream_data->app_properties);
-        stream_data->app_properties = qd_compose(QD_PERFORMATIVE_BODY_DATA, stream_data->app_properties);
-        if (receive_complete) {
-            qd_compose_insert_binary(stream_data->app_properties, 0, 0);
+        if (stream_data->entire_header_arrived) {
+            delivery_routed = true;
+            header_and_prop = qd_message_compose_amqp(stream_data->message,
+                                                  stream_data->reply_to,
+                                                  0,
+                                                  0,
+                                                  0,
+                                                  0,
+                                                  stream_data->stream_id);
+            compose_and_deliver(stream_data, header_and_prop, conn, receive_complete);
         }
-        qd_message_compose_3(stream_data->message, header_prop, stream_data->app_properties, receive_complete);
-        stream_data->in_dlv = qdr_link_deliver(stream_data->in_link, stream_data->message, 0, false, 0, 0, 0, 0);
     }
+
+    return delivery_routed;
 }
 
 static void write_buffers(qdr_http_connection_t *conn)
 {
     qdr_http2_session_data_t *session_data = conn->session_data;
-    size_t buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
-    size_t qd_buff_size = DEQ_SIZE(session_data->buffs);
-    size_t num_buffs = qd_buff_size > buffs_to_write ? buffs_to_write : qd_buff_size;
+    size_t pn_buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
+    size_t num_buffs = DEQ_SIZE(session_data->buffs) > pn_buffs_to_write ? pn_buffs_to_write : DEQ_SIZE(session_data->buffs);
+
+    //
+    // No buffers to write, no need to proceed.
+    //
+    if (num_buffs == 0) {
+        qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Zero buffers written in write_buffers - pn_raw_connection_write_buffers_capacity = %zu, DEQ_SIZE(session_data->buffs) = %zu - returning", conn->conn_id, pn_buffs_to_write, DEQ_SIZE(session_data->buffs));
+        return;
+    }
+
     pn_raw_buffer_t raw_buffers[num_buffs];
     qd_buffer_t *qd_buff = DEQ_HEAD(session_data->buffs);
 
@@ -414,16 +451,16 @@ static void write_buffers(qdr_http_connection_t *conn)
         DEQ_REMOVE_HEAD(session_data->buffs);
         qd_buff = DEQ_HEAD(session_data->buffs);
         i ++;
+
     }
-    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Writing %i bytes in write_buffers", conn->conn_id, total_bytes);
+
     if (i >0) {
         size_t num_buffers_written = pn_raw_connection_write_buffers(session_data->conn->pn_raw_conn, raw_buffers, num_buffs);
+        qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Written %i buffer(s) and %i bytes in pn_raw_connection_write_buffers", conn->conn_id, num_buffers_written, total_bytes);
         if (num_buffs != num_buffers_written) {
             assert(false);
         }
     }
-
-
 }
 
 //static void send_window_update_frame(qdr_http2_session_data_t *session_data, int32_t stream_id)
@@ -448,7 +485,9 @@ static void send_settings_frame(qdr_http_connection_t *conn)
     int rv = nghttp2_submit_settings(session_data->session, NGHTTP2_FLAG_NONE, iv, ARRLEN(iv));
     if (rv != 0) {
         qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Fatal error sending settings frame, rv=%i", conn->conn_id, rv);
+        return;
     }
+    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Initial SETTINGS frame sent", conn->conn_id);
     nghttp2_session_send(session_data->session);
     write_buffers(session_data->conn);
 }
@@ -465,38 +504,62 @@ static int on_frame_recv_callback(nghttp2_session *session,
 
     switch (frame->hd.type) {
     case NGHTTP2_SETTINGS: {
-        qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 SETTINGS frame received", conn->conn_id, stream_id);
+        qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 SETTINGS frame received", conn->conn_id, stream_id);
     }
     break;
     case NGHTTP2_WINDOW_UPDATE:
-        qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 WINDOW_UPDATE frame received", conn->conn_id, stream_id);
+        qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 WINDOW_UPDATE frame received", conn->conn_id, stream_id);
     break;
     case NGHTTP2_DATA: {
-        qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 DATA frame received", conn->conn_id, stream_id);
+        qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] NGHTTP2_DATA frame received", conn->conn_id, stream_id);
         if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
-            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 END_STREAM flag received", conn->conn_id, stream_id);
-            MSG_CONTENT(stream_data->message)->receive_complete = true;
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] NGHTTP2_DATA NGHTTP2_FLAG_END_STREAM flag received, receive_complete = true", conn->conn_id, stream_id);
+            qd_message_set_receive_complete(stream_data->message);
+        }
+
+        if (stream_data->in_dlv) {
+            if (!stream_data->body) {
+                stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+                qd_compose_insert_binary(stream_data->body, 0, 0);
+                qd_message_extend(stream_data->message, stream_data->body);
+            }
         }
+
         if (stream_data->in_dlv) {
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] NGHTTP2_DATA frame received, qdr_delivery_continue(dlv=%lx)", conn->conn_id, stream_id, (long) stream_data->in_dlv);
             qdr_delivery_continue(http_adaptor->core, stream_data->in_dlv, false);
         }
     }
     break;
     case NGHTTP2_HEADERS:{
-        qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 HEADERS frame received", conn->conn_id, stream_id);
+        qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 HEADERS frame received", conn->conn_id, stream_id);
         if (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS) {
             /* All the headers have been received. Send out the AMQP message */
-            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 END_HEADERS flag received", conn->conn_id, stream_id);
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 NGHTTP2_FLAG_END_HEADERS flag received, all headers have arrived", conn->conn_id, stream_id);
             stream_data->entire_header_arrived = true;
-            // All header fields have been received
-            // End the map.
+            //
+            // All header fields have been received. End the application properties map.
+            //
+            qd_compose_end_map(stream_data->app_properties);
+
             bool receive_complete = false;
             if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
+                qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 NGHTTP2_FLAG_END_HEADERS and NGHTTP2_FLAG_END_STREAM flag received, receive_complete=true", conn->conn_id, stream_id);
+                qd_message_set_receive_complete(stream_data->message);
                 receive_complete = true;
             }
 
-            MSG_CONTENT(stream_data->message)->receive_complete = receive_complete;
-            link_deliver(stream_data, receive_complete);
+            //
+            // All headers have arrived, send out the delivery with just the headers,
+            // if/when the body arrives later, we will call the qdr_delivery_continue()
+            //
+            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] All headers arrived, trying to route delivery", conn->conn_id);
+            if (route_delivery(stream_data, receive_complete)) {
+                qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] All headers arrived, delivery routed successfully", conn->conn_id);
+            }
+            else {
+                qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] All headers arrived, delivery not routed", conn->conn_id);
+            }
         }
     }
     break;
@@ -518,6 +581,7 @@ qdr_http_connection_t *qdr_http_connection_ingress(qd_http_lsnr_t* listener)
 
     ingress_http_conn->session_data = new_qdr_http2_session_data_t();
     ZERO(ingress_http_conn->session_data);
+    DEQ_INIT(ingress_http_conn->session_data->buffs);
     DEQ_INIT(ingress_http_conn->session_data->streams);
     ingress_http_conn->session_data->conn = ingress_http_conn;
 
@@ -621,7 +685,13 @@ static void qdr_http_second_attach(void *context, qdr_link_t *link,
         if (qdr_link_direction(link) == QD_OUTGOING && source->dynamic) {
             if (stream_data->session_data->conn->ingress) {
                 qdr_copy_reply_to(stream_data, qdr_terminus_get_address(source));
-                link_deliver(stream_data, MSG_CONTENT(stream_data->message)->receive_complete);
+                qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to is available now, trying to route delivery", stream_data->session_data->conn->conn_id);
+                if (route_delivery(stream_data, qd_message_receive_complete(stream_data->message))) {
+                    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to available now, delivery routed successfully", stream_data->session_data->conn->conn_id);
+                }
+                else {
+                    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i] Reply-to available now, delivery not routed", stream_data->session_data->conn->conn_id);
+                }
                 grant_read_buffers(stream_data->session_data->conn);
             }
             qdr_link_flow(http_adaptor->core, link, 10, false);
@@ -671,10 +741,168 @@ ssize_t read_callback(nghttp2_session *session,
                                   nghttp2_data_source *source,
                                   void *user_data)
 {
-    qd_buffer_t *qd_buff = source->ptr;
-    buf = qd_buffer_base(qd_buff);
-    size_t ret_val = qd_buffer_size(qd_buff);
-    return ret_val;
+    qdr_link_t *link = source->ptr;
+    qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+    qdr_http2_session_data_t *session_data = conn->session_data;
+    qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
+
+    qd_message_depth_status_t status = qd_message_check_depth(stream_data->message, QD_DEPTH_BODY);
+
+    write_buffers(session_data->conn);
+
+    switch (status) {
+    case QD_MESSAGE_DEPTH_OK: {
+        //
+        // At least one complete body performative has arrived.  It is now safe to switch
+        // over to the per-message extraction of body-data segments.
+        //
+        qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_DEPTH_OK", conn->conn_id, stream_data->stream_id);
+        qd_message_body_data_t        *body_data = 0;
+        qd_message_body_data_result_t  body_data_result;
+
+        //
+        // Process as many body-data segments as are available.
+        //
+        int buff_offset = 0;
+        body_data = stream_data->curr_body_data;
+        if (body_data) {
+            //
+            // If we saved the body_data, use the buff_offset.
+            //
+            body_data_result = stream_data->curr_body_data_result;
+            buff_offset = stream_data->curr_body_data_buff_offset;
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback Use existing body_data", conn->conn_id, stream_data->stream_id);
+        }
+        else {
+            body_data_result = qd_message_next_body_data(stream_data->message, &body_data);
+            if (stream_data->curr_body_data) {
+                qd_message_body_data_release(stream_data->curr_body_data);
+            }
+            stream_data->curr_body_data = body_data;
+            stream_data->curr_body_data_result = body_data_result;
+            stream_data->curr_body_data_buff_offset = 0;
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback qd_message_next_body_data", conn->conn_id, stream_data->stream_id);
+        }
+
+        switch (body_data_result) {
+        case QD_MESSAGE_BODY_DATA_OK: {
+            //
+            // We have a new valid body-data segment.  Handle it
+            //
+            stream_data->body_data_buff_count = qd_message_body_data_buffer_count(body_data);
+
+            size_t pn_buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
+
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback pn_raw_connection_write_buffers_capacity=%zu", conn->conn_id, stream_data->stream_id, pn_buffs_to_write);
+
+            if (stream_data->body_data_buff_count == 0 || pn_buffs_to_write==0) {
+                // We cannot send anything, we need to come back here.
+
+                //TODO - This will not pass code review. Need to investigate.
+                link->credit_to_core = 0;
+                qdr_link_flow(http_adaptor->core, link, 1, false);
+                if (stream_data->body_data_buff_count == 0) {
+                    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Exiting read_callback QD_MESSAGE_BODY_DATA_OK, body_data_buff_count=0, temporarily pausing stream", conn->conn_id, stream_data->stream_id);
+                    qd_message_body_data_release(stream_data->curr_body_data);
+                    stream_data->curr_body_data = 0;
+                }
+                if (pn_buffs_to_write == 0)
+                    qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Exiting read_callback, pn_buffs_to_write=0, pausing stream", conn->conn_id, stream_data->stream_id);
+
+                //
+                // We don't have any buffers to send but we may or may not get more buffers.
+                // Temporarily pause this stream
+                //
+
+                stream_data->disposition = 0;
+                return NGHTTP2_ERR_DEFERRED;
+            }
+
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_OK, body_data_buff_count=%i", conn->conn_id, stream_data->stream_id, stream_data->body_data_buff_count);
+
+            //
+            // We are looking to write only one pn_raw_buffer_t per iteration.
+            //
+            pn_raw_buffer_t raw_buffers[1];
+            qd_message_body_data_buffers(body_data, raw_buffers, buff_offset, 1);
+            stream_data->curr_body_data_buff_offset += 1;
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback, size of raw_buffer=%zu", conn->conn_id, stream_data->stream_id, raw_buffers[0].size);
+            memcpy(buf, raw_buffers[0].bytes, raw_buffers[0].size);
+            stream_data->body_data_buff_count -= 1;
+            if (!stream_data->body_data_buff_count) {
+                qd_message_body_data_release(stream_data->curr_body_data);
+                stream_data->curr_body_data = 0;
+            }
+            return raw_buffers[0].size;
+        }
+
+        case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+            //
+            // A new segment has not completely arrived yet.  Check again later.
+            //
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id);
+            return 0;
+
+        case QD_MESSAGE_BODY_DATA_NO_MORE: {
+            //
+            // We have already handled the last body-data segment for this delivery.
+            // Complete the "sending" of this delivery and replenish credit.
+            //
+            size_t pn_buffs_to_write = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
+            if (pn_buffs_to_write == 0) {
+                return NGHTTP2_ERR_DEFERRED;
+                stream_data->disposition = 0;
+                qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_NO_MORE - pn_buffs_to_write=0 send is not complete", conn->conn_id, stream_data->stream_id);
+            }
+            else {
+                qd_message_body_data_release(stream_data->curr_body_data);
+                *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+                qd_message_set_send_complete(stream_data->message);
+                stream_data->disposition = PN_ACCEPTED; // This will cause the delivery to be settled
+                qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_NO_MORE - send is complete, setting NGHTTP2_DATA_FLAG_EOF", conn->conn_id, stream_data->stream_id);
+            }
+
+            qdr_link_flow(http_adaptor->core, link, 1, false);
+            break;
+        }
+
+        case QD_MESSAGE_BODY_DATA_INVALID:
+            //
+            // The body-data is corrupt in some way.  Stop handling the delivery and reject it.
+            //
+            *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+            qd_message_body_data_release(stream_data->curr_body_data);
+            qdr_link_flow(http_adaptor->core, link, 1, false);
+            stream_data->disposition = PN_REJECTED;
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_INVALID", conn->conn_id, stream_data->stream_id);
+            break;
+
+        case QD_MESSAGE_BODY_DATA_NOT_DATA:
+            //
+            // Valid data was seen, but it is not a body-data performative.  Reject the delivery.
+            //
+            *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+            qd_message_body_data_release(stream_data->curr_body_data);
+            qdr_link_flow(http_adaptor->core, link, 1, false);
+            stream_data->disposition = PN_REJECTED;
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_callback QD_MESSAGE_BODY_DATA_NOT_DATA", conn->conn_id, stream_data->stream_id);
+            break;
+        }
+        break;
+    }
+
+    case QD_MESSAGE_DEPTH_INVALID:
+        qdr_link_flow(http_adaptor->core, link, 1, false);
+        qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_callback QD_MESSAGE_DEPTH_INVALID", conn->conn_id, stream_data->stream_id);
+        stream_data->disposition = PN_REJECTED;
+        break;
+
+    case QD_MESSAGE_DEPTH_INCOMPLETE:
+        break;
+    }
+
+    qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_callback Returning zero", conn->conn_id, stream_data->stream_id);
+    return 0;
 }
 
 uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *link)
@@ -682,12 +910,12 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *
     qdr_http2_session_data_t *session_data = stream_data->session_data;
     qdr_http_connection_t *conn = session_data->conn;
     qd_message_t *message = stream_data->message;
-
     if (stream_data->out_dlv) {
 
-        if (!stream_data->header_sent) {
-            stream_data->header_sent = true;
+        if (qd_message_send_complete(stream_data->message))
+            return 0;
 
+        if (!stream_data->header_sent) {
             // The HTTP Path is in the AMQP to field.
             //qd_iterator_t *to = qd_message_field_iterator(message, QD_FIELD_TO);
             //char *path = (char *)qd_iterator_copy(to);
@@ -718,20 +946,8 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *
                 hdrs[idx].namelen = qd_iterator_length(key_raw);
                 hdrs[idx].valuelen = qd_iterator_length(val_raw);
                 hdrs[idx].flags = NGHTTP2_NV_FLAG_NONE;
-                qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%i] HTTP2 Outgoing HEADER [%s=%s]", conn->conn_id, stream_id, (char *)hdrs[idx].name, (char *)hdrs[idx].value);
             }
 
-            /*
-             * case NGHTTP2_HEADERS
-             * case NGHTTP2_PRIORITY
-             * case NGHTTP2_RST_STREAM
-             * case NGHTTP2_SETTINGS
-             * case NGHTTP2_PUSH_PROMISE
-             * case NGHTTP2_PING
-             * case NGHTTP2_CONTINUATION
-             * case NGHTTP2_GOAWAY
-             * case NGHTTP2_WINDOW_UPDATE
-             */
             // This does not really submit the request. We need to read the bytes
             //nghttp2_session_set_next_stream_id(session_data->session, stream_data->stream_id);
             stream_data->stream_id = nghttp2_submit_headers(session_data->session,
@@ -739,60 +955,47 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *
                                                             stream_id, NULL, hdrs,
                                                             count,
                                                             stream_data);
+            if (stream_id != -1) {
+                stream_data->stream_id = stream_id;
+            }
+
+            for (uint32_t idx = 0; idx < count; idx++) {
+                qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] HTTP2 HEADER Outgoing [%s=%s]", conn->conn_id, stream_data->stream_id, (char *)hdrs[idx].name, (char *)hdrs[idx].value);
+            }
 
             nghttp2_session_send(session_data->session);
             write_buffers(session_data->conn);
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Headers submitted", conn->conn_id, stream_data->stream_id);
+        }
+        else {
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Headers already submitted, Proceeding with the body", conn->conn_id, stream_data->stream_id);
+        }
 
+        if (stream_data->header_sent) {
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Stream was paused, resuming now", conn->conn_id, stream_data->stream_id);
+            nghttp2_session_resume_data(session_data->session, stream_data->stream_id);
+            nghttp2_session_send(session_data->session);
+            write_buffers(session_data->conn);
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] nghttp2_session_send - stream resumed, write_buffers done", conn->conn_id, stream_data->stream_id);
         }
+        else {
+            qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] Processing message body", conn->conn_id, stream_data->stream_id);
+            conn->data_prd.read_callback = read_callback;
+            conn->data_prd.source.ptr = link;
+            int rv = nghttp2_submit_data(session_data->session, NGHTTP2_FLAG_END_STREAM, stream_data->stream_id, &conn->data_prd);
+            if (rv != 0) {
+                qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] Error submitting data rv=%i", conn->conn_id, stream_data->stream_id, rv);
+            }
+            else {
+                nghttp2_session_send(session_data->session);
+                write_buffers(session_data->conn);
+                qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] nghttp2_session_send - write_buffers done", conn->conn_id, stream_data->stream_id);
+            }
 
-//        qd_message_body_data_t *out_body_data = 0;
-//        qd_message_body_data_result_t body_data_result = qd_message_next_body_data(message, &out_body_data);
-//        bool has_out_body_data = false;
-//
-//        char *body = 0;
-//        has_out_body_data = true;
-//        switch (body_data_result) {
-//        case QD_MESSAGE_BODY_DATA_OK: {
-//            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%i] HTTP2 Outgoing QD_MESSAGE_BODY_DATA_OK", conn->conn_id, stream_data->stream_id);
-//            qd_iterator_t *body_iter = qd_message_body_data_iterator(out_body_data);
-//            body = (char*) qd_iterator_copy(body_iter);
-//            if (body) {
-//                conn->data_prd.source.ptr = body;
-//                printf("handle_outgoing_http: message body-data received: %s\n", body);
-//                nghttp2_submit_data(session_data->session, 0, stream_data->stream_id, &conn->data_prd);
-//            }
-//            free(body);
-//            qd_iterator_free(body_iter);
-//            break;
-//        }
-//
-//        case QD_MESSAGE_BODY_DATA_INCOMPLETE:
-//            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%i] HTTP2 Outgoing QD_MESSAGE_BODY_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id);
-//            break;
-//
-//        case QD_MESSAGE_BODY_DATA_NO_MORE: {
-//            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%i] HTTP2 Outgoing QD_MESSAGE_BODY_DATA_NO_MORE", conn->conn_id, stream_data->stream_id);
-//            qd_message_set_send_complete(message);
-//            qdr_link_flow(http_adaptor->core, link, 1, false);
-//            nghttp2_submit_data(session_data->session, NGHTTP2_FLAG_END_STREAM, stream_data->stream_id, &conn->data_prd);
-//            return PN_ACCEPTED; // This will cause the delivery to be settled
-//        }
-//
-//        case QD_MESSAGE_BODY_DATA_INVALID:
-//            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%i] HTTP2 Outgoing QD_MESSAGE_BODY_DATA_INVALID", conn->conn_id, stream_data->stream_id);
-//            qdr_link_flow(http_adaptor->core, link, 1, false);
-//            return PN_REJECTED;
-//
-//        case QD_MESSAGE_BODY_DATA_NOT_DATA:
-//            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%i] HTTP2 Outgoing QD_MESSAGE_BODY_DATA_NOT_DATA", conn->conn_id, stream_data->stream_id);
-//            qdr_link_flow(http_adaptor->core, link, 1, false);
-//            return PN_REJECTED;
-//
-//        } // switch
-//
-//        if (has_out_body_data)
-//            nghttp2_session_send(session_data->session);
+        }
+        stream_data->header_sent = true;
 
+        return stream_data->disposition;
     }
     return 0;
 }
@@ -812,8 +1015,6 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
 
         stream_data->message = qdr_delivery_message(delivery);
         stream_data->out_dlv = delivery;
-        conn->initial_stream = stream_data;
-
         qdr_terminus_t *source = qdr_terminus(0);
         qdr_terminus_set_address(source, conn->config->address);
         stream_data->out_link = qdr_link_first_attach(conn->qdr_conn,
@@ -825,15 +1026,12 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
                                                      true,
                                                      delivery,
                                                      &(stream_data->outgoing_id));
-        conn->initial_stream->out_link = stream_data->out_link;
         qdr_link_set_context(stream_data->out_link, stream_data);
         qd_iterator_t *fld_iter = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO);
         char *reply_to = (char *)qd_iterator_copy(fld_iter);
         stream_data->reply_to = malloc(qd_iterator_length(fld_iter) + 1);
         strcpy(stream_data->reply_to, reply_to);
 
-        printf ("qdr_http_deliver if stream_dispatcher reply_to is %s\n", (char *)reply_to);
-
         // Sender link
         qdr_terminus_t *target = qdr_terminus(0);
         qdr_terminus_set_address(target, reply_to);
@@ -849,7 +1047,6 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
 
         // TODO - This is wrong
         qdr_link_set_context(stream_data->in_link, stream_data);
-        printf ("qdr_http_deliver stream_data->session_data->conn->stream_dispatcher %p\n", (void *)stream_data->in_link);
 
         //Let's make an outbound connection to the configured connector.
         qdr_http_connection_t *conn = stream_data->session_data->conn;
@@ -867,6 +1064,7 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
             }
             return handle_outgoing_http(stream_data, link);
         }
+        qdr_link_flow(http_adaptor->core, link, 1, false);
     }
     return 0;
 }
@@ -893,7 +1091,7 @@ static int handle_incoming_http(qdr_http_connection_t *conn)
             qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
             uint32_t raw_buff_size = raw_buffers[i].size;
             qd_buffer_insert(buf, raw_buff_size);
-            qd_log(http_adaptor->log_source, QD_LOG_DEBUG, "[C%i] Inserting qd_buffer of size %"PRIu32" ", conn->conn_id, raw_buff_size);
+            qd_log(http_adaptor->log_source, QD_LOG_DEBUG, "[C%i] - handle_incoming_http - Inserting qd_buffer of size %"PRIu32" ", conn->conn_id, raw_buff_size);
             count += raw_buffers[i].size;
             DEQ_INSERT_TAIL(buffers, buf);
         }
@@ -977,27 +1175,26 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
         } else {
             qd_log(log, QD_LOG_INFO, "[C%i] Connected", conn->conn_id);
             conn->connection_established = true;
-            handle_outgoing_http(conn->initial_stream, conn->initial_stream->out_link);
             qdr_connection_process(conn->qdr_conn);
         }
         break;
     }
     case PN_RAW_CONNECTION_CLOSED_READ: {
-        qd_log(log, QD_LOG_INFO, "[C%i] Closed for reading", conn->conn_id);
+        qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
         break;
     }
     case PN_RAW_CONNECTION_CLOSED_WRITE: {
-        qd_log(log, QD_LOG_INFO, "[C%i] Closed for writing", conn->conn_id);
+        qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id);
         break;
     }
     case PN_RAW_CONNECTION_DISCONNECTED: {
         qdr_connection_closed(conn->qdr_conn);
         //free_qdr_http_connection(conn);
-        qd_log(log, QD_LOG_INFO, "[C%i] Disconnected", conn->conn_id);
+        qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_DISCONNECTED", conn->conn_id);
         break;
     }
     case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
-        qd_log(log, QD_LOG_INFO, "[C%i] Need write buffers", conn->conn_id);
+        qd_log(log, QD_LOG_TRACE, "[C%i] Need write buffers", conn->conn_id);
 
         //TODO - Refactor this out
         //while (qdr_connection_process(conn->qdr_conn)) {}
@@ -1005,20 +1202,20 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
     }
     case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
         grant_read_buffers(conn);
-        qd_log(log, QD_LOG_INFO, "[C%i] Need read buffers", conn->conn_id);
+        qd_log(log, QD_LOG_TRACE, "[C%i] Need read buffers", conn->conn_id);
 
         //TODO - No need to call this
         //while (qdr_connection_process(conn->qdr_conn)) {}
         break;
     }
     case PN_RAW_CONNECTION_WAKE: {
-        qd_log(log, QD_LOG_INFO, "[C%i] Wake-up", conn->conn_id);
+        qd_log(log, QD_LOG_TRACE, "[C%i] Wake-up", conn->conn_id);
         while (qdr_connection_process(conn->qdr_conn)) {}
         break;
     }
     case PN_RAW_CONNECTION_READ: {
         int read = handle_incoming_http(conn);
-        qd_log(log, QD_LOG_INFO, "[C%i] Read %i bytes", conn->conn_id, read);
+        qd_log(log, QD_LOG_TRACE, "[C%i] Read %i bytes", conn->conn_id, read);
         while (qdr_connection_process(conn->qdr_conn)) {}
         break;
     }
@@ -1028,13 +1225,14 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
         size_t written = 0;
         while ( (n = pn_raw_connection_take_written_buffers(conn->pn_raw_conn, buffs, WRITE_BUFFERS)) ) {
             for (size_t i = 0; i < n; ++i) {
+                written += buffs[i].size;
                 qd_buffer_t *qd_buff = (qd_buffer_t *) buffs[i].context;
                 assert(qd_buff);
                 if (qd_buff)
                     qd_buffer_free(qd_buff);
             }
         }
-        qd_log(log, QD_LOG_INFO, "[C%i] Wrote %i bytes", conn->conn_id, written);
+        qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_WRITTEN Wrote %i bytes", conn->conn_id, written);
         while (qdr_connection_process(conn->qdr_conn)) {}
         break;
     }
@@ -1063,6 +1261,7 @@ static void handle_listener_event(pn_event_t *e, qd_server_t *qd_server, void *c
         break;
 
         case PN_LISTENER_CLOSE:
+            qd_log(log, QD_LOG_INFO, "Closing HTTP connection on %s", host_port);
             break;
 
         default:
@@ -1131,6 +1330,7 @@ qdr_http_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector
 
     egress_conn->session_data = new_qdr_http2_session_data_t();
     ZERO(egress_conn->session_data);
+    DEQ_INIT(egress_conn->session_data->buffs);
     DEQ_INIT(egress_conn->session_data->streams);
     egress_conn->session_data->conn = egress_conn;
 
diff --git a/src/adaptors/http_adaptor.h b/src/adaptors/http_adaptor.h
index 2f0134a..cfbdf73 100644
--- a/src/adaptors/http_adaptor.h
+++ b/src/adaptors/http_adaptor.h
@@ -50,6 +50,7 @@ struct qdr_http2_stream_data_t {
     qdr_delivery_t           *out_dlv;
     uint64_t                  incoming_id;
     uint64_t                  outgoing_id;
+    uint64_t                  disposition;
 
     qdr_link_t               *in_link;
     qdr_link_t               *out_link;
@@ -58,8 +59,16 @@ struct qdr_http2_stream_data_t {
     qd_composed_field_t      *field;
     qd_composed_field_t      *header_properties;  // This has the header and the properties.
     qd_composed_field_t      *app_properties;     // This has the application properties.
-    bool                     entire_header_arrived; // true if all the header properties has arrived, just before the start of the data frame or just before the end stream.
+    qd_composed_field_t      *body;
+
+    qd_message_body_data_t        *curr_body_data;
+    qd_message_body_data_result_t  curr_body_data_result;
+    int                            curr_body_data_buff_offset;
+    int                            body_data_buff_count;
+
+    bool                     entire_header_arrived; // true if all the header properties have arrived, just before the start of the data frame or just before the END_STREAM.
     bool                     header_sent;
+    bool                     has_body;
     bool                     has_data;  // Did we ever receive a DATA frame.
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org