You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/10/08 15:07:57 UTC

[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1743: Fixed issue with two router HTTP2 requests

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
     new 840ee5c  DISPATCH-1743: Fixed issue with two router HTTP2 requests
840ee5c is described below

commit 840ee5cfb7eb35953870d68922d9fff85d4f9890
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Thu Oct 8 11:06:45 2020 -0400

    DISPATCH-1743: Fixed issue with two router HTTP2 requests
---
 src/adaptors/http2/http2_adaptor.c | 73 ++++++++++++++++++++++++--------------
 1 file changed, 46 insertions(+), 27 deletions(-)

diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index b57dd31..b6e4a5c 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -275,6 +275,9 @@ static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool on
         DEQ_REMOVE(session_data->streams, stream_data);
         nghttp2_session_set_stream_user_data(session_data->session, stream_data->stream_id, NULL);
     }
+    if (stream_data->session_data && stream_data->session_data->conn)
+        qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] Freeing stream", stream_data->session_data->conn->conn_id, stream_data->stream_id);
+
     free_qdr_http2_stream_data_t(stream_data);
 }
 
@@ -428,8 +431,6 @@ static int snd_data_callback(nghttp2_session *session,
     qdr_http2_session_data_t *session_data = conn->session_data;
     qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t *)source->ptr;
 
-    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 snd_data_callback length=%zu", conn->conn_id, stream_data->stream_id, length);
-
     qd_http2_buffer_t *http2_buff = qd_http2_buffer();
     DEQ_INSERT_TAIL(session_data->buffs, http2_buff);
     // Insert the framehd of length 9 bytes into the buffer
@@ -444,11 +445,20 @@ static int snd_data_callback(nghttp2_session *session,
         int idx = 0;
         while (idx < stream_data->qd_buffers_to_send) {
             if (pn_raw_buffs[idx].size > 0) {
-                memcpy(qd_http2_buffer_cursor(http2_buff), pn_raw_buffs[idx].bytes, pn_raw_buffs[idx].size);
-                qd_http2_buffer_insert(http2_buff, pn_raw_buffs[idx].size);
-                bytes_sent += pn_raw_buffs[idx].size;
+                int bytes_remaining = length - bytes_sent;
+                if (bytes_remaining > pn_raw_buffs[idx].size) {
+                    memcpy(qd_http2_buffer_cursor(http2_buff), pn_raw_buffs[idx].bytes, pn_raw_buffs[idx].size);
+                    qd_http2_buffer_insert(http2_buff, pn_raw_buffs[idx].size);
+                    bytes_sent += pn_raw_buffs[idx].size;
+                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] snd_data_callback memcpy pn_raw_buffs[%i].size=%zu", conn->conn_id, stream_data->stream_id, idx, pn_raw_buffs[idx].size);
+                }
+                else {
+                    memcpy(qd_http2_buffer_cursor(http2_buff), pn_raw_buffs[idx].bytes, bytes_remaining);
+                    qd_http2_buffer_insert(http2_buff, bytes_remaining);
+                    bytes_sent += bytes_remaining;
+                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] snd_data_callback memcpy bytes_remaining=%i", conn->conn_id, stream_data->stream_id, bytes_remaining);
+                }
                 stream_data->curr_body_data_qd_buff_offset += 1;
-                qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] snd_data_callback memcpy pn_raw_buffs[%i].size=%zu", conn->conn_id, stream_data->stream_id, idx, pn_raw_buffs[idx].size);
             }
             idx += 1;
         }
@@ -457,19 +467,17 @@ static int snd_data_callback(nghttp2_session *session,
         qd_message_body_data_release(stream_data->curr_body_data);
         stream_data->curr_body_data = 0;
         stream_data->curr_body_data_qd_buff_offset = 0;
-        qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] snd_data_callback qd_message_body_data_release", conn->conn_id, stream_data->stream_id);
-
-//        if (stream_data->next_body_data) {
-//            qd_message_body_data_release(stream_data->next_body_data);
-//            stream_data->next_body_data = 0;
-//        }
+        qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] snd_data_callback, full_payload_handled, qd_message_body_data_release", conn->conn_id, stream_data->stream_id);
     }
 
+    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 snd_data_callback finished, length=%zu, bytes_sent=%i, stream_data=%p", conn->conn_id, stream_data->stream_id, length, bytes_sent, (void *)stream_data);
+
     if (length)
         assert(bytes_sent == length);
 
     write_buffers(conn);
 
+
     return 0;
 
 }
@@ -749,7 +757,9 @@ static int on_frame_recv_callback(nghttp2_session *session,
             qdr_delivery_remote_state_updated(http2_adaptor->core, stream_data->out_dlv, stream_data->out_dlv_local_disposition, true, 0, 0, false);
             qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] In on_frame_recv_callback NGHTTP2_DATA QD_STREAM_FULLY_CLOSED, qdr_delivery_remote_state_updated(stream_data->out_dlv)", conn->conn_id, stream_data->stream_id);
             stream_data->disp_updated = true;
+            qdr_delivery_set_context(stream_data->out_dlv, 0);
             stream_data->out_dlv = 0;
+            free_http2_stream_data(stream_data, false);
         }
     }
     break;
@@ -919,8 +929,8 @@ ssize_t read_data_callback(nghttp2_session *session,
             }
 
             stream_data->body_data_buff_count = qd_message_body_data_buffer_count(stream_data->curr_body_data);
+            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_data_callback, stream_data->body_data_buff_count=%i, payload_length=%zu\n", conn->conn_id, stream_data->stream_id, stream_data->body_data_buff_count, payload_length);
 
-            assert (payload_length >= 0);
             size_t bytes_to_send = 0;
             if (payload_length) {
                 size_t remaining_payload_length = payload_length - (stream_data->curr_body_data_qd_buff_offset * BUFFER_SIZE);
@@ -929,7 +939,7 @@ ssize_t read_data_callback(nghttp2_session *session,
                     bytes_to_send = remaining_payload_length;
                     stream_data->qd_buffers_to_send = stream_data->body_data_buff_count;
                     stream_data->full_payload_handled = true;
-                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_data_callback remaining_payload_length <= QD_HTTP2_BUFFER_SIZE bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send, stream_data->qd_buffers_to_send);
+                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_data_callback remaining_payload_length (%zu) <= QD_HTTP2_BUFFER_SIZE(16384), bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, remaining_payload_length, bytes_to_send, stream_data->qd_buffers_to_send);
 
                     // Look ahead one body data
                     stream_data->next_body_data_result = qd_message_next_body_data(message, &stream_data->next_body_data);
@@ -959,7 +969,7 @@ ssize_t read_data_callback(nghttp2_session *session,
                     qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_data_callback remaining_payload_length <= QD_HTTP2_BUFFER_SIZE ELSE bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send, stream_data->qd_buffers_to_send);
                 }
             }
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_data_callback bytes_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send);
+            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_data_callback returning bytes_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send);
             return bytes_to_send;
         }
 
@@ -987,7 +997,7 @@ ssize_t read_data_callback(nghttp2_session *session,
                 *data_flags |= NGHTTP2_DATA_FLAG_EOF;
                 stream_data->full_payload_handled = true;
                 stream_data->out_msg_body_sent = true;
-                stream_data->out_dlv_local_disposition = PN_ACCEPTED; // This will cause the delivery to be settled
+                stream_data->out_dlv_local_disposition = PN_ACCEPTED;
                 qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_data_callback QD_MESSAGE_BODY_DATA_NO_MORE - stream_data->out_dlv_local_disposition = PN_ACCEPTED - send_complete=true, setting NGHTTP2_DATA_FLAG_EOF", conn->conn_id, stream_data->stream_id);
             }
 
@@ -1083,7 +1093,7 @@ static void grant_read_buffers(qdr_http2_connection_t *conn)
                     raw_buffers[i].context = (uintptr_t) buf;
                 }
                 desired -= i;
-                qd_log(http2_adaptor->log_source, QD_LOG_INFO, "[C%i] Calling pn_raw_connection_give_read_buffers in grant_read_buffers", conn->conn_id);
+                qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%i] Calling pn_raw_connection_give_read_buffers in grant_read_buffers", conn->conn_id);
                 pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i);
             }
         }
@@ -1123,8 +1133,10 @@ static int qdr_http_get_credit(void *context, qdr_link_t *link)
 
 static void qdr_http_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled)
 {
+    qdr_http2_stream_data_t* stream_data = qdr_delivery_get_context(dlv);
+    if (!stream_data)
+        return;
     if (settled) {
-        qdr_http2_stream_data_t* stream_data = qdr_delivery_get_context(dlv);
         nghttp2_nv hdrs[1];
         if (disp == PN_RELEASED || disp == PN_MODIFIED || disp == PN_REJECTED) {
             if (disp == PN_RELEASED || disp == PN_MODIFIED) {
@@ -1146,8 +1158,8 @@ static void qdr_http_delivery_update(void *context, qdr_delivery_t *dlv, uint64_
             }
             nghttp2_submit_goaway(stream_data->session_data->session, 0, stream_data->stream_id, NGHTTP2_CONNECT_ERROR, (uint8_t *)"Service Unavailable", 19);
             nghttp2_session_send(stream_data->session_data->session);
+            free_http2_stream_data(stream_data, false);
         }
-        free_http2_stream_data(stream_data, false);
     }
 }
 
@@ -1485,8 +1497,13 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
             stream_data->out_dlv = delivery;
         }
         qd_log(http2_adaptor->log_source, QD_LOG_DEBUG, "[C%i] - qdr_http_deliver - call handle_outgoing_http", conn->conn_id);
-        return handle_outgoing_http(stream_data);
-}
+        uint64_t disp = handle_outgoing_http(stream_data);
+        if (stream_data->status == QD_STREAM_FULLY_CLOSED && disp == PN_ACCEPTED) {
+            qdr_delivery_set_context(stream_data->out_dlv, 0);
+            free_http2_stream_data(stream_data, false);
+        }
+        return disp;
+    }
     return 0;
 }
 
@@ -1551,10 +1568,10 @@ static int handle_incoming_http(qdr_http2_connection_t *conn)
                     // If the client magic is bad, we need to close the connection.
                 }
                 else if (rv == NGHTTP2_ERR_CALLBACK_FAILURE) {
-                    printf ("NGHTTP2_ERR_CALLBACK_FAILURE\n");
+                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i] HTTP NGHTTP2_ERR_CALLBACK_FAILURE", conn->conn_id);
                 }
                 else if (rv == NGHTTP2_ERR_BAD_CLIENT_MAGIC) {
-                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i] HTTP Protocol error, NGHTTP2_ERR_BAD_CLIENT_MAGIC, closing connection", conn->conn_id);
+                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i] HTTP2 Protocol error, NGHTTP2_ERR_BAD_CLIENT_MAGIC, closing connection", conn->conn_id);
                     nghttp2_submit_goaway(conn->session_data->session, 0, 0, NGHTTP2_PROTOCOL_ERROR, (uint8_t *)"Bad Client Magic", 16);
                     nghttp2_session_send(conn->session_data->session);
                     pn_raw_connection_close(conn->pn_raw_conn);
@@ -1622,14 +1639,16 @@ qdr_http2_connection_t *qdr_http_connection_ingress_accept(qdr_http2_connection_
 
 static void restart_streams(qdr_http2_connection_t *http_conn)
 {
-    // TODO - See if nghttp2 can provide you with which stream to process.
     qdr_http2_stream_data_t *stream_data = DEQ_HEAD(http_conn->session_data->streams);
-    if (!stream_data)
+    if (!stream_data) {
+        qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%i] In restart_streams, no stream_data, returning", http_conn->conn_id);
         return;
+    }
 
     DEQ_REMOVE_HEAD(http_conn->session_data->streams);
     DEQ_INSERT_TAIL(http_conn->session_data->streams, stream_data);
     stream_data = DEQ_HEAD(http_conn->session_data->streams);
+    qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] In restart_streams swapped head and tail streams", http_conn->conn_id, stream_data->stream_id);
     while (stream_data) {
         if (stream_data->status == QD_STREAM_FULLY_CLOSED) {
             qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] In restart_streams QD_STREAM_FULLY_CLOSED, not restarting stream", http_conn->conn_id, stream_data->stream_id);
@@ -1680,7 +1699,7 @@ static void handle_disconnected(qdr_http2_connection_t* conn)
     else {
         if (conn->stream_dispatcher) {
             qdr_http2_stream_data_t *stream_data = qdr_link_get_context(conn->stream_dispatcher);
-            qd_log(http2_adaptor->log_source, QD_LOG_INFO, "[C%i] Detaching stream dispatcher link on egress connection, freed associated stream data", conn->conn_id);
+            qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%i] Detaching stream dispatcher link on egress connection, freed associated stream data", conn->conn_id);
             qdr_link_detach(conn->stream_dispatcher, QD_CLOSED, 0);
             qdr_link_set_context(conn->stream_dispatcher, 0);
             conn->stream_dispatcher = 0;
@@ -2013,7 +2032,7 @@ static void qdr_http2_adaptor_final(void *adaptor_context)
     while (http_conn) {
         if (http_conn->stream_dispatcher_stream_data) {
             free_qdr_http2_stream_data_t(http_conn->stream_dispatcher_stream_data);
-            http_conn->stream_dispatcher = 0;
+            http_conn->stream_dispatcher_stream_data = 0;
         }
         free_qdr_http2_connection(http_conn, true);
         http_conn = DEQ_HEAD(adaptor->connections);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org