You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/09 01:25:57 UTC
[34/50] [abbrv] qpid-proton git commit: PROTON-1493: c-proactor make
pn_proactor_interrupt async-signal-safe
PROTON-1493: c-proactor make pn_proactor_interrupt async-signal-safe
pn_proactor_interrupt() will often be used from signal handlers so must be
async-signal-safe. Updated the documentation and modified the implementations
of pn_proactor_interrupt() to use only async-signal-safe calls, no locks.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5f8738f5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5f8738f5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5f8738f5
Branch: refs/heads/go1
Commit: 5f8738f573c3e9c39608714453b2425e3a105ec7
Parents: 8d862be
Author: Alan Conway <ac...@redhat.com>
Authored: Mon May 29 17:12:36 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed May 31 10:49:36 2017 -0400
----------------------------------------------------------------------
INSTALL.md | 11 +++
examples/c/proactor/broker.c | 8 +--
proton-c/include/proton/proactor.h | 11 +--
proton-c/src/proactor/epoll.c | 122 +++++++++++++++++---------------
proton-c/src/proactor/libuv.c | 37 +++++++---
5 files changed, 112 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/INSTALL.md
----------------------------------------------------------------------
diff --git a/INSTALL.md b/INSTALL.md
index 8de93fe..e5e5db6 100644
--- a/INSTALL.md
+++ b/INSTALL.md
@@ -137,6 +137,17 @@ Note that if you wish to build debug version of proton for use with
swig bindings on Windows, you must have the appropriate debug target
libraries to link against.
+Other platforms
+---------------
+
+Proton can use the http://libuv.org IO library on any platform where
+it is available. Install the libuv library and header files and adapt
+the instructions for building on Linux.
+
+The libuv library is not required on Linux or Windows but if you wish
+you can use it instead of the default native IO by running cmake with
+`-Dproactor=libuv`
+
Installing Language Bindings
----------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index 7d95e7f..d9285db 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -195,9 +195,8 @@ typedef struct broker_t {
} broker_t;
void broker_stop(broker_t *b) {
- /* In this broker an interrupt stops a thread, stopping all threads stops the broker */
- for (size_t i = 0; i < b->threads; ++i)
- pn_proactor_interrupt(b->proactor);
+ /* Interrupt the proactor to stop the working threads. */
+ pn_proactor_interrupt(b->proactor);
}
/* Try to send if link is sender and has credit */
@@ -369,12 +368,13 @@ static void handle(broker_t* b, pn_event_t* e) {
break;
- case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
+ case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
broker_stop(b);
break;
case PN_PROACTOR_INTERRUPT:
b->finished = true;
+ pn_proactor_interrupt(b->proactor); /* Pass along the interrupt to the other threads */
break;
default:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 861afbe..9c7ce59 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -196,12 +196,13 @@ PNP_EXTERN void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *even
/**
* Return a @ref PN_PROACTOR_INTERRUPT event as soon as possible.
*
- * Exactly one @ref PN_PROACTOR_INTERRUPT event is generated for each call to
- * pn_proactor_interrupt(). If threads are blocked in pn_proactor_wait(), one
- * of them will be interrupted, otherwise the interrupt will be returned by a
- * future call to pn_proactor_wait(). Calling pn_proactor_interrupt().
+ * At least one PN_PROACTOR_INTERRUPT event will be returned after this call.
+ * Interrupts can be "coalesced" - if several pn_proactor_interrupt() calls
+ * happen close together, there may be only one PN_PROACTOR_INTERRUPT event that
+ * occurs after all of them.
*
- * @note Thread safe
+ * @note Thread-safe and async-signal-safe: can be called in a signal handler.
+ * This is the only pn_proactor function that is async-signal-safe.
*/
PNP_EXTERN void pn_proactor_interrupt(pn_proactor_t *proactor);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 7490ecd..b258da3 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -302,11 +302,13 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) {
* thread. Conversely, a thread must never stop working without
* checking if it has newly arrived work.
*
- * External wake operations, like pn_connection_wake() and
- * pn_proactor_interrupt(), are built on top of the internal wake
- * mechanism. The former coalesces multiple wakes until event
- * delivery, the latter does not. The WAKEABLE implementation can be
- * modeled on whichever is more suited.
+ * External wake operations, like pn_connection_wake() and are built on top of
+ * the internal wake mechanism. The former coalesces multiple wakes until event
+ * delivery, the latter does not. The WAKEABLE implementation can be modeled on
+ * whichever is more suited.
+ *
+ * pn_proactor_interrupt() must be async-signal-safe so it has a dedicated
+ * eventfd to allow a lock-free pn_proactor_interrupt() implementation.
*/
typedef enum {
PROACTOR,
@@ -360,10 +362,10 @@ struct pn_proactor_t {
pn_collector_t *collector;
pcontext_t *contexts; /* in-use contexts for PN_PROACTOR_INACTIVE and cleanup */
epoll_extended_t epoll_wake;
+ epoll_extended_t epoll_interrupt;
pn_event_batch_t batch;
- size_t interrupts; /* total pending interrupts */
- size_t deferred_interrupts; /* interrupts for current batch */
size_t disconnects_pending; /* unfinished proactor disconnects*/
+ bool interrupt;
bool inactive;
bool timer_expired;
bool timer_cancelled;
@@ -375,6 +377,8 @@ struct pn_proactor_t {
bool wakes_in_progress;
pcontext_t *wake_list_first;
pcontext_t *wake_list_last;
+ // Interrupts have a dedicated eventfd because they must be async-signal safe.
+ int interruptfd;
};
static void rearm(pn_proactor_t *p, epoll_extended_t *ee);
@@ -1470,6 +1474,16 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
// proactor
// ========================================================================
+/* Set up an epoll_extended_t to be used for wakeup or interrupts */
+static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd) {
+ ee->psocket = NULL;
+ ee->fd = eventfd;
+ ee->type = WAKE;
+ ee->wanted = EPOLLIN;
+ ee->polling = false;
+ start_polling(ee, epollfd); // TODO: check for error
+}
+
pn_proactor_t *pn_proactor() {
pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
if (!p) return NULL;
@@ -1478,26 +1492,24 @@ pn_proactor_t *pn_proactor() {
pmutex_init(&p->eventfd_mutex);
ptimer_init(&p->timer, 0);
- if ((p->epollfd = epoll_create(1)) >= 0)
+ if ((p->epollfd = epoll_create(1)) >= 0) {
if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
- if (p->timer.timerfd >= 0)
- if ((p->collector = pn_collector()) != NULL) {
- p->batch.next_event = &proactor_batch_next;
- start_polling(&p->timer.epoll_io, p->epollfd); // TODO: check for error
- p->timer_armed = true;
-
- p->epoll_wake.psocket = NULL;
- p->epoll_wake.fd = p->eventfd;
- p->epoll_wake.type = WAKE;
- p->epoll_wake.wanted = EPOLLIN;
- p->epoll_wake.polling = false;
- start_polling(&p->epoll_wake, p->epollfd); // TODO: check for error
- return p;
- }
+ if ((p->interruptfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
+ if (p->timer.timerfd >= 0)
+ if ((p->collector = pn_collector()) != NULL) {
+ p->batch.next_event = &proactor_batch_next;
+ start_polling(&p->timer.epoll_io, p->epollfd); // TODO: check for error
+ p->timer_armed = true;
+ epoll_wake_init(&p->epoll_wake, p->eventfd, p->epollfd);
+ epoll_wake_init(&p->epoll_interrupt, p->interruptfd, p->epollfd);
+ return p;
+ }
+ }
}
-
+ }
if (p->epollfd >= 0) close(p->epollfd);
if (p->eventfd >= 0) close(p->eventfd);
+ if (p->interruptfd >= 0) close(p->eventfd);
ptimer_finalize(&p->timer);
if (p->collector) pn_free(p->collector);
free (p);
@@ -1510,6 +1522,8 @@ void pn_proactor_free(pn_proactor_t *p) {
p->epollfd = -1;
close(p->eventfd);
p->eventfd = -1;
+ close(p->interruptfd);
+ p->interruptfd = -1;
ptimer_finalize(&p->timer);
while (p->contexts) {
pcontext_t *ctx = p->contexts;
@@ -1551,34 +1565,23 @@ static void proactor_add_event(pn_proactor_t *p, pn_event_type_t t) {
static bool proactor_update_batch(pn_proactor_t *p) {
if (proactor_has_event(p))
return true;
- if (p->deferred_interrupts > 0) {
- // drain these first
- --p->deferred_interrupts;
- --p->interrupts;
- proactor_add_event(p, PN_PROACTOR_INTERRUPT);
- return true;
- }
if (p->timer_expired) {
p->timer_expired = false;
proactor_add_event(p, PN_PROACTOR_TIMEOUT);
return true;
}
-
- int ec = 0;
- if (p->interrupts > 0) {
- --p->interrupts;
+ if (p->interrupt) {
+ p->interrupt = false;
proactor_add_event(p, PN_PROACTOR_INTERRUPT);
- ec++;
- if (p->interrupts > 0)
- p->deferred_interrupts = p->interrupts;
+ return true;
}
- if (p->inactive && ec == 0) {
+ if (p->inactive) {
p->inactive = false;
- ec++;
proactor_add_event(p, PN_PROACTOR_INACTIVE);
+ return true;
}
- return ec > 0;
+ return false;
}
static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
@@ -1590,10 +1593,12 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
return log_event(p, e);
}
-static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout) {
- bool timer_fired = timeout && ptimer_callback(&p->timer) != 0;
+static pn_event_batch_t *proactor_process(pn_proactor_t *p, pn_event_type_t event) {
+ bool timer_fired = (event == PN_PROACTOR_TIMEOUT) && ptimer_callback(&p->timer) != 0;
lock(&p->context.mutex);
- if (timeout) {
+ if (event == PN_PROACTOR_INTERRUPT) {
+ p->interrupt = true;
+ } else if (event == PN_PROACTOR_TIMEOUT) {
p->timer_armed = false;
if (timer_fired && !p->timer_cancelled)
p->timer_expired = true;
@@ -1667,17 +1672,20 @@ static bool proactor_remove(pcontext_t *ctx) {
return can_free;
}
-static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p) {
+static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t *ee) {
+ if (ee->fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */
+ return proactor_process(p, PN_PROACTOR_INTERRUPT);
+ }
pcontext_t *ctx = wake_pop_front(p);
if (ctx) {
switch (ctx->type) {
- case PROACTOR:
- return proactor_process(p, false);
- case PCONNECTION:
+ case PROACTOR:
+ return proactor_process(p, PN_EVENT_NONE);
+ case PCONNECTION:
return pconnection_process((pconnection_t *) ctx->owner, 0, false, false);
- case LISTENER:
- return listener_process(&((pn_listener_t *) ctx->owner)->psockets[0], 0);
- default:
+ case LISTENER:
+ return listener_process(&((pn_listener_t *) ctx->owner)->psockets[0], 0);
+ default:
assert(ctx->type == WAKEABLE); // TODO: implement or remove
}
}
@@ -1710,9 +1718,9 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_blo
epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr;
if (ee->type == WAKE) {
- batch = process_inbound_wake(p);
+ batch = process_inbound_wake(p, ee);
} else if (ee->type == PROACTOR_TIMER) {
- batch = proactor_process(p, true);
+ batch = proactor_process(p, PN_PROACTOR_TIMEOUT);
} else {
pconnection_t *pc = psocket_pconnection(ee->psocket);
if (pc) {
@@ -1772,11 +1780,11 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
}
void pn_proactor_interrupt(pn_proactor_t *p) {
- lock(&p->context.mutex);
- ++p->interrupts;
- bool notify = wake(&p->context);
- unlock(&p->context.mutex);
- if (notify) wake_notify(&p->context);
+ if (p->interruptfd == -1)
+ return;
+ uint64_t increment = 1;
+ if (write(p->interruptfd, &increment, sizeof(uint64_t)) != sizeof(uint64_t))
+ EPOLL_FATAL("setting eventfd", errno);
}
void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index cf7a31b..8cd6dd7 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -52,7 +52,7 @@
libuv functions are thread unsafe, we use a"leader-worker-follower" model as follows:
- At most one thread at a time is the "leader". The leader runs the UV loop till there
- are events to process and then becomes a "worker"n
+ are events to process and then becomes a "worker"
- Concurrent "worker" threads process events for separate connections or listeners.
When they run out of work they become "followers"
@@ -227,10 +227,13 @@ struct pn_listener_t {
typedef enum { TM_NONE, TM_REQUEST, TM_PENDING, TM_FIRED } timeout_state_t;
struct pn_proactor_t {
+ /* Notification */
+ uv_async_t notify;
+ uv_async_t interrupt;
+
/* Leader thread */
uv_cond_t cond;
uv_loop_t loop;
- uv_async_t async;
uv_timer_t timer;
/* Owner thread: proactor collector and batch can belong to leader or a worker */
@@ -241,7 +244,6 @@ struct pn_proactor_t {
uv_mutex_t lock;
work_queue_t worker_q; /* ready for work, to be returned via pn_proactor_wait() */
work_queue_t leader_q; /* waiting for attention by the leader thread */
- size_t interrupt; /* pending interrupts */
timeout_state_t timeout_state;
pn_millis_t timeout;
size_t count; /* connection/listener count for INACTIVE events */
@@ -250,12 +252,21 @@ struct pn_proactor_t {
bool inactive;
bool has_leader;
bool batch_working; /* batch is being processed in a worker thread */
+ bool need_interrupt; /* Need a PN_PROACTOR_INTERRUPT event */
};
/* Notify the leader thread that there is something to do outside of uv_run() */
static inline void notify(pn_proactor_t* p) {
- uv_async_send(&p->async);
+ uv_async_send(&p->notify);
+}
+
+/* Set the interrupt flag in the leader thread to avoid race conditions. */
+void on_interrupt(uv_async_t *async) {
+ if (async->data) {
+ pn_proactor_t *p = (pn_proactor_t*)async->data;
+ p->need_interrupt = true;
+ }
}
/* Notify that this work item needs attention from the leader at the next opportunity */
@@ -814,8 +825,8 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
p->inactive = false;
return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
}
- if (p->interrupt > 0) {
- --p->interrupt;
+ if (p->need_interrupt) {
+ p->need_interrupt = false;
return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
}
if (p->timeout_state == TM_FIRED) {
@@ -1072,10 +1083,12 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) {
}
void pn_proactor_interrupt(pn_proactor_t *p) {
- uv_mutex_lock(&p->lock);
- ++p->interrupt;
- uv_mutex_unlock(&p->lock);
- notify(p);
+ /* NOTE: pn_proactor_interrupt must be async-signal-safe so we cannot use
+ locks to update shared proactor state here. Instead we use a dedicated
+ uv_async, the on_interrupt() callback will set the interrupt flag in the
+ safety of the leader thread.
+ */
+ uv_async_send(&p->interrupt);
}
void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
@@ -1155,7 +1168,9 @@ pn_proactor_t *pn_proactor() {
uv_loop_init(&p->loop);
uv_mutex_init(&p->lock);
uv_cond_init(&p->cond);
- uv_async_init(&p->loop, &p->async, NULL);
+ uv_async_init(&p->loop, &p->notify, NULL);
+ uv_async_init(&p->loop, &p->interrupt, on_interrupt);
+ p->interrupt.data = p;
uv_timer_init(&p->loop, &p->timer);
p->timer.data = p;
p->disconnect_cond = pn_condition();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org