You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/11/09 23:11:31 UTC
[2/3] qpid-dispatch git commit: DISPATCH-193 - Updated the Container
API with regard to connection lifecycle.
DISPATCH-193 - Updated the Container API with regard to connection lifecycle.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/511d31bf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/511d31bf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/511d31bf
Branch: refs/heads/tross-DISPATCH-179-1
Commit: 511d31bf6445e591322d4b72a2c7204b90afbced
Parents: 8642a7b
Author: Ted Ross <tr...@redhat.com>
Authored: Mon Nov 9 16:45:44 2015 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Nov 9 16:46:54 2015 -0500
----------------------------------------------------------------------
include/qpid/dispatch/container.h | 11 +++--
include/qpid/dispatch/server.h | 9 ++++
src/container.c | 88 +++++++++++++++++++++++-----------
src/router_node.c | 14 ++++--
src/server.c | 6 +++
5 files changed, 92 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/511d31bf/include/qpid/dispatch/container.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index f8faf9d..c88f796 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -127,11 +127,14 @@ typedef struct {
/** Invoked when an instance of the node type is destroyed. */
qd_container_node_handler_t node_destroyed_handler;
- /** Invoked when an incoming connection (via listener) is established. */
- qd_container_conn_handler_t inbound_conn_open_handler;
+ /** Invoked when an incoming connection (via listener) is opened. */
+ qd_container_conn_handler_t inbound_conn_opened_handler;
- /** Invoked when an outgoing connection (via connector) is established. */
- qd_container_conn_handler_t outbound_conn_open_handler;
+ /** Invoked when an outgoing connection (via connector) is opened. */
+ qd_container_conn_handler_t outbound_conn_opened_handler;
+
+ /** Invoked when a connection is closed. */
+ qd_container_conn_handler_t conn_closed_handler;
} qd_node_type_t;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/511d31bf/include/qpid/dispatch/server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index 3eca75c..2fb237d 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -488,6 +488,15 @@ pn_connection_t *qd_connection_pn(qd_connection_t *conn);
/**
+ * Get the direction of establishment for this connection.
+ *
+ * @param conn Connection object supplied in QD_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @return true if connection came through a listener, false if through a connector.
+ */
+bool qd_connection_inbound(qd_connection_t *conn);
+
+
+/**
* Get the event collector for a connection.
*
* @param conn Connection object supplied in QD_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/511d31bf/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 2be163d..ace94e0 100644
--- a/src/container.c
+++ b/src/container.c
@@ -252,7 +252,62 @@ static void do_updated(pn_delivery_t *pnd)
}
-static int close_handler(void* unused, pn_connection_t *conn, qd_connection_t* qd_conn)
+static void notify_opened(qd_container_t *container, qd_connection_t *conn, void *context)
+{
+ const qd_node_type_t *nt;
+
+ //
+ // Note the locking structure in this function. Generally this would be unsafe, but since
+ // this particular list is only ever appended to and never has items inserted or deleted,
+ // this usage is safe in this case.
+ //
+ sys_mutex_lock(container->lock);
+ qdc_node_type_t *nt_item = DEQ_HEAD(container->node_type_list);
+ sys_mutex_unlock(container->lock);
+
+ while (nt_item) {
+ nt = nt_item->ntype;
+ if (qd_connection_inbound(conn)) {
+ if (nt->inbound_conn_opened_handler)
+ nt->inbound_conn_opened_handler(nt->type_context, conn, context);
+ } else {
+ if (nt->outbound_conn_opened_handler)
+ nt->outbound_conn_opened_handler(nt->type_context, conn, context);
+ }
+
+ sys_mutex_lock(container->lock);
+ nt_item = DEQ_NEXT(nt_item);
+ sys_mutex_unlock(container->lock);
+ }
+}
+
+
+static void notify_closed(qd_container_t *container, qd_connection_t *conn, void *context)
+{
+ const qd_node_type_t *nt;
+
+ //
+ // Note the locking structure in this function. Generally this would be unsafe, but since
+ // this particular list is only ever appended to and never has items inserted or deleted,
+ // this usage is safe in this case.
+ //
+ sys_mutex_lock(container->lock);
+ qdc_node_type_t *nt_item = DEQ_HEAD(container->node_type_list);
+ sys_mutex_unlock(container->lock);
+
+ while (nt_item) {
+ nt = nt_item->ntype;
+ if (nt->conn_closed_handler)
+ nt->conn_closed_handler(nt->type_context, conn, context);
+
+ sys_mutex_lock(container->lock);
+ nt_item = DEQ_NEXT(nt_item);
+ sys_mutex_unlock(container->lock);
+ }
+}
+
+
+static int close_handler(qd_container_t *container, void* conn_context, pn_connection_t *conn, qd_connection_t* qd_conn)
{
qd_connection_manager_connection_closed(qd_conn);
@@ -280,8 +335,9 @@ static int close_handler(void* unused, pn_connection_t *conn, qd_connection_t* q
ssn = pn_session_next(ssn, 0);
}
- // teardown the connection
+ // close the connection
pn_connection_close(conn);
+ notify_closed(container, qd_conn, conn_context);
return 0;
}
@@ -323,6 +379,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
pn_connection_open(conn);
qd_connection_manager_connection_opened(qd_conn);
+ notify_opened(container, qd_conn, conn_context);
break;
case PN_CONNECTION_REMOTE_CLOSE :
@@ -454,31 +511,6 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
static void open_handler(qd_container_t *container, qd_connection_t *conn, qd_direction_t dir, void *context)
{
- const qd_node_type_t *nt;
-
- //
- // Note the locking structure in this function. Generally this would be unsafe, but since
- // this particular list is only ever appended to and never has items inserted or deleted,
- // this usage is safe in this case.
- //
- sys_mutex_lock(container->lock);
- qdc_node_type_t *nt_item = DEQ_HEAD(container->node_type_list);
- sys_mutex_unlock(container->lock);
-
- while (nt_item) {
- nt = nt_item->ntype;
- if (dir == QD_INCOMING) {
- if (nt->inbound_conn_open_handler)
- nt->inbound_conn_open_handler(nt->type_context, conn, context);
- } else {
- if (nt->outbound_conn_open_handler)
- nt->outbound_conn_open_handler(nt->type_context, conn, context);
- }
-
- sys_mutex_lock(container->lock);
- nt_item = DEQ_NEXT(nt_item);
- sys_mutex_unlock(container->lock);
- }
}
@@ -490,7 +522,7 @@ static int handler(void *handler_context, void *conn_context, qd_conn_event_t ev
switch (event) {
case QD_CONN_EVENT_LISTENER_OPEN: open_handler(container, qd_conn, QD_INCOMING, conn_context); break;
case QD_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, qd_conn, QD_OUTGOING, conn_context); break;
- case QD_CONN_EVENT_CLOSE: return close_handler(conn_context, conn, qd_conn);
+ case QD_CONN_EVENT_CLOSE: return close_handler(container, conn_context, conn, qd_conn);
case QD_CONN_EVENT_WRITABLE: return writable_handler(conn_context, conn, qd_conn);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/511d31bf/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 8057f2b..639a9bf 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1663,12 +1663,12 @@ static int router_link_detach_handler(void* context, qd_link_t *link, qd_detach_
}
-static void router_inbound_open_handler(void *type_context, qd_connection_t *conn, void *context)
+static void router_inbound_opened_handler(void *type_context, qd_connection_t *conn, void *context)
{
}
-static void router_outbound_open_handler(void *type_context, qd_connection_t *conn, void *context)
+static void router_outbound_opened_handler(void *type_context, qd_connection_t *conn, void *context)
{
qd_router_t *router = (qd_router_t*) type_context;
@@ -1753,6 +1753,11 @@ static void router_outbound_open_handler(void *type_context, qd_connection_t *co
}
+static void router_closed_handler(void *type_context, qd_connection_t *conn, void *context)
+{
+}
+
+
static void qd_router_timer_handler(void *context)
{
qd_router_t *router = (qd_router_t*) context;
@@ -1776,8 +1781,9 @@ static qd_node_type_t router_node = {"router", 0, 0,
router_link_flow_handler,
0, // node_created_handler
0, // node_destroyed_handler
- router_inbound_open_handler,
- router_outbound_open_handler };
+ router_inbound_opened_handler,
+ router_outbound_opened_handler,
+ router_closed_handler};
static int type_registered = 0;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/511d31bf/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index 96703bc..b7303b2 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1222,6 +1222,12 @@ pn_connection_t *qd_connection_pn(qd_connection_t *conn)
}
+bool qd_connection_inbound(qd_connection_t *conn)
+{
+ return conn->listener != 0;
+}
+
+
pn_collector_t *qd_connection_collector(qd_connection_t *conn)
{
return conn->collector;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org