You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/06/08 17:53:47 UTC
svn commit: r1684210 - in /qpid/dispatch/trunk:
include/qpid/dispatch/server.h src/container.c src/server.c
src/server_private.h
Author: tross
Date: Mon Jun 8 15:53:46 2015
New Revision: 1684210
URL: http://svn.apache.org/r1684210
Log:
DISPATCH-142 - Move proton event handling from container down to server.
Modified:
qpid/dispatch/trunk/include/qpid/dispatch/server.h
qpid/dispatch/trunk/src/container.c
qpid/dispatch/trunk/src/server.c
qpid/dispatch/trunk/src/server_private.h
Modified: qpid/dispatch/trunk/include/qpid/dispatch/server.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/server.h?rev=1684210&r1=1684209&r2=1684210&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/server.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/server.h Mon Jun 8 15:53:46 2015
@@ -21,6 +21,7 @@
#include <qpid/dispatch/dispatch.h>
#include <proton/engine.h>
+#include <proton/event.h>
/**@file
* Control server threads, signals and connections.
@@ -224,8 +225,8 @@ typedef enum {
/// The connection was closed at the transport level (not cleanly).
QD_CONN_EVENT_CLOSE,
- /// The connection requires processing.
- QD_CONN_EVENT_PROCESS
+ /// The connection is writable
+ QD_CONN_EVENT_WRITABLE
} qd_conn_event_t;
@@ -367,6 +368,19 @@ typedef struct qd_server_config_t {
*/
typedef int (*qd_conn_handler_cb_t)(void *handler_context, void* conn_context, qd_conn_event_t event, qd_connection_t *conn);
+/**
+ * Proton Event Handler
+ *
+ * This callback is invoked when proton events for a connection require
+ * processing.
+ *
+ * @param handler_context The handler context supplied in qd_server_set_conn_handler.
+ * @param conn_context The handler context supplied in qd_server_{connect,listen}.
+ * @param event The proton event being raised.
+ * @param conn The connection associated with this proton event.
+ */
+typedef int (*qd_pn_event_handler_cb_t)(void *handler_context, void* conn_context, pn_event_t *event, qd_connection_t *conn);
+
/**
* Set the connection event handler callback.
@@ -376,9 +390,10 @@ typedef int (*qd_conn_handler_cb_t)(void
*
* @param qd The dispatch handle returned by qd_dispatch.
* @param conn_handler The handler for processing connection-related events.
+ * @param pn_event_handler The handler for proton events.
* @param handler_context Context data to associate with the handler.
*/
-void qd_server_set_conn_handler(qd_dispatch_t *qd, qd_conn_handler_cb_t conn_handler, void *handler_context);
+void qd_server_set_conn_handler(qd_dispatch_t *qd, qd_conn_handler_cb_t conn_handler, qd_pn_event_handler_cb_t pn_event_handler, void *handler_context);
/**
Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1684210&r1=1684209&r2=1684210&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Mon Jun 8 15:53:46 2015
@@ -269,128 +269,10 @@ static int close_handler(void* unused, p
}
-static int process_handler(qd_container_t *container, void* unused, qd_connection_t *qd_conn)
+static int writable_handler(void* unused, pn_connection_t *conn, qd_connection_t* qd_conn)
{
- pn_session_t *ssn;
- pn_link_t *pn_link;
- qd_link_t *qd_link;
- pn_delivery_t *delivery;
- pn_collector_t *collector = qd_connection_collector(qd_conn);
- pn_connection_t *conn = qd_connection_pn(qd_conn);
- pn_event_t *event;
- int event_count = 0;
-
- //
- // Spin through the collected events for this connection and process them
- // individually.
- //
- event = pn_collector_peek(collector);
- while (event) {
- event_count++;
-
- switch (pn_event_type(event)) {
- case PN_CONNECTION_REMOTE_OPEN :
- if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
- pn_connection_open(conn);
- qd_connection_manager_connection_opened(qd_conn);
- break;
-
- case PN_CONNECTION_REMOTE_CLOSE :
- if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED))
- pn_connection_close(conn);
- qd_connection_manager_connection_closed(qd_conn);
- break;
-
- case PN_SESSION_REMOTE_OPEN :
- ssn = pn_event_session(event);
- if (pn_session_state(ssn) & PN_LOCAL_UNINIT) {
- pn_session_set_incoming_capacity(ssn, 1000000);
- pn_session_open(ssn);
- }
- break;
-
- case PN_SESSION_REMOTE_CLOSE :
- ssn = pn_event_session(event);
- if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
- // remote has nuked our session. Check for any links that were
- // left open and forcibly detach them, since no detaches will
- // arrive on this session.
- pn_connection_t *conn = pn_session_connection(ssn);
- pn_link_t *pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
- while (pn_link) {
- if (pn_link_session(pn_link) == ssn) {
- qd_link_t *qd_link = (qd_link_t *)pn_link_get_context(pn_link);
- if (qd_link && qd_link->node) {
- qd_log(container->log_source, QD_LOG_NOTICE,
- "Aborting link '%s' due to parent session end",
- pn_link_name(pn_link));
- qd_link->node->ntype->link_detach_handler(qd_link->node->context,
- qd_link, 1); // assume
- // closed?
- pn_link_close(pn_link);
- pn_link_free(pn_link);
- }
- }
- pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
- }
- pn_session_close(ssn);
- pn_session_free(ssn);
- }
- break;
-
- case PN_LINK_REMOTE_OPEN :
- pn_link = pn_event_link(event);
- if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) {
- if (pn_link_is_sender(pn_link))
- setup_outgoing_link(container, pn_link);
- else
- setup_incoming_link(container, pn_link);
- } else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE)
- handle_link_open(container, pn_link);
- break;
-
- case PN_LINK_REMOTE_CLOSE :
- pn_link = pn_event_link(event);
- if (pn_link_state(pn_link) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
- qd_link = (qd_link_t*) pn_link_get_context(pn_link);
- qd_node_t *node = qd_link->node;
- if (node)
- node->ntype->link_detach_handler(node->context, qd_link, 1); // TODO - get 'closed' from detach message
- pn_link_close(pn_link);
- pn_link_free(pn_link);
- }
- break;
-
- case PN_LINK_FINAL :
- pn_link = pn_event_link(event);
- qd_link = (qd_link_t*) pn_link_get_context(pn_link);
- break;
-
- case PN_LINK_FLOW :
- pn_link = pn_event_link(event);
- qd_link = (qd_link_t*) pn_link_get_context(pn_link);
- if (qd_link && qd_link->node && qd_link->node->ntype->link_flow_handler)
- qd_link->node->ntype->link_flow_handler(qd_link->node->context, qd_link);
- break;
-
- case PN_DELIVERY :
- delivery = pn_event_delivery(event);
- if (pn_delivery_readable(delivery))
- do_receive(delivery);
-
- if (pn_delivery_updated(delivery)) {
- do_updated(delivery);
- pn_delivery_clear(delivery);
- }
- break;
-
- default :
- break;
- }
-
- pn_collector_pop(collector);
- event = pn_collector_peek(collector);
- }
+ pn_link_t *pn_link;
+ int event_count = 0;
//
// Call the attached node's writable handler for all active links
@@ -406,11 +288,123 @@ static int process_handler(qd_container_
pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
}
-
return event_count;
}
+int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *event, qd_connection_t *qd_conn)
+{
+ qd_container_t *container = (qd_container_t*) handler_context;
+ pn_connection_t *conn = qd_connection_pn(qd_conn);
+ pn_session_t *ssn;
+ pn_link_t *pn_link;
+ qd_link_t *qd_link;
+ pn_delivery_t *delivery;
+
+ switch (pn_event_type(event)) {
+ case PN_CONNECTION_REMOTE_OPEN :
+ if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
+ pn_connection_open(conn);
+ qd_connection_manager_connection_opened(qd_conn);
+ break;
+
+ case PN_CONNECTION_REMOTE_CLOSE :
+ if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED))
+ pn_connection_close(conn);
+ qd_connection_manager_connection_closed(qd_conn);
+ break;
+
+ case PN_SESSION_REMOTE_OPEN :
+ ssn = pn_event_session(event);
+ if (pn_session_state(ssn) & PN_LOCAL_UNINIT) {
+ pn_session_set_incoming_capacity(ssn, 1000000);
+ pn_session_open(ssn);
+ }
+ break;
+
+ case PN_SESSION_REMOTE_CLOSE :
+ ssn = pn_event_session(event);
+ if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+ // remote has nuked our session. Check for any links that were
+ // left open and forcibly detach them, since no detaches will
+ // arrive on this session.
+ pn_connection_t *conn = pn_session_connection(ssn);
+ pn_link_t *pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ while (pn_link) {
+ if (pn_link_session(pn_link) == ssn) {
+ qd_link_t *qd_link = (qd_link_t *)pn_link_get_context(pn_link);
+ if (qd_link && qd_link->node) {
+ qd_log(container->log_source, QD_LOG_NOTICE,
+ "Aborting link '%s' due to parent session end",
+ pn_link_name(pn_link));
+ qd_link->node->ntype->link_detach_handler(qd_link->node->context,
+ qd_link, 1); // assume
+ // closed?
+ pn_link_close(pn_link);
+ pn_link_free(pn_link);
+ }
+ }
+ pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ }
+ pn_session_close(ssn);
+ pn_session_free(ssn);
+ }
+ break;
+
+ case PN_LINK_REMOTE_OPEN :
+ pn_link = pn_event_link(event);
+ if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) {
+ if (pn_link_is_sender(pn_link))
+ setup_outgoing_link(container, pn_link);
+ else
+ setup_incoming_link(container, pn_link);
+ } else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE)
+ handle_link_open(container, pn_link);
+ break;
+
+ case PN_LINK_REMOTE_CLOSE :
+ pn_link = pn_event_link(event);
+ if (pn_link_state(pn_link) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+ qd_link = (qd_link_t*) pn_link_get_context(pn_link);
+ qd_node_t *node = qd_link->node;
+ if (node)
+ node->ntype->link_detach_handler(node->context, qd_link, 1); // TODO - get 'closed' from detach message
+ pn_link_close(pn_link);
+ pn_link_free(pn_link);
+ }
+ break;
+
+ case PN_LINK_FINAL :
+ pn_link = pn_event_link(event);
+ qd_link = (qd_link_t*) pn_link_get_context(pn_link);
+ break;
+
+ case PN_LINK_FLOW :
+ pn_link = pn_event_link(event);
+ qd_link = (qd_link_t*) pn_link_get_context(pn_link);
+ if (qd_link && qd_link->node && qd_link->node->ntype->link_flow_handler)
+ qd_link->node->ntype->link_flow_handler(qd_link->node->context, qd_link);
+ break;
+
+ case PN_DELIVERY :
+ delivery = pn_event_delivery(event);
+ if (pn_delivery_readable(delivery))
+ do_receive(delivery);
+
+ if (pn_delivery_updated(delivery)) {
+ do_updated(delivery);
+ pn_delivery_clear(delivery);
+ }
+ break;
+
+ default :
+ break;
+ }
+
+ return 1;
+}
+
+
static void open_handler(qd_container_t *container, qd_connection_t *conn, qd_direction_t dir, void *context)
{
const qd_node_type_t *nt;
@@ -452,7 +446,7 @@ static int handler(void *handler_context
case QD_CONN_EVENT_LISTENER_OPEN: open_handler(container, qd_conn, QD_INCOMING, conn_context); break;
case QD_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, qd_conn, QD_OUTGOING, conn_context); break;
case QD_CONN_EVENT_CLOSE: return close_handler(conn_context, conn, qd_conn);
- case QD_CONN_EVENT_PROCESS: return process_handler(container, conn_context, qd_conn);
+ case QD_CONN_EVENT_WRITABLE: return writable_handler(conn_context, conn, qd_conn);
}
return 0;
@@ -473,7 +467,7 @@ qd_container_t *qd_container(qd_dispatch
DEQ_INIT(container->nodes);
DEQ_INIT(container->node_type_list);
- qd_server_set_conn_handler(qd, handler, container);
+ qd_server_set_conn_handler(qd, handler, pn_event_handler, container);
qd_log(container->log_source, QD_LOG_TRACE, "Container Initialized");
return container;
Modified: qpid/dispatch/trunk/src/server.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1684210&r1=1684209&r2=1684210&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Mon Jun 8 15:53:46 2015
@@ -358,15 +358,13 @@ static int process_connector(qd_server_t
pn_sasl_outcome(sasl) == PN_SASL_SKIPPED) {
ctx->state = CONN_STATE_OPERATIONAL;
- qd_conn_event_t ce = QD_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+ qd_conn_event_t ce = QD_CONN_EVENT_LISTENER_OPEN;
- if (ctx->listener) {
- ce = QD_CONN_EVENT_LISTENER_OPEN;
- } else if (ctx->connector) {
+ if (ctx->connector) {
ce = QD_CONN_EVENT_CONNECTOR_OPEN;
ctx->connector->delay = 0;
} else
- assert(0);
+ assert(ctx->listener);
qd_server->conn_handler(qd_server->conn_handler_context,
ctx->context, ce, (qd_connection_t*) qdpn_connector_context(cxtr));
@@ -391,9 +389,19 @@ static int process_connector(qd_server_t
}
else {
invoke_deferred_calls(ctx, false);
- events = qd_server->conn_handler(qd_server->conn_handler_context, ctx->context,
- QD_CONN_EVENT_PROCESS,
- (qd_connection_t*) qdpn_connector_context(cxtr));
+
+ qd_connection_t *qd_conn = (qd_connection_t*) qdpn_connector_context(cxtr);
+ pn_collector_t *collector = qd_connection_collector(qd_conn);
+ pn_event_t *event;
+
+ events = 0;
+ event = pn_collector_peek(collector);
+ while (event) {
+ events += qd_server->pn_event_handler(qd_server->conn_handler_context, ctx->context, event, qd_conn);
+ pn_collector_pop(collector);
+ event = pn_collector_peek(collector);
+ }
+ events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn);
}
break;
@@ -836,19 +844,20 @@ qd_server_t *qd_server(qd_dispatch_t *qd
return 0;
DEQ_INIT(qd_server->connections);
- qd_server->qd = qd;
- qd_server->log_source = qd_log_source("SERVER");
- qd_server->thread_count = thread_count;
- qd_server->container_name = container_name;
- qd_server->driver = qdpn_driver();
- qd_server->start_handler = 0;
- qd_server->conn_handler = 0;
- qd_server->signal_handler = 0;
- qd_server->ufd_handler = 0;
- qd_server->start_context = 0;
- qd_server->signal_context = 0;
- qd_server->lock = sys_mutex();
- qd_server->cond = sys_cond();
+ qd_server->qd = qd;
+ qd_server->log_source = qd_log_source("SERVER");
+ qd_server->thread_count = thread_count;
+ qd_server->container_name = container_name;
+ qd_server->driver = qdpn_driver();
+ qd_server->start_handler = 0;
+ qd_server->conn_handler = 0;
+ qd_server->pn_event_handler = 0;
+ qd_server->signal_handler = 0;
+ qd_server->ufd_handler = 0;
+ qd_server->start_context = 0;
+ qd_server->signal_context = 0;
+ qd_server->lock = sys_mutex();
+ qd_server->cond = sys_cond();
qd_timer_initialize(qd_server->lock);
@@ -886,9 +895,13 @@ void qd_server_free(qd_server_t *qd_serv
}
-void qd_server_set_conn_handler(qd_dispatch_t *qd, qd_conn_handler_cb_t handler, void *handler_context)
+void qd_server_set_conn_handler(qd_dispatch_t *qd,
+ qd_conn_handler_cb_t handler,
+ qd_pn_event_handler_cb_t pn_event_handler,
+ void *handler_context)
{
qd->server->conn_handler = handler;
+ qd->server->pn_event_handler = pn_event_handler;
qd->server->conn_handler_context = handler_context;
}
Modified: qpid/dispatch/trunk/src/server_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server_private.h?rev=1684210&r1=1684209&r2=1684210&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server_private.h (original)
+++ qpid/dispatch/trunk/src/server_private.h Mon Jun 8 15:53:46 2015
@@ -141,31 +141,32 @@ DEQ_DECLARE(qd_work_item_t, qd_work_list
struct qd_server_t {
- qd_dispatch_t *qd;
- int thread_count;
- const char *container_name;
- qdpn_driver_t *driver;
- qd_log_source_t *log_source;
- qd_thread_start_cb_t start_handler;
- qd_conn_handler_cb_t conn_handler;
- qd_user_fd_handler_cb_t ufd_handler;
- void *start_context;
- void *conn_handler_context;
- sys_cond_t *cond;
- sys_mutex_t *lock;
- qd_thread_t **threads;
- qd_work_list_t work_queue;
- qd_timer_list_t pending_timers;
- bool a_thread_is_waiting;
- int threads_active;
- int pause_requests;
- int threads_paused;
- int pause_next_sequence;
- int pause_now_serving;
- qd_signal_handler_cb_t signal_handler;
- void *signal_context;
- int pending_signal;
- qd_connection_list_t connections;
+ qd_dispatch_t *qd;
+ int thread_count;
+ const char *container_name;
+ qdpn_driver_t *driver;
+ qd_log_source_t *log_source;
+ qd_thread_start_cb_t start_handler;
+ qd_conn_handler_cb_t conn_handler;
+ qd_pn_event_handler_cb_t pn_event_handler;
+ qd_user_fd_handler_cb_t ufd_handler;
+ void *start_context;
+ void *conn_handler_context;
+ sys_cond_t *cond;
+ sys_mutex_t *lock;
+ qd_thread_t **threads;
+ qd_work_list_t work_queue;
+ qd_timer_list_t pending_timers;
+ bool a_thread_is_waiting;
+ int threads_active;
+ int pause_requests;
+ int threads_paused;
+ int pause_next_sequence;
+ int pause_now_serving;
+ qd_signal_handler_cb_t signal_handler;
+ void *signal_context;
+ int pending_signal;
+ qd_connection_list_t connections;
};
ALLOC_DECLARE(qd_work_item_t);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org