You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/08/24 16:20:40 UTC
[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1743
- Fix for leaking deliveries. Removed unused fields in http_adaptor.h and
removed unused code
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
new fd6773c DISPATCH-1743 - Fix for leaking deliveries. Removed unused fields in http_adaptor.h and removed unused code
fd6773c is described below
commit fd6773cb32afeda5017084c2265abfd5bc4e1fde
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Mon Aug 24 12:18:44 2020 -0400
DISPATCH-1743 - Fix for leaking deliveries. Removed unused fields in http_adaptor.h and removed unused code
---
src/adaptors/http_adaptor.c | 111 ++++++++++++++++++++++++++------------------
src/adaptors/http_adaptor.h | 17 +------
2 files changed, 68 insertions(+), 60 deletions(-)
diff --git a/src/adaptors/http_adaptor.c b/src/adaptors/http_adaptor.c
index 8a63095..2c203d7 100644
--- a/src/adaptors/http_adaptor.c
+++ b/src/adaptors/http_adaptor.c
@@ -146,6 +146,29 @@ qd_composed_field_t *qd_message_compose_amqp(qd_message_t *msg,
return field;
}
+void free_http2_stream_data(qdr_http2_stream_data_t *stream_data)
+{
+ if (stream_data->in_link)
+ qdr_link_detach(stream_data->in_link, QD_CLOSED, 0);
+ if (stream_data->out_link)
+ qdr_link_detach(stream_data->out_link, QD_CLOSED, 0);
+ free(stream_data->reply_to);
+ qd_compose_free(stream_data->app_properties);
+ qd_compose_free(stream_data->body);
+ free_qdr_http2_stream_data_t(stream_data);
+}
+
+void free_http2_stream(qdr_http2_session_data_t *session_data, int32_t stream_id)
+{
+ if (!stream_id)
+ return;
+
+ qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
+ DEQ_REMOVE(session_data->streams, stream_data);
+ free_http2_stream_data(stream_data);
+ nghttp2_session_set_stream_user_data(session_data->session, stream_id, NULL);
+}
+
static char *get_address_string(pn_raw_connection_t *pn_raw_conn)
{
const pn_netaddr_t *netaddr = pn_raw_connection_remote_addr(pn_raw_conn);
@@ -162,11 +185,22 @@ void free_qdr_http_connection(qdr_http_connection_t* http_conn)
{
if(http_conn->remote_address) {
free(http_conn->remote_address);
+ http_conn->remote_address = 0;
}
if (http_conn->activate_timer) {
- qd_timer_free(http_conn->activate_timer);
+ //qd_timer_free(http_conn->activate_timer);
+ }
+
+ if (!http_conn->ingress) {
+ qdr_http2_stream_data_t *stream_data = qdr_link_get_context(http_conn->stream_dispatcher);
+ free_http2_stream_data(stream_data);
+ qdr_link_detach(http_conn->stream_dispatcher, QD_CLOSED, 0);
+ http_conn->stream_dispatcher = 0;
}
nghttp2_session_del(http_conn->session_data->session);
+ http_conn->session_data->session = 0;
+ free_qdr_http2_session_data_t(http_conn->session_data);
+ http_conn->session_data = 0;
free(http_conn);
}
@@ -182,27 +216,6 @@ static qdr_http2_stream_data_t *create_http2_stream_data(qdr_http2_session_data_
return stream_data;
}
-void free_http2_stream_data(qdr_http2_session_data_t *session_data, int32_t stream_id)
-{
- qdr_http2_stream_data_t *stream_data = DEQ_HEAD(session_data->streams);
- while (stream_data) {
- if (stream_data->stream_id == stream_id) {
- DEQ_REMOVE(session_data->streams, stream_data);
- nghttp2_session_set_stream_user_data(session_data->session, stream_id, NULL);
-
- //
- // TODO - Free all stream related data
- //
- qdr_link_detach(stream_data->in_link, QD_CLOSED, 0);
- qdr_link_detach(stream_data->out_link, QD_CLOSED, 0);
- free(stream_data->reply_to);
- qd_compose_free(stream_data->app_properties);
- free_qdr_http2_stream_data_t(stream_data);
- break;
- }
- }
-}
-
static int on_data_chunk_recv_callback(nghttp2_session *session,
uint8_t flags,
@@ -243,10 +256,10 @@ static int on_stream_close_callback(nghttp2_session *session,
nghttp2_error_code error_code,
void *user_data)
{
-// qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
-// qdr_http2_session_data_t *session_data = conn->session_data;
-// qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 on_stream_close_callback ", conn->conn_id, stream_id);
-// free_http2_stream_data(session_data, stream_id);
+ qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+ qdr_http2_session_data_t *session_data = conn->session_data;
+ qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 on_stream_close_callback, freeing stream", conn->conn_id, stream_id);
+ free_http2_stream(session_data, stream_id);
return 0;
}
@@ -382,6 +395,20 @@ static void compose_and_deliver(qdr_http2_stream_data_t *stream_data, qd_compose
stream_data->in_dlv = qdr_link_deliver(stream_data->in_link, stream_data->message, 0, false, 0, 0, 0, 0);
qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"][L%"PRIu64"] Routed delivery dlv:%lx", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity, (long) stream_data->in_dlv);
+ if (stream_data->in_dlv) {
+ qdr_delivery_decref(http_adaptor->core, stream_data->in_dlv, "http_adaptor - release protection of return from deliver");
+ } else {
+ //
+ // If there is no delivery, the message is now and will always be unroutable because there is no address.
+ //
+ //qd_bitmask_free(link_exclusions);
+ qd_message_set_discard(qdr_delivery_message(stream_data->in_dlv), true);
+ if (receive_complete) {
+ qd_message_free(qdr_delivery_message(stream_data->in_dlv));
+ }
+ }
+
+
}
static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_complete)
@@ -398,8 +425,7 @@ static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_co
conn->config->address,
0,
stream_data->reply_to,
- 0,
- 0,
+ 0, 0,
stream_data->stream_id);
compose_and_deliver(stream_data, header_and_prop, conn, receive_complete);
}
@@ -408,11 +434,7 @@ static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_co
if (stream_data->entire_header_arrived) {
delivery_routed = true;
header_and_prop = qd_message_compose_amqp(stream_data->message,
- stream_data->reply_to,
- 0,
- 0,
- 0,
- 0,
+ stream_data->reply_to, 0, 0, 0, 0,
stream_data->stream_id);
compose_and_deliver(stream_data, header_and_prop, conn, receive_complete);
}
@@ -1003,6 +1025,10 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *
static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled)
{
qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t*)qdr_link_get_context(link);
+
+ if (!stream_data)
+ return 0;
+
qdr_http_connection_t *conn = stream_data->session_data->conn;
if (link == stream_data->session_data->conn->stream_dispatcher) {
@@ -1044,8 +1070,6 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
false,
0,
&(stream_data->incoming_id));
-
- // TODO - This is wrong
qdr_link_set_context(stream_data->in_link, stream_data);
//Let's make an outbound connection to the configured connector.
@@ -1110,8 +1134,6 @@ static int handle_incoming_http(qdr_http_connection_t *conn)
qd_buffer_free(curr_buf);
}
- // TODO - Look into reusing the same buffers over and over again.
- // Andrew - this is where we left off last time.
grant_read_buffers(conn);
return count;
@@ -1180,32 +1202,28 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
break;
}
case PN_RAW_CONNECTION_CLOSED_READ: {
+ qdr_connection_closed(conn->qdr_conn);
qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
+ free_qdr_http_connection(conn);
break;
}
case PN_RAW_CONNECTION_CLOSED_WRITE: {
+ qdr_connection_closed(conn->qdr_conn);
qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id);
+ free_qdr_http_connection(conn);
break;
}
case PN_RAW_CONNECTION_DISCONNECTED: {
- qdr_connection_closed(conn->qdr_conn);
- //free_qdr_http_connection(conn);
qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_DISCONNECTED", conn->conn_id);
break;
}
case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
qd_log(log, QD_LOG_TRACE, "[C%i] Need write buffers", conn->conn_id);
-
- //TODO - Refactor this out
- //while (qdr_connection_process(conn->qdr_conn)) {}
break;
}
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
grant_read_buffers(conn);
qd_log(log, QD_LOG_TRACE, "[C%i] Need read buffers", conn->conn_id);
-
- //TODO - No need to call this
- //while (qdr_connection_process(conn->qdr_conn)) {}
break;
}
case PN_RAW_CONNECTION_WAKE: {
@@ -1416,6 +1434,9 @@ qd_http_connector_t *qd_http2_configure_connector(qd_dispatch_t *qd, const qd_ht
static void qdr_http_adaptor_final(void *adaptor_context)
{
qdr_http_adaptor_t *adaptor = (qdr_http_adaptor_t*) adaptor_context;
+ //adaptor->log_source and adaptor->protocol_log_source will be freed in qd_log_finalize() in dispatch.c
+ adaptor->log_source = 0;
+ adaptor->protocol_log_source = 0;
qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
free(adaptor);
http_adaptor = NULL;
diff --git a/src/adaptors/http_adaptor.h b/src/adaptors/http_adaptor.h
index cfbdf73..4869c39 100644
--- a/src/adaptors/http_adaptor.h
+++ b/src/adaptors/http_adaptor.h
@@ -56,9 +56,7 @@ struct qdr_http2_stream_data_t {
qdr_link_t *out_link;
qd_message_t *message;
- qd_composed_field_t *field;
- qd_composed_field_t *header_properties; // This has the header and the properties.
- qd_composed_field_t *app_properties; // This has the application properties.
+ qd_composed_field_t *app_properties;
qd_composed_field_t *body;
qd_message_body_data_t *curr_body_data;
@@ -66,22 +64,11 @@ struct qdr_http2_stream_data_t {
int curr_body_data_buff_offset;
int body_data_buff_count;
- bool entire_header_arrived; // true if all the header properties have arrived, just before the start of the data frame or just before the END_STREAM.
+ bool entire_header_arrived; // true if all the headershave arrived, just before the start of the data frame or just before the END_STREAM.
bool header_sent;
- bool has_body;
- bool has_data; // Did we ever receive a DATA frame.
DEQ_LINKS(qdr_http2_stream_data_t);
- //const char *uri; // The NULL-terminated URI string to retrieve.
- /* The authority portion of the |uri|, not NULL-terminated */
- //char *authority;
- /* The path portion of the |uri|, including query, not NULL-terminated */
- //char *path;
- /* The length of the |authority| */
- //size_t authoritylen;
- /* The length of the |path| */
- //size_t pathlen;
};
struct qdr_http_connection_t {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org