You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/06/08 17:53:47 UTC

svn commit: r1684210 - in /qpid/dispatch/trunk: include/qpid/dispatch/server.h src/container.c src/server.c src/server_private.h

Author: tross
Date: Mon Jun  8 15:53:46 2015
New Revision: 1684210

URL: http://svn.apache.org/r1684210
Log:
DISPATCH-142 - Move proton event handling from container down to server.

Modified:
    qpid/dispatch/trunk/include/qpid/dispatch/server.h
    qpid/dispatch/trunk/src/container.c
    qpid/dispatch/trunk/src/server.c
    qpid/dispatch/trunk/src/server_private.h

Modified: qpid/dispatch/trunk/include/qpid/dispatch/server.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/server.h?rev=1684210&r1=1684209&r2=1684210&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/server.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/server.h Mon Jun  8 15:53:46 2015
@@ -21,6 +21,7 @@
 
 #include <qpid/dispatch/dispatch.h>
 #include <proton/engine.h>
+#include <proton/event.h>
 
 /**@file
  * Control server threads, signals and connections.
@@ -224,8 +225,8 @@ typedef enum {
     /// The connection was closed at the transport level (not cleanly).
     QD_CONN_EVENT_CLOSE,
 
-    /// The connection requires processing.
-    QD_CONN_EVENT_PROCESS
+    /// The connection is writable
+    QD_CONN_EVENT_WRITABLE
 } qd_conn_event_t;
 
 
@@ -367,6 +368,19 @@ typedef struct qd_server_config_t {
  */
 typedef int (*qd_conn_handler_cb_t)(void *handler_context, void* conn_context, qd_conn_event_t event, qd_connection_t *conn);
 
+/**
+ * Proton Event Handler
+ *
+ * This callback is invoked when proton events for a connection require
+ * processing.
+ *
+ * @param handler_context The handler context supplied in qd_server_set_conn_handler.
+ * @param conn_context The handler context supplied in qd_server_{connect,listen}.
+ * @param event The proton event being raised.
+ * @param conn The connection associated with this proton event.
+ */
+typedef int (*qd_pn_event_handler_cb_t)(void *handler_context, void* conn_context, pn_event_t *event, qd_connection_t *conn);
+
 
 /**
  * Set the connection event handler callback.
@@ -376,9 +390,10 @@ typedef int (*qd_conn_handler_cb_t)(void
  *
  * @param qd The dispatch handle returned by qd_dispatch.
  * @param conn_handler The handler for processing connection-related events.
+ * @param pn_event_handler The handler for proton events.
  * @param handler_context Context data to associate with the handler.
  */
-void qd_server_set_conn_handler(qd_dispatch_t *qd, qd_conn_handler_cb_t conn_handler, void *handler_context);
+void qd_server_set_conn_handler(qd_dispatch_t *qd, qd_conn_handler_cb_t conn_handler, qd_pn_event_handler_cb_t pn_event_handler, void *handler_context);
 
 
 /**

Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1684210&r1=1684209&r2=1684210&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Mon Jun  8 15:53:46 2015
@@ -269,128 +269,10 @@ static int close_handler(void* unused, p
 }
 
 
-static int process_handler(qd_container_t *container, void* unused, qd_connection_t *qd_conn)
+static int writable_handler(void* unused, pn_connection_t *conn, qd_connection_t* qd_conn)
 {
-    pn_session_t    *ssn;
-    pn_link_t       *pn_link;
-    qd_link_t       *qd_link;
-    pn_delivery_t   *delivery;
-    pn_collector_t  *collector   = qd_connection_collector(qd_conn);
-    pn_connection_t *conn        = qd_connection_pn(qd_conn);
-    pn_event_t      *event;
-    int              event_count = 0;
-
-    //
-    // Spin through the collected events for this connection and process them
-    // individually.
-    //
-    event = pn_collector_peek(collector);
-    while (event) {
-        event_count++;
-
-        switch (pn_event_type(event)) {
-        case PN_CONNECTION_REMOTE_OPEN :
-            if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
-                pn_connection_open(conn);
-            qd_connection_manager_connection_opened(qd_conn);
-            break;
-
-        case PN_CONNECTION_REMOTE_CLOSE :
-            if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED))
-                pn_connection_close(conn);
-            qd_connection_manager_connection_closed(qd_conn);
-            break;
-
-        case PN_SESSION_REMOTE_OPEN :
-            ssn = pn_event_session(event);
-            if (pn_session_state(ssn) & PN_LOCAL_UNINIT) {
-                pn_session_set_incoming_capacity(ssn, 1000000);
-                pn_session_open(ssn);
-            }
-            break;
-
-        case PN_SESSION_REMOTE_CLOSE :
-            ssn = pn_event_session(event);
-            if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
-                // remote has nuked our session.  Check for any links that were
-                // left open and forcibly detach them, since no detaches will
-                // arrive on this session.
-                pn_connection_t *conn = pn_session_connection(ssn);
-                pn_link_t *pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
-                while (pn_link) {
-                    if (pn_link_session(pn_link) == ssn) {
-                        qd_link_t *qd_link = (qd_link_t *)pn_link_get_context(pn_link);
-                        if (qd_link && qd_link->node) {
-                            qd_log(container->log_source, QD_LOG_NOTICE,
-                                   "Aborting link '%s' due to parent session end",
-                                   pn_link_name(pn_link));
-                            qd_link->node->ntype->link_detach_handler(qd_link->node->context,
-                                                                      qd_link, 1); // assume
-                                                                                   // closed?
-                            pn_link_close(pn_link);
-                            pn_link_free(pn_link);
-                        }
-                    }
-                    pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
-                }
-                pn_session_close(ssn);
-                pn_session_free(ssn);
-            }
-            break;
-
-        case PN_LINK_REMOTE_OPEN :
-            pn_link = pn_event_link(event);
-            if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) {
-                if (pn_link_is_sender(pn_link))
-                    setup_outgoing_link(container, pn_link);
-                else
-                    setup_incoming_link(container, pn_link);
-            } else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE)
-                handle_link_open(container, pn_link);
-            break;
-
-        case PN_LINK_REMOTE_CLOSE :
-            pn_link = pn_event_link(event);
-            if (pn_link_state(pn_link) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
-                qd_link = (qd_link_t*) pn_link_get_context(pn_link);
-                qd_node_t *node = qd_link->node;
-                if (node)
-                    node->ntype->link_detach_handler(node->context, qd_link, 1); // TODO - get 'closed' from detach message
-                pn_link_close(pn_link);
-                pn_link_free(pn_link);
-            }
-            break;
-
-        case PN_LINK_FINAL :
-            pn_link = pn_event_link(event);
-            qd_link = (qd_link_t*) pn_link_get_context(pn_link);
-            break;
-
-        case PN_LINK_FLOW :
-            pn_link = pn_event_link(event);
-            qd_link = (qd_link_t*) pn_link_get_context(pn_link);
-            if (qd_link && qd_link->node && qd_link->node->ntype->link_flow_handler)
-                qd_link->node->ntype->link_flow_handler(qd_link->node->context, qd_link);
-            break;
-
-        case PN_DELIVERY :
-            delivery = pn_event_delivery(event);
-            if (pn_delivery_readable(delivery))
-                do_receive(delivery);
-
-            if (pn_delivery_updated(delivery)) {
-                do_updated(delivery);
-                pn_delivery_clear(delivery);
-            }
-            break;
-
-        default :
-            break;
-        }
-
-        pn_collector_pop(collector);
-        event = pn_collector_peek(collector);
-    }
+    pn_link_t *pn_link;
+    int        event_count = 0;
 
     //
     // Call the attached node's writable handler for all active links
@@ -406,11 +288,123 @@ static int process_handler(qd_container_
             pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
         }
     }
-
     return event_count;
 }
 
 
+int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *event, qd_connection_t *qd_conn)
+{
+    qd_container_t  *container = (qd_container_t*) handler_context;
+    pn_connection_t *conn      = qd_connection_pn(qd_conn);
+    pn_session_t    *ssn;
+    pn_link_t       *pn_link;
+    qd_link_t       *qd_link;
+    pn_delivery_t   *delivery;
+
+    switch (pn_event_type(event)) {
+    case PN_CONNECTION_REMOTE_OPEN :
+        if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
+            pn_connection_open(conn);
+        qd_connection_manager_connection_opened(qd_conn);
+        break;
+
+    case PN_CONNECTION_REMOTE_CLOSE :
+        if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED))
+            pn_connection_close(conn);
+        qd_connection_manager_connection_closed(qd_conn);
+        break;
+
+    case PN_SESSION_REMOTE_OPEN :
+        ssn = pn_event_session(event);
+        if (pn_session_state(ssn) & PN_LOCAL_UNINIT) {
+            pn_session_set_incoming_capacity(ssn, 1000000);
+            pn_session_open(ssn);
+        }
+        break;
+
+    case PN_SESSION_REMOTE_CLOSE :
+        ssn = pn_event_session(event);
+        if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+            // remote has nuked our session.  Check for any links that were
+            // left open and forcibly detach them, since no detaches will
+            // arrive on this session.
+            pn_connection_t *conn = pn_session_connection(ssn);
+            pn_link_t *pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+            while (pn_link) {
+                if (pn_link_session(pn_link) == ssn) {
+                    qd_link_t *qd_link = (qd_link_t *)pn_link_get_context(pn_link);
+                    if (qd_link && qd_link->node) {
+                        qd_log(container->log_source, QD_LOG_NOTICE,
+                               "Aborting link '%s' due to parent session end",
+                               pn_link_name(pn_link));
+                        qd_link->node->ntype->link_detach_handler(qd_link->node->context,
+                                                                  qd_link, 1); // assume
+                        // closed?
+                        pn_link_close(pn_link);
+                        pn_link_free(pn_link);
+                    }
+                }
+                pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+            }
+            pn_session_close(ssn);
+            pn_session_free(ssn);
+        }
+        break;
+
+    case PN_LINK_REMOTE_OPEN :
+        pn_link = pn_event_link(event);
+        if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) {
+            if (pn_link_is_sender(pn_link))
+                setup_outgoing_link(container, pn_link);
+            else
+                setup_incoming_link(container, pn_link);
+        } else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE)
+            handle_link_open(container, pn_link);
+        break;
+
+    case PN_LINK_REMOTE_CLOSE :
+        pn_link = pn_event_link(event);
+        if (pn_link_state(pn_link) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+            qd_link = (qd_link_t*) pn_link_get_context(pn_link);
+            qd_node_t *node = qd_link->node;
+            if (node)
+                node->ntype->link_detach_handler(node->context, qd_link, 1); // TODO - get 'closed' from detach message
+            pn_link_close(pn_link);
+            pn_link_free(pn_link);
+        }
+        break;
+
+    case PN_LINK_FINAL :
+        pn_link = pn_event_link(event);
+        qd_link = (qd_link_t*) pn_link_get_context(pn_link);
+        break;
+
+    case PN_LINK_FLOW :
+        pn_link = pn_event_link(event);
+        qd_link = (qd_link_t*) pn_link_get_context(pn_link);
+        if (qd_link && qd_link->node && qd_link->node->ntype->link_flow_handler)
+            qd_link->node->ntype->link_flow_handler(qd_link->node->context, qd_link);
+        break;
+
+    case PN_DELIVERY :
+        delivery = pn_event_delivery(event);
+        if (pn_delivery_readable(delivery))
+            do_receive(delivery);
+
+        if (pn_delivery_updated(delivery)) {
+            do_updated(delivery);
+            pn_delivery_clear(delivery);
+        }
+        break;
+
+    default :
+        break;
+    }
+
+    return 1;
+}
+
+
 static void open_handler(qd_container_t *container, qd_connection_t *conn, qd_direction_t dir, void *context)
 {
     const qd_node_type_t *nt;
@@ -452,7 +446,7 @@ static int handler(void *handler_context
     case QD_CONN_EVENT_LISTENER_OPEN:  open_handler(container, qd_conn, QD_INCOMING, conn_context);   break;
     case QD_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, qd_conn, QD_OUTGOING, conn_context);   break;
     case QD_CONN_EVENT_CLOSE:          return close_handler(conn_context, conn, qd_conn);
-    case QD_CONN_EVENT_PROCESS:        return process_handler(container, conn_context, qd_conn);
+    case QD_CONN_EVENT_WRITABLE:       return writable_handler(conn_context, conn, qd_conn);
     }
 
     return 0;
@@ -473,7 +467,7 @@ qd_container_t *qd_container(qd_dispatch
     DEQ_INIT(container->nodes);
     DEQ_INIT(container->node_type_list);
 
-    qd_server_set_conn_handler(qd, handler, container);
+    qd_server_set_conn_handler(qd, handler, pn_event_handler, container);
 
     qd_log(container->log_source, QD_LOG_TRACE, "Container Initialized");
     return container;

Modified: qpid/dispatch/trunk/src/server.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1684210&r1=1684209&r2=1684210&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Mon Jun  8 15:53:46 2015
@@ -358,15 +358,13 @@ static int process_connector(qd_server_t
                 pn_sasl_outcome(sasl) == PN_SASL_SKIPPED) {
                 ctx->state = CONN_STATE_OPERATIONAL;
 
-                qd_conn_event_t ce = QD_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+                qd_conn_event_t ce = QD_CONN_EVENT_LISTENER_OPEN;
 
-                if (ctx->listener) {
-                    ce = QD_CONN_EVENT_LISTENER_OPEN;
-                } else if (ctx->connector) {
+                if (ctx->connector) {
                     ce = QD_CONN_EVENT_CONNECTOR_OPEN;
                     ctx->connector->delay = 0;
                 } else
-                    assert(0);
+                    assert(ctx->listener);
 
                 qd_server->conn_handler(qd_server->conn_handler_context,
                                         ctx->context, ce, (qd_connection_t*) qdpn_connector_context(cxtr));
@@ -391,9 +389,19 @@ static int process_connector(qd_server_t
             }
             else {
                 invoke_deferred_calls(ctx, false);
-                events = qd_server->conn_handler(qd_server->conn_handler_context, ctx->context,
-                                                 QD_CONN_EVENT_PROCESS,
-                                                 (qd_connection_t*) qdpn_connector_context(cxtr));
+
+                qd_connection_t *qd_conn   = (qd_connection_t*) qdpn_connector_context(cxtr);
+                pn_collector_t  *collector = qd_connection_collector(qd_conn);
+                pn_event_t      *event;
+
+                events = 0;
+                event = pn_collector_peek(collector);
+                while (event) {
+                    events += qd_server->pn_event_handler(qd_server->conn_handler_context, ctx->context, event, qd_conn);
+                    pn_collector_pop(collector);
+                    event = pn_collector_peek(collector);
+                }
+                events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn);
             }
             break;
 
@@ -836,19 +844,20 @@ qd_server_t *qd_server(qd_dispatch_t *qd
         return 0;
 
     DEQ_INIT(qd_server->connections);
-    qd_server->qd              = qd;
-    qd_server->log_source      = qd_log_source("SERVER");
-    qd_server->thread_count    = thread_count;
-    qd_server->container_name  = container_name;
-    qd_server->driver          = qdpn_driver();
-    qd_server->start_handler   = 0;
-    qd_server->conn_handler    = 0;
-    qd_server->signal_handler  = 0;
-    qd_server->ufd_handler     = 0;
-    qd_server->start_context   = 0;
-    qd_server->signal_context  = 0;
-    qd_server->lock            = sys_mutex();
-    qd_server->cond            = sys_cond();
+    qd_server->qd               = qd;
+    qd_server->log_source       = qd_log_source("SERVER");
+    qd_server->thread_count     = thread_count;
+    qd_server->container_name   = container_name;
+    qd_server->driver           = qdpn_driver();
+    qd_server->start_handler    = 0;
+    qd_server->conn_handler     = 0;
+    qd_server->pn_event_handler = 0;
+    qd_server->signal_handler   = 0;
+    qd_server->ufd_handler      = 0;
+    qd_server->start_context    = 0;
+    qd_server->signal_context   = 0;
+    qd_server->lock             = sys_mutex();
+    qd_server->cond             = sys_cond();
 
     qd_timer_initialize(qd_server->lock);
 
@@ -886,9 +895,13 @@ void qd_server_free(qd_server_t *qd_serv
 }
 
 
-void qd_server_set_conn_handler(qd_dispatch_t *qd, qd_conn_handler_cb_t handler, void *handler_context)
+void qd_server_set_conn_handler(qd_dispatch_t            *qd,
+                                qd_conn_handler_cb_t      handler,
+                                qd_pn_event_handler_cb_t  pn_event_handler,
+                                void                     *handler_context)
 {
     qd->server->conn_handler         = handler;
+    qd->server->pn_event_handler     = pn_event_handler;
     qd->server->conn_handler_context = handler_context;
 }
 

Modified: qpid/dispatch/trunk/src/server_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server_private.h?rev=1684210&r1=1684209&r2=1684210&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server_private.h (original)
+++ qpid/dispatch/trunk/src/server_private.h Mon Jun  8 15:53:46 2015
@@ -141,31 +141,32 @@ DEQ_DECLARE(qd_work_item_t, qd_work_list
 
 
 struct qd_server_t {
-    qd_dispatch_t           *qd;
-    int                      thread_count;
-    const char              *container_name;
-    qdpn_driver_t           *driver;
-    qd_log_source_t         *log_source;
-    qd_thread_start_cb_t     start_handler;
-    qd_conn_handler_cb_t     conn_handler;
-    qd_user_fd_handler_cb_t  ufd_handler;
-    void                    *start_context;
-    void                    *conn_handler_context;
-    sys_cond_t              *cond;
-    sys_mutex_t             *lock;
-    qd_thread_t            **threads;
-    qd_work_list_t           work_queue;
-    qd_timer_list_t          pending_timers;
-    bool                     a_thread_is_waiting;
-    int                      threads_active;
-    int                      pause_requests;
-    int                      threads_paused;
-    int                      pause_next_sequence;
-    int                      pause_now_serving;
-    qd_signal_handler_cb_t   signal_handler;
-    void                    *signal_context;
-    int                      pending_signal;
-    qd_connection_list_t     connections;
+    qd_dispatch_t            *qd;
+    int                       thread_count;
+    const char               *container_name;
+    qdpn_driver_t            *driver;
+    qd_log_source_t          *log_source;
+    qd_thread_start_cb_t      start_handler;
+    qd_conn_handler_cb_t      conn_handler;
+    qd_pn_event_handler_cb_t  pn_event_handler;
+    qd_user_fd_handler_cb_t   ufd_handler;
+    void                     *start_context;
+    void                     *conn_handler_context;
+    sys_cond_t               *cond;
+    sys_mutex_t              *lock;
+    qd_thread_t             **threads;
+    qd_work_list_t            work_queue;
+    qd_timer_list_t           pending_timers;
+    bool                      a_thread_is_waiting;
+    int                       threads_active;
+    int                       pause_requests;
+    int                       threads_paused;
+    int                       pause_next_sequence;
+    int                       pause_now_serving;
+    qd_signal_handler_cb_t    signal_handler;
+    void                     *signal_context;
+    int                       pending_signal;
+    qd_connection_list_t      connections;
 };
 
 ALLOC_DECLARE(qd_work_item_t);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org