You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2016/12/05 20:16:27 UTC
qpid-dispatch git commit: DISPATCH-583 - Fixed lifecycle management
of proton links and sessions. These are now freed after the collector is done
with all the events
Repository: qpid-dispatch
Updated Branches:
refs/heads/master ba4d5c29d -> d918b7a2c
DISPATCH-583 - Fixed lifecycle management of proton links and sessions. These are now freed after the collector is done with all the events
(cherry picked from commit 5737eb45df2ba94de7dfc7817792e1afe18df73b)
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d918b7a2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d918b7a2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d918b7a2
Branch: refs/heads/master
Commit: d918b7a2c64b394bd6ea91c9e3cc5eddb0c2319a
Parents: ba4d5c2
Author: Ganesh Murthy <gm...@redhat.com>
Authored: Fri Dec 2 16:39:22 2016 -0500
Committer: Ganesh Murthy <gm...@redhat.com>
Committed: Mon Dec 5 15:12:55 2016 -0500
----------------------------------------------------------------------
include/qpid/dispatch/server.h | 15 ++++++-
src/container.c | 86 ++++++++++++++++++++++++++++++++++---
src/server.c | 14 ++++--
src/server_private.h | 12 ++++++
4 files changed, 118 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d918b7a2/include/qpid/dispatch/server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index aebe36e..8544f81 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -477,6 +477,15 @@ typedef int (*qd_pn_event_handler_cb_t)(void *handler_context, void* conn_contex
/**
+ * Post event process handler
+ * Invoke only after all proton events have been popped from the collector.
+ *
+ * @param conn The connection for which all proton events have been popped.
+ */
+typedef void (*qd_pn_event_complete_cb_t)(void *handler_context, qd_connection_t *conn);
+
+
+/**
* Set the connection event handler callback.
*
* Set the connection handler callback for the server. This callback is
@@ -487,7 +496,11 @@ typedef int (*qd_pn_event_handler_cb_t)(void *handler_context, void* conn_contex
* @param pn_event_handler The handler for proton events.
* @param handler_context Context data to associate with the handler.
*/
-void qd_server_set_conn_handler(qd_dispatch_t *qd, qd_conn_handler_cb_t conn_handler, qd_pn_event_handler_cb_t pn_event_handler, void *handler_context);
+void qd_server_set_conn_handler(qd_dispatch_t *qd,
+ qd_conn_handler_cb_t conn_handler,
+ qd_pn_event_handler_cb_t pn_event_handler,
+ qd_pn_event_complete_cb_t pn_event_complete_handler,
+ void *handler_context);
/**
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d918b7a2/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 5103a66..b48b17f 100644
--- a/src/container.c
+++ b/src/container.c
@@ -83,6 +83,8 @@ struct qd_container_t {
qdc_node_type_list_t node_type_list;
};
+ALLOC_DEFINE(qd_pn_free_link_session_t);
+
static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link)
{
qd_node_t *node = container->default_node;
@@ -312,6 +314,80 @@ static int writable_handler(qd_container_t *container, pn_connection_t *conn, qd
return event_count;
}
+/**
+ * Returns true if the free_link already exists in free_link_list, false otherwise
+ */
+static bool link_exists(qd_pn_free_link_session_list_t **free_list, pn_link_t *free_link)
+{
+ qd_pn_free_link_session_t *free_item = DEQ_HEAD(**free_list);
+ while(free_item) {
+ if (free_item->pn_link == free_link)
+ return true;
+ free_item = DEQ_NEXT(free_item);
+ }
+ return false;
+}
+
+/**
+ * Returns true if the free_session already exists in free_session_list, false otherwise
+ */
+static bool session_exists(qd_pn_free_link_session_list_t **free_list, pn_session_t *free_session)
+{
+ qd_pn_free_link_session_t *free_item = DEQ_HEAD(**free_list);
+ while(free_item) {
+ if (free_item->pn_session == free_session)
+ return true;
+ free_item = DEQ_NEXT(free_item);
+ }
+ return false;
+}
+
+static void add_session_to_free_list(qd_pn_free_link_session_list_t *free_link_session_list, pn_session_t *ssn)
+{
+ if (!session_exists(&free_link_session_list, ssn)) {
+ qd_pn_free_link_session_t *to_free = new_qd_pn_free_link_session_t();
+ DEQ_ITEM_INIT(to_free);
+ to_free->pn_session = ssn;
+ to_free->pn_link = 0;
+ DEQ_INSERT_TAIL(*free_link_session_list, to_free);
+ }
+}
+
+
+static void add_link_to_free_list(qd_pn_free_link_session_list_t *free_link_session_list, pn_link_t *pn_link)
+{
+ if (!link_exists(&free_link_session_list, pn_link)) {
+ qd_pn_free_link_session_t *to_free = new_qd_pn_free_link_session_t();
+ DEQ_ITEM_INIT(to_free);
+ to_free->pn_link = pn_link;
+ to_free->pn_session = 0;
+ DEQ_INSERT_TAIL(*free_link_session_list, to_free);
+ }
+
+}
+
+void pn_event_complete_handler(void *handler_context, qd_connection_t *qd_conn)
+{
+ qd_pn_free_link_session_t *to_free_link = DEQ_HEAD(qd_conn->free_link_session_list);
+ qd_pn_free_link_session_t *to_free_session = DEQ_HEAD(qd_conn->free_link_session_list);
+ while(to_free_link) {
+ if (to_free_link->pn_link) {
+ pn_link_free(to_free_link->pn_link);
+ to_free_link->pn_link = 0;
+ }
+ to_free_link = DEQ_NEXT(to_free_link);
+ }
+
+ while(to_free_session) {
+ if (to_free_session->pn_session) {
+ pn_session_free(to_free_session->pn_session);
+ to_free_session->pn_session = 0;
+ }
+ DEQ_REMOVE_HEAD(qd_conn->free_link_session_list);
+ free_qd_pn_free_link_session_t(to_free_session);
+ to_free_session = DEQ_HEAD(qd_conn->free_link_session_list);
+ }
+}
int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *event, qd_connection_t *qd_conn)
{
@@ -360,7 +436,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
case PN_SESSION_LOCAL_CLOSE :
ssn = pn_event_session(event);
if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
- pn_session_free(ssn);
+ add_session_to_free_list(&qd_conn->free_link_session_list,ssn);
}
break;
case PN_SESSION_REMOTE_CLOSE :
@@ -400,7 +476,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
pn_session_close(ssn);
}
else if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
- pn_session_free(ssn);
+ add_session_to_free_list(&qd_conn->free_link_session_list,ssn);
}
}
break;
@@ -464,7 +540,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) {
if (qd_link->close_sess_with_link && sess)
pn_session_close(sess);
- pn_link_free(pn_link);
+ add_link_to_free_list(&qd_conn->free_link_session_list, pn_link);
}
}
}
@@ -474,7 +550,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
case PN_LINK_LOCAL_CLOSE:
pn_link = pn_event_link(event);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
- pn_link_free(pn_link);
+ add_link_to_free_list(&qd_conn->free_link_session_list, pn_link);
}
break;
@@ -548,7 +624,7 @@ qd_container_t *qd_container(qd_dispatch_t *qd)
DEQ_INIT(container->nodes);
DEQ_INIT(container->node_type_list);
- qd_server_set_conn_handler(qd, handler, pn_event_handler, container);
+ qd_server_set_conn_handler(qd, handler, pn_event_handler, pn_event_complete_handler, container);
qd_log(container->log_source, QD_LOG_TRACE, "Container Initialized");
return container;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d918b7a2/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index 557c0b4..40b5699 100644
--- a/src/server.c
+++ b/src/server.c
@@ -366,6 +366,7 @@ qd_connection_t *qd_connection_allocate()
DEQ_ITEM_INIT(ctx);
DEQ_INIT(ctx->deferred_calls);
ctx->deferred_call_lock = sys_mutex();
+ DEQ_INIT(ctx->free_link_session_list);
return ctx;
}
@@ -852,6 +853,11 @@ static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
event = ctx->event_stall ? 0 : pn_collector_peek(collector);
}
+
+ //
+ // Free up any links and sessions that need to be freed since all the events have been popped from the collector.
+ //
+ qd_server->pn_event_complete_handler(qd_server->conn_handler_context, qd_conn);
events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn);
}
} while (events > 0);
@@ -1415,11 +1421,13 @@ void qd_server_free(qd_server_t *qd_server)
void qd_server_set_conn_handler(qd_dispatch_t *qd,
qd_conn_handler_cb_t handler,
qd_pn_event_handler_cb_t pn_event_handler,
+ qd_pn_event_complete_cb_t pn_event_complete_handler,
void *handler_context)
{
- qd->server->conn_handler = handler;
- qd->server->pn_event_handler = pn_event_handler;
- qd->server->conn_handler_context = handler_context;
+ qd->server->conn_handler = handler;
+ qd->server->pn_event_handler = pn_event_handler;
+ qd->server->pn_event_complete_handler = pn_event_complete_handler;
+ qd->server->conn_handler_context = handler_context;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d918b7a2/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index eb23fa1..8c4d89e 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -85,6 +85,14 @@ typedef struct qd_deferred_call_t {
DEQ_DECLARE(qd_deferred_call_t, qd_deferred_call_list_t);
+typedef struct qd_pn_free_link_session_t {
+ DEQ_LINKS(struct qd_pn_free_link_session_t);
+ pn_session_t *pn_session;
+ pn_link_t *pn_link;
+} qd_pn_free_link_session_t;
+
+DEQ_DECLARE(qd_pn_free_link_session_t, qd_pn_free_link_session_list_t);
+
/**
* Connection objects wrap Proton connection objects.
*/
@@ -118,6 +126,7 @@ struct qd_connection_t {
bool event_stall;
bool policy_counted;
char *role; //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc.
+ qd_pn_free_link_session_list_t free_link_session_list;
};
DEQ_DECLARE(qd_connection_t, qd_connection_list_t);
@@ -160,6 +169,7 @@ struct qd_server_t {
qd_thread_start_cb_t start_handler;
qd_conn_handler_cb_t conn_handler;
qd_pn_event_handler_cb_t pn_event_handler;
+ qd_pn_event_complete_cb_t pn_event_complete_handler;
qd_user_fd_handler_cb_t ufd_handler;
void *start_context;
void *conn_handler_context;
@@ -191,5 +201,7 @@ ALLOC_DECLARE(qd_deferred_call_t);
ALLOC_DECLARE(qd_connector_t);
ALLOC_DECLARE(qd_connection_t);
ALLOC_DECLARE(qd_user_fd_t);
+ALLOC_DECLARE(qd_pn_free_link_session_t);
+
#endif
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org