You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/08/24 16:20:40 UTC

[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1743 - Fix for leaking deliveries. Removed unused fields in http_adaptor.h and removed unused code

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
     new fd6773c  DISPATCH-1743 - Fix for leaking deliveries. Removed unused fields in http_adaptor.h and removed unused code
fd6773c is described below

commit fd6773cb32afeda5017084c2265abfd5bc4e1fde
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Mon Aug 24 12:18:44 2020 -0400

    DISPATCH-1743 - Fix for leaking deliveries. Removed unused fields in http_adaptor.h and removed unused code
---
 src/adaptors/http_adaptor.c | 111 ++++++++++++++++++++++++++------------------
 src/adaptors/http_adaptor.h |  17 +------
 2 files changed, 68 insertions(+), 60 deletions(-)

diff --git a/src/adaptors/http_adaptor.c b/src/adaptors/http_adaptor.c
index 8a63095..2c203d7 100644
--- a/src/adaptors/http_adaptor.c
+++ b/src/adaptors/http_adaptor.c
@@ -146,6 +146,29 @@ qd_composed_field_t  *qd_message_compose_amqp(qd_message_t *msg,
     return field;
 }
 
+void free_http2_stream_data(qdr_http2_stream_data_t *stream_data)
+{
+    if (stream_data->in_link)
+        qdr_link_detach(stream_data->in_link, QD_CLOSED, 0);
+    if (stream_data->out_link)
+        qdr_link_detach(stream_data->out_link, QD_CLOSED, 0);
+    free(stream_data->reply_to);
+    qd_compose_free(stream_data->app_properties);
+    qd_compose_free(stream_data->body);
+    free_qdr_http2_stream_data_t(stream_data);
+}
+
+void free_http2_stream(qdr_http2_session_data_t *session_data, int32_t stream_id)
+{
+    if (!stream_id)
+        return;
+
+    qdr_http2_stream_data_t *stream_data = nghttp2_session_get_stream_user_data(session_data->session, stream_id);
+    DEQ_REMOVE(session_data->streams, stream_data);
+    free_http2_stream_data(stream_data);
+    nghttp2_session_set_stream_user_data(session_data->session, stream_id, NULL);
+}
+
 static char *get_address_string(pn_raw_connection_t *pn_raw_conn)
 {
     const pn_netaddr_t *netaddr = pn_raw_connection_remote_addr(pn_raw_conn);
@@ -162,11 +185,22 @@ void free_qdr_http_connection(qdr_http_connection_t* http_conn)
 {
     if(http_conn->remote_address) {
         free(http_conn->remote_address);
+        http_conn->remote_address = 0;
     }
     if (http_conn->activate_timer) {
-        qd_timer_free(http_conn->activate_timer);
+        //qd_timer_free(http_conn->activate_timer);
+    }
+
+    if (!http_conn->ingress) {
+        qdr_http2_stream_data_t *stream_data = qdr_link_get_context(http_conn->stream_dispatcher);
+        free_http2_stream_data(stream_data);
+        qdr_link_detach(http_conn->stream_dispatcher, QD_CLOSED, 0);
+        http_conn->stream_dispatcher = 0;
     }
     nghttp2_session_del(http_conn->session_data->session);
+    http_conn->session_data->session = 0;
+    free_qdr_http2_session_data_t(http_conn->session_data);
+    http_conn->session_data = 0;
     free(http_conn);
 }
 
@@ -182,27 +216,6 @@ static qdr_http2_stream_data_t *create_http2_stream_data(qdr_http2_session_data_
     return stream_data;
 }
 
-void free_http2_stream_data(qdr_http2_session_data_t *session_data, int32_t stream_id)
-{
-    qdr_http2_stream_data_t *stream_data = DEQ_HEAD(session_data->streams);
-    while (stream_data) {
-        if (stream_data->stream_id == stream_id) {
-            DEQ_REMOVE(session_data->streams, stream_data);
-            nghttp2_session_set_stream_user_data(session_data->session, stream_id, NULL);
-
-            //
-            // TODO - Free all stream related data
-            //
-            qdr_link_detach(stream_data->in_link, QD_CLOSED, 0);
-            qdr_link_detach(stream_data->out_link, QD_CLOSED, 0);
-            free(stream_data->reply_to);
-            qd_compose_free(stream_data->app_properties);
-            free_qdr_http2_stream_data_t(stream_data);
-            break;
-        }
-    }
-}
-
 
 static int on_data_chunk_recv_callback(nghttp2_session *session,
                                                    uint8_t flags,
@@ -243,10 +256,10 @@ static int on_stream_close_callback(nghttp2_session *session,
                                     nghttp2_error_code error_code,
                                     void *user_data)
 {
-//    qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
-//    qdr_http2_session_data_t *session_data = conn->session_data;
-//    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 on_stream_close_callback ", conn->conn_id, stream_id);
-//    free_http2_stream_data(session_data, stream_id);
+    qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
+    qdr_http2_session_data_t *session_data = conn->session_data;
+    qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 on_stream_close_callback, freeing stream", conn->conn_id, stream_id);
+    free_http2_stream(session_data, stream_id);
     return 0;
 }
 
@@ -382,6 +395,20 @@ static void compose_and_deliver(qdr_http2_stream_data_t *stream_data, qd_compose
     stream_data->in_dlv = qdr_link_deliver(stream_data->in_link, stream_data->message, 0, false, 0, 0, 0, 0);
     qd_log(http_adaptor->log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"][L%"PRIu64"] Routed delivery dlv:%lx", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity, (long) stream_data->in_dlv);
 
+    if (stream_data->in_dlv) {
+        qdr_delivery_decref(http_adaptor->core, stream_data->in_dlv, "http_adaptor - release protection of return from deliver");
+    } else {
+        //
+        // If there is no delivery, the message is now and will always be unroutable because there is no address.
+        //
+        //qd_bitmask_free(link_exclusions);
+        qd_message_set_discard(qdr_delivery_message(stream_data->in_dlv), true);
+        if (receive_complete) {
+            qd_message_free(qdr_delivery_message(stream_data->in_dlv));
+        }
+    }
+
+
 }
 
 static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_complete)
@@ -398,8 +425,7 @@ static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_co
                                                   conn->config->address,
                                                   0,
                                                   stream_data->reply_to,
-                                                  0,
-                                                  0,
+                                                  0, 0,
                                                   stream_data->stream_id);
             compose_and_deliver(stream_data, header_and_prop, conn, receive_complete);
         }
@@ -408,11 +434,7 @@ static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_co
         if (stream_data->entire_header_arrived) {
             delivery_routed = true;
             header_and_prop = qd_message_compose_amqp(stream_data->message,
-                                                  stream_data->reply_to,
-                                                  0,
-                                                  0,
-                                                  0,
-                                                  0,
+                                                  stream_data->reply_to, 0, 0, 0, 0,
                                                   stream_data->stream_id);
             compose_and_deliver(stream_data, header_and_prop, conn, receive_complete);
         }
@@ -1003,6 +1025,10 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data, qdr_link_t *
 static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled)
 {
     qdr_http2_stream_data_t *stream_data =  (qdr_http2_stream_data_t*)qdr_link_get_context(link);
+
+    if (!stream_data)
+        return 0;
+
     qdr_http_connection_t *conn = stream_data->session_data->conn;
 
     if (link == stream_data->session_data->conn->stream_dispatcher) {
@@ -1044,8 +1070,6 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
                                                      false,
                                                      0,
                                                      &(stream_data->incoming_id));
-
-        // TODO - This is wrong
         qdr_link_set_context(stream_data->in_link, stream_data);
 
         //Let's make an outbound connection to the configured connector.
@@ -1110,8 +1134,6 @@ static int handle_incoming_http(qdr_http_connection_t *conn)
         qd_buffer_free(curr_buf);
     }
 
-    // TODO - Look into reusing the same buffers over and over again.
-    // Andrew - this is where we left off last time.
     grant_read_buffers(conn);
 
     return count;
@@ -1180,32 +1202,28 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
         break;
     }
     case PN_RAW_CONNECTION_CLOSED_READ: {
+        qdr_connection_closed(conn->qdr_conn);
         qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
+        free_qdr_http_connection(conn);
         break;
     }
     case PN_RAW_CONNECTION_CLOSED_WRITE: {
+        qdr_connection_closed(conn->qdr_conn);
         qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id);
+        free_qdr_http_connection(conn);
         break;
     }
     case PN_RAW_CONNECTION_DISCONNECTED: {
-        qdr_connection_closed(conn->qdr_conn);
-        //free_qdr_http_connection(conn);
         qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_DISCONNECTED", conn->conn_id);
         break;
     }
     case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
         qd_log(log, QD_LOG_TRACE, "[C%i] Need write buffers", conn->conn_id);
-
-        //TODO - Refactor this out
-        //while (qdr_connection_process(conn->qdr_conn)) {}
         break;
     }
     case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
         grant_read_buffers(conn);
         qd_log(log, QD_LOG_TRACE, "[C%i] Need read buffers", conn->conn_id);
-
-        //TODO - No need to call this
-        //while (qdr_connection_process(conn->qdr_conn)) {}
         break;
     }
     case PN_RAW_CONNECTION_WAKE: {
@@ -1416,6 +1434,9 @@ qd_http_connector_t *qd_http2_configure_connector(qd_dispatch_t *qd, const qd_ht
 static void qdr_http_adaptor_final(void *adaptor_context)
 {
     qdr_http_adaptor_t *adaptor = (qdr_http_adaptor_t*) adaptor_context;
+    //adaptor->log_source and adaptor->protocol_log_source will be freed in qd_log_finalize() in dispatch.c
+    adaptor->log_source = 0;
+    adaptor->protocol_log_source = 0;
     qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor);
     free(adaptor);
     http_adaptor =  NULL;
diff --git a/src/adaptors/http_adaptor.h b/src/adaptors/http_adaptor.h
index cfbdf73..4869c39 100644
--- a/src/adaptors/http_adaptor.h
+++ b/src/adaptors/http_adaptor.h
@@ -56,9 +56,7 @@ struct qdr_http2_stream_data_t {
     qdr_link_t               *out_link;
 
     qd_message_t             *message;
-    qd_composed_field_t      *field;
-    qd_composed_field_t      *header_properties;  // This has the header and the properties.
-    qd_composed_field_t      *app_properties;     // This has the application properties.
+    qd_composed_field_t      *app_properties;
     qd_composed_field_t      *body;
 
     qd_message_body_data_t        *curr_body_data;
@@ -66,22 +64,11 @@ struct qdr_http2_stream_data_t {
     int                            curr_body_data_buff_offset;
     int                            body_data_buff_count;
 
-    bool                     entire_header_arrived; // true if all the header properties have arrived, just before the start of the data frame or just before the END_STREAM.
+    bool                     entire_header_arrived; // true if all the headershave arrived, just before the start of the data frame or just before the END_STREAM.
     bool                     header_sent;
-    bool                     has_body;
-    bool                     has_data;  // Did we ever receive a DATA frame.
 
 
     DEQ_LINKS(qdr_http2_stream_data_t);
-    //const char *uri; // The NULL-terminated URI string to retrieve.
-    /* The authority portion of the |uri|, not NULL-terminated */
-    //char *authority;
-    /* The path portion of the |uri|, including query, not NULL-terminated */
-    //char *path;
-    /* The length of the |authority| */
-    //size_t authoritylen;
-    /* The length of the |path| */
-    //size_t pathlen;
 };
 
 struct qdr_http_connection_t {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org