You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/09/28 12:52:48 UTC
[qpid-dispatch] branch dev-protocol-adaptors updated:
DISPATCH-1743: Added locking around connection freeing. Fixed body data
leak
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
new b571572 DISPATCH-1743: Added locking around connection freeing. Fixed body data leak
b571572 is described below
commit b571572e48020726751cb3ba112bf72c0c195430
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Mon Sep 28 08:51:52 2020 -0400
DISPATCH-1743: Added locking around connection freeing. Fixed body data leak
---
src/adaptors/http2/http2_adaptor.c | 56 ++++++++++++++++++++++++++++++--------
1 file changed, 44 insertions(+), 12 deletions(-)
diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index f38c43d..5b84540 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -67,10 +67,12 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
static void advance_stream_status(qdr_http2_stream_data_t *stream_data)
{
- if (stream_data->status == QD_STREAM_OPEN)
+ if (stream_data->status == QD_STREAM_OPEN) {
stream_data->status = QD_STREAM_HALF_CLOSED;
- else if (stream_data->status == QD_STREAM_HALF_CLOSED)
+ }
+ else if (stream_data->status == QD_STREAM_HALF_CLOSED) {
stream_data->status = QD_STREAM_FULLY_CLOSED;
+ }
}
@@ -251,10 +253,14 @@ static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data)
qdr_http2_session_data_t *session_data = stream_data->session_data;
qdr_http2_connection_t *conn = session_data->conn;
stream_data->session_data = 0;
- if (conn->qdr_conn && stream_data->in_link)
+ if (conn->qdr_conn && stream_data->in_link) {
+ qdr_link_set_context(stream_data->in_link, 0);
qdr_link_detach(stream_data->in_link, QD_CLOSED, 0);
- if (conn->qdr_conn && stream_data->out_link)
+ }
+ if (conn->qdr_conn && stream_data->out_link) {
+ qdr_link_set_context(stream_data->out_link, 0);
qdr_link_detach(stream_data->out_link, QD_CLOSED, 0);
+ }
free(stream_data->reply_to);
qd_compose_free(stream_data->app_properties);
qd_compose_free(stream_data->body);
@@ -429,7 +435,11 @@ static int snd_data_callback(nghttp2_session *session,
stream_data->curr_body_data = 0;
stream_data->curr_body_data_qd_buff_offset = 0;
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] snd_data_callback qd_message_body_data_release", conn->conn_id, stream_data->stream_id);
- stream_data->full_payload_handled = false;
+
+ if (stream_data->next_body_data) {
+ qd_message_body_data_release(stream_data->next_body_data);
+ stream_data->next_body_data = 0;
+ }
}
if (length)
@@ -695,8 +705,8 @@ static int on_frame_recv_callback(nghttp2_session *session,
if (qd_message_receive_complete(qdr_delivery_message(stream_data->in_dlv))
&& qd_message_send_complete(qdr_delivery_message(stream_data->out_dlv))) {
- advance_stream_status(stream_data);
qdr_delivery_remote_state_updated(http_adaptor->core, stream_data->out_dlv, stream_data->out_dlv_local_disposition, true, 0, 0, false);
+ advance_stream_status(stream_data);
}
}
@@ -864,6 +874,7 @@ ssize_t read_data_callback(nghttp2_session *session,
if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
stream_data->out_msg_body_sent = true;
+ stream_data->full_payload_handled = true;
qd_message_body_data_release(stream_data->next_body_data);
stream_data->next_body_data = 0;
stream_data->out_dlv_local_disposition = PN_ACCEPTED;
@@ -891,11 +902,19 @@ ssize_t read_data_callback(nghttp2_session *session,
bytes_to_send = remaining_payload_length;
stream_data->qd_buffers_to_send = stream_data->body_data_buff_count;
stream_data->full_payload_handled = true;
+
+ if (stream_data->next_body_data) {
+ qd_message_body_data_release(stream_data->next_body_data);
+ stream_data->next_body_data = 0;
+ }
+
// Look ahead one body data
stream_data->next_body_data_result = qd_message_next_body_data(message, &stream_data->next_body_data);
if (body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
stream_data->out_msg_body_sent = true;
+ qd_message_body_data_release(stream_data->next_body_data);
+ stream_data->next_body_data = 0;
stream_data->out_dlv_local_disposition = PN_ACCEPTED;
break;
}
@@ -929,13 +948,14 @@ ssize_t read_data_callback(nghttp2_session *session,
//
size_t pn_buffs_write_capacity = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
if (pn_buffs_write_capacity == 0) {
- return NGHTTP2_ERR_DEFERRED;
stream_data->out_dlv_local_disposition = 0;
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_data_callback QD_MESSAGE_BODY_DATA_NO_MORE - pn_buffs_write_capacity=0 send is not complete", conn->conn_id, stream_data->stream_id);
+ return NGHTTP2_ERR_DEFERRED;
}
else {
stream_data->qd_buffers_to_send = 0;
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
+ stream_data->full_payload_handled = true;
stream_data->out_msg_body_sent = true;
stream_data->out_dlv_local_disposition = PN_ACCEPTED; // This will cause the delivery to be settled
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_data_callback QD_MESSAGE_BODY_DATA_NO_MORE - stream_data->out_dlv_local_disposition = PN_ACCEPTED - send_complete=true, setting NGHTTP2_DATA_FLAG_EOF", conn->conn_id, stream_data->stream_id);
@@ -950,6 +970,7 @@ ssize_t read_data_callback(nghttp2_session *session,
//
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
qd_message_body_data_release(stream_data->curr_body_data);
+ stream_data->curr_body_data = 0;
stream_data->out_dlv_local_disposition = PN_REJECTED;
qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_data_callback QD_MESSAGE_BODY_DATA_INVALID", conn->conn_id, stream_data->stream_id);
break;
@@ -960,6 +981,7 @@ ssize_t read_data_callback(nghttp2_session *session,
//
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
qd_message_body_data_release(stream_data->curr_body_data);
+ stream_data->curr_body_data = 0;
stream_data->out_dlv_local_disposition = PN_REJECTED;
qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_data_callback QD_MESSAGE_BODY_DATA_NOT_DATA", conn->conn_id, stream_data->stream_id);
break;
@@ -973,8 +995,9 @@ ssize_t read_data_callback(nghttp2_session *session,
break;
case QD_MESSAGE_DEPTH_INCOMPLETE:
+ stream_data->out_dlv_local_disposition = 0;
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%i] read_data_callback QD_MESSAGE_DEPTH_INCOMPLETE", conn->conn_id, stream_data->stream_id);
- break;
+ return NGHTTP2_ERR_DEFERRED;
}
qd_log(http_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%i][S%i] read_data_callback Returning zero", conn->conn_id, stream_data->stream_id);
@@ -1138,6 +1161,7 @@ static void qdr_http_second_attach(void *context, qdr_link_t *link,
static void qdr_http_activate(void *notused, qdr_connection_t *c)
{
+ sys_mutex_lock(qd_server_get_activation_lock(http_adaptor->core->qd->server));
qdr_http2_connection_t* conn = (qdr_http2_connection_t*) qdr_connection_get_context(c);
if (conn) {
if (conn->pn_raw_conn) {
@@ -1151,6 +1175,7 @@ static void qdr_http_activate(void *notused, qdr_connection_t *c)
qd_log(http_adaptor->log_source, QD_LOG_ERROR, "[C%i] Cannot activate", conn->conn_id);
}
}
+ sys_mutex_unlock(qd_server_get_activation_lock(http_adaptor->core->qd->server));
}
static int qdr_http_push(void *context, qdr_link_t *link, int limit)
@@ -1212,8 +1237,12 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data)
if (stream_data->curr_body_data_result == QD_MESSAGE_BODY_DATA_OK) {
size_t payload_length = qd_message_body_data_payload_length(stream_data->curr_body_data);
if (payload_length == 0) {
- stream_data->curr_body_data_result = qd_message_next_body_data(message, &stream_data->curr_body_data);
- if (stream_data->curr_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) {
+ stream_data->next_body_data_result = qd_message_next_body_data(message, &stream_data->next_body_data);
+ if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) {
+ qd_message_body_data_release(stream_data->next_body_data);
+ stream_data->next_body_data = 0;
+ qd_message_body_data_release(stream_data->curr_body_data);
+ stream_data->curr_body_data = 0;
flags = NGHTTP2_FLAG_END_STREAM;
stream_data->out_msg_has_body = false;
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] Message has no body, sending NGHTTP2_FLAG_END_STREAM with nghttp2_submit_headers", conn->conn_id);
@@ -1556,9 +1585,9 @@ static void restart_streams(qdr_http2_connection_t *http_conn)
stream_data = DEQ_HEAD(http_conn->session_data->streams);
while (stream_data) {
if (stream_data->status == QD_STREAM_FULLY_CLOSED) {
- qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] freeing stream", http_conn->conn_id, stream_data->stream_id);
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] In restart_streams QD_STREAM_FULLY_CLOSED, not restarting stream", http_conn->conn_id, stream_data->stream_id);
qdr_http2_stream_data_t *next_stream_data = DEQ_NEXT(stream_data);
- free_http2_stream_data(stream_data);
+ DEQ_REMOVE(http_conn->session_data->streams, stream_data);
stream_data = next_stream_data;
}
else {
@@ -1582,6 +1611,7 @@ static void qdr_del_http2_connection_CT(qdr_core_t *core, qdr_action_t *action,
static void handle_disconnected(qdr_http2_connection_t* conn)
{
+ sys_mutex_lock(qd_server_get_activation_lock(http_adaptor->core->qd->server));
if (conn->pn_raw_conn) {
qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Setting conn->pn_raw_conn=0", conn->conn_id);
conn->pn_raw_conn = 0;
@@ -1594,6 +1624,7 @@ static void handle_disconnected(qdr_http2_connection_t* conn)
free_http2_stream_data(stream_data);
}
qd_log(http_adaptor->log_source, QD_LOG_INFO, "[C%i] Detaching stream dispatcher on egress connection", conn->conn_id);
+ qdr_link_set_context(conn->stream_dispatcher, 0);
qdr_link_detach(conn->stream_dispatcher, QD_CLOSED, 0);
conn->stream_dispatcher = 0;
}
@@ -1607,6 +1638,7 @@ static void handle_disconnected(qdr_http2_connection_t* conn)
action->args.general.context_1 = conn;
qdr_action_enqueue(http_adaptor->core, action);
}
+ sys_mutex_unlock(qd_server_get_activation_lock(http_adaptor->core->qd->server));
}
static void egress_conn_timer_handler(void *context)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org