You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/09/28 12:52:48 UTC

[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1743: Added locking around connection freeing. Fixed body data leak

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
     new b571572  DISPATCH-1743: Added locking around connection freeing. Fixed body data leak
b571572 is described below

commit b571572e48020726751cb3ba112bf72c0c195430
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Mon Sep 28 08:51:52 2020 -0400

    DISPATCH-1743: Added locking around connection freeing. Fixed body data leak
---
 src/adaptors/http2/http2_adaptor.c | 56 ++++++++++++++++++++++++++++++--------
 1 file changed, 44 insertions(+), 12 deletions(-)

diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index f38c43d..5b84540 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -67,10 +67,12 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
 
 static void advance_stream_status(qdr_http2_stream_data_t *stream_data)
 {
-    if (stream_data->status == QD_STREAM_OPEN)
+    if (stream_data->status == QD_STREAM_OPEN) {
         stream_data->status = QD_STREAM_HALF_CLOSED;
-    else if (stream_data->status == QD_STREAM_HALF_CLOSED)
+    }
+    else if (stream_data->status == QD_STREAM_HALF_CLOSED) {
         stream_data->status = QD_STREAM_FULLY_CLOSED;
+    }
 }
 
 
@@ -251,10 +253,14 @@ static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data)
     qdr_http2_session_data_t *session_data = stream_data->session_data;
     qdr_http2_connection_t *conn = session_data->conn;
     stream_data->session_data = 0;
-    if (conn->qdr_conn && stream_data->in_link)
+    if (conn->qdr_conn && stream_data->in_link) {
+        qdr_link_set_context(stream_data->in_link, 0);
         qdr_link_detach(stream_data->in_link, QD_CLOSED, 0);
-    if (conn->qdr_conn && stream_data->out_link)
+    }
+    if (conn->qdr_conn && stream_data->out_link) {
+        qdr_link_set_context(stream_data->out_link, 0);
         qdr_link_detach(stream_data->out_link, QD_CLOSED, 0);
+    }
     free(stream_data->reply_to);
     qd_compose_free(stream_data->app_properties);
     qd_compose_free(stream_data->body);
@@ -429,7 +435,11 @@ static int snd_data_callback(nghttp2_session *session,
         stream_data->curr_body_data = 0;
         stream_data->curr_body_data_qd_buff_offset = 0;
         qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] snd_data_callback qd_message_body_data_release", conn->conn_id, stream_data->stream_id);
-        stream_data->full_payload_handled = false;
+
+        if (stream_data->next_body_data) {
+            qd_message_body_data_release(stream_data->next_body_data);
+            stream_data->next_body_data = 0;
+        }
     }
 
     if (length)
@@ -695,8 +705,8 @@ static int on_frame_recv_callback(nghttp2_session *session,
 
             if (qd_message_receive_complete(qdr_delivery_message(stream_data->in_dlv))
                     && qd_message_send_complete(qdr_delivery_message(stream_data->out_dlv))) {
-                advance_stream_status(stream_data);
                 qdr_delivery_remote_state_updated(http_adaptor->core, stream_data->out_dlv, stream_data->out_dlv_local_disposition, true, 0, 0, false);
+                advance_stream_status(stream_data);
             }
         }
 
@@ -864,6 +874,7 @@ ssize_t read_data_callback(nghttp2_session *session,
                 if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) {
                     *data_flags |= NGHTTP2_DATA_FLAG_EOF;
                     stream_data->out_msg_body_sent = true;
+                    stream_data->full_payload_handled = true;
                     qd_message_body_data_release(stream_data->next_body_data);
                     stream_data->next_body_data = 0;
                     stream_data->out_dlv_local_disposition = PN_ACCEPTED;
@@ -891,11 +902,19 @@ ssize_t read_data_callback(nghttp2_session *session,
                     bytes_to_send = remaining_payload_length;
                     stream_data->qd_buffers_to_send = stream_data->body_data_buff_count;
                     stream_data->full_payload_handled = true;
+
+                    if (stream_data->next_body_data) {
+                        qd_message_body_data_release(stream_data->next_body_data);
+                        stream_data->next_body_data = 0;
+                    }
+
                     // Look ahead one body data
                     stream_data->next_body_data_result = qd_message_next_body_data(message, &stream_data->next_body_data);
                     if (body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) {
                         *data_flags |= NGHTTP2_DATA_FLAG_EOF;
                         stream_data->out_msg_body_sent = true;
+                        qd_message_body_data_release(stream_data->next_body_data);
+                        stream_data->next_body_data = 0;
                         stream_data->out_dlv_local_disposition = PN_ACCEPTED;
                         break;
                     }
@@ -929,13 +948,14 @@ ssize_t read_data_callback(nghttp2_session *session,
             //
             size_t pn_buffs_write_capacity = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
             if (pn_buffs_write_capacity == 0) {
-                return NGHTTP2_ERR_DEFERRED;
                 stream_data->out_dlv_local_disposition = 0;
                 qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_data_callback QD_MESSAGE_BODY_DATA_NO_MORE - pn_buffs_write_capacity=0 send is not complete", conn->conn_id, stream_data->stream_id);
+                return NGHTTP2_ERR_DEFERRED;
             }
             else {
                 stream_data->qd_buffers_to_send = 0;
                 *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+                stream_data->full_payload_handled = true;
                 stream_data->out_msg_body_sent = true;
                 stream_data->out_dlv_local_disposition = PN_ACCEPTED; // This will cause the delivery to be settled
                 qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_data_callback QD_MESSAGE_BODY_DATA_NO_MORE - stream_data->out_dlv_local_disposition = PN_ACCEPTED - send_complete=true, setting NGHTTP2_DATA_FLAG_EOF", conn->conn_id, stream_data->stream_id);
@@ -950,6 +970,7 @@ ssize_t read_data_callback(nghttp2_session *session,
             //
             *data_flags |= NGHTTP2_DATA_FLAG_EOF;
             qd_message_body_data_release(stream_data->curr_body_data);
+            stream_data->curr_body_data = 0;
             stream_data->out_dlv_local_disposition = PN_REJECTED;
             qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_data_callback QD_MESSAGE_BODY_DATA_INVALID", conn->conn_id, stream_data->stream_id);
             break;
@@ -960,6 +981,7 @@ ssize_t read_data_callback(nghttp2_session *session,
             //
             *data_flags |= NGHTTP2_DATA_FLAG_EOF;
             qd_message_body_data_release(stream_data->curr_body_data);
+            stream_data->curr_body_data = 0;
             stream_data->out_dlv_local_disposition = PN_REJECTED;
             qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_data_callback QD_MESSAGE_BODY_DATA_NOT_DATA", conn->conn_id, stream_data->stream_id);
             break;
@@ -973,8 +995,9 @@ ssize_t read_data_callback(nghttp2_session *session,
         break;
 
     case QD_MESSAGE_DEPTH_INCOMPLETE:
+        stream_data->out_dlv_local_disposition = 0;
         qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_data_callback QD_MESSAGE_DEPTH_INCOMPLETE", conn->conn_id, stream_data->stream_id);
-        break;
+        return NGHTTP2_ERR_DEFERRED;
     }
 
     qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_data_callback Returning zero", conn->conn_id, stream_data->stream_id);
@@ -1138,6 +1161,7 @@ static void qdr_http_second_attach(void *context, qdr_link_t *link,
 
 static void qdr_http_activate(void *notused, qdr_connection_t *c)
 {
+    sys_mutex_lock(qd_server_get_activation_lock(http_adaptor->core->qd->server));
     qdr_http2_connection_t* conn = (qdr_http2_connection_t*) qdr_connection_get_context(c);
     if (conn) {
         if (conn->pn_raw_conn) {
@@ -1151,6 +1175,7 @@ static void qdr_http_activate(void *notused, qdr_connection_t *c)
             qd_log(http_adaptor->log_source, QD_LOG_ERROR, "[C%i] Cannot activate", conn->conn_id);
         }
     }
+    sys_mutex_unlock(qd_server_get_activation_lock(http_adaptor->core->qd->server));
 }
 
 static int qdr_http_push(void *context, qdr_link_t *link, int limit)
@@ -1212,8 +1237,12 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
             if (stream_data->curr_body_data_result == QD_MESSAGE_BODY_DATA_OK) {
                 size_t payload_length = qd_message_body_data_payload_length(stream_data->curr_body_data);
                 if (payload_length == 0) {
-                    stream_data->curr_body_data_result = qd_message_next_body_data(message, &stream_data->curr_body_data);
-                    if (stream_data->curr_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) {
+                    stream_data->next_body_data_result = qd_message_next_body_data(message, &stream_data->next_body_data);
+                    if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) {
+                        qd_message_body_data_release(stream_data->next_body_data);
+                        stream_data->next_body_data = 0;
+                        qd_message_body_data_release(stream_data->curr_body_data);
+                        stream_data->curr_body_data = 0;
                         flags = NGHTTP2_FLAG_END_STREAM;
                         stream_data->out_msg_has_body = false;
                         qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] Message has no body, sending NGHTTP2_FLAG_END_STREAM with nghttp2_submit_headers", conn->conn_id);
@@ -1556,9 +1585,9 @@ static void restart_streams(qdr_http2_connection_t *http_conn)
     stream_data = DEQ_HEAD(http_conn->session_data->streams);
     while (stream_data) {
         if (stream_data->status == QD_STREAM_FULLY_CLOSED) {
-            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] freeing stream", http_conn->conn_id, stream_data->stream_id);
+            qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] In restart_streams QD_STREAM_FULLY_CLOSED, not restarting stream", http_conn->conn_id, stream_data->stream_id);
             qdr_http2_stream_data_t *next_stream_data = DEQ_NEXT(stream_data);
-            free_http2_stream_data(stream_data);
+            DEQ_REMOVE(http_conn->session_data->streams, stream_data);
             stream_data = next_stream_data;
         }
         else {
@@ -1582,6 +1611,7 @@ static void qdr_del_http2_connection_CT(qdr_core_t *core, qdr_action_t *action,
 
 static void handle_disconnected(qdr_http2_connection_t* conn)
 {
+    sys_mutex_lock(qd_server_get_activation_lock(http_adaptor->core->qd->server));
     if (conn->pn_raw_conn) {
         qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Setting conn->pn_raw_conn=0", conn->conn_id);
         conn->pn_raw_conn = 0;
@@ -1594,6 +1624,7 @@ static void handle_disconnected(qdr_http2_connection_t* conn)
                 free_http2_stream_data(stream_data);
             }
             qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Detaching stream dispatcher on egress connection", conn->conn_id);
+            qdr_link_set_context(conn->stream_dispatcher, 0);
             qdr_link_detach(conn->stream_dispatcher, QD_CLOSED, 0);
             conn->stream_dispatcher = 0;
         }
@@ -1607,6 +1638,7 @@ static void handle_disconnected(qdr_http2_connection_t* conn)
         action->args.general.context_1 = conn;
         qdr_action_enqueue(http_adaptor->core, action);
     }
+    sys_mutex_unlock(qd_server_get_activation_lock(http_adaptor->core->qd->server));
 }
 
 static void egress_conn_timer_handler(void *context)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org