You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2021/03/17 14:50:37 UTC

[qpid-dispatch] branch master updated: DISPATCH-1979: Added code to capture the footer of the GRPC message in case it arrives before credit is available to send the message. Also code to create new nghttp2 session when a new connection is opened. This closes #1057.

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fa6135  DISPATCH-1979: Added code to capture the footer of the GRPC message in case it arrives before credit is available to send the message. Also code to create new nghttp2 session when a new connection is opened. This closes #1057.
3fa6135 is described below

commit 3fa6135dfba7926b966ad3c1baa567002e22a9a6
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Sun Feb 14 19:58:41 2021 -0500

    DISPATCH-1979: Added code to capture the footer of the GRPC message in case it arrives before credit is available to send the message. Also code to create new nghttp2 session when a new connection is opened. This closes #1057.
---
 include/qpid/dispatch/message.h    |   1 +
 src/adaptors/http2/http2_adaptor.c | 173 +++++++++++++++++++++++++++++--------
 src/adaptors/http2/http2_adaptor.h |   1 -
 src/message.c                      |  17 ++++
 4 files changed, 155 insertions(+), 37 deletions(-)

diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 1558dde..a580701 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -303,6 +303,7 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b
 void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content, bool receive_complete);
 void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, bool receive_complete);
 void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3, bool receive_complete);
+void qd_message_compose_5(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, qd_composed_field_t *field3, qd_composed_field_t *field4, bool receive_complete);
 
 /**
  * qd_message_extend
diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index db7261c..5735b5c 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -69,6 +69,18 @@ static qdr_http2_adaptor_t *http2_adaptor;
 
 static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void *context);
 static void _http_record_request(qdr_http2_connection_t *conn, qdr_http2_stream_data_t *stream_data);
+static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool on_shutdown);
+
+static void free_all_connection_streams(qdr_http2_connection_t *http_conn, bool on_shutdown)
+{
+    // Free all the stream data associated with this connection/session.
+    qdr_http2_stream_data_t *stream_data = DEQ_HEAD(http_conn->session_data->streams);
+    while (stream_data) {
+        qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Freeing stream in free_qdr_http2_connection", stream_data->session_data->conn->conn_id, stream_data->stream_id);
+        free_http2_stream_data(stream_data, on_shutdown);
+        stream_data = DEQ_HEAD(http_conn->session_data->streams);
+    }
+}
 
 static void set_stream_data_delivery_flags(qdr_http2_stream_data_t * stream_data, qdr_delivery_t *dlv) {
     if (dlv == stream_data->in_dlv) {
@@ -345,12 +357,7 @@ static char *get_address_string(pn_raw_connection_t *pn_raw_conn)
 void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdown)
 {
     // Free all the stream data associated with this connection/session.
-    qdr_http2_stream_data_t *stream_data = DEQ_HEAD(http_conn->session_data->streams);
-    while (stream_data) {
-        qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Freeing stream in free_qdr_http2_connection", stream_data->session_data->conn->conn_id, stream_data->stream_id);
-        free_http2_stream_data(stream_data, on_shutdown);
-        stream_data = DEQ_HEAD(http_conn->session_data->streams);
-    }
+    free_all_connection_streams(http_conn, on_shutdown);
 
     if(http_conn->remote_address) {
         free(http_conn->remote_address);
@@ -368,9 +375,11 @@ void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdo
 
     http_conn->context.context = 0;
 
-    nghttp2_session_del(http_conn->session_data->session);
+    if (http_conn->session_data->session)
+        nghttp2_session_del(http_conn->session_data->session);
 
     free_qdr_http2_session_data_t(http_conn->session_data);
+    http_conn->session_data = 0;
     sys_mutex_lock(http2_adaptor->lock);
     DEQ_REMOVE(http2_adaptor->connections, http_conn);
     sys_mutex_unlock(http2_adaptor->lock);
@@ -528,14 +537,13 @@ static int snd_data_callback(nghttp2_session *session,
     qdr_http2_session_data_t *session_data = conn->session_data;
     qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t *)source->ptr;
 
-    qd_http2_buffer_t *http2_buff = qd_http2_buffer();
-    DEQ_INSERT_TAIL(session_data->buffs, http2_buff);
-    // Insert the framehd of length 9 bytes into the buffer
-    memcpy(qd_http2_buffer_cursor(http2_buff), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH);
-    qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH);
-
     int bytes_sent = 0; // This should not include the header length of 9.
     if (length) {
+        qd_http2_buffer_t *http2_buff = qd_http2_buffer();
+        DEQ_INSERT_TAIL(session_data->buffs, http2_buff);
+        // Insert the framehd of length 9 bytes into the buffer
+        memcpy(qd_http2_buffer_cursor(http2_buff), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH);
+        qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH);
         pn_raw_buffer_t pn_raw_buffs[stream_data->qd_buffers_to_send];
         qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, 0, stream_data->qd_buffers_to_send);
 
@@ -574,11 +582,10 @@ static int snd_data_callback(nghttp2_session *session,
 
     qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 snd_data_callback finished, length=%zu, bytes_sent=%i, stream_data=%p", conn->conn_id, stream_data->stream_id, length, bytes_sent, (void *)stream_data);
 
-    if (length)
+    if (length) {
         assert(bytes_sent == length);
-
-    write_buffers(conn);
-
+        write_buffers(conn);
+    }
 
     return 0;
 
@@ -745,16 +752,37 @@ static bool compose_and_deliver(qdr_http2_connection_t *conn, qdr_http2_stream_d
                 qd_compose_insert_binary(stream_data->body, 0, 0);
                 qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Inserting empty body data in compose_and_deliver", conn->conn_id, stream_data->stream_id);
             }
-            qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete);
+
+            if (stream_data->footer_properties) {
+                qd_message_compose_5(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, stream_data->footer_properties, receive_complete);
+            }
+            else {
+                qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete);
+            }
         }
         else {
             if (stream_data->body) {
                 qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] receive_complete = false and has stream_data->body in compose_and_deliver", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity);
-                qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete);
+                if (stream_data->footer_properties) {
+                    qd_message_compose_5(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, stream_data->footer_properties, receive_complete);
+                }
+                else {
+                    qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete);
+                }
                 stream_data->body_data_added = true;
             }
             else {
-                qd_message_compose_3(stream_data->message, header_and_props, stream_data->app_properties, receive_complete);
+
+                if (stream_data->footer_properties) {
+                    //
+                    // The footer has already arrived but there was no body. Insert an empty body
+                    //
+                    stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+                    qd_message_compose_5(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, stream_data->footer_properties, receive_complete);
+                }
+                else {
+                    qd_message_compose_3(stream_data->message, header_and_props, stream_data->app_properties, receive_complete);
+                }
             }
         }
 
@@ -990,8 +1018,15 @@ static int on_frame_recv_callback(nghttp2_session *session,
             }
 
             if (stream_data->entire_footer_arrived) {
-                qdr_delivery_continue(http2_adaptor->core, stream_data->in_dlv, false);
-                qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Entire footer arrived, qdr_delivery_continue "DLV_FMT, conn->conn_id, stream_id, DLV_ARGS(stream_data->in_dlv));
+                if (stream_data->in_dlv) {
+                    qdr_delivery_continue(http2_adaptor->core, stream_data->in_dlv, false);
+                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Entire footer arrived, qdr_delivery_continue "DLV_FMT, conn->conn_id, stream_id, DLV_ARGS(stream_data->in_dlv));
+                }
+                else {
+                    if (route_delivery(stream_data, receive_complete)) {
+                        qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Entire footer arrived, delivery routed successfully (on_frame_recv_callback)", conn->conn_id, stream_id);
+                    }
+                }
             }
             else {
                 //
@@ -1351,6 +1386,25 @@ static int qdr_http_get_credit(void *context, qdr_link_t *link)
 }
 
 
+ssize_t error_read_callback(nghttp2_session *session,
+                      int32_t stream_id,
+                      uint8_t *buf,
+                      size_t length,
+                      uint32_t *data_flags,
+                      nghttp2_data_source *source,
+                      void *user_data)
+{
+    size_t len = 0;
+    char *error_msg = (char *) source->ptr;
+    if (error_msg) {
+        len  = strlen(error_msg);
+        if (len > 0)
+            memcpy(buf, error_msg, len);
+    }
+    *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+    return len;
+}
+
 static void qdr_http_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
 {
     qdr_http2_stream_data_t* stream_data = qdr_delivery_get_context(dlv);
@@ -1377,31 +1431,49 @@ static void qdr_http_delivery_update(void *context, qdr_delivery_t *dlv, uint64_
     }
 
     if (settled) {
-        nghttp2_nv hdrs[1];
+        nghttp2_nv hdrs[2];
         if (conn->ingress && (disp == PN_RELEASED || disp == PN_MODIFIED || disp == PN_REJECTED)) {
             if (disp == PN_RELEASED || disp == PN_MODIFIED) {
+                uint8_t * error_msg = (uint8_t *)"Service Unavailable";
                 hdrs[0].name = (uint8_t *)":status";
                 hdrs[0].value = (uint8_t *)"503";
                 hdrs[0].namelen = 7;
                 hdrs[0].valuelen = 3;
                 hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
-                nghttp2_submit_response(stream_data->session_data->session, stream_data->stream_id, hdrs, 1, 0);
+
+                hdrs[1].name = (uint8_t *)":content-type";
+                hdrs[1].value = (uint8_t *)"text/plain";
+                hdrs[1].namelen = 13;
+                hdrs[1].valuelen = 10;
+                hdrs[1].flags = NGHTTP2_NV_FLAG_NONE;
+
+                nghttp2_data_provider data_prd;
+                data_prd.read_callback = error_read_callback;
+                data_prd.source.ptr = error_msg;
+
+                nghttp2_submit_response(stream_data->session_data->session, stream_data->stream_id, hdrs, 2, &data_prd);
+                nghttp2_submit_goaway(stream_data->session_data->session, 0, stream_data->stream_id, NGHTTP2_CONNECT_ERROR, error_msg, 19);
             }
             else if (disp == PN_REJECTED) {
+                uint8_t * error_msg = (uint8_t *)"Resource Unavailable";
                 hdrs[0].name = (uint8_t *)":status";
                 hdrs[0].value = (uint8_t *)"400";
                 hdrs[0].namelen = 7;
                 hdrs[0].valuelen = 3;
                 hdrs[0].flags = NGHTTP2_NV_FLAG_NONE;
-                nghttp2_submit_response(stream_data->session_data->session, stream_data->stream_id, hdrs, 1, 0);
-            }
 
-            //
-            // Send a GOAWAY frame on the client/ingress connection.
-            // The GOAWAY frame is used to initiate shutdown of a connection or to signal serious error conditions.
-            //
-            nghttp2_submit_goaway(stream_data->session_data->session, 0, stream_data->stream_id, NGHTTP2_CONNECT_ERROR, (uint8_t *)"Service Unavailable", 19);
+                hdrs[1].name = (uint8_t *)":content-type";
+                hdrs[1].value = (uint8_t *)"text/plain";
+                hdrs[1].namelen = 13;
+                hdrs[1].valuelen = 10;
+                hdrs[1].flags = NGHTTP2_NV_FLAG_NONE;
+
+                nghttp2_data_provider data_prd;
+                data_prd.read_callback = error_read_callback;
+                data_prd.source.ptr = error_msg;
 
+                nghttp2_submit_response(stream_data->session_data->session, stream_data->stream_id, hdrs, 2, &data_prd);
+            }
         }
 
         if (!conn->ingress && (disp == PN_RELEASED || disp == PN_MODIFIED || disp == PN_REJECTED)) {
@@ -1908,7 +1980,6 @@ static int handle_incoming_http(qdr_http2_connection_t *conn)
             qd_http2_buffer_insert(buf, raw_buff_size);
             count += raw_buff_size;
             DEQ_INSERT_TAIL(buffers, buf);
-            //qd_log(http2_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] handle_incoming_http - Inserting qd_http2_buffer of size %"PRIu32" ", conn->conn_id, raw_buff_size);
         }
     }
 
@@ -2072,10 +2143,37 @@ static void close_connections(qdr_http2_connection_t* conn)
     qdr_action_enqueue(http2_adaptor->core, action);
 }
 
+static void clean_session_data(qdr_http2_connection_t* conn)
+{
+    free_all_connection_streams(conn, false);
+
+    //
+    // This closes the nghttp2 session. Next time when a new connection is opened, a new nghttp2 session
+    // will be created by calling nghttp2_session_client_new
+    //
+    nghttp2_session_del(conn->session_data->session);
+    conn->session_data->session = 0;
+
+    //
+    // Free all the buffers on this session. This session is closed and any unsent buffers should be freed.
+    //
+    qd_http2_buffer_t *buf = DEQ_HEAD(conn->session_data->buffs);
+    qd_http2_buffer_t *curr_buf = 0;
+    while (buf) {
+        curr_buf = buf;
+        DEQ_REMOVE_HEAD(conn->session_data->buffs);
+        buf = DEQ_HEAD(conn->session_data->buffs);
+        free_qd_http2_buffer_t(curr_buf);
+    }
+}
+
 
 static void handle_disconnected(qdr_http2_connection_t* conn)
 {
     sys_mutex_lock(qd_server_get_activation_lock(http2_adaptor->core->qd->server));
+
+    clean_session_data(conn);
+
     if (conn->pn_raw_conn) {
         qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Setting conn->pn_raw_conn=0", conn->conn_id);
         conn->pn_raw_conn = 0;
@@ -2128,11 +2226,11 @@ static void egress_conn_timer_handler(void *context)
 {
     qdr_http2_connection_t* conn = (qdr_http2_connection_t*) context;
 
-    qd_log(http2_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Running egress_conn_timer_handler", conn->conn_id);
-
-    if (conn->connection_established)
+    if (conn->pn_raw_conn || conn->connection_established)
         return;
 
+    qd_log(http2_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Running egress_conn_timer_handler", conn->conn_id);
+
     if (!conn->ingress) {
         qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] - Egress_conn_timer_handler - Trying to establishing outbound connection", conn->conn_id);
         http_connector_establish(conn);
@@ -2249,8 +2347,11 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
             send_settings_frame(conn);
             qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Accepted Ingress ((PN_RAW_CONNECTION_CONNECTED)) from %s", conn->conn_id, conn->remote_address);
         } else {
-            if (!conn->session_data->session)
+            if (!conn->session_data->session) {
                 nghttp2_session_client_new(&conn->session_data->session, (nghttp2_session_callbacks *)http2_adaptor->callbacks, (void *)conn);
+                send_settings_frame(conn);
+                conn->client_magic_sent = true;
+            }
             qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connected Egress (PN_RAW_CONNECTION_CONNECTED)", conn->conn_id);
             conn->connection_established = true;
             create_stream_dispatcher_link(conn);
diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h
index decbfc1..b493136 100644
--- a/src/adaptors/http2/http2_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -64,7 +64,6 @@ struct qdr_http2_session_data_t {
     nghttp2_session             *session;    // A pointer to the nghttp2s' session object
     qd_http2_stream_data_list_t  streams;    // A session can have many streams.
     qd_http2_buffer_list_t       buffs;      // Buffers for writing
-    bool                         max_buffs_in_pool;
 };
 
 struct qdr_http2_stream_data_t {
diff --git a/src/message.c b/src/message.c
index 3508a99..6161275 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2350,6 +2350,23 @@ void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_com
     DEQ_APPEND(content->buffers, (*field3_buffers));
 }
 
+void qd_message_compose_5(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, qd_composed_field_t *field3, qd_composed_field_t *field4, bool receive_complete)
+{
+    qd_message_content_t *content        = MSG_CONTENT(msg);
+    content->receive_complete            = receive_complete;
+    qd_buffer_list_t     *field1_buffers = qd_compose_buffers(field1);
+    qd_buffer_list_t     *field2_buffers = qd_compose_buffers(field2);
+    qd_buffer_list_t     *field3_buffers = qd_compose_buffers(field3);
+    qd_buffer_list_t     *field4_buffers = qd_compose_buffers(field4);
+
+    content->buffers = *field1_buffers;
+    DEQ_INIT(*field1_buffers);
+    DEQ_APPEND(content->buffers, (*field2_buffers));
+    DEQ_APPEND(content->buffers, (*field3_buffers));
+    DEQ_APPEND(content->buffers, (*field4_buffers));
+
+}
+
 
 int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field, bool *q2_blocked)
 {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org