You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/23 22:51:15 UTC
[08/38] qpid-proton git commit: PROTON-1403: c proactor library
PROTON-1403: c proactor library
Move the libuv example proactor into an installed library.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/afacb165
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/afacb165
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/afacb165
Branch: refs/heads/go1
Commit: afacb16527e9f231ae76d5e16ca0d9ac7edcff86
Parents: b9a57e8
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Feb 10 21:43:05 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Feb 10 21:49:32 2017 -0500
----------------------------------------------------------------------
examples/c/proactor/CMakeLists.txt | 33 +-
examples/c/proactor/broker.c | 44 +-
examples/c/proactor/libuv_proactor.c | 873 ------------------------
examples/c/proactor/test.py | 14 +-
examples/c/proactor/thread.h | 49 ++
proton-c/CMakeLists.txt | 74 +-
proton-c/include/proton/import_export.h | 7 +
proton-c/include/proton/listener.h | 4 +-
proton-c/include/proton/proactor.h | 26 +-
proton-c/include/proton/types.h | 17 +-
proton-c/src/libqpid-proton-proactor.pc.in | 30 +
proton-c/src/proactor/libuv.c | 873 ++++++++++++++++++++++++
12 files changed, 1074 insertions(+), 970 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
index f701651..2ed4f94 100644
--- a/examples/c/proactor/CMakeLists.txt
+++ b/examples/c/proactor/CMakeLists.txt
@@ -23,21 +23,20 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
-find_package(Libuv)
-if (Libuv_FOUND)
- foreach(name broker send receive)
- add_executable(libuv_${name} ${name}.c libuv_proactor.c)
- target_link_libraries(libuv_${name} ${Proton_LIBRARIES} ${Libuv_LIBRARIES})
- set_target_properties(libuv_${name} PROPERTIES
- COMPILE_DEFINITIONS "PN_PROACTOR_INCLUDE=\"libuv_proactor.h\"")
- endforeach()
+# Add a test with the correct environment to find test executables and valgrind.
+if(WIN32)
+ set(test_path "$<TARGET_FILE_DIR:broker>;$<TARGET_FILE_DIR:qpid-proton>")
+else(WIN32)
+ set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
+ set(PLATFORM_LIBS pthread)
+endif(WIN32)
+
+foreach(name broker send receive)
+ add_executable(proactor-${name} ${name}.c)
+ target_link_libraries(proactor-${name} ${Proton_LIBRARIES} ${PLATFORM_LIBS})
+ set_target_properties(proactor-${name} PROPERTIES OUTPUT_NAME ${name})
+endforeach()
+
+set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
+add_test(c-proactor ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py -v)
- # Add a test with the correct environment to find test executables and valgrind.
- if(WIN32)
- set(test_path "$<TARGET_FILE_DIR:libuv_broker>;$<TARGET_FILE_DIR:qpid-proton>")
- else(WIN32)
- set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
- endif(WIN32)
- set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
- add_test(c-proactor-libuv ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py)
-endif()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index ca52336..d6261f4 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -17,6 +17,8 @@
* under the License.
*/
+#include "thread.h"
+
#include <proton/connection_driver.h>
#include <proton/proactor.h>
#include <proton/engine.h>
@@ -29,11 +31,6 @@
#include <string.h>
#include <unistd.h>
-/* TODO aconway 2016-10-14: this example does not require libuv IO,
- it uses uv.h only for portable mutex and thread functions.
-*/
-#include <uv.h>
-
bool enable_debug = false;
void debug(const char* fmt, ...) {
@@ -91,7 +88,7 @@ void pcheck(int err, const char* s) {
/* Simple thread-safe queue implementation */
typedef struct queue_t {
- uv_mutex_t lock;
+ pthread_mutex_t lock;
char* name;
VEC(pn_rwbytes_t) messages; /* Messages on the queue_t */
VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
@@ -101,7 +98,7 @@ typedef struct queue_t {
static void queue_init(queue_t *q, const char* name, queue_t *next) {
debug("created queue %s", name);
- uv_mutex_init(&q->lock);
+ pthread_mutex_init(&q->lock, NULL);
q->name = strdup(name);
VEC_INIT(q->messages);
VEC_INIT(q->waiting);
@@ -110,7 +107,7 @@ static void queue_init(queue_t *q, const char* name, queue_t *next) {
}
static void queue_destroy(queue_t *q) {
- uv_mutex_destroy(&q->lock);
+ pthread_mutex_destroy(&q->lock);
free(q->name);
for (size_t i = 0; i < q->messages.len; ++i)
free(q->messages.data[i].start);
@@ -126,7 +123,7 @@ static void queue_destroy(queue_t *q) {
static void queue_send(queue_t *q, pn_link_t *s) {
pn_rwbytes_t m = { 0 };
size_t tag = 0;
- uv_mutex_lock(&q->lock);
+ pthread_mutex_lock(&q->lock);
if (q->messages.len == 0) { /* Empty, record connection as waiting */
debug("queue is empty %s", q->name);
/* Record connection for wake-up if not already on the list. */
@@ -143,7 +140,7 @@ static void queue_send(queue_t *q, pn_link_t *s) {
VEC_POP(q->messages);
tag = ++q->sent;
}
- uv_mutex_unlock(&q->lock);
+ pthread_mutex_unlock(&q->lock);
if (m.start) {
pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag)));
pn_link_send(s, m.start, m.size);
@@ -172,7 +169,7 @@ bool pn_connection_get_check_queues(pn_connection_t *c) {
*/
static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
debug("received to queue %s", q->name);
- uv_mutex_lock(&q->lock);
+ pthread_mutex_lock(&q->lock);
VEC_PUSH(q->messages, m);
if (q->messages.len == 1) { /* Was empty, notify waiting connections */
for (size_t i = 0; i < q->waiting.len; ++i) {
@@ -182,18 +179,18 @@ static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
}
q->waiting.len = 0;
}
- uv_mutex_unlock(&q->lock);
+ pthread_mutex_unlock(&q->lock);
}
/* Thread safe set of queues */
typedef struct queues_t {
- uv_mutex_t lock;
+ pthread_mutex_t lock;
queue_t *queues;
size_t sent;
} queues_t;
void queues_init(queues_t *qs) {
- uv_mutex_init(&qs->lock);
+ pthread_mutex_init(&qs->lock, NULL);
qs->queues = NULL;
}
@@ -202,12 +199,12 @@ void queues_destroy(queues_t *qs) {
queue_destroy(q);
free(q);
}
- uv_mutex_destroy(&qs->lock);
+ pthread_mutex_destroy(&qs->lock);
}
/** Get or create the named queue. */
queue_t* queues_get(queues_t *qs, const char* name) {
- uv_mutex_lock(&qs->lock);
+ pthread_mutex_lock(&qs->lock);
queue_t *q;
for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
;
@@ -216,7 +213,7 @@ queue_t* queues_get(queues_t *qs, const char* name) {
queue_init(q, name, qs->queues);
qs->queues = q;
}
- uv_mutex_unlock(&qs->lock);
+ pthread_mutex_unlock(&qs->lock);
return q;
}
@@ -255,7 +252,7 @@ static void link_send(broker_t *b, pn_link_t *s) {
}
static void queue_unsub(queue_t *q, pn_connection_t *c) {
- uv_mutex_lock(&q->lock);
+ pthread_mutex_lock(&q->lock);
for (size_t i = 0; i < q->waiting.len; ++i) {
if (q->waiting.data[i] == c){
q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */
@@ -263,7 +260,7 @@ static void queue_unsub(queue_t *q, pn_connection_t *c) {
break;
}
}
- uv_mutex_unlock(&q->lock);
+ pthread_mutex_unlock(&q->lock);
}
/* Unsubscribe from the queue of interest to this link. */
@@ -416,7 +413,7 @@ static void handle(broker_t* b, pn_event_t* e) {
}
}
-static void broker_thread(void *void_broker) {
+static void* broker_thread(void *void_broker) {
broker_t *b = (broker_t*)void_broker;
do {
pn_event_batch_t *events = pn_proactor_wait(b->proactor);
@@ -426,6 +423,7 @@ static void broker_thread(void *void_broker) {
}
pn_proactor_done(b->proactor, events);
} while(!b->finished);
+ return NULL;
}
static void usage(const char *arg0) {
@@ -474,13 +472,13 @@ int main(int argc, char **argv) {
exit(1);
}
/* Start n-1 threads and use main thread */
- uv_thread_t* threads = (uv_thread_t*)calloc(sizeof(uv_thread_t), b.threads);
+ pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), b.threads);
for (size_t i = 0; i < b.threads-1; ++i) {
- check(uv_thread_create(&threads[i], broker_thread, &b), "pthread_create");
+ check(pthread_create(&threads[i], NULL, broker_thread, &b), "pthread_create");
}
broker_thread(&b); /* Use the main thread too. */
for (size_t i = 0; i < b.threads-1; ++i) {
- check(uv_thread_join(&threads[i]), "pthread_join");
+ check(pthread_join(threads[i], NULL), "pthread_join");
}
pn_proactor_free(b.proactor);
free(threads);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/libuv_proactor.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c
deleted file mode 100644
index 42bbfab..0000000
--- a/examples/c/proactor/libuv_proactor.c
+++ /dev/null
@@ -1,873 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <uv.h>
-
-#include <proton/condition.h>
-#include <proton/connection_driver.h>
-#include <proton/engine.h>
-#include <proton/message.h>
-#include <proton/object.h>
-#include <proton/proactor.h>
-#include <proton/transport.h>
-#include <proton/url.h>
-
-#include <assert.h>
-#include <stddef.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-/*
- libuv loop functions are thread unsafe. The only exception is uv_async_send()
- which is a thread safe "wakeup" that can wake the uv_loop from another thread.
-
- To provide concurrency the proactor uses a "leader-worker-follower" model,
- threads take turns at the roles:
-
- - a single "leader" calls libuv functions and runs the uv_loop in short bursts
- to generate work. When there is work available it gives up leadership and
- becomes a "worker"
-
- - "workers" handle events concurrently for distinct connections/listeners
- They do as much work as they can get, when none is left they become "followers"
-
- - "followers" wait for the leader to generate work and become workers.
- When the leader itself becomes a worker, one of the followers takes over.
-
- This model is symmetric: any thread can take on any role based on run-time
- requirements. It also allows the IO and non-IO work associated with an IO
- wake-up to be processed in a single thread with no context switches.
-
- Function naming:
- - on_ - called in leader thread via uv_run().
- - leader_ - called in leader thread, while processing the leader_q.
- - owner_ - called in owning thread, leader or worker but not concurrently.
-
- Note on_ and leader_ functions can call each other, the prefix indicates the
- path they are most often called on.
-*/
-
-const char *COND_NAME = "proactor";
-const char *AMQP_PORT = "5672";
-const char *AMQP_PORT_NAME = "amqp";
-const char *AMQPS_PORT = "5671";
-const char *AMQPS_PORT_NAME = "amqps";
-
-PN_HANDLE(PN_PROACTOR)
-
-/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
- Class definitions are for identification as pn_event_t context only.
-*/
-PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
-PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
-
-/* common to connection and listener */
-typedef struct psocket_t {
- /* Immutable */
- pn_proactor_t *proactor;
-
- /* Protected by proactor.lock */
- struct psocket_t* next;
- void (*wakeup)(struct psocket_t*); /* interrupting action for leader */
-
- /* Only used by leader */
- uv_tcp_t tcp;
- void (*action)(struct psocket_t*); /* deferred action for leader */
- bool is_conn:1;
- char host[NI_MAXHOST];
- char port[NI_MAXSERV];
-} psocket_t;
-
-/* Special value for psocket.next pointer when socket is not on any any list. */
-psocket_t UNLISTED;
-
-static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host, const char *port) {
- ps->proactor = p;
- ps->next = &UNLISTED;
- ps->is_conn = is_conn;
- ps->tcp.data = ps;
-
- /* For platforms that don't know about "amqp" and "amqps" service names. */
- if (strcmp(port, AMQP_PORT_NAME) == 0)
- port = AMQP_PORT;
- else if (strcmp(port, AMQPS_PORT_NAME) == 0)
- port = AMQPS_PORT;
- /* Set to "\001" to indicate a NULL as opposed to an empty string "" */
- strncpy(ps->host, host ? host : "\001", sizeof(ps->host));
- strncpy(ps->port, port ? port : "\001", sizeof(ps->port));
-}
-
-/* Turn "\001" back to NULL */
-static inline const char* fixstr(const char* str) {
- return str[0] == '\001' ? NULL : str;
-}
-
-typedef struct pconnection_t {
- psocket_t psocket;
-
- /* Only used by owner thread */
- pn_connection_driver_t driver;
-
- /* Only used by leader */
- uv_connect_t connect;
- uv_timer_t timer;
- uv_write_t write;
- uv_shutdown_t shutdown;
- size_t writing;
- bool reading:1;
- bool server:1; /* accept, not connect */
-} pconnection_t;
-
-struct pn_listener_t {
- psocket_t psocket;
-
- /* Only used by owner thread */
- pconnection_t *accepting; /* accept in progress */
- pn_condition_t *condition;
- pn_collector_t *collector;
- pn_event_batch_t batch;
- pn_record_t *attachments;
- void *context;
- size_t backlog;
-};
-
-
-typedef struct queue { psocket_t *front, *back; } queue;
-
-struct pn_proactor_t {
- /* Leader thread */
- uv_cond_t cond;
- uv_loop_t loop;
- uv_async_t async;
- uv_timer_t timer;
-
- /* Owner thread: proactor collector and batch can belong to leader or a worker */
- pn_collector_t *collector;
- pn_event_batch_t batch;
-
- /* Protected by lock */
- uv_mutex_t lock;
- queue start_q;
- queue worker_q;
- queue leader_q;
- size_t interrupt; /* pending interrupts */
- pn_millis_t timeout;
- size_t count; /* psocket count */
- bool inactive:1;
- bool timeout_request:1;
- bool timeout_elapsed:1;
- bool has_leader:1;
- bool batch_working:1; /* batch belongs to a worker. */
-};
-
-static bool push_lh(queue *q, psocket_t *ps) {
- if (ps->next != &UNLISTED) /* Don't move if already listed. */
- return false;
- ps->next = NULL;
- if (!q->front) {
- q->front = q->back = ps;
- } else {
- q->back->next = ps;
- q->back = ps;
- }
- return true;
-}
-
-static psocket_t* pop_lh(queue *q) {
- psocket_t *ps = q->front;
- if (ps) {
- q->front = ps->next;
- ps->next = &UNLISTED;
- }
- return ps;
-}
-
-static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
- return ps->is_conn ? (pconnection_t*)ps : NULL;
-}
-
-static inline pn_listener_t *as_listener(psocket_t* ps) {
- return ps->is_conn ? NULL: (pn_listener_t*)ps;
-}
-
-/* Put ps on the leader queue for processing. Thread safe. */
-static void to_leader_lh(psocket_t *ps) {
- push_lh(&ps->proactor->leader_q, ps);
- uv_async_send(&ps->proactor->async); /* Wake leader */
-}
-
-static void to_leader(psocket_t *ps) {
- uv_mutex_lock(&ps->proactor->lock);
- to_leader_lh(ps);
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Detach from IO and put ps on the worker queue */
-static void leader_to_worker(psocket_t *ps) {
- if (ps->is_conn) {
- pconnection_t *pc = as_pconnection_t(ps);
- /* Don't detach if there are no events yet. */
- if (pn_connection_driver_has_event(&pc->driver)) {
- if (pc->writing) {
- pc->writing = 0;
- uv_cancel((uv_req_t*)&pc->write);
- }
- if (pc->reading) {
- pc->reading = false;
- uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
- }
- if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
- uv_timer_stop(&pc->timer);
- }
- }
- } else {
- pn_listener_t *l = as_listener(ps);
- uv_read_stop((uv_stream_t*)&l->psocket.tcp);
- }
- uv_mutex_lock(&ps->proactor->lock);
- push_lh(&ps->proactor->worker_q, ps);
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Set a deferred action for leader, if not already set. */
-static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
- uv_mutex_lock(&ps->proactor->lock);
- if (!ps->action) {
- ps->action = action;
- }
- to_leader_lh(ps);
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Owner thread send to worker thread. Set deferred action if not already set. */
-static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) {
- uv_mutex_lock(&ps->proactor->lock);
- if (!ps->action) {
- ps->action = action;
- }
- push_lh(&ps->proactor->worker_q, ps);
- uv_async_send(&ps->proactor->async); /* Wake leader */
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-
-/* Re-queue for further work */
-static void worker_requeue(psocket_t* ps) {
- uv_mutex_lock(&ps->proactor->lock);
- push_lh(&ps->proactor->worker_q, ps);
- uv_async_send(&ps->proactor->async); /* Wake leader */
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
- pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
- if (!pc) return NULL;
- if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
- return NULL;
- }
- psocket_init(&pc->psocket, p, true, host, port);
- if (server) {
- pn_transport_set_server(pc->driver.transport);
- }
- pn_record_t *r = pn_connection_attachments(pc->driver.connection);
- pn_record_def(r, PN_PROACTOR, PN_VOID);
- pn_record_set(r, PN_PROACTOR, pc);
- return pc;
-}
-
-static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
-static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
-
-static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
- return (batch->next_event == proactor_batch_next) ?
- (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
-}
-
-static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
- return (batch->next_event == listener_batch_next) ?
- (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
-}
-
-static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
- pn_connection_driver_t *d = pn_event_batch_connection_driver(batch);
- return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
-}
-
-static void leader_count(pn_proactor_t *p, int change) {
- uv_mutex_lock(&p->lock);
- p->count += change;
- p->inactive = (p->count == 0);
- uv_mutex_unlock(&p->lock);
-}
-
-/* Free if there are no uv callbacks pending and no events */
-static void leader_pconnection_t_maybe_free(pconnection_t *pc) {
- if (pn_connection_driver_has_event(&pc->driver)) {
- leader_to_worker(&pc->psocket); /* Return to worker */
- } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) {
- /* All UV requests are finished */
- pn_connection_driver_destroy(&pc->driver);
- leader_count(pc->psocket.proactor, -1);
- free(pc);
- }
-}
-
-/* Free if there are no uv callbacks pending and no events */
-static void leader_listener_maybe_free(pn_listener_t *l) {
- if (pn_collector_peek(l->collector)) {
- leader_to_worker(&l->psocket); /* Return to worker */
- } else if (!l->psocket.tcp.data) {
- pn_condition_free(l->condition);
- leader_count(l->psocket.proactor, -1);
- free(l);
- }
-}
-
-/* Free if there are no uv callbacks pending and no events */
-static void leader_maybe_free(psocket_t *ps) {
- if (ps->is_conn) {
- leader_pconnection_t_maybe_free(as_pconnection_t(ps));
- } else {
- leader_listener_maybe_free(as_listener(ps));
- }
-}
-
-static void on_close(uv_handle_t *h) {
- psocket_t *ps = (psocket_t*)h->data;
- h->data = NULL; /* Mark closed */
- leader_maybe_free(ps);
-}
-
-static void on_shutdown(uv_shutdown_t *shutdown, int err) {
- psocket_t *ps = (psocket_t*)shutdown->data;
- shutdown->data = NULL; /* Mark closed */
- leader_maybe_free(ps);
-}
-
-static inline void leader_close(psocket_t *ps) {
- if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) {
- uv_close((uv_handle_t*)&ps->tcp, on_close);
- }
- pconnection_t *pc = as_pconnection_t(ps);
- if (pc) {
- pn_connection_driver_close(&pc->driver);
- if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
- uv_timer_stop(&pc->timer);
- uv_close((uv_handle_t*)&pc->timer, on_close);
- }
- }
- leader_maybe_free(ps);
-}
-
-static pconnection_t *get_pconnection_t(pn_connection_t* c) {
- if (!c) return NULL;
- pn_record_t *r = pn_connection_attachments(c);
- return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
-}
-
-static void leader_error(psocket_t *ps, int err, const char* what) {
- if (ps->is_conn) {
- pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
- pn_connection_driver_bind(driver); /* Bind so errors will be reported */
- pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
- what, fixstr(ps->host), fixstr(ps->port),
- uv_strerror(err));
- pn_connection_driver_close(driver);
- } else {
- pn_listener_t *l = as_listener(ps);
- pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
- what, fixstr(ps->host), fixstr(ps->port),
- uv_strerror(err));
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
- }
- leader_to_worker(ps); /* Worker to handle the error */
-}
-
-/* uv-initialization */
-static int leader_init(psocket_t *ps) {
- leader_count(ps->proactor, +1);
- int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
- if (!err) {
- pconnection_t *pc = as_pconnection_t(ps);
- if (pc) {
- pc->connect.data = ps;
- int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
- if (!err) {
- pc->timer.data = pc;
- }
- }
- }
- if (err) {
- leader_error(ps, err, "initialization");
- }
- return err;
-}
-
-/* Common logic for on_connect and on_accept */
-static void leader_connect_accept(pconnection_t *pc, int err, const char *what) {
- if (!err) {
- leader_to_worker(&pc->psocket);
- } else {
- leader_error(&pc->psocket, err, what);
- }
-}
-
-static void on_connect(uv_connect_t *connect, int err) {
- leader_connect_accept((pconnection_t*)connect->data, err, "on connect");
-}
-
-static void on_accept(uv_stream_t* server, int err) {
- pn_listener_t *l = (pn_listener_t*) server->data;
- if (err) {
- leader_error(&l->psocket, err, "on accept");
- }
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
- leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */
-}
-
-static void leader_accept(psocket_t *ps) {
- pn_listener_t * l = as_listener(ps);
- pconnection_t *pc = l->accepting;
- l->accepting = NULL;
- if (pc) {
- int err = leader_init(&pc->psocket);
- if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
- leader_connect_accept(pc, err, "on accept");
- }
-}
-
-static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
- int err = leader_init(ps);
- struct addrinfo hints = { 0 };
- if (server) hints.ai_flags = AI_PASSIVE;
- if (!err) {
- err = uv_getaddrinfo(&ps->proactor->loop, info, NULL, fixstr(ps->host), fixstr(ps->port), &hints);
- }
- return err;
-}
-
-static void leader_connect(psocket_t *ps) {
- pconnection_t *pc = as_pconnection_t(ps);
- uv_getaddrinfo_t info;
- int err = leader_resolve(ps, &info, false);
- if (!err) {
- err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect);
- uv_freeaddrinfo(info.addrinfo);
- }
- if (err) {
- leader_error(ps, err, "connect to");
- }
-}
-
-static void leader_listen(psocket_t *ps) {
- pn_listener_t *l = as_listener(ps);
- uv_getaddrinfo_t info;
- int err = leader_resolve(ps, &info, true);
- if (!err) {
- err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
- uv_freeaddrinfo(info.addrinfo);
- }
- if (!err) err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
- if (err) {
- leader_error(ps, err, "listen on ");
- }
-}
-
-static void on_tick(uv_timer_t *timer) {
- pconnection_t *pc = (pconnection_t*)timer->data;
- pn_transport_t *t = pc->driver.transport;
- if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
- uv_timer_stop(&pc->timer);
- uint64_t now = uv_now(pc->timer.loop);
- uint64_t next = pn_transport_tick(t, now);
- if (next) {
- uv_timer_start(&pc->timer, on_tick, next - now, 0);
- }
- }
-}
-
-static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
- pconnection_t *pc = (pconnection_t*)stream->data;
- if (nread >= 0) {
- pn_connection_driver_read_done(&pc->driver, nread);
- on_tick(&pc->timer); /* check for tick changes. */
- leader_to_worker(&pc->psocket);
- /* Reading continues automatically until stopped. */
- } else if (nread == UV_EOF) { /* hangup */
- pn_connection_driver_read_close(&pc->driver);
- leader_maybe_free(&pc->psocket);
- } else {
- leader_error(&pc->psocket, nread, "on read from");
- }
-}
-
-static void on_write(uv_write_t* write, int err) {
- pconnection_t *pc = (pconnection_t*)write->data;
- write->data = NULL;
- if (err == 0) {
- pn_connection_driver_write_done(&pc->driver, pc->writing);
- leader_to_worker(&pc->psocket);
- } else if (err == UV_ECANCELED) {
- leader_maybe_free(&pc->psocket);
- } else {
- leader_error(&pc->psocket, err, "on write to");
- }
- pc->writing = 0; /* Need to send a new write request */
-}
-
-static void on_timeout(uv_timer_t *timer) {
- pn_proactor_t *p = (pn_proactor_t*)timer->data;
- uv_mutex_lock(&p->lock);
- p->timeout_elapsed = true;
- uv_mutex_unlock(&p->lock);
-}
-
-// Read buffer allocation function for uv, just returns the transports read buffer.
-static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
- pconnection_t *pc = (pconnection_t*)stream->data;
- pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
- *buf = uv_buf_init(rbuf.start, rbuf.size);
-}
-
-static void leader_rewatch(psocket_t *ps) {
- int err = 0;
- if (ps->is_conn) {
- pconnection_t *pc = as_pconnection_t(ps);
- if (pc->timer.data) { /* uv-initialized */
- on_tick(&pc->timer); /* Re-enable ticks if required */
- }
- pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
- pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-
- /* Ticks and checking buffers can generate events, process before proceeding */
- if (pn_connection_driver_has_event(&pc->driver)) {
- leader_to_worker(ps);
- } else { /* Re-watch for IO */
- if (wbuf.size > 0 && !pc->writing) {
- pc->writing = wbuf.size;
- uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
- pc->write.data = ps;
- uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
- } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
- pc->shutdown.data = ps;
- uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
- }
- if (rbuf.size > 0 && !pc->reading) {
- pc->reading = true;
- err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
- }
- }
- } else {
- pn_listener_t *l = as_listener(ps);
- err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
- }
- if (err) {
- leader_error(ps, err, "rewatch");
- }
-}
-
-/* Set the event in the proactor's batch */
-static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
- pn_collector_put(p->collector, pn_proactor__class(), p, t);
- p->batch_working = true;
- return &p->batch;
-}
-
-/* Return the next event batch or 0 if no events are ready */
-static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
- if (!p->batch_working) { /* Can generate proactor events */
- if (p->inactive) {
- p->inactive = false;
- return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
- }
- if (p->interrupt > 0) {
- --p->interrupt;
- return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
- }
- if (p->timeout_elapsed) {
- p->timeout_elapsed = false;
- return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT);
- }
- }
- for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
- if (ps->is_conn) {
- pconnection_t *pc = as_pconnection_t(ps);
- return &pc->driver.batch;
- } else { /* Listener */
- pn_listener_t *l = as_listener(ps);
- return &l->batch;
- }
- to_leader(ps); /* No event, back to leader */
- }
- return 0;
-}
-
-/* Called in any thread to set a wakeup action. Replaces any previous wakeup action. */
-static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
- uv_mutex_lock(&ps->proactor->lock);
- ps->wakeup = action;
- to_leader_lh(ps);
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-pn_listener_t *pn_event_listener(pn_event_t *e) {
- return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
-}
-
-pn_proactor_t *pn_event_proactor(pn_event_t *e) {
- if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
- pn_listener_t *l = pn_event_listener(e);
- if (l) return l->psocket.proactor;
- pn_connection_t *c = pn_event_connection(e);
- if (c) return pn_connection_proactor(pn_event_connection(e));
- return NULL;
-}
-
-void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
- pconnection_t *pc = batch_pconnection(batch);
- if (pc) {
- if (pn_connection_driver_has_event(&pc->driver)) {
- /* Process all events before going back to IO. */
- worker_requeue(&pc->psocket);
- } else if (pn_connection_driver_finished(&pc->driver)) {
- owner_to_leader(&pc->psocket, leader_close);
- } else {
- owner_to_leader(&pc->psocket, leader_rewatch);
- }
- return;
- }
- pn_listener_t *l = batch_listener(batch);
- if (l) {
- owner_to_leader(&l->psocket, leader_rewatch);
- return;
- }
- pn_proactor_t *bp = batch_proactor(batch);
- if (bp == p) {
- uv_mutex_lock(&p->lock);
- p->batch_working = false;
- uv_async_send(&p->async); /* Wake leader */
- uv_mutex_unlock(&p->lock);
- return;
- }
-}
-
-/* Run follower/leader loop till we can return an event and be a worker */
-pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
- uv_mutex_lock(&p->lock);
- /* Try to grab work immediately. */
- pn_event_batch_t *batch = get_batch_lh(p);
- if (batch == NULL) {
- /* No work available, follow the leader */
- while (p->has_leader) {
- uv_cond_wait(&p->cond, &p->lock);
- }
- /* Lead till there is work to do. */
- p->has_leader = true;
- while (batch == NULL) {
- if (p->timeout_request) {
- p->timeout_request = false;
- if (p->timeout) {
- uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
- } else {
- uv_timer_stop(&p->timer);
- }
- }
- for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
- void (*action)(psocket_t*) = ps->action;
- void (*wakeup)(psocket_t*) = ps->wakeup;
- ps->action = NULL;
- ps->wakeup = NULL;
- if (action || wakeup) {
- uv_mutex_unlock(&p->lock);
- if (action) action(ps);
- if (wakeup) wakeup(ps);
- uv_mutex_lock(&p->lock);
- }
- }
- batch = get_batch_lh(p);
- if (batch == NULL) {
- uv_mutex_unlock(&p->lock);
- uv_run(&p->loop, UV_RUN_ONCE);
- uv_mutex_lock(&p->lock);
- }
- }
- /* Signal the next leader and return to work */
- p->has_leader = false;
- uv_cond_signal(&p->cond);
- }
- uv_mutex_unlock(&p->lock);
- return batch;
-}
-
-void pn_proactor_interrupt(pn_proactor_t *p) {
- uv_mutex_lock(&p->lock);
- ++p->interrupt;
- uv_async_send(&p->async); /* Interrupt the UV loop */
- uv_mutex_unlock(&p->lock);
-}
-
-void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
- uv_mutex_lock(&p->lock);
- p->timeout = t;
- p->timeout_request = true;
- uv_async_send(&p->async); /* Interrupt the UV loop */
- uv_mutex_unlock(&p->lock);
-}
-
-int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
- pconnection_t *pc = new_pconnection_t(p, c, false, host, port);
- if (!pc) {
- return PN_OUT_OF_MEMORY;
- }
- /* Process PN_CONNECTION_INIT before binding */
- owner_to_worker(&pc->psocket, leader_connect);
- return 0;
-}
-
-int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog)
-{
- psocket_init(&l->psocket, p, false, host, port);
- l->backlog = backlog;
- owner_to_leader(&l->psocket, leader_listen);
- return 0;
-}
-
-pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
- pconnection_t *pc = get_pconnection_t(c);
- return pc ? pc->psocket.proactor : NULL;
-}
-
-void leader_wake_connection(psocket_t *ps) {
- pconnection_t *pc = as_pconnection_t(ps);
- pn_connection_t *c = pc->driver.connection;
- pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
- leader_to_worker(ps);
-}
-
-void pn_connection_wake(pn_connection_t* c) {
- wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
-}
-
-pn_proactor_t *pn_proactor() {
- pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
- p->collector = pn_collector();
- p->batch.next_event = &proactor_batch_next;
- if (!p->collector) return NULL;
- uv_loop_init(&p->loop);
- uv_mutex_init(&p->lock);
- uv_cond_init(&p->cond);
- uv_async_init(&p->loop, &p->async, NULL);
- uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */
- p->timer.data = p;
- return p;
-}
-
-static void on_stopping(uv_handle_t* h, void* v) {
- uv_close(h, NULL); /* Close this handle */
- if (!uv_loop_alive(h->loop)) /* Everything closed */
- uv_stop(h->loop); /* Stop the loop, pn_proactor_destroy() can return */
-}
-
-void pn_proactor_free(pn_proactor_t *p) {
- uv_walk(&p->loop, on_stopping, NULL); /* Close all handles */
- uv_run(&p->loop, UV_RUN_DEFAULT); /* Run till stop, all handles closed */
- uv_loop_close(&p->loop);
- uv_mutex_destroy(&p->lock);
- uv_cond_destroy(&p->cond);
- pn_collector_free(p->collector);
- free(p);
-}
-
-static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
- pn_listener_t *l = batch_listener(batch);
- pn_event_t *handled = pn_collector_prev(l->collector);
- if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) {
- owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */
- }
- return pn_collector_next(l->collector);
-}
-
-static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
- return pn_collector_next(batch_proactor(batch)->collector);
-}
-
-static void pn_listener_free(pn_listener_t *l) {
- if (l) {
- if (!l->collector) pn_collector_free(l->collector);
- if (!l->condition) pn_condition_free(l->condition);
- if (!l->attachments) pn_free(l->attachments);
- free(l);
- }
-}
-
-pn_listener_t *pn_listener() {
- pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
- if (l) {
- l->batch.next_event = listener_batch_next;
- l->collector = pn_collector();
- l->condition = pn_condition();
- l->attachments = pn_record();
- if (!l->condition || !l->collector || !l->attachments) {
- pn_listener_free(l);
- return NULL;
- }
- }
- return l;
-}
-
-void pn_listener_close(pn_listener_t* l) {
- wakeup(&l->psocket, leader_close);
-}
-
-pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
- return l ? l->psocket.proactor : NULL;
-}
-
-pn_condition_t* pn_listener_condition(pn_listener_t* l) {
- return l->condition;
-}
-
-void *pn_listener_get_context(pn_listener_t *l) {
- return l->context;
-}
-
-void pn_listener_set_context(pn_listener_t *l, void *context) {
- l->context = context;
-}
-
-pn_record_t *pn_listener_attachments(pn_listener_t *l) {
- return l->attachments;
-}
-
-int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
- if (l->accepting) {
- return PN_STATE_ERR; /* Only one at a time */
- }
- l->accepting = new_pconnection_t(
- l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
- if (!l->accepting) {
- return UV_ENOMEM;
- }
- owner_to_leader(&l->psocket, leader_accept);
- return 0;
-}
-
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
index a86425d..29aa327 100644
--- a/examples/c/proactor/test.py
+++ b/examples/c/proactor/test.py
@@ -32,27 +32,27 @@ def receive_expect(n):
return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n
class CExampleTest(BrokerTestCase):
- broker_exe = ["libuv_broker"]
+ broker_exe = ["broker"]
def test_send_receive(self):
"""Send first then receive"""
- s = self.proc(["libuv_send", "-a", self.addr])
+ s = self.proc(["send", "-a", self.addr])
self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
- r = self.proc(["libuv_receive", "-a", self.addr])
+ r = self.proc(["receive", "-a", self.addr])
self.assertEqual(receive_expect(100), r.wait_out())
def test_receive_send(self):
"""Start receiving first, then send."""
- r = self.proc(["libuv_receive", "-a", self.addr]);
- s = self.proc(["libuv_send", "-a", self.addr]);
+ r = self.proc(["receive", "-a", self.addr]);
+ s = self.proc(["send", "-a", self.addr]);
self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
self.assertEqual(receive_expect(100), r.wait_out())
def test_timed_send(self):
"""Send with timed delay"""
- s = self.proc(["libuv_send", "-a", self.addr, "-d100", "-m3"])
+ s = self.proc(["send", "-a", self.addr, "-d100", "-m3"])
self.assertEqual("3 messages sent and acknowledged\n", s.wait_out())
- r = self.proc(["libuv_receive", "-a", self.addr, "-m3"])
+ r = self.proc(["receive", "-a", self.addr, "-m3"])
self.assertEqual(receive_expect(3), r.wait_out())
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/thread.h
----------------------------------------------------------------------
diff --git a/examples/c/proactor/thread.h b/examples/c/proactor/thread.h
new file mode 100644
index 0000000..3b9f19e
--- /dev/null
+++ b/examples/c/proactor/thread.h
@@ -0,0 +1,49 @@
+#ifndef _PROTON_EXAMPLES_C_PROACTOR_THREADS_H
+#define _PROTON_EXAMPLES_C_PROACTOR_THREADS_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */
+
+#ifdef _WIN32
+
+#include <windows.h>
+#include <time.h>
+#define _WIN32_WINNT 0x500 /* WINBASE.H - Enable SignalObjectAndWait */
+#include <process.h>
+#include <windows.h>
+
+#define pthread_function DWORD WINAPI
+#define pthread_function_return DWORD
+#define pthread_t HANDLE
+#define pthread_create(thhandle,attr,thfunc,tharg) (int)((*thhandle=(HANDLE)_beginthreadex(NULL,0,(DWORD WINAPI(*)())thfunc,tharg,0,NULL))==NULL)
+#define pthread_join(thread, result) ((WaitForSingleObject((thread),INFINITE)!=WAIT_OBJECT_0) || !CloseHandle(thread))
+#define pthread_mutex_T HANDLE
+#define pthread_mutex_init(pobject,pattr) (*pobject=CreateMutex(NULL,FALSE,NULL))
+#define pthread_mutex_destroy(pobject) CloseHandle(*pobject)
+#define pthread_mutex_lock(pobject) WaitForSingleObject(*pobject,INFINITE)
+#define pthread_mutex_unlock(pobject) ReleaseMutex(*pobject)
+
+#else
+
+#include <pthread.h>
+
+#endif
+
+#endif
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index 8edb661..e5552c5 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -105,6 +105,13 @@ else(PN_WINAPI)
set (pn_selector_impl src/reactor/io/posix/selector.c)
endif(PN_WINAPI)
+# Select proactor impl
+find_package(Libuv)
+if (Libuv_FOUND)
+ set (qpid-proton-proactor src/proactor/libuv.c)
+ set (PROACTOR_LIBS ${Libuv_LIBRARIES})
+endif()
+
# Link in SASL if present
if (SASL_IMPL STREQUAL cyrus)
set(pn_sasl_impl src/sasl/sasl.c src/sasl/cyrus_sasl.c)
@@ -116,7 +123,7 @@ endif ()
# Set Compiler extra flags for Solaris when using SunStudio
if(CMAKE_CXX_COMPILER_ID STREQUAL "SunPro" )
- set( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mt" )
+ set( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mt" )
endif()
if(CMAKE_C_COMPILER_ID STREQUAL "SunPro" )
@@ -327,6 +334,7 @@ set (qpid-proton-platform-all
src/reactor/io/windows/selector.c
src/reactor/io/posix/io.c
src/reactor/io/posix/selector.c
+ src/proactor/libuv.c
)
# platform specific library build:
@@ -379,7 +387,7 @@ set (qpid-proton-core
src/core/autodetect.c
src/core/transport.c
src/core/message.c
- )
+)
set (qpid-proton-include-generated
${CMAKE_CURRENT_BINARY_DIR}/src/encodings.h
@@ -455,6 +463,7 @@ set (qpid-proton-include
include/proton/log.h
include/proton/message.h
include/proton/object.h
+ include/proton/proactor.h
include/proton/sasl.h
include/proton/session.h
include/proton/ssl.h
@@ -495,9 +504,15 @@ set_source_files_properties (
COMPILE_DEFINITIONS "${PLATFORM_DEFINITIONS}"
)
+set_source_files_properties (${qpid-proton-proactor} PROPERTIES
+ # Skip COMPILE_LANGUAGE_FLAGS, libuv.h won't compile with --std=c99
+ COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${LTO} "
+ )
+
if (BUILD_WITH_CXX)
set_source_files_properties (
${qpid-proton-core}
+ ${qpid-proton-proactor}
${qpid-proton-layers}
${qpid-proton-extra}
${qpid-proton-platform}
@@ -526,6 +541,12 @@ set_target_properties (
LINK_FLAGS "${CATCH_UNDEFINED} ${LTO}"
)
+add_library (
+ qpid-proton-proactor SHARED
+ ${qpid-proton-proactor}
+ )
+target_link_libraries (qpid-proton-proactor qpid-proton-core ${PROACTOR_LIBS})
+
add_library(
qpid-proton SHARED
# Proton Core
@@ -534,7 +555,8 @@ add_library(
${qpid-proton-platform}
${qpid-proton-include}
${qpid-proton-include-generated}
-
+ # Proactor
+ ${qpid-proton-proactor}
# Proton Reactor/Messenger
${qpid-proton-extra}
${qpid-proton-platform-io}
@@ -550,7 +572,7 @@ if (MSVC)
add_dependencies(qpid-proton qpid-proton-core)
endif (MSVC)
-target_link_libraries (qpid-proton ${UUID_LIB} ${SSL_LIB} ${SASL_LIB} ${TIME_LIB} ${PLATFORM_LIBS})
+target_link_libraries (qpid-proton ${UUID_LIB} ${SSL_LIB} ${SASL_LIB} ${TIME_LIB} ${PLATFORM_LIBS} ${PROACTOR_LIBS})
set_target_properties (
qpid-proton
@@ -586,32 +608,26 @@ install (FILES ${headers} DESTINATION ${INCLUDE_INSTALL_DIR}/proton)
install (FILES ${CMAKE_CURRENT_BINARY_DIR}/include/proton/version.h
DESTINATION ${INCLUDE_INSTALL_DIR}/proton)
-# Pkg config file
-configure_file(
- ${CMAKE_CURRENT_SOURCE_DIR}/src/libqpid-proton.pc.in
- ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton.pc @ONLY)
-install (FILES
- ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton.pc
- DESTINATION ${LIB_INSTALL_DIR}/pkgconfig)
-configure_file(
- ${CMAKE_CURRENT_SOURCE_DIR}/src/libqpid-proton-core.pc.in
- ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton-core.pc @ONLY)
-install (FILES
- ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton-core.pc
- DESTINATION ${LIB_INSTALL_DIR}/pkgconfig)
-
+# Set ${VAR}/${VAR}DEBUG variables, configure and install the packageconf files for LIB
+macro(configure_lib VAR LIB)
+ if(DEFINED CMAKE_IMPORT_LIBRARY_PREFIX)
+ set(LIB_PREFIX ${CMAKE_IMPORT_LIBRARY_PREFIX})
+ set(LIB_SUFFIX ${CMAKE_IMPORT_LIBRARY_SUFFIX})
+ else()
+ set(LIB_PREFIX ${CMAKE_SHARED_LIBRARY_PREFIX})
+ set(LIB_SUFFIX ${CMAKE_SHARED_LIBRARY_SUFFIX})
+ endif()
+ set(${VAR} ${LIB_PREFIX}${LIB}${LIB_SUFFIX})
+ set("${VAR}DEBUG" ${LIB_PREFIX}${LIB}${CMAKE_DEBUG_POSTFIX}${LIB_SUFFIX})
+ configure_file(
+ ${CMAKE_CURRENT_SOURCE_DIR}/src/lib${LIB}.pc.in
+ ${CMAKE_CURRENT_BINARY_DIR}/lib${LIB}.pc @ONLY)
+ install (FILES ${CMAKE_CURRENT_BINARY_DIR}/lib${LIB}.pc DESTINATION ${LIB_INSTALL_DIR}/pkgconfig)
+endmacro()
-if (DEFINED CMAKE_IMPORT_LIBRARY_PREFIX)
-set(PROTONLIB ${CMAKE_IMPORT_LIBRARY_PREFIX}qpid-proton${CMAKE_IMPORT_LIBRARY_SUFFIX})
-set(PROTONLIBDEBUG ${CMAKE_IMPORT_LIBRARY_PREFIX}qpid-proton${CMAKE_DEBUG_POSTFIX}${CMAKE_IMPORT_LIBRARY_SUFFIX})
-set(PROTONCORELIB ${CMAKE_IMPORT_LIBRARY_PREFIX}qpid-proton-core${CMAKE_IMPORT_LIBRARY_SUFFIX})
-set(PROTONCORELIBDEBUG ${CMAKE_IMPORT_LIBRARY_PREFIX}qpid-proton-core${CMAKE_DEBUG_POSTFIX}${CMAKE_IMPORT_LIBRARY_SUFFIX})
-else ()
-set(PROTONLIB ${CMAKE_SHARED_LIBRARY_PREFIX}qpid-proton${CMAKE_SHARED_LIBRARY_SUFFIX})
-set(PROTONLIBDEBUG ${CMAKE_SHARED_LIBRARY_PREFIX}qpid-proton${CMAKE_DEBUG_POSTFIX}${CMAKE_SHARED_LIBRARY_SUFFIX})
-set(PROTONCORELIB ${CMAKE_SHARED_LIBRARY_PREFIX}qpid-proton-core${CMAKE_SHARED_LIBRARY_SUFFIX})
-set(PROTONCORELIBDEBUG ${CMAKE_SHARED_LIBRARY_PREFIX}qpid-proton-core${CMAKE_DEBUG_POSTFIX}${CMAKE_SHARED_LIBRARY_SUFFIX})
-endif ()
+configure_lib(PROTONLIB qpid-proton)
+configure_lib(PROTONCORELIB qpid-proton-core)
+configure_lib(PROTONPROACTORLIB qpid-proton-proactor)
include(WriteBasicConfigVersionFile)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/include/proton/import_export.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/import_export.h b/proton-c/include/proton/import_export.h
index 6547a07..86776cd 100644
--- a/proton-c/include/proton/import_export.h
+++ b/proton-c/include/proton/import_export.h
@@ -56,6 +56,13 @@
# define PN_EXTERN PN_IMPORT
#endif
+// For proactor proton symbols
+#if defined(qpid_proton_proactor_EXPORTS) || defined(qpid_proton_EXPORTS)
+# define PNP_EXTERN PN_EXPORT
+#else
+# define PNP_EXTERN PN_IMPORT
+#endif
+
// For extra proton symbols
#if defined(qpid_proton_EXPORTS)
# define PNX_EXTERN PN_EXPORT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/include/proton/listener.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h
index 729c095..4656ee4 100644
--- a/proton-c/include/proton/listener.h
+++ b/proton-c/include/proton/listener.h
@@ -63,7 +63,7 @@ PN_EXTERN pn_condition_t *pn_listener_condition(pn_listener_t *l);
/**
* @cond INTERNAL
*/
-
+
/**
* @deprecated
*
@@ -81,7 +81,7 @@ PN_EXTERN void pn_listener_set_context(pn_listener_t *listener, void *context);
/**
* @endcond
*/
-
+
/**
* Get the attachments that are associated with a listener object.
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 695bbb1..71a7dda 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -53,13 +53,13 @@ extern "C" {
/**
* Create a proactor. Must be freed with pn_proactor_free()
*/
-pn_proactor_t *pn_proactor(void);
+PNP_EXTERN pn_proactor_t *pn_proactor(void);
/**
* Free the proactor. Abort any open network connections and clean up all
* associated resources.
*/
-void pn_proactor_free(pn_proactor_t *proactor);
+PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor);
/**
* Connect connection to host/port. Connection and transport events will be
@@ -72,9 +72,9 @@ void pn_proactor_free(pn_proactor_t *proactor);
*
* @return error on immediate error, e.g. an allocation failure.
* Other errors are indicated by connection or transport events via
- * pn_proactor_wait()
+PNP_EXTERN * pn_proactor_wait()
*/
-int pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection,
+PNP_EXTERN int pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection,
const char *host, const char *port);
/**
@@ -91,7 +91,7 @@ int pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection,
* Other errors are indicated by pn_listener_condition() on the
* PN_LISTENER_CLOSE event.
*/
-int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener,
+PNP_EXTERN int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener,
const char *host, const char *port, int backlog);
/**
@@ -111,7 +111,7 @@ int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener,
* batch must be handled in sequence, but batches returned by separate
* calls to pn_proactor_wait() can be handled concurrently.
*/
-pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor);
+PNP_EXTERN pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor);
/**
* Call when done handling a batch of events.
@@ -122,7 +122,7 @@ pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor);
* @note Thread-safe: may be called from any thread provided the
* exactly once rule is respected.
*/
-void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events);
+PNP_EXTERN void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events);
/**
* Cause PN_PROACTOR_INTERRUPT to be returned to exactly one call of
@@ -136,7 +136,7 @@ void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events);
*
* @note Thread-safe.
*/
-void pn_proactor_interrupt(pn_proactor_t *proactor);
+PNP_EXTERN void pn_proactor_interrupt(pn_proactor_t *proactor);
/**
* Cause PN_PROACTOR_TIMEOUT to be returned to a thread calling wait()
@@ -148,7 +148,7 @@ void pn_proactor_interrupt(pn_proactor_t *proactor);
* timeout. `pn_proactor_set_timeout(0)` will cancel the timeout
* without setting a new one.
*/
-void pn_proactor_set_timeout(pn_proactor_t *proactor, pn_millis_t timeout);
+PNP_EXTERN void pn_proactor_set_timeout(pn_proactor_t *proactor, pn_millis_t timeout);
/**
* Cause a PN_CONNECTION_WAKE event to be returned by the proactor, even if
@@ -160,22 +160,22 @@ void pn_proactor_set_timeout(pn_proactor_t *proactor, pn_millis_t timeout);
* Wakes can be "coalesced" - if several pn_connection_wake() calls happen
* concurrently, there may be only one PN_CONNECTION_WAKE event.
*/
-void pn_connection_wake(pn_connection_t *connection);
+PNP_EXTERN void pn_connection_wake(pn_connection_t *connection);
/**
* Return the proactor associated with a connection or NULL.
*/
-pn_proactor_t *pn_connection_proactor(pn_connection_t *connection);
+PNP_EXTERN pn_proactor_t *pn_connection_proactor(pn_connection_t *connection);
/**
* Return the proactor associated with an event or NULL.
*/
-pn_proactor_t *pn_event_proactor(pn_event_t *event);
+PNP_EXTERN pn_proactor_t *pn_event_proactor(pn_event_t *event);
/**
* Return the listener associated with an event or NULL.
*/
-pn_listener_t *pn_event_listener(pn_event_t *event);
+PNP_EXTERN pn_listener_t *pn_event_listener(pn_event_t *event);
/**
* @}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/include/proton/types.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/types.h b/proton-c/include/proton/types.h
index 4400393..1abe9e6 100644
--- a/proton-c/include/proton/types.h
+++ b/proton-c/include/proton/types.h
@@ -407,6 +407,12 @@ typedef struct pn_delivery_t pn_delivery_t;
typedef struct pn_collector_t pn_collector_t;
/**
+ * A listener accepts connections.
+ * @ingroup listener
+ */
+typedef struct pn_listener_t pn_listener_t;
+
+/**
* An AMQP Transport object.
*
* A pn_transport_t encapsulates the transport related state of all
@@ -419,6 +425,11 @@ typedef struct pn_collector_t pn_collector_t;
typedef struct pn_transport_t pn_transport_t;
/**
+ * The proactor, see pn_proactor()
+ */
+typedef struct pn_proactor_t pn_proactor_t;
+
+/**
* @cond INTERNAL
*
* An event handler
@@ -426,12 +437,6 @@ typedef struct pn_transport_t pn_transport_t;
* A pn_handler_t is target of ::pn_event_t dispatched by the pn_reactor_t
*/
typedef struct pn_handler_t pn_handler_t;
-
-/**
- *
- */
-typedef struct pn_proactor_t pn_proactor_t;
-typedef struct pn_listener_t pn_listener_t;
/**
* @endcond
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/src/libqpid-proton-proactor.pc.in
----------------------------------------------------------------------
diff --git a/proton-c/src/libqpid-proton-proactor.pc.in b/proton-c/src/libqpid-proton-proactor.pc.in
new file mode 100644
index 0000000..19007a8
--- /dev/null
+++ b/proton-c/src/libqpid-proton-proactor.pc.in
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+prefix=@PREFIX@
+exec_prefix=@EXEC_PREFIX@
+libdir=@LIBDIR@
+includedir=@INCLUDEDIR@
+
+Name: Proton Proactor
+Description: Qpid Proton C proative IO library
+Version: @PN_VERSION@
+URL: http://qpid.apache.org/proton/
+Libs: -L${libdir} -lqpid-proton-proactor
+Cflags: -I${includedir}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
new file mode 100644
index 0000000..42bbfab
--- /dev/null
+++ b/proton-c/src/proactor/libuv.c
@@ -0,0 +1,873 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <uv.h>
+
+#include <proton/condition.h>
+#include <proton/connection_driver.h>
+#include <proton/engine.h>
+#include <proton/message.h>
+#include <proton/object.h>
+#include <proton/proactor.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+
+#include <assert.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+/*
+ libuv loop functions are thread unsafe. The only exception is uv_async_send()
+ which is a thread safe "wakeup" that can wake the uv_loop from another thread.
+
+ To provide concurrency the proactor uses a "leader-worker-follower" model,
+ threads take turns at the roles:
+
+ - a single "leader" calls libuv functions and runs the uv_loop in short bursts
+ to generate work. When there is work available it gives up leadership and
+ becomes a "worker"
+
+ - "workers" handle events concurrently for distinct connections/listeners
+ They do as much work as they can get, when none is left they become "followers"
+
+ - "followers" wait for the leader to generate work and become workers.
+ When the leader itself becomes a worker, one of the followers takes over.
+
+ This model is symmetric: any thread can take on any role based on run-time
+ requirements. It also allows the IO and non-IO work associated with an IO
+ wake-up to be processed in a single thread with no context switches.
+
+ Function naming:
+ - on_ - called in leader thread via uv_run().
+ - leader_ - called in leader thread, while processing the leader_q.
+ - owner_ - called in owning thread, leader or worker but not concurrently.
+
+ Note on_ and leader_ functions can call each other, the prefix indicates the
+ path they are most often called on.
+*/
+
+const char *COND_NAME = "proactor";
+const char *AMQP_PORT = "5672";
+const char *AMQP_PORT_NAME = "amqp";
+const char *AMQPS_PORT = "5671";
+const char *AMQPS_PORT_NAME = "amqps";
+
+PN_HANDLE(PN_PROACTOR)
+
+/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
+ Class definitions are for identification as pn_event_t context only.
+*/
+PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
+PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
+
+/* common to connection and listener */
+typedef struct psocket_t {
+ /* Immutable */
+ pn_proactor_t *proactor;
+
+ /* Protected by proactor.lock */
+ struct psocket_t* next;
+ void (*wakeup)(struct psocket_t*); /* interrupting action for leader */
+
+ /* Only used by leader */
+ uv_tcp_t tcp;
+ void (*action)(struct psocket_t*); /* deferred action for leader */
+ bool is_conn:1;
+ char host[NI_MAXHOST];
+ char port[NI_MAXSERV];
+} psocket_t;
+
+/* Special value for psocket.next pointer when socket is not on any any list. */
+psocket_t UNLISTED;
+
+static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host, const char *port) {
+ ps->proactor = p;
+ ps->next = &UNLISTED;
+ ps->is_conn = is_conn;
+ ps->tcp.data = ps;
+
+ /* For platforms that don't know about "amqp" and "amqps" service names. */
+ if (strcmp(port, AMQP_PORT_NAME) == 0)
+ port = AMQP_PORT;
+ else if (strcmp(port, AMQPS_PORT_NAME) == 0)
+ port = AMQPS_PORT;
+ /* Set to "\001" to indicate a NULL as opposed to an empty string "" */
+ strncpy(ps->host, host ? host : "\001", sizeof(ps->host));
+ strncpy(ps->port, port ? port : "\001", sizeof(ps->port));
+}
+
+/* Turn "\001" back to NULL */
+static inline const char* fixstr(const char* str) {
+ return str[0] == '\001' ? NULL : str;
+}
+
+typedef struct pconnection_t {
+ psocket_t psocket;
+
+ /* Only used by owner thread */
+ pn_connection_driver_t driver;
+
+ /* Only used by leader */
+ uv_connect_t connect;
+ uv_timer_t timer;
+ uv_write_t write;
+ uv_shutdown_t shutdown;
+ size_t writing;
+ bool reading:1;
+ bool server:1; /* accept, not connect */
+} pconnection_t;
+
+struct pn_listener_t {
+ psocket_t psocket;
+
+ /* Only used by owner thread */
+ pconnection_t *accepting; /* accept in progress */
+ pn_condition_t *condition;
+ pn_collector_t *collector;
+ pn_event_batch_t batch;
+ pn_record_t *attachments;
+ void *context;
+ size_t backlog;
+};
+
+
+typedef struct queue { psocket_t *front, *back; } queue;
+
+struct pn_proactor_t {
+ /* Leader thread */
+ uv_cond_t cond;
+ uv_loop_t loop;
+ uv_async_t async;
+ uv_timer_t timer;
+
+ /* Owner thread: proactor collector and batch can belong to leader or a worker */
+ pn_collector_t *collector;
+ pn_event_batch_t batch;
+
+ /* Protected by lock */
+ uv_mutex_t lock;
+ queue start_q;
+ queue worker_q;
+ queue leader_q;
+ size_t interrupt; /* pending interrupts */
+ pn_millis_t timeout;
+ size_t count; /* psocket count */
+ bool inactive:1;
+ bool timeout_request:1;
+ bool timeout_elapsed:1;
+ bool has_leader:1;
+ bool batch_working:1; /* batch belongs to a worker. */
+};
+
+static bool push_lh(queue *q, psocket_t *ps) {
+ if (ps->next != &UNLISTED) /* Don't move if already listed. */
+ return false;
+ ps->next = NULL;
+ if (!q->front) {
+ q->front = q->back = ps;
+ } else {
+ q->back->next = ps;
+ q->back = ps;
+ }
+ return true;
+}
+
+static psocket_t* pop_lh(queue *q) {
+ psocket_t *ps = q->front;
+ if (ps) {
+ q->front = ps->next;
+ ps->next = &UNLISTED;
+ }
+ return ps;
+}
+
+static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
+ return ps->is_conn ? (pconnection_t*)ps : NULL;
+}
+
+static inline pn_listener_t *as_listener(psocket_t* ps) {
+ return ps->is_conn ? NULL: (pn_listener_t*)ps;
+}
+
+/* Put ps on the leader queue for processing. Thread safe. */
+static void to_leader_lh(psocket_t *ps) {
+ push_lh(&ps->proactor->leader_q, ps);
+ uv_async_send(&ps->proactor->async); /* Wake leader */
+}
+
+static void to_leader(psocket_t *ps) {
+ uv_mutex_lock(&ps->proactor->lock);
+ to_leader_lh(ps);
+ uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Detach from IO and put ps on the worker queue */
+static void leader_to_worker(psocket_t *ps) {
+ if (ps->is_conn) {
+ pconnection_t *pc = as_pconnection_t(ps);
+ /* Don't detach if there are no events yet. */
+ if (pn_connection_driver_has_event(&pc->driver)) {
+ if (pc->writing) {
+ pc->writing = 0;
+ uv_cancel((uv_req_t*)&pc->write);
+ }
+ if (pc->reading) {
+ pc->reading = false;
+ uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+ }
+ if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
+ uv_timer_stop(&pc->timer);
+ }
+ }
+ } else {
+ pn_listener_t *l = as_listener(ps);
+ uv_read_stop((uv_stream_t*)&l->psocket.tcp);
+ }
+ uv_mutex_lock(&ps->proactor->lock);
+ push_lh(&ps->proactor->worker_q, ps);
+ uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Set a deferred action for leader, if not already set. */
+static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
+ uv_mutex_lock(&ps->proactor->lock);
+ if (!ps->action) {
+ ps->action = action;
+ }
+ to_leader_lh(ps);
+ uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Owner thread send to worker thread. Set deferred action if not already set. */
+static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) {
+ uv_mutex_lock(&ps->proactor->lock);
+ if (!ps->action) {
+ ps->action = action;
+ }
+ push_lh(&ps->proactor->worker_q, ps);
+ uv_async_send(&ps->proactor->async); /* Wake leader */
+ uv_mutex_unlock(&ps->proactor->lock);
+}
+
+
+/* Re-queue for further work */
+static void worker_requeue(psocket_t* ps) {
+ uv_mutex_lock(&ps->proactor->lock);
+ push_lh(&ps->proactor->worker_q, ps);
+ uv_async_send(&ps->proactor->async); /* Wake leader */
+ uv_mutex_unlock(&ps->proactor->lock);
+}
+
+static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
+ pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
+ if (!pc) return NULL;
+ if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
+ return NULL;
+ }
+ psocket_init(&pc->psocket, p, true, host, port);
+ if (server) {
+ pn_transport_set_server(pc->driver.transport);
+ }
+ pn_record_t *r = pn_connection_attachments(pc->driver.connection);
+ pn_record_def(r, PN_PROACTOR, PN_VOID);
+ pn_record_set(r, PN_PROACTOR, pc);
+ return pc;
+}
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
+
+static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
+ return (batch->next_event == proactor_batch_next) ?
+ (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
+}
+
+static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
+ return (batch->next_event == listener_batch_next) ?
+ (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
+}
+
+static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
+ pn_connection_driver_t *d = pn_event_batch_connection_driver(batch);
+ return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
+}
+
+static void leader_count(pn_proactor_t *p, int change) {
+ uv_mutex_lock(&p->lock);
+ p->count += change;
+ p->inactive = (p->count == 0);
+ uv_mutex_unlock(&p->lock);
+}
+
+/* Free if there are no uv callbacks pending and no events */
+static void leader_pconnection_t_maybe_free(pconnection_t *pc) {
+ if (pn_connection_driver_has_event(&pc->driver)) {
+ leader_to_worker(&pc->psocket); /* Return to worker */
+ } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) {
+ /* All UV requests are finished */
+ pn_connection_driver_destroy(&pc->driver);
+ leader_count(pc->psocket.proactor, -1);
+ free(pc);
+ }
+}
+
+/* Free if there are no uv callbacks pending and no events */
+static void leader_listener_maybe_free(pn_listener_t *l) {
+ if (pn_collector_peek(l->collector)) {
+ leader_to_worker(&l->psocket); /* Return to worker */
+ } else if (!l->psocket.tcp.data) {
+ pn_condition_free(l->condition);
+ leader_count(l->psocket.proactor, -1);
+ free(l);
+ }
+}
+
+/* Free if there are no uv callbacks pending and no events */
+static void leader_maybe_free(psocket_t *ps) {
+ if (ps->is_conn) {
+ leader_pconnection_t_maybe_free(as_pconnection_t(ps));
+ } else {
+ leader_listener_maybe_free(as_listener(ps));
+ }
+}
+
+static void on_close(uv_handle_t *h) {
+ psocket_t *ps = (psocket_t*)h->data;
+ h->data = NULL; /* Mark closed */
+ leader_maybe_free(ps);
+}
+
+static void on_shutdown(uv_shutdown_t *shutdown, int err) {
+ psocket_t *ps = (psocket_t*)shutdown->data;
+ shutdown->data = NULL; /* Mark closed */
+ leader_maybe_free(ps);
+}
+
+static inline void leader_close(psocket_t *ps) {
+ if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) {
+ uv_close((uv_handle_t*)&ps->tcp, on_close);
+ }
+ pconnection_t *pc = as_pconnection_t(ps);
+ if (pc) {
+ pn_connection_driver_close(&pc->driver);
+ if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
+ uv_timer_stop(&pc->timer);
+ uv_close((uv_handle_t*)&pc->timer, on_close);
+ }
+ }
+ leader_maybe_free(ps);
+}
+
+static pconnection_t *get_pconnection_t(pn_connection_t* c) {
+ if (!c) return NULL;
+ pn_record_t *r = pn_connection_attachments(c);
+ return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
+}
+
+static void leader_error(psocket_t *ps, int err, const char* what) {
+ if (ps->is_conn) {
+ pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
+ pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+ pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
+ what, fixstr(ps->host), fixstr(ps->port),
+ uv_strerror(err));
+ pn_connection_driver_close(driver);
+ } else {
+ pn_listener_t *l = as_listener(ps);
+ pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
+ what, fixstr(ps->host), fixstr(ps->port),
+ uv_strerror(err));
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
+ }
+ leader_to_worker(ps); /* Worker to handle the error */
+}
+
+/* uv-initialization */
+static int leader_init(psocket_t *ps) {
+ leader_count(ps->proactor, +1);
+ int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
+ if (!err) {
+ pconnection_t *pc = as_pconnection_t(ps);
+ if (pc) {
+ pc->connect.data = ps;
+ int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
+ if (!err) {
+ pc->timer.data = pc;
+ }
+ }
+ }
+ if (err) {
+ leader_error(ps, err, "initialization");
+ }
+ return err;
+}
+
+/* Common logic for on_connect and on_accept */
+static void leader_connect_accept(pconnection_t *pc, int err, const char *what) {
+ if (!err) {
+ leader_to_worker(&pc->psocket);
+ } else {
+ leader_error(&pc->psocket, err, what);
+ }
+}
+
+static void on_connect(uv_connect_t *connect, int err) {
+ leader_connect_accept((pconnection_t*)connect->data, err, "on connect");
+}
+
+static void on_accept(uv_stream_t* server, int err) {
+ pn_listener_t *l = (pn_listener_t*) server->data;
+ if (err) {
+ leader_error(&l->psocket, err, "on accept");
+ }
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+ leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */
+}
+
+static void leader_accept(psocket_t *ps) {
+ pn_listener_t * l = as_listener(ps);
+ pconnection_t *pc = l->accepting;
+ l->accepting = NULL;
+ if (pc) {
+ int err = leader_init(&pc->psocket);
+ if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
+ leader_connect_accept(pc, err, "on accept");
+ }
+}
+
+static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
+ int err = leader_init(ps);
+ struct addrinfo hints = { 0 };
+ if (server) hints.ai_flags = AI_PASSIVE;
+ if (!err) {
+ err = uv_getaddrinfo(&ps->proactor->loop, info, NULL, fixstr(ps->host), fixstr(ps->port), &hints);
+ }
+ return err;
+}
+
+static void leader_connect(psocket_t *ps) {
+ pconnection_t *pc = as_pconnection_t(ps);
+ uv_getaddrinfo_t info;
+ int err = leader_resolve(ps, &info, false);
+ if (!err) {
+ err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect);
+ uv_freeaddrinfo(info.addrinfo);
+ }
+ if (err) {
+ leader_error(ps, err, "connect to");
+ }
+}
+
+static void leader_listen(psocket_t *ps) {
+ pn_listener_t *l = as_listener(ps);
+ uv_getaddrinfo_t info;
+ int err = leader_resolve(ps, &info, true);
+ if (!err) {
+ err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
+ uv_freeaddrinfo(info.addrinfo);
+ }
+ if (!err) err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
+ if (err) {
+ leader_error(ps, err, "listen on ");
+ }
+}
+
+static void on_tick(uv_timer_t *timer) {
+ pconnection_t *pc = (pconnection_t*)timer->data;
+ pn_transport_t *t = pc->driver.transport;
+ if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
+ uv_timer_stop(&pc->timer);
+ uint64_t now = uv_now(pc->timer.loop);
+ uint64_t next = pn_transport_tick(t, now);
+ if (next) {
+ uv_timer_start(&pc->timer, on_tick, next - now, 0);
+ }
+ }
+}
+
+static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
+ pconnection_t *pc = (pconnection_t*)stream->data;
+ if (nread >= 0) {
+ pn_connection_driver_read_done(&pc->driver, nread);
+ on_tick(&pc->timer); /* check for tick changes. */
+ leader_to_worker(&pc->psocket);
+ /* Reading continues automatically until stopped. */
+ } else if (nread == UV_EOF) { /* hangup */
+ pn_connection_driver_read_close(&pc->driver);
+ leader_maybe_free(&pc->psocket);
+ } else {
+ leader_error(&pc->psocket, nread, "on read from");
+ }
+}
+
+static void on_write(uv_write_t* write, int err) {
+ pconnection_t *pc = (pconnection_t*)write->data;
+ write->data = NULL;
+ if (err == 0) {
+ pn_connection_driver_write_done(&pc->driver, pc->writing);
+ leader_to_worker(&pc->psocket);
+ } else if (err == UV_ECANCELED) {
+ leader_maybe_free(&pc->psocket);
+ } else {
+ leader_error(&pc->psocket, err, "on write to");
+ }
+ pc->writing = 0; /* Need to send a new write request */
+}
+
+static void on_timeout(uv_timer_t *timer) {
+ pn_proactor_t *p = (pn_proactor_t*)timer->data;
+ uv_mutex_lock(&p->lock);
+ p->timeout_elapsed = true;
+ uv_mutex_unlock(&p->lock);
+}
+
+// Read buffer allocation function for uv, just returns the transports read buffer.
+static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
+ pconnection_t *pc = (pconnection_t*)stream->data;
+ pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+ *buf = uv_buf_init(rbuf.start, rbuf.size);
+}
+
+static void leader_rewatch(psocket_t *ps) {
+ int err = 0;
+ if (ps->is_conn) {
+ pconnection_t *pc = as_pconnection_t(ps);
+ if (pc->timer.data) { /* uv-initialized */
+ on_tick(&pc->timer); /* Re-enable ticks if required */
+ }
+ pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+ pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+
+ /* Ticks and checking buffers can generate events, process before proceeding */
+ if (pn_connection_driver_has_event(&pc->driver)) {
+ leader_to_worker(ps);
+ } else { /* Re-watch for IO */
+ if (wbuf.size > 0 && !pc->writing) {
+ pc->writing = wbuf.size;
+ uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+ pc->write.data = ps;
+ uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
+ } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
+ pc->shutdown.data = ps;
+ uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
+ }
+ if (rbuf.size > 0 && !pc->reading) {
+ pc->reading = true;
+ err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+ }
+ }
+ } else {
+ pn_listener_t *l = as_listener(ps);
+ err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
+ }
+ if (err) {
+ leader_error(ps, err, "rewatch");
+ }
+}
+
+/* Set the event in the proactor's batch */
+static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
+ pn_collector_put(p->collector, pn_proactor__class(), p, t);
+ p->batch_working = true;
+ return &p->batch;
+}
+
+/* Return the next event batch or 0 if no events are ready */
+static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
+ if (!p->batch_working) { /* Can generate proactor events */
+ if (p->inactive) {
+ p->inactive = false;
+ return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
+ }
+ if (p->interrupt > 0) {
+ --p->interrupt;
+ return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
+ }
+ if (p->timeout_elapsed) {
+ p->timeout_elapsed = false;
+ return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT);
+ }
+ }
+ for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
+ if (ps->is_conn) {
+ pconnection_t *pc = as_pconnection_t(ps);
+ return &pc->driver.batch;
+ } else { /* Listener */
+ pn_listener_t *l = as_listener(ps);
+ return &l->batch;
+ }
+ to_leader(ps); /* No event, back to leader */
+ }
+ return 0;
+}
+
+/* Called in any thread to set a wakeup action. Replaces any previous wakeup action. */
+static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
+ uv_mutex_lock(&ps->proactor->lock);
+ ps->wakeup = action;
+ to_leader_lh(ps);
+ uv_mutex_unlock(&ps->proactor->lock);
+}
+
+pn_listener_t *pn_event_listener(pn_event_t *e) {
+ return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
+}
+
+pn_proactor_t *pn_event_proactor(pn_event_t *e) {
+ if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
+ pn_listener_t *l = pn_event_listener(e);
+ if (l) return l->psocket.proactor;
+ pn_connection_t *c = pn_event_connection(e);
+ if (c) return pn_connection_proactor(pn_event_connection(e));
+ return NULL;
+}
+
+void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
+ pconnection_t *pc = batch_pconnection(batch);
+ if (pc) {
+ if (pn_connection_driver_has_event(&pc->driver)) {
+ /* Process all events before going back to IO. */
+ worker_requeue(&pc->psocket);
+ } else if (pn_connection_driver_finished(&pc->driver)) {
+ owner_to_leader(&pc->psocket, leader_close);
+ } else {
+ owner_to_leader(&pc->psocket, leader_rewatch);
+ }
+ return;
+ }
+ pn_listener_t *l = batch_listener(batch);
+ if (l) {
+ owner_to_leader(&l->psocket, leader_rewatch);
+ return;
+ }
+ pn_proactor_t *bp = batch_proactor(batch);
+ if (bp == p) {
+ uv_mutex_lock(&p->lock);
+ p->batch_working = false;
+ uv_async_send(&p->async); /* Wake leader */
+ uv_mutex_unlock(&p->lock);
+ return;
+ }
+}
+
+/* Run follower/leader loop till we can return an event and be a worker */
+pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
+ uv_mutex_lock(&p->lock);
+ /* Try to grab work immediately. */
+ pn_event_batch_t *batch = get_batch_lh(p);
+ if (batch == NULL) {
+ /* No work available, follow the leader */
+ while (p->has_leader) {
+ uv_cond_wait(&p->cond, &p->lock);
+ }
+ /* Lead till there is work to do. */
+ p->has_leader = true;
+ while (batch == NULL) {
+ if (p->timeout_request) {
+ p->timeout_request = false;
+ if (p->timeout) {
+ uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
+ } else {
+ uv_timer_stop(&p->timer);
+ }
+ }
+ for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
+ void (*action)(psocket_t*) = ps->action;
+ void (*wakeup)(psocket_t*) = ps->wakeup;
+ ps->action = NULL;
+ ps->wakeup = NULL;
+ if (action || wakeup) {
+ uv_mutex_unlock(&p->lock);
+ if (action) action(ps);
+ if (wakeup) wakeup(ps);
+ uv_mutex_lock(&p->lock);
+ }
+ }
+ batch = get_batch_lh(p);
+ if (batch == NULL) {
+ uv_mutex_unlock(&p->lock);
+ uv_run(&p->loop, UV_RUN_ONCE);
+ uv_mutex_lock(&p->lock);
+ }
+ }
+ /* Signal the next leader and return to work */
+ p->has_leader = false;
+ uv_cond_signal(&p->cond);
+ }
+ uv_mutex_unlock(&p->lock);
+ return batch;
+}
+
+void pn_proactor_interrupt(pn_proactor_t *p) {
+ uv_mutex_lock(&p->lock);
+ ++p->interrupt;
+ uv_async_send(&p->async); /* Interrupt the UV loop */
+ uv_mutex_unlock(&p->lock);
+}
+
+void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
+ uv_mutex_lock(&p->lock);
+ p->timeout = t;
+ p->timeout_request = true;
+ uv_async_send(&p->async); /* Interrupt the UV loop */
+ uv_mutex_unlock(&p->lock);
+}
+
+int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
+ pconnection_t *pc = new_pconnection_t(p, c, false, host, port);
+ if (!pc) {
+ return PN_OUT_OF_MEMORY;
+ }
+ /* Process PN_CONNECTION_INIT before binding */
+ owner_to_worker(&pc->psocket, leader_connect);
+ return 0;
+}
+
+int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog)
+{
+ psocket_init(&l->psocket, p, false, host, port);
+ l->backlog = backlog;
+ owner_to_leader(&l->psocket, leader_listen);
+ return 0;
+}
+
+pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
+ pconnection_t *pc = get_pconnection_t(c);
+ return pc ? pc->psocket.proactor : NULL;
+}
+
+void leader_wake_connection(psocket_t *ps) {
+ pconnection_t *pc = as_pconnection_t(ps);
+ pn_connection_t *c = pc->driver.connection;
+ pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+ leader_to_worker(ps);
+}
+
+void pn_connection_wake(pn_connection_t* c) {
+ wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
+}
+
+pn_proactor_t *pn_proactor() {
+ pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
+ p->collector = pn_collector();
+ p->batch.next_event = &proactor_batch_next;
+ if (!p->collector) return NULL;
+ uv_loop_init(&p->loop);
+ uv_mutex_init(&p->lock);
+ uv_cond_init(&p->cond);
+ uv_async_init(&p->loop, &p->async, NULL);
+ uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */
+ p->timer.data = p;
+ return p;
+}
+
+static void on_stopping(uv_handle_t* h, void* v) {
+ uv_close(h, NULL); /* Close this handle */
+ if (!uv_loop_alive(h->loop)) /* Everything closed */
+ uv_stop(h->loop); /* Stop the loop, pn_proactor_destroy() can return */
+}
+
+void pn_proactor_free(pn_proactor_t *p) {
+ uv_walk(&p->loop, on_stopping, NULL); /* Close all handles */
+ uv_run(&p->loop, UV_RUN_DEFAULT); /* Run till stop, all handles closed */
+ uv_loop_close(&p->loop);
+ uv_mutex_destroy(&p->lock);
+ uv_cond_destroy(&p->cond);
+ pn_collector_free(p->collector);
+ free(p);
+}
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
+ pn_listener_t *l = batch_listener(batch);
+ pn_event_t *handled = pn_collector_prev(l->collector);
+ if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) {
+ owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */
+ }
+ return pn_collector_next(l->collector);
+}
+
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
+ return pn_collector_next(batch_proactor(batch)->collector);
+}
+
+static void pn_listener_free(pn_listener_t *l) {
+ if (l) {
+ if (!l->collector) pn_collector_free(l->collector);
+ if (!l->condition) pn_condition_free(l->condition);
+ if (!l->attachments) pn_free(l->attachments);
+ free(l);
+ }
+}
+
+pn_listener_t *pn_listener() {
+ pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
+ if (l) {
+ l->batch.next_event = listener_batch_next;
+ l->collector = pn_collector();
+ l->condition = pn_condition();
+ l->attachments = pn_record();
+ if (!l->condition || !l->collector || !l->attachments) {
+ pn_listener_free(l);
+ return NULL;
+ }
+ }
+ return l;
+}
+
+void pn_listener_close(pn_listener_t* l) {
+ wakeup(&l->psocket, leader_close);
+}
+
+pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
+ return l ? l->psocket.proactor : NULL;
+}
+
+pn_condition_t* pn_listener_condition(pn_listener_t* l) {
+ return l->condition;
+}
+
+void *pn_listener_get_context(pn_listener_t *l) {
+ return l->context;
+}
+
+void pn_listener_set_context(pn_listener_t *l, void *context) {
+ l->context = context;
+}
+
+pn_record_t *pn_listener_attachments(pn_listener_t *l) {
+ return l->attachments;
+}
+
+int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
+ if (l->accepting) {
+ return PN_STATE_ERR; /* Only one at a time */
+ }
+ l->accepting = new_pconnection_t(
+ l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
+ if (!l->accepting) {
+ return UV_ENOMEM;
+ }
+ owner_to_leader(&l->psocket, leader_accept);
+ return 0;
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org