You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2016/12/05 20:16:27 UTC

qpid-dispatch git commit: DISPATCH-583 - Fixed lifecycle management of proton links and sessions. These are now freed after the collector is done with all the events

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master ba4d5c29d -> d918b7a2c


DISPATCH-583 - Fixed lifecycle management of proton links and sessions. These are now freed after the collector is done with all the events

(cherry picked from commit 5737eb45df2ba94de7dfc7817792e1afe18df73b)


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d918b7a2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d918b7a2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d918b7a2

Branch: refs/heads/master
Commit: d918b7a2c64b394bd6ea91c9e3cc5eddb0c2319a
Parents: ba4d5c2
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Fri Dec 2 16:39:22 2016 -0500
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Mon Dec 5 15:12:55 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/server.h | 15 ++++++-
 src/container.c                | 86 ++++++++++++++++++++++++++++++++++---
 src/server.c                   | 14 ++++--
 src/server_private.h           | 12 ++++++
 4 files changed, 118 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d918b7a2/include/qpid/dispatch/server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index aebe36e..8544f81 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -477,6 +477,15 @@ typedef int (*qd_pn_event_handler_cb_t)(void *handler_context, void* conn_contex
 
 
 /**
+ * Post event process handler
+ * Invoke only after all proton events have been popped from the collector.
+ *
+ * @param conn The connection for which all proton events have been popped.
+ */
+typedef void (*qd_pn_event_complete_cb_t)(void *handler_context, qd_connection_t *conn);
+
+
+/**
  * Set the connection event handler callback.
  *
  * Set the connection handler callback for the server.  This callback is
@@ -487,7 +496,11 @@ typedef int (*qd_pn_event_handler_cb_t)(void *handler_context, void* conn_contex
  * @param pn_event_handler The handler for proton events.
  * @param handler_context Context data to associate with the handler.
  */
-void qd_server_set_conn_handler(qd_dispatch_t *qd, qd_conn_handler_cb_t conn_handler, qd_pn_event_handler_cb_t pn_event_handler, void *handler_context);
+void qd_server_set_conn_handler(qd_dispatch_t *qd,
+                                qd_conn_handler_cb_t conn_handler,
+                                qd_pn_event_handler_cb_t pn_event_handler,
+                                qd_pn_event_complete_cb_t pn_event_complete_handler,
+                                void *handler_context);
 
 
 /**

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d918b7a2/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 5103a66..b48b17f 100644
--- a/src/container.c
+++ b/src/container.c
@@ -83,6 +83,8 @@ struct qd_container_t {
     qdc_node_type_list_t  node_type_list;
 };
 
+ALLOC_DEFINE(qd_pn_free_link_session_t);
+
 static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link)
 {
     qd_node_t *node = container->default_node;
@@ -312,6 +314,80 @@ static int writable_handler(qd_container_t *container, pn_connection_t *conn, qd
     return event_count;
 }
 
+/**
+ * Returns true if the free_link already exists in free_link_list, false otherwise
+ */
+static bool link_exists(qd_pn_free_link_session_list_t  **free_list, pn_link_t *free_link)
+{
+    qd_pn_free_link_session_t *free_item = DEQ_HEAD(**free_list);
+    while(free_item) {
+        if (free_item->pn_link == free_link)
+            return true;
+        free_item = DEQ_NEXT(free_item);
+    }
+    return false;
+}
+
+/**
+ * Returns true if the free_session already exists in free_session_list, false otherwise
+ */
+static bool session_exists(qd_pn_free_link_session_list_t  **free_list, pn_session_t *free_session)
+{
+    qd_pn_free_link_session_t *free_item = DEQ_HEAD(**free_list);
+    while(free_item) {
+        if (free_item->pn_session == free_session)
+            return true;
+        free_item = DEQ_NEXT(free_item);
+    }
+    return false;
+}
+
+static void add_session_to_free_list(qd_pn_free_link_session_list_t  *free_link_session_list, pn_session_t *ssn)
+{
+    if (!session_exists(&free_link_session_list, ssn)) {
+        qd_pn_free_link_session_t *to_free = new_qd_pn_free_link_session_t();
+        DEQ_ITEM_INIT(to_free);
+        to_free->pn_session = ssn;
+        to_free->pn_link = 0;
+        DEQ_INSERT_TAIL(*free_link_session_list, to_free);
+    }
+}
+
+
+static void add_link_to_free_list(qd_pn_free_link_session_list_t  *free_link_session_list, pn_link_t *pn_link)
+{
+    if (!link_exists(&free_link_session_list, pn_link)) {
+        qd_pn_free_link_session_t *to_free = new_qd_pn_free_link_session_t();
+        DEQ_ITEM_INIT(to_free);
+        to_free->pn_link = pn_link;
+        to_free->pn_session = 0;
+        DEQ_INSERT_TAIL(*free_link_session_list, to_free);
+    }
+
+}
+
+void pn_event_complete_handler(void *handler_context, qd_connection_t *qd_conn)
+{
+    qd_pn_free_link_session_t *to_free_link = DEQ_HEAD(qd_conn->free_link_session_list);
+    qd_pn_free_link_session_t *to_free_session = DEQ_HEAD(qd_conn->free_link_session_list);
+    while(to_free_link) {
+        if (to_free_link->pn_link) {
+            pn_link_free(to_free_link->pn_link);
+            to_free_link->pn_link = 0;
+        }
+        to_free_link = DEQ_NEXT(to_free_link);
+    }
+
+    while(to_free_session) {
+        if (to_free_session->pn_session) {
+            pn_session_free(to_free_session->pn_session);
+            to_free_session->pn_session = 0;
+        }
+        DEQ_REMOVE_HEAD(qd_conn->free_link_session_list);
+        free_qd_pn_free_link_session_t(to_free_session);
+        to_free_session = DEQ_HEAD(qd_conn->free_link_session_list);
+    }
+}
 
 int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *event, qd_connection_t *qd_conn)
 {
@@ -360,7 +436,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
     case PN_SESSION_LOCAL_CLOSE :
         ssn = pn_event_session(event);
         if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
-            pn_session_free(ssn);
+            add_session_to_free_list(&qd_conn->free_link_session_list,ssn);
         }
         break;
     case PN_SESSION_REMOTE_CLOSE :
@@ -400,7 +476,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
                 pn_session_close(ssn);
             }
             else if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
-                pn_session_free(ssn);
+                add_session_to_free_list(&qd_conn->free_link_session_list,ssn);
             }
         }
         break;
@@ -464,7 +540,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
                 if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) {
                     if (qd_link->close_sess_with_link && sess)
                         pn_session_close(sess);
-                    pn_link_free(pn_link);
+                    add_link_to_free_list(&qd_conn->free_link_session_list, pn_link);
                 }
             }
         }
@@ -474,7 +550,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
     case PN_LINK_LOCAL_CLOSE:
         pn_link = pn_event_link(event);
         if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
-            pn_link_free(pn_link);
+            add_link_to_free_list(&qd_conn->free_link_session_list, pn_link);
         }
         break;
 
@@ -548,7 +624,7 @@ qd_container_t *qd_container(qd_dispatch_t *qd)
     DEQ_INIT(container->nodes);
     DEQ_INIT(container->node_type_list);
 
-    qd_server_set_conn_handler(qd, handler, pn_event_handler, container);
+    qd_server_set_conn_handler(qd, handler, pn_event_handler, pn_event_complete_handler, container);
 
     qd_log(container->log_source, QD_LOG_TRACE, "Container Initialized");
     return container;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d918b7a2/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index 557c0b4..40b5699 100644
--- a/src/server.c
+++ b/src/server.c
@@ -366,6 +366,7 @@ qd_connection_t *qd_connection_allocate()
     DEQ_ITEM_INIT(ctx);
     DEQ_INIT(ctx->deferred_calls);
     ctx->deferred_call_lock = sys_mutex();
+    DEQ_INIT(ctx->free_link_session_list);
     return ctx;
 }
 
@@ -852,6 +853,11 @@ static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
 
                 event = ctx->event_stall ? 0 : pn_collector_peek(collector);
             }
+
+            //
+            // Free up any links and sessions that need to be freed since all the events have been popped from the collector.
+            //
+            qd_server->pn_event_complete_handler(qd_server->conn_handler_context, qd_conn);
             events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn);
         }
     } while (events > 0);
@@ -1415,11 +1421,13 @@ void qd_server_free(qd_server_t *qd_server)
 void qd_server_set_conn_handler(qd_dispatch_t            *qd,
                                 qd_conn_handler_cb_t      handler,
                                 qd_pn_event_handler_cb_t  pn_event_handler,
+                                qd_pn_event_complete_cb_t pn_event_complete_handler,
                                 void                     *handler_context)
 {
-    qd->server->conn_handler         = handler;
-    qd->server->pn_event_handler     = pn_event_handler;
-    qd->server->conn_handler_context = handler_context;
+    qd->server->conn_handler              = handler;
+    qd->server->pn_event_handler          = pn_event_handler;
+    qd->server->pn_event_complete_handler = pn_event_complete_handler;
+    qd->server->conn_handler_context      = handler_context;
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d918b7a2/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index eb23fa1..8c4d89e 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -85,6 +85,14 @@ typedef struct qd_deferred_call_t {
 
 DEQ_DECLARE(qd_deferred_call_t, qd_deferred_call_list_t);
 
+typedef struct qd_pn_free_link_session_t {
+    DEQ_LINKS(struct qd_pn_free_link_session_t);
+    pn_session_t *pn_session;
+    pn_link_t    *pn_link;
+} qd_pn_free_link_session_t;
+
+DEQ_DECLARE(qd_pn_free_link_session_t, qd_pn_free_link_session_list_t);
+
 /**
  * Connection objects wrap Proton connection objects.
  */
@@ -118,6 +126,7 @@ struct qd_connection_t {
     bool                      event_stall;
     bool                      policy_counted;
     char                     *role;  //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc.
+    qd_pn_free_link_session_list_t  free_link_session_list;
 };
 
 DEQ_DECLARE(qd_connection_t, qd_connection_list_t);
@@ -160,6 +169,7 @@ struct qd_server_t {
     qd_thread_start_cb_t      start_handler;
     qd_conn_handler_cb_t      conn_handler;
     qd_pn_event_handler_cb_t  pn_event_handler;
+    qd_pn_event_complete_cb_t pn_event_complete_handler;
     qd_user_fd_handler_cb_t   ufd_handler;
     void                     *start_context;
     void                     *conn_handler_context;
@@ -191,5 +201,7 @@ ALLOC_DECLARE(qd_deferred_call_t);
 ALLOC_DECLARE(qd_connector_t);
 ALLOC_DECLARE(qd_connection_t);
 ALLOC_DECLARE(qd_user_fd_t);
+ALLOC_DECLARE(qd_pn_free_link_session_t);
+
 
 #endif


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org