You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2014/04/07 21:24:34 UTC

svn commit: r1585571 - in /qpid/dispatch/trunk: CMakeLists.txt src/server.c src/server_private.h src/work_queue.c src/work_queue.h

Author: tross
Date: Mon Apr  7 19:24:33 2014
New Revision: 1585571

URL: http://svn.apache.org/r1585571
Log:
DISPATCH-41 - Prep for adding the Proton event API: Removed old-style work queue in server.

Removed:
    qpid/dispatch/trunk/src/work_queue.c
    qpid/dispatch/trunk/src/work_queue.h
Modified:
    qpid/dispatch/trunk/CMakeLists.txt
    qpid/dispatch/trunk/src/server.c
    qpid/dispatch/trunk/src/server_private.h

Modified: qpid/dispatch/trunk/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/CMakeLists.txt?rev=1585571&r1=1585570&r2=1585571&view=diff
==============================================================================
--- qpid/dispatch/trunk/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/CMakeLists.txt Mon Apr  7 19:24:33 2014
@@ -132,7 +132,6 @@ set(server_SOURCES
     src/server.c
     src/timer.c
     src/waypoint.c
-    src/work_queue.c
     )
 
 set_property(SOURCE src/python_embedded.c src/router_pynode.c

Modified: qpid/dispatch/trunk/src/server.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1585571&r1=1585570&r2=1585571&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Mon Apr  7 19:24:33 2014
@@ -30,6 +30,7 @@
 
 static __thread qd_server_t *thread_server = 0;
 
+ALLOC_DEFINE(qd_work_item_t);
 ALLOC_DEFINE(qd_listener_t);
 ALLOC_DEFINE(qd_connector_t);
 ALLOC_DEFINE(qd_connection_t);
@@ -272,7 +273,8 @@ static void *thread_run(void *arg)
 {
     qd_thread_t     *thread    = (qd_thread_t*) arg;
     qd_server_t     *qd_server = thread->qd_server;
-    pn_connector_t  *work;
+    qd_work_item_t  *work;
+    pn_connector_t  *cxtr;
     pn_connection_t *conn;
     qd_connection_t *ctx;
     int              error;
@@ -342,7 +344,7 @@ static void *thread_run(void *arg)
         //
         // Check the work queue for connectors scheduled for processing.
         //
-        work = work_queue_get(qd_server->work_queue);
+        work = DEQ_HEAD(qd_server->work_queue);
         if (!work) {
             //
             // There is no pending work to do
@@ -414,15 +416,18 @@ static void *thread_run(void *arg)
                 // being processed by another thread, put it in the work queue and signal the
                 // condition variable.
                 //
-                work = pn_driver_connector(qd_server->driver);
-                while (work) {
-                    ctx = pn_connector_context(work);
+                cxtr = pn_driver_connector(qd_server->driver);
+                while (cxtr) {
+                    ctx = pn_connector_context(cxtr);
                     if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) {
                         ctx->enqueued = 1;
-                        work_queue_put(qd_server->work_queue, work);
+                        qd_work_item_t *workitem = new_qd_work_item_t();
+                        DEQ_ITEM_INIT(workitem);
+                        workitem->cxtr = cxtr;
+                        DEQ_INSERT_TAIL(qd_server->work_queue, workitem);
                         sys_cond_signal(qd_server->cond);
                     }
-                    work = pn_driver_connector(qd_server->driver);
+                    cxtr = pn_driver_connector(qd_server->driver);
                 }
 
                 //
@@ -436,18 +441,21 @@ static void *thread_run(void *arg)
         // If we were given a connector to work on from the work queue, mark it as
         // owned by this thread and as no longer enqueued.
         //
+        cxtr = 0;
         if (work) {
-            ctx = pn_connector_context(work);
+            DEQ_REMOVE_HEAD(qd_server->work_queue);
+            ctx = pn_connector_context(work->cxtr);
             if (ctx->owner_thread == CONTEXT_NO_OWNER) {
                 ctx->owner_thread = thread->thread_id;
                 ctx->enqueued = 0;
                 qd_server->threads_active++;
+                cxtr = work->cxtr;
+                free_qd_work_item_t(work);
             } else {
                 //
                 // This connector is being processed by another thread, re-queue it.
                 //
-                work_queue_put(qd_server->work_queue, work);
-                work = 0;
+                DEQ_INSERT_TAIL(qd_server->work_queue, work);
             }
         }
         sys_mutex_unlock(qd_server->lock);
@@ -455,17 +463,17 @@ static void *thread_run(void *arg)
         //
         // Process the connector that we now have exclusive access to.
         //
-        if (work) {
-            int work_done = process_connector(qd_server, work);
+        if (cxtr) {
+            int work_done = process_connector(qd_server, cxtr);
 
             //
             // Check to see if the connector was closed during processing
             //
-            if (pn_connector_closed(work)) {
+            if (pn_connector_closed(cxtr)) {
                 //
                 // Connector is closed.  Free the context and the connector.
                 //
-                conn = pn_connector_connection(work);
+                conn = pn_connector_connection(cxtr);
 
                 //
                 // If this is a dispatch connector, schedule the re-connect timer
@@ -479,7 +487,7 @@ static void *thread_run(void *arg)
                 sys_mutex_lock(qd_server->lock);
                 DEQ_REMOVE(qd_server->connections, ctx);
                 free_qd_connection_t(ctx);
-                pn_connector_free(work);
+                pn_connector_free(cxtr);
                 if (conn)
                     pn_connection_free(conn);
                 qd_server->threads_active--;
@@ -644,7 +652,7 @@ qd_server_t *qd_server(int thread_count,
     for (i = 0; i < thread_count; i++)
         qd_server->threads[i] = thread(qd_server, i);
 
-    qd_server->work_queue          = work_queue();
+    DEQ_INIT(qd_server->work_queue);
     DEQ_INIT(qd_server->pending_timers);
     qd_server->a_thread_is_waiting = false;
     qd_server->threads_active      = 0;
@@ -669,8 +677,6 @@ void qd_server_free(qd_server_t *qd_serv
     for (i = 0; i < qd_server->thread_count; i++)
         thread_free(qd_server->threads[i]);
 
-    work_queue_free(qd_server->work_queue);
-
     pn_driver_free(qd_server->driver);
     sys_mutex_free(qd_server->lock);
     sys_cond_free(qd_server->cond);

Modified: qpid/dispatch/trunk/src/server_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server_private.h?rev=1585571&r1=1585570&r2=1585571&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server_private.h (original)
+++ qpid/dispatch/trunk/src/server_private.h Mon Apr  7 19:24:33 2014
@@ -24,7 +24,6 @@
 #include <qpid/dispatch/alloc.h>
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/log.h>
-#include "work_queue.h"
 #include <proton/driver.h>
 #include <proton/engine.h>
 #include <proton/driver_extras.h>
@@ -109,6 +108,14 @@ typedef struct qd_thread_t {
 } qd_thread_t;
 
 
+typedef struct qd_work_item_t {
+    DEQ_LINKS(struct qd_work_item_t);
+    pn_connector_t *cxtr;
+} qd_work_item_t;
+
+DEQ_DECLARE(qd_work_item_t, qd_work_list_t);
+
+
 struct qd_server_t {
     int                      thread_count;
     const char              *container_name;
@@ -122,7 +129,7 @@ struct qd_server_t {
     sys_cond_t              *cond;
     sys_mutex_t             *lock;
     qd_thread_t            **threads;
-    work_queue_t            *work_queue;
+    qd_work_list_t           work_queue;
     qd_timer_list_t          pending_timers;
     bool                     a_thread_is_waiting;
     int                      threads_active;
@@ -136,7 +143,7 @@ struct qd_server_t {
     qd_connection_list_t     connections;
 };
 
-
+ALLOC_DECLARE(qd_work_item_t);
 ALLOC_DECLARE(qd_listener_t);
 ALLOC_DECLARE(qd_connector_t);
 ALLOC_DECLARE(qd_connection_t);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org