You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2014/04/07 21:24:34 UTC
svn commit: r1585571 - in /qpid/dispatch/trunk: CMakeLists.txt src/server.c
src/server_private.h src/work_queue.c src/work_queue.h
Author: tross
Date: Mon Apr 7 19:24:33 2014
New Revision: 1585571
URL: http://svn.apache.org/r1585571
Log:
DISPATCH-41 - Prep for adding the Proton event API: Removed old-style work queue in server.
Removed:
qpid/dispatch/trunk/src/work_queue.c
qpid/dispatch/trunk/src/work_queue.h
Modified:
qpid/dispatch/trunk/CMakeLists.txt
qpid/dispatch/trunk/src/server.c
qpid/dispatch/trunk/src/server_private.h
Modified: qpid/dispatch/trunk/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/CMakeLists.txt?rev=1585571&r1=1585570&r2=1585571&view=diff
==============================================================================
--- qpid/dispatch/trunk/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/CMakeLists.txt Mon Apr 7 19:24:33 2014
@@ -132,7 +132,6 @@ set(server_SOURCES
src/server.c
src/timer.c
src/waypoint.c
- src/work_queue.c
)
set_property(SOURCE src/python_embedded.c src/router_pynode.c
Modified: qpid/dispatch/trunk/src/server.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1585571&r1=1585570&r2=1585571&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Mon Apr 7 19:24:33 2014
@@ -30,6 +30,7 @@
static __thread qd_server_t *thread_server = 0;
+ALLOC_DEFINE(qd_work_item_t);
ALLOC_DEFINE(qd_listener_t);
ALLOC_DEFINE(qd_connector_t);
ALLOC_DEFINE(qd_connection_t);
@@ -272,7 +273,8 @@ static void *thread_run(void *arg)
{
qd_thread_t *thread = (qd_thread_t*) arg;
qd_server_t *qd_server = thread->qd_server;
- pn_connector_t *work;
+ qd_work_item_t *work;
+ pn_connector_t *cxtr;
pn_connection_t *conn;
qd_connection_t *ctx;
int error;
@@ -342,7 +344,7 @@ static void *thread_run(void *arg)
//
// Check the work queue for connectors scheduled for processing.
//
- work = work_queue_get(qd_server->work_queue);
+ work = DEQ_HEAD(qd_server->work_queue);
if (!work) {
//
// There is no pending work to do
@@ -414,15 +416,18 @@ static void *thread_run(void *arg)
// being processed by another thread, put it in the work queue and signal the
// condition variable.
//
- work = pn_driver_connector(qd_server->driver);
- while (work) {
- ctx = pn_connector_context(work);
+ cxtr = pn_driver_connector(qd_server->driver);
+ while (cxtr) {
+ ctx = pn_connector_context(cxtr);
if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) {
ctx->enqueued = 1;
- work_queue_put(qd_server->work_queue, work);
+ qd_work_item_t *workitem = new_qd_work_item_t();
+ DEQ_ITEM_INIT(workitem);
+ workitem->cxtr = cxtr;
+ DEQ_INSERT_TAIL(qd_server->work_queue, workitem);
sys_cond_signal(qd_server->cond);
}
- work = pn_driver_connector(qd_server->driver);
+ cxtr = pn_driver_connector(qd_server->driver);
}
//
@@ -436,18 +441,21 @@ static void *thread_run(void *arg)
// If we were given a connector to work on from the work queue, mark it as
// owned by this thread and as no longer enqueued.
//
+ cxtr = 0;
if (work) {
- ctx = pn_connector_context(work);
+ DEQ_REMOVE_HEAD(qd_server->work_queue);
+ ctx = pn_connector_context(work->cxtr);
if (ctx->owner_thread == CONTEXT_NO_OWNER) {
ctx->owner_thread = thread->thread_id;
ctx->enqueued = 0;
qd_server->threads_active++;
+ cxtr = work->cxtr;
+ free_qd_work_item_t(work);
} else {
//
// This connector is being processed by another thread, re-queue it.
//
- work_queue_put(qd_server->work_queue, work);
- work = 0;
+ DEQ_INSERT_TAIL(qd_server->work_queue, work);
}
}
sys_mutex_unlock(qd_server->lock);
@@ -455,17 +463,17 @@ static void *thread_run(void *arg)
//
// Process the connector that we now have exclusive access to.
//
- if (work) {
- int work_done = process_connector(qd_server, work);
+ if (cxtr) {
+ int work_done = process_connector(qd_server, cxtr);
//
// Check to see if the connector was closed during processing
//
- if (pn_connector_closed(work)) {
+ if (pn_connector_closed(cxtr)) {
//
// Connector is closed. Free the context and the connector.
//
- conn = pn_connector_connection(work);
+ conn = pn_connector_connection(cxtr);
//
// If this is a dispatch connector, schedule the re-connect timer
@@ -479,7 +487,7 @@ static void *thread_run(void *arg)
sys_mutex_lock(qd_server->lock);
DEQ_REMOVE(qd_server->connections, ctx);
free_qd_connection_t(ctx);
- pn_connector_free(work);
+ pn_connector_free(cxtr);
if (conn)
pn_connection_free(conn);
qd_server->threads_active--;
@@ -644,7 +652,7 @@ qd_server_t *qd_server(int thread_count,
for (i = 0; i < thread_count; i++)
qd_server->threads[i] = thread(qd_server, i);
- qd_server->work_queue = work_queue();
+ DEQ_INIT(qd_server->work_queue);
DEQ_INIT(qd_server->pending_timers);
qd_server->a_thread_is_waiting = false;
qd_server->threads_active = 0;
@@ -669,8 +677,6 @@ void qd_server_free(qd_server_t *qd_serv
for (i = 0; i < qd_server->thread_count; i++)
thread_free(qd_server->threads[i]);
- work_queue_free(qd_server->work_queue);
-
pn_driver_free(qd_server->driver);
sys_mutex_free(qd_server->lock);
sys_cond_free(qd_server->cond);
Modified: qpid/dispatch/trunk/src/server_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server_private.h?rev=1585571&r1=1585570&r2=1585571&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server_private.h (original)
+++ qpid/dispatch/trunk/src/server_private.h Mon Apr 7 19:24:33 2014
@@ -24,7 +24,6 @@
#include <qpid/dispatch/alloc.h>
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/log.h>
-#include "work_queue.h"
#include <proton/driver.h>
#include <proton/engine.h>
#include <proton/driver_extras.h>
@@ -109,6 +108,14 @@ typedef struct qd_thread_t {
} qd_thread_t;
+typedef struct qd_work_item_t {
+ DEQ_LINKS(struct qd_work_item_t);
+ pn_connector_t *cxtr;
+} qd_work_item_t;
+
+DEQ_DECLARE(qd_work_item_t, qd_work_list_t);
+
+
struct qd_server_t {
int thread_count;
const char *container_name;
@@ -122,7 +129,7 @@ struct qd_server_t {
sys_cond_t *cond;
sys_mutex_t *lock;
qd_thread_t **threads;
- work_queue_t *work_queue;
+ qd_work_list_t work_queue;
qd_timer_list_t pending_timers;
bool a_thread_is_waiting;
int threads_active;
@@ -136,7 +143,7 @@ struct qd_server_t {
qd_connection_list_t connections;
};
-
+ALLOC_DECLARE(qd_work_item_t);
ALLOC_DECLARE(qd_listener_t);
ALLOC_DECLARE(qd_connector_t);
ALLOC_DECLARE(qd_connection_t);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org