You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/08/25 13:27:14 UTC

[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1743: Freed all related objects on connection close

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
     new 167aeea  DISPATCH-1743: Freed all related objects on connection close
167aeea is described below

commit 167aeead47c4182774b459c030f32ed5c71eefb2
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Tue Aug 25 09:26:08 2020 -0400

    DISPATCH-1743: Freed all related objects on connection close
---
 src/adaptors/http_adaptor.c | 59 ++++++++++++++++++++++++++++-----------------
 src/adaptors/http_adaptor.h | 22 +++++++----------
 2 files changed, 46 insertions(+), 35 deletions(-)

diff --git a/src/adaptors/http_adaptor.c b/src/adaptors/http_adaptor.c
index 2c203d7..2a66518 100644
--- a/src/adaptors/http_adaptor.c
+++ b/src/adaptors/http_adaptor.c
@@ -41,15 +41,8 @@ const char *CONTENT_ENCODING = "content-encoding";
 
 #define READ_BUFFERS 4
 #define WRITE_BUFFERS 4
-#define DEBUGBUILD 1
 #define ARRLEN(x) (sizeof(x) / sizeof(x[0]))
 
-#define MAKE_NV2(NAME, VALUE)                                                  \
-{                                                                              \
-    (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, sizeof(VALUE) - 1,    \
-        NGHTTP2_NV_FLAG_NONE                                                   \
-}
-
 ALLOC_DEFINE(qdr_http2_session_data_t);
 ALLOC_DEFINE(qdr_http2_stream_data_t);
 
@@ -148,6 +141,7 @@ qd_composed_field_t  *qd_message_compose_amqp(qd_message_t *msg,
 
 void free_http2_stream_data(qdr_http2_stream_data_t *stream_data)
 {
+    stream_data->session_data = 0;
     if (stream_data->in_link)
         qdr_link_detach(stream_data->in_link, QD_CLOSED, 0);
     if (stream_data->out_link)
@@ -191,12 +185,23 @@ void free_qdr_http_connection(qdr_http_connection_t* http_conn)
         //qd_timer_free(http_conn->activate_timer);
     }
 
+    qdr_http2_stream_data_t *stream_data = 0;
+
     if (!http_conn->ingress) {
-        qdr_http2_stream_data_t *stream_data = qdr_link_get_context(http_conn->stream_dispatcher);
+        stream_data = qdr_link_get_context(http_conn->stream_dispatcher);
         free_http2_stream_data(stream_data);
         qdr_link_detach(http_conn->stream_dispatcher, QD_CLOSED, 0);
         http_conn->stream_dispatcher = 0;
     }
+
+    // Free all the stream data associated with this connection/session.
+    stream_data = DEQ_HEAD(http_conn->session_data->streams);
+    while (stream_data) {
+        DEQ_REMOVE_HEAD(http_conn->session_data->streams);
+        free_http2_stream_data(stream_data);
+        stream_data = DEQ_HEAD(http_conn->session_data->streams);
+    }
+
     nghttp2_session_del(http_conn->session_data->session);
     http_conn->session_data->session = 0;
     free_qdr_http2_session_data_t(http_conn->session_data);
@@ -273,7 +278,6 @@ static ssize_t send_callback(nghttp2_session *session,
     qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
     qdr_http2_session_data_t *session_data = conn->session_data;
     qd_buffer_list_append(&(session_data->buffs), (uint8_t *)data, length);
-
     qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] HTTP2 send_callback data length %zu, DEQ_SIZE(session_data->buffs)=%zu", conn->conn_id, length, DEQ_SIZE(session_data->buffs));
     return (ssize_t)length;
 }
@@ -287,16 +291,18 @@ static int on_begin_headers_callback(nghttp2_session *session,
 {
     qdr_http_connection_t *conn = (qdr_http_connection_t *)user_data;
     qdr_http2_session_data_t *session_data = conn->session_data;
+    qdr_http2_stream_data_t *stream_data = 0;
 
     if (frame->hd.type == NGHTTP2_HEADERS) {
         if(frame->headers.cat == NGHTTP2_HCAT_REQUEST && conn->ingress) {
             // This is a brand new request.
             int32_t stream_id = frame->hd.stream_id;
             qdr_terminus_t *target = qdr_terminus(0);
-            qdr_http2_stream_data_t *stream_data = create_http2_stream_data(session_data, stream_id);
+            stream_data = create_http2_stream_data(session_data, stream_id);
             qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i] Processing incoming HTTP2 stream with id %"PRId32"", conn->conn_id, stream_id);
+
             //
-            // For every stream create  -
+            // For every single stream in the same connection, create  -
             // 1. sending link with the configured address as the target
             //
             qdr_terminus_set_address(target, conn->config->address);
@@ -312,7 +318,11 @@ static int on_begin_headers_callback(nghttp2_session *session,
             qdr_link_set_context(stream_data->in_link, stream_data);
 
             //
-            // 2. dynamic receiver on which to receive back the response data for that stream
+            // 2. dynamic receiver on which to receive back the response data for that stream.
+            //
+
+            //
+            // TODO - Why not create something like a UUID and prefix it with the router id ?
             //
             qdr_terminus_t *dynamic_source = qdr_terminus(0);
             qdr_terminus_set_dynamic(dynamic_source);
@@ -329,12 +339,15 @@ static int on_begin_headers_callback(nghttp2_session *session,
         }
         else if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
             int32_t stream_id = frame->hd.stream_id;
-            qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t *)nghttp2_session_get_stream_user_data(session_data->session, stream_id);
+            stream_data = (qdr_http2_stream_data_t *)nghttp2_session_get_stream_user_data(session_data->session, stream_id);
             stream_data->message = qd_message();
         }
     }
 
-    //Andrew next Monday
+    if (stream_data) {
+        stream_data->app_properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
+        qd_compose_start_map(stream_data->app_properties);
+    }
 
     return 0;
 }
@@ -359,10 +372,7 @@ static int on_header_callback(nghttp2_session *session,
 
     switch (frame->hd.type) {
         case NGHTTP2_HEADERS: {
-            if (!stream_data->app_properties) {
-                stream_data->app_properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
-                qd_compose_start_map(stream_data->app_properties);
-            }
+            // Andrew next Monday
             qd_compose_insert_string_n(stream_data->app_properties, (const char *)name, namelen);
             qd_compose_insert_string_n(stream_data->app_properties, (const char *)value, valuelen);
             qd_log(http_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%i][S%"PRId32"] HTTP2 HEADER Incoming [%s=%s]", conn->conn_id, stream_data->stream_id, (char *)name, (char *)value);
@@ -1110,6 +1120,10 @@ static int handle_incoming_http(qdr_http_connection_t *conn)
     pn_raw_buffer_t raw_buffers[READ_BUFFERS];
     size_t n;
     int count = 0;
+
+    if (!conn->pn_raw_conn)
+        return 0;
+
     while ( (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) {
         for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) {
             qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
@@ -1202,19 +1216,19 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
         break;
     }
     case PN_RAW_CONNECTION_CLOSED_READ: {
-        qdr_connection_closed(conn->qdr_conn);
+        pn_raw_connection_close(conn->pn_raw_conn);
+        conn->pn_raw_conn = 0;
         qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
-        free_qdr_http_connection(conn);
         break;
     }
     case PN_RAW_CONNECTION_CLOSED_WRITE: {
-        qdr_connection_closed(conn->qdr_conn);
         qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id);
-        free_qdr_http_connection(conn);
         break;
     }
     case PN_RAW_CONNECTION_DISCONNECTED: {
         qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_DISCONNECTED", conn->conn_id);
+        qdr_connection_closed(conn->qdr_conn);
+        free_qdr_http_connection(conn);
         break;
     }
     case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
@@ -1232,6 +1246,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
         break;
     }
     case PN_RAW_CONNECTION_READ: {
+        qd_log(log, QD_LOG_TRACE, "[C%i] PN_RAW_CONNECTION_READ", conn->conn_id);
         int read = handle_incoming_http(conn);
         qd_log(log, QD_LOG_TRACE, "[C%i] Read %i bytes", conn->conn_id, read);
         while (qdr_connection_process(conn->qdr_conn)) {}
diff --git a/src/adaptors/http_adaptor.h b/src/adaptors/http_adaptor.h
index 4869c39..53a05b1 100644
--- a/src/adaptors/http_adaptor.h
+++ b/src/adaptors/http_adaptor.h
@@ -36,14 +36,13 @@ typedef struct qdr_http_connection_t    qdr_http_connection_t;
 DEQ_DECLARE(qdr_http2_stream_data_t, qd_http2_stream_data_list_t);
 
 struct qdr_http2_session_data_t {
-    qd_http2_stream_data_list_t  streams;    // A session can have many streams.
+    qdr_http_connection_t       *conn;       // Connection associated with the session_data
     nghttp2_session             *session;    // A pointer to the nghttp2s' session object
+    qd_http2_stream_data_list_t  streams;    // A session can have many streams.
     qd_buffer_list_t             buffs;      // Buffers for writing
-    qdr_http_connection_t       *conn;       // Connection associated with the session_data
 };
 
 struct qdr_http2_stream_data_t {
-    int32_t                   stream_id;
     qdr_http2_session_data_t *session_data;
     char                     *reply_to;
     qdr_delivery_t           *in_dlv;
@@ -51,24 +50,21 @@ struct qdr_http2_stream_data_t {
     uint64_t                  incoming_id;
     uint64_t                  outgoing_id;
     uint64_t                  disposition;
-
     qdr_link_t               *in_link;
     qdr_link_t               *out_link;
-
     qd_message_t             *message;
     qd_composed_field_t      *app_properties;
     qd_composed_field_t      *body;
+    qd_message_body_data_t   *curr_body_data;
+    DEQ_LINKS(qdr_http2_stream_data_t);
 
-    qd_message_body_data_t        *curr_body_data;
     qd_message_body_data_result_t  curr_body_data_result;
     int                            curr_body_data_buff_offset;
     int                            body_data_buff_count;
+    int32_t                        stream_id;
 
     bool                     entire_header_arrived; // true if all the headershave arrived, just before the start of the data frame or just before the END_STREAM.
     bool                     header_sent;
-
-
-    DEQ_LINKS(qdr_http2_stream_data_t);
 };
 
 struct qdr_http_connection_t {
@@ -76,7 +72,6 @@ struct qdr_http_connection_t {
     qdr_connection_t        *qdr_conn;
     pn_raw_connection_t     *pn_raw_conn;
     pn_raw_buffer_t          read_buffers[4];
-    bool                     ingress;
     qd_timer_t              *activate_timer;
     qd_http_bridge_config_t *config;
     qd_server_t             *server;
@@ -87,12 +82,13 @@ struct qdr_http_connection_t {
     char                    *remote_address;
     qdr_link_t              *stream_dispatcher;
     uint64_t                 stream_dispatcher_id;
-    bool                     connection_established;
-    bool                     grant_initial_buffers;
     qdr_http2_stream_data_t *initial_stream;
     char                     *reply_to;
     nghttp2_data_provider    data_prd;
-};
+    bool                     connection_established;
+    bool                     grant_initial_buffers;
+    bool                     ingress;
+ };
 
 ALLOC_DECLARE(qdr_http2_session_data_t);
 ALLOC_DECLARE(qdr_http2_stream_data_t);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org