You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/06 22:08:47 UTC
[2/3] qpid-proton git commit: PROTON-1574: Fix lock-order-inversion
warnings
PROTON-1574: Fix lock-order-inversion warnings
The lock introduced in
17d2a6f4 PROTON-1568: c++ enable race detection for self-tests
created lock-order problems.
Using the lock as a simple memory barrier solves the original race and the
lock-order problem.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/eac4e310
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/eac4e310
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/eac4e310
Branch: refs/heads/master
Commit: eac4e3101a8e078e812e22b33b88b9b6219d8a5d
Parents: 5f4d852
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Sep 6 17:11:37 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Sep 6 17:54:44 2017 -0400
----------------------------------------------------------------------
proton-c/src/proactor/epoll.c | 67 +++++++++++++++-----------------------
1 file changed, 27 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/eac4e310/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 46effcc..6f0cc96 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -121,18 +121,25 @@ typedef enum {
// Data to use with epoll.
typedef struct epoll_extended_t {
- /* epoll_ctl()/epoll_wake() do not form a memory barrier, so cached memory
- writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
- visible to epoll_wait() thread. Lock use of epoll_extended_t to be safe.
- */
- pmutex mutex;
struct psocket_t *psocket; // pconnection, listener, or NULL -> proactor
int fd;
epoll_type_t type; // io/timer/wakeup
uint32_t wanted; // events to poll for
bool polling;
+ pmutex barrier_mutex;
} epoll_extended_t;
+/* epoll_ctl()/epoll_wake() do not form a memory barrier, so cached memory
+ writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
+ visible to epoll_wait() thread. This function creates a memory barrier,
+ called before epoll_ctl() and after epoll_wait()
+*/
+static void memory_barrier(epoll_extended_t *ee) {
+ // Mutex lock/unlock has the side-effect of being a memory barrier.
+ lock(&ee->barrier_mutex);
+ unlock(&ee->barrier_mutex);
+}
+
/*
* This timerfd logic assumes EPOLLONESHOT and there never being two
* active timeout callbacks. There can be multiple (or zero)
@@ -282,33 +289,26 @@ PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
static bool start_polling(epoll_extended_t *ee, int epollfd) {
if (ee->polling)
return false;
- pmutex_init(&ee->mutex);
- lock(&ee->mutex);
ee->polling = true;
struct epoll_event ev;
ev.data.ptr = ee;
ev.events = ee->wanted | EPOLLONESHOT;
- int fd = ee->fd;
- unlock(&ee->mutex);
- return (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) == 0);
+ memory_barrier(ee);
+ return (epoll_ctl(epollfd, EPOLL_CTL_ADD, ee->fd, &ev) == 0);
}
static void stop_polling(epoll_extended_t *ee, int epollfd) {
// TODO: check for error, return bool or just log?
- lock(&ee->mutex);
if (ee->fd == -1 || !ee->polling || epollfd == -1)
return;
struct epoll_event ev;
ev.data.ptr = ee;
ev.events = 0;
- unlock(&ee->mutex);
+ memory_barrier(ee);
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, ee->fd, &ev) == -1)
EPOLL_FATAL("EPOLL_CTL_DEL", errno);
- lock(&ee->mutex);
ee->fd = -1;
ee->polling = false;
- unlock(&ee->mutex);
- pmutex_finalize(&ee->mutex);
}
/*
@@ -642,11 +642,9 @@ static void psocket_gai_error(psocket_t *ps, int gai_err, const char* what) {
static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
struct epoll_event ev;
ev.data.ptr = ee;
- lock(&ee->mutex);
ev.events = ee->wanted | EPOLLONESHOT;
- int fd = ee->fd;
- unlock(&ee->mutex);
- if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, fd, &ev) == -1)
+ memory_barrier(ee);
+ if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
EPOLL_FATAL("arming polled file descriptor", errno);
}
@@ -1185,11 +1183,9 @@ static void pconnection_start(pconnection_t *pc) {
start_polling(&pc->timer.epoll_io, efd); // TODO: check for error
epoll_extended_t *ee = &pc->psocket.epoll_io;
- lock(&ee->mutex);
ee->fd = pc->psocket.sockfd;
ee->wanted = EPOLLIN | EPOLLOUT;
ee->polling = false;
- unlock(&ee->mutex);
start_polling(ee, efd); // TODO: check for error
}
@@ -1256,6 +1252,8 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
lock(&pc->context.mutex);
proactor_add(&pc->context);
+ pn_connection_open(pc->driver.connection); /* Auto-open */
+
bool notify = false;
bool notify_proactor = false;
@@ -1621,14 +1619,11 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
/* Set up an epoll_extended_t to be used for wakeup or interrupts */
static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd) {
- pmutex_init(&ee->mutex);
- lock(&ee->mutex);
ee->psocket = NULL;
ee->fd = eventfd;
ee->type = WAKE;
ee->wanted = EPOLLIN;
ee->polling = false;
- unlock(&ee->mutex);
start_polling(ee, epollfd); // TODO: check for error
}
@@ -1640,7 +1635,6 @@ pn_proactor_t *pn_proactor() {
pmutex_init(&p->eventfd_mutex);
pmutex_init(&p->bind_mutex);
ptimer_init(&p->timer, 0);
- pmutex_init(&p->overflow_mutex);
if ((p->epollfd = epoll_create(1)) >= 0) {
if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
@@ -1822,11 +1816,7 @@ static bool proactor_remove(pcontext_t *ctx) {
}
static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t *ee) {
- lock(&ee->mutex);
- int fd = ee->fd;
- unlock(&ee->mutex);
-
- if (fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */
+ if (ee->fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */
(void)read_uint64(p->interruptfd);
rearm(p, &p->epoll_interrupt);
return proactor_process(p, PN_PROACTOR_INTERRUPT);
@@ -1871,28 +1861,25 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_blo
}
assert(n == 1);
epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr;
+ memory_barrier(ee);
- lock(&ee->mutex);
- epoll_type_t type = ee->type;
- psocket_t* psocket = ee->psocket;
- unlock(&ee->mutex);
- if (type == WAKE) {
+ if (ee->type == WAKE) {
batch = process_inbound_wake(p, ee);
- } else if (type == PROACTOR_TIMER) {
+ } else if (ee->type == PROACTOR_TIMER) {
batch = proactor_process(p, PN_PROACTOR_TIMEOUT);
} else {
- pconnection_t *pc = psocket_pconnection(psocket);
+ pconnection_t *pc = psocket_pconnection(ee->psocket);
if (pc) {
- if (type == PCONNECTION_IO) {
+ if (ee->type == PCONNECTION_IO) {
batch = pconnection_process(pc, ev.events, false, false);
} else {
- assert(type == PCONNECTION_TIMER);
+ assert(ee->type == PCONNECTION_TIMER);
batch = pconnection_process(pc, 0, true, false);
}
}
else {
// TODO: can any of the listener processing be parallelized like IOCP?
- batch = listener_process(psocket, ev.events);
+ batch = listener_process(ee->psocket, ev.events);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org