You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2014/04/11 23:03:48 UTC

svn commit: r1586773 - in /qpid/dispatch/trunk: include/qpid/dispatch/server.h src/container.c src/server.c src/server_private.h

Author: tross
Date: Fri Apr 11 21:03:48 2014
New Revision: 1586773

URL: http://svn.apache.org/r1586773
Log:
DISPATCH-41 - Container now uses the Proton Engine Event Collector API.

Modified:
    qpid/dispatch/trunk/include/qpid/dispatch/server.h
    qpid/dispatch/trunk/src/container.c
    qpid/dispatch/trunk/src/server.c
    qpid/dispatch/trunk/src/server_private.h

Modified: qpid/dispatch/trunk/include/qpid/dispatch/server.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/server.h?rev=1586773&r1=1586772&r2=1586773&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/server.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/server.h Fri Apr 11 21:03:48 2014
@@ -408,6 +408,15 @@ pn_connection_t *qd_connection_pn(qd_con
 
 
 /**
+ * \brief Get the event collector for a connection.
+ *
+ * @param conn Connection object supplied in QD_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @return The pn_collector associated with the connection.
+ */
+pn_collector_t *qd_connection_collector(qd_connection_t *conn);
+
+
+/**
  * \brief Get the configuration that was used in the setup of this connection.
  *
  * @param conn Connection object supplied in QD_CONN_EVENT_{LISTENER,CONNETOR}_OPEN

Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1586773&r1=1586772&r2=1586773&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Fri Apr 11 21:03:48 2014
@@ -25,6 +25,8 @@
 #include <qpid/dispatch/message.h>
 #include <proton/engine.h>
 #include <proton/message.h>
+#include <proton/connection.h>
+#include <proton/event.h>
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/hash.h>
 #include <qpid/dispatch/threading.h>
@@ -279,100 +281,89 @@ static int close_handler(void* unused, p
 }
 
 
-static int process_handler(qd_container_t *container, void* unused, pn_connection_t *conn)
+static int process_handler(qd_container_t *container, void* unused, qd_connection_t *qd_conn)
 {
     pn_session_t    *ssn;
     pn_link_t       *pn_link;
     pn_delivery_t   *delivery;
+    pn_collector_t  *collector   = qd_connection_collector(qd_conn);
+    pn_connection_t *conn        = qd_connection_pn(qd_conn);
+    pn_event_t      *event;
     int              event_count = 0;
 
-    // Step 1: setup the engine's connection, and any sessions and links
-    // that may be pending.
-
-    // initialize the connection if it's new
-    if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
-        pn_connection_open(conn);
-        event_count++;
-    }
-
-    // open all pending sessions
-    ssn = pn_session_head(conn, PN_LOCAL_UNINIT);
-    while (ssn) {
-        pn_session_set_incoming_capacity(ssn, 1000000);
-        pn_session_open(ssn);
-        ssn = pn_session_next(ssn, PN_LOCAL_UNINIT);
-        event_count++;
-    }
-
-    // configure and open any pending links
-    pn_link = pn_link_head(conn, PN_LOCAL_UNINIT);
-    while (pn_link) {
-        if (pn_link_is_sender(pn_link))
-            setup_outgoing_link(container, pn_link);
-        else
-            setup_incoming_link(container, pn_link);
-        pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT);
+    //
+    // Spin through the collected events for this connection and process them
+    // individually.
+    //
+    event = pn_collector_peek(collector);
+    while (event) {
         event_count++;
-    }
 
+        switch (pn_event_type(event)) {
+        case PN_CONNECTION_REMOTE_STATE :
+            if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
+                pn_connection_open(conn);
+            else if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED))
+                pn_connection_close(conn);
+            break;
+
+        case PN_SESSION_REMOTE_STATE :
+            ssn = pn_event_session(event);
+            if (pn_session_state(ssn) & PN_LOCAL_UNINIT) {
+                pn_session_set_incoming_capacity(ssn, 1000000);
+                pn_session_open(ssn);
+            } else if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED))
+                pn_session_close(ssn);
+            break;
+
+        case PN_LINK_REMOTE_STATE :
+            pn_link = pn_event_link(event);
+            if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) {
+                if (pn_link_is_sender(pn_link))
+                    setup_outgoing_link(container, pn_link);
+                else
+                    setup_incoming_link(container, pn_link);
+            } else if (pn_link_state(pn_link) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+                qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link);
+                qd_node_t *node = link->node;
+                if (node)
+                    node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message
+                pn_link_close(pn_link);
+            }
+            break;
 
-    // Step 2: Now drain all the pending deliveries from the connection's
-    // work queue and process them
+        case PN_DELIVERY :
+            delivery = pn_event_delivery(event);
+            if (pn_delivery_readable(delivery))
+                do_receive(delivery);
+
+            if (pn_delivery_updated(delivery)) {
+                do_updated(delivery);
+                pn_delivery_clear(delivery);
+            }
+            break;
 
-    delivery = pn_work_head(conn);
-    while (delivery) {
-        if (pn_delivery_readable(delivery))
-            do_receive(delivery);
-
-        if (pn_delivery_updated(delivery)) {
-            do_updated(delivery);
-            pn_delivery_clear(delivery);
+        default :
+            break;
         }
-        delivery = pn_work_next(delivery);
-        event_count++;
+
+        pn_collector_pop(collector);
+        event = pn_collector_peek(collector);
     }
 
     //
-    // Step 2.5: Call the attached node's writable handler for all active links
+    // Call the attached node's writable handler for all active links
     // on the connection.  Note that in Dispatch, links are considered
     // bidirectional.  Incoming and outgoing only pertains to deliveries and
     // deliveries are a subset of the traffic that flows both directions on links.
     //
-    pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
-    while (pn_link) {
-        assert(pn_session_connection(pn_link_session(pn_link)) == conn);
-        event_count += do_writable(pn_link);
-        pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
-    }
-
-    // Step 3: Clean up any links or sessions that have been closed by the
-    // remote.  If the connection has been closed remotely, clean that up
-    // also.
-
-    // teardown any terminating links
-    pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
-    while (pn_link) {
-        qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link);
-        qd_node_t *node = link->node;
-        if (node)
-            node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message
-        pn_link_close(pn_link);
-        pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
-        event_count++;
-    }
-
-    // teardown any terminating sessions
-    ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
-    while (ssn) {
-        pn_session_close(ssn);
-        ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
-        event_count++;
-    }
-
-    // teardown the connection if it's terminating
-    if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
-        pn_connection_close(conn);
-        event_count++;
+    if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)) {
+        pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+        while (pn_link) {
+            assert(pn_session_connection(pn_link_session(pn_link)) == conn);
+            event_count += do_writable(pn_link);
+            pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+        }
     }
 
     return event_count;
@@ -420,7 +411,7 @@ static int handler(void *handler_context
     case QD_CONN_EVENT_LISTENER_OPEN:  open_handler(container, qd_conn, QD_INCOMING, conn_context);   break;
     case QD_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, qd_conn, QD_OUTGOING, conn_context);   break;
     case QD_CONN_EVENT_CLOSE:          return close_handler(conn_context, conn);
-    case QD_CONN_EVENT_PROCESS:        return process_handler(container, conn_context, conn);
+    case QD_CONN_EVENT_PROCESS:        return process_handler(container, conn_context, qd_conn);
     }
 
     return 0;

Modified: qpid/dispatch/trunk/src/server.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1586773&r1=1586772&r2=1586773&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Fri Apr 11 21:03:48 2014
@@ -69,6 +69,7 @@ static void thread_process_listeners(qd_
         ctx->owner_thread = CONTEXT_NO_OWNER;
         ctx->enqueued     = 0;
         ctx->pn_cxtr      = cxtr;
+        ctx->collector    = 0;
         ctx->listener     = (qd_listener_t*) pn_listener_context(listener);
         ctx->connector    = 0;
         ctx->context      = ctx->listener->context;
@@ -77,6 +78,8 @@ static void thread_process_listeners(qd_
         ctx->ufd          = 0;
 
         pn_connection_t *conn = pn_connection();
+        ctx->collector = pn_collector();
+        pn_connection_collect(conn, ctx->collector);
         pn_connection_set_container(conn, qd_server->container_name);
         pn_connector_set_connection(cxtr, conn);
         pn_connection_set_context(conn, ctx);
@@ -195,6 +198,8 @@ static int process_connector(qd_server_t
             }
 
             pn_connection_t *conn = pn_connection();
+            ctx->collector = pn_collector();
+            pn_connection_collect(conn, ctx->collector);
             pn_connection_set_container(conn, qd_server->container_name);
             pn_connector_set_connection(cxtr, conn);
             pn_connection_set_context(conn, ctx);
@@ -486,6 +491,8 @@ static void *thread_run(void *arg)
 
                 sys_mutex_lock(qd_server->lock);
                 DEQ_REMOVE(qd_server->connections, ctx);
+                if (ctx->collector)
+                    pn_collector_free(ctx->collector);
                 free_qd_connection_t(ctx);
                 pn_connector_free(cxtr);
                 if (conn)
@@ -567,6 +574,7 @@ static void cxtr_try_open(void *context)
     ctx->owner_thread = CONTEXT_NO_OWNER;
     ctx->enqueued     = 0;
     ctx->pn_conn      = 0;
+    ctx->collector    = 0;
     ctx->listener     = 0;
     ctx->connector    = ct;
     ctx->context      = ct->context;
@@ -878,6 +886,12 @@ pn_connection_t *qd_connection_pn(qd_con
 }
 
 
+pn_collector_t *qd_connection_collector(qd_connection_t *conn)
+{
+    return conn->collector;
+}
+
+
 const qd_server_config_t *qd_connection_config(const qd_connection_t *conn)
 {
     if (conn->listener)
@@ -975,6 +989,7 @@ qd_user_fd_t *qd_user_fd(qd_dispatch_t *
     ctx->owner_thread = CONTEXT_NO_OWNER;
     ctx->enqueued     = 0;
     ctx->pn_conn      = 0;
+    ctx->collector    = 0;
     ctx->listener     = 0;
     ctx->connector    = 0;
     ctx->context      = 0;

Modified: qpid/dispatch/trunk/src/server_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server_private.h?rev=1586773&r1=1586772&r2=1586773&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server_private.h (original)
+++ qpid/dispatch/trunk/src/server_private.h Fri Apr 11 21:03:48 2014
@@ -26,6 +26,7 @@
 #include <qpid/dispatch/log.h>
 #include <proton/driver.h>
 #include <proton/engine.h>
+#include <proton/event.h>
 #include <proton/driver_extras.h>
 
 #include "dispatch_private.h"
@@ -79,6 +80,7 @@ struct qd_connection_t {
     int              enqueued;
     pn_connector_t  *pn_cxtr;
     pn_connection_t *pn_conn;
+    pn_collector_t  *collector;
     qd_listener_t   *listener;
     qd_connector_t  *connector;
     void            *context; // Copy of context from listener or connector



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org