You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/08/25 13:27:14 UTC
[qpid-dispatch] branch dev-protocol-adaptors updated:
DISPATCH-1743: Freed all related objects on connection close
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
new 167aeea DISPATCH-1743: Freed all related objects on connection close
167aeea is described below
commit 167aeead47c4182774b459c030f32ed5c71eefb2
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Tue Aug 25 09:26:08 2020 -0400
DISPATCH-1743: Freed all related objects on connection close
---
src/adaptors/http_adaptor.c | 59 ++++++++++++++++++++++++++++-----------------
src/adaptors/http_adaptor.h | 22 +++++++----------
2 files changed, 46 insertions(+), 35 deletions(-)
diff --git a/src/adaptors/http_adaptor.c b/src/adaptors/http_adaptor.c
index 2c203d7..2a66518 100644
--- a/src/adaptors/http_adaptor.c
+++ b/src/adaptors/http_adaptor.c
@@ -41,15 +41,8 @@ const char *CONTENT_ENCODING = "content-encoding";
#define READ_BUFFERS 4
#define WRITE_BUFFERS 4
-#define DEBUGBUILD 1
#define ARRLEN(x) (sizeof(x) / sizeof(x[0]))
-#define MAKE_NV2(NAME, VALUE) \
-{ \
- (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, sizeof(VALUE) - 1, \
- NGHTTP2_NV_FLAG_NONE \
-}
-
ALLOC_DEFINE(qdr_http2_session_data_t);
ALLOC_DEFINE(qdr_http2_stream_data_t);
@@ -148,6 +141,7 @@ qd_composed_field_t *qd_message_compose_amqp(qd_message_t *msg,
void free_http2_stream_data(qdr_http2_stream_data_t *stream_data)
{
+ stream_data->session_data = 0;
if (stream_data->in_link)
qdr_link_detach(stream_data->in_link, QD_CLOSED, 0);
if (stream_data->out_link)
@@ -191,12 +185,23 @@ void free_qdr_http_connection(qdr_http_connection_t* http_conn)
//qd_timer_free(http_conn->activate_timer);
}
+ qdr_http2_stream_data_t *stream_data = 0;
+
if (!http_conn->ingress) {
- qdr_http2_stream_data_t *stream_data = qdr_link_get_context(http_conn->stream_dispatcher);
+ stream_data = qdr_link_get_context(http_conn->stream_dispatcher);
free_http2_stream_data(stream_data);
qdr_link_detach(http_conn->stream_dispatcher, QD_CLOSED, 0);
http_conn->stream_dispatcher = 0;
}
+
+ // Free all the stream data associated with this connection/session.
+ stream_data = DEQ_HEAD(http_conn->session_data->streams);
+ while (stream_data) {
+ DEQ_REMOVE_HEAD(http_conn->session_data->streams);
+ free_http2_stream_data(stream_data);
+ stream_data = DEQ_HEAD(http_conn->session_data->streams);
+ }
+
nghttp2_session_del(http_conn->session_data->session);
http_conn->session_data->session = 0;
free_qdr_http2_session_data_t(http_conn->session_data);
@@ -273,7 +278,6 @@ static ssize_t send_callback(nghttp2_session *session,
qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
qdr_http2_session_data_t *session_data = conn->session_data;
qd_buffer_list_append(&(session_data->buffs), (uint8_t *)data, length);
-
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] HTTP2 send_callback data length %zu, DEQ_SIZE(session_data->buffs)=%zu", conn->conn_id, length, DEQ_SIZE(session_data->buffs));
return (ssize_t)length;
}
@@ -287,16 +291,18 @@ static int on_begin_headers_callback(nghttp2_session *session,
{
qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
qdr_http2_session_data_t *session_data = conn->session_data;
+ qdr_http2_stream_data_t *stream_data = 0;
if (frame->hd.type == NGHTTP2_HEADERS) {
if(frame->headers.cat == NGHTTP2_HCAT_REQUEST && conn->ingress) {
// This is a brand new request.
int32_t stream_id = frame->hd.stream_id;
qdr_terminus_t *target = qdr_terminus(0);
- qdr_http2_stream_data_t *stream_data = create_http2_stream_data(session_data, stream_id);
+ stream_data = create_http2_stream_data(session_data, stream_id);
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] Processing incoming HTTP2 stream with id %"PRId32"", conn->conn_id, stream_id);
+
//
- // For every stream create -
+ // For every single stream in the same connection, create -
// 1. sending link with the configured address as the target
//
qdr_terminus_set_address(target, conn->config->address);
@@ -312,7 +318,11 @@ static int on_begin_headers_callback(nghttp2_session *session,
qdr_link_set_context(stream_data->in_link, stream_data);
//
- // 2. dynamic receiver on which to receive back the response data for that stream
+ // 2. dynamic receiver on which to receive back the response data for that stream.
+ //
+
+ //
+ // TODO - Why not create something like a UUID and prefix it with the router id ?
//
qdr_terminus_t *dynamic_source = qdr_terminus(0);
qdr_terminus_set_dynamic(dynamic_source);
@@ -329,12 +339,15 @@ static int on_begin_headers_callback(nghttp2_session *session,
}
else if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
int32_t stream_id = frame->hd.stream_id;
- qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t *)nghttp2_session_get_stream_user_data(session_data->session, stream_id);
+ stream_data = (qdr_http2_stream_data_t *)nghttp2_session_get_stream_user_data(session_data->session, stream_id);
stream_data->message = qd_message();
}
}
- //Andrew next Monday
+ if (stream_data) {
+ stream_data->app_properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
+ qd_compose_start_map(stream_data->app_properties);
+ }
return 0;
}
@@ -359,10 +372,7 @@ static int on_header_callback(nghttp2_session *session,
switch (frame->hd.type) {
case NGHTTP2_HEADERS: {
- if (!stream_data->app_properties) {
- stream_data->app_properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
- qd_compose_start_map(stream_data->app_properties);
- }
+ // Andrew next Monday
qd_compose_insert_string_n(stream_data->app_properties, (const char *)name, namelen);
qd_compose_insert_string_n(stream_data->app_properties, (const char *)value, valuelen);
qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 HEADER Incoming [%s=%s]", conn->conn_id, stream_data->stream_id, (char *)name, (char *)value);
@@ -1110,6 +1120,10 @@ static int handle_incoming_http(qdr_http_connection_t *conn)
pn_raw_buffer_t raw_buffers[READ_BUFFERS];
size_t n;
int count = 0;
+
+ if (!conn->pn_raw_conn)
+ return 0;
+
while ( (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) {
for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) {
qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
@@ -1202,19 +1216,19 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
break;
}
case PN_RAW_CONNECTION_CLOSED_READ: {
- qdr_connection_closed(conn->qdr_conn);
+ pn_raw_connection_close(conn->pn_raw_conn);
+ conn->pn_raw_conn = 0;
qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
- free_qdr_http_connection(conn);
break;
}
case PN_RAW_CONNECTION_CLOSED_WRITE: {
- qdr_connection_closed(conn->qdr_conn);
qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id);
- free_qdr_http_connection(conn);
break;
}
case PN_RAW_CONNECTION_DISCONNECTED: {
qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_DISCONNECTED", conn->conn_id);
+ qdr_connection_closed(conn->qdr_conn);
+ free_qdr_http_connection(conn);
break;
}
case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
@@ -1232,6 +1246,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
break;
}
case PN_RAW_CONNECTION_READ: {
+ qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_READ", conn->conn_id);
int read = handle_incoming_http(conn);
qd_log(log, QD_LOG_TRACE, "[C%i] Read %i bytes", conn->conn_id, read);
while (qdr_connection_process(conn->qdr_conn)) {}
diff --git a/src/adaptors/http_adaptor.h b/src/adaptors/http_adaptor.h
index 4869c39..53a05b1 100644
--- a/src/adaptors/http_adaptor.h
+++ b/src/adaptors/http_adaptor.h
@@ -36,14 +36,13 @@ typedef struct qdr_http_connection_t qdr_http_connection_t;
DEQ_DECLARE(qdr_http2_stream_data_t, qd_http2_stream_data_list_t);
struct qdr_http2_session_data_t {
- qd_http2_stream_data_list_t streams; // A session can have many streams.
+ qdr_http_connection_t *conn; // Connection associated with the session_data
nghttp2_session *session; // A pointer to the nghttp2s' session object
+ qd_http2_stream_data_list_t streams; // A session can have many streams.
qd_buffer_list_t buffs; // Buffers for writing
- qdr_http_connection_t *conn; // Connection associated with the session_data
};
struct qdr_http2_stream_data_t {
- int32_t stream_id;
qdr_http2_session_data_t *session_data;
char *reply_to;
qdr_delivery_t *in_dlv;
@@ -51,24 +50,21 @@ struct qdr_http2_stream_data_t {
uint64_t incoming_id;
uint64_t outgoing_id;
uint64_t disposition;
-
qdr_link_t *in_link;
qdr_link_t *out_link;
-
qd_message_t *message;
qd_composed_field_t *app_properties;
qd_composed_field_t *body;
+ qd_message_body_data_t *curr_body_data;
+ DEQ_LINKS(qdr_http2_stream_data_t);
- qd_message_body_data_t *curr_body_data;
qd_message_body_data_result_t curr_body_data_result;
int curr_body_data_buff_offset;
int body_data_buff_count;
+ int32_t stream_id;
bool entire_header_arrived; // true if all the headershave arrived, just before the start of the data frame or just before the END_STREAM.
bool header_sent;
-
-
- DEQ_LINKS(qdr_http2_stream_data_t);
};
struct qdr_http_connection_t {
@@ -76,7 +72,6 @@ struct qdr_http_connection_t {
qdr_connection_t *qdr_conn;
pn_raw_connection_t *pn_raw_conn;
pn_raw_buffer_t read_buffers[4];
- bool ingress;
qd_timer_t *activate_timer;
qd_http_bridge_config_t *config;
qd_server_t *server;
@@ -87,12 +82,13 @@ struct qdr_http_connection_t {
char *remote_address;
qdr_link_t *stream_dispatcher;
uint64_t stream_dispatcher_id;
- bool connection_established;
- bool grant_initial_buffers;
qdr_http2_stream_data_t *initial_stream;
char *reply_to;
nghttp2_data_provider data_prd;
-};
+ bool connection_established;
+ bool grant_initial_buffers;
+ bool ingress;
+ };
ALLOC_DECLARE(qdr_http2_session_data_t);
ALLOC_DECLARE(qdr_http2_stream_data_t);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org