You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/06 22:08:47 UTC

[2/3] qpid-proton git commit: PROTON-1574: Fix lock-order-inversion warnings

PROTON-1574: Fix lock-order-inversion warnings

The lock introduced in

    17d2a6f4 PROTON-1568: c++ enable race detection for self-tests

created lock-order problems.

Using the lock as a simple memory barrier solves the original race and the
lock-order problem.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/eac4e310
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/eac4e310
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/eac4e310

Branch: refs/heads/master
Commit: eac4e3101a8e078e812e22b33b88b9b6219d8a5d
Parents: 5f4d852
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Sep 6 17:11:37 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Sep 6 17:54:44 2017 -0400

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 67 +++++++++++++++-----------------------
 1 file changed, 27 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/eac4e310/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 46effcc..6f0cc96 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -121,18 +121,25 @@ typedef enum {
 
 // Data to use with epoll.
 typedef struct epoll_extended_t {
-  /* epoll_ctl()/epoll_wake() do not form a memory barrier, so cached memory
-     writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
-     visible to epoll_wait() thread. Lock use of epoll_extended_t to be safe.
-  */
-  pmutex mutex;
   struct psocket_t *psocket;  // pconnection, listener, or NULL -> proactor
   int fd;
   epoll_type_t type;   // io/timer/wakeup
   uint32_t wanted;     // events to poll for
   bool polling;
+  pmutex barrier_mutex;
 } epoll_extended_t;
 
+/* epoll_ctl()/epoll_wake() do not form a memory barrier, so cached memory
+   writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
+   visible to epoll_wait() thread. This function creates a memory barrier,
+   called before epoll_ctl() and after epoll_wait()
+*/
+static void memory_barrier(epoll_extended_t *ee) {
+  // Mutex lock/unlock has the side-effect of being a memory barrier.
+  lock(&ee->barrier_mutex);
+  unlock(&ee->barrier_mutex);
+}
+
 /*
  * This timerfd logic assumes EPOLLONESHOT and there never being two
  * active timeout callbacks.  There can be multiple (or zero)
@@ -282,33 +289,26 @@ PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
 static bool start_polling(epoll_extended_t *ee, int epollfd) {
   if (ee->polling)
     return false;
-  pmutex_init(&ee->mutex);
-  lock(&ee->mutex);
   ee->polling = true;
   struct epoll_event ev;
   ev.data.ptr = ee;
   ev.events = ee->wanted | EPOLLONESHOT;
-  int fd = ee->fd;
-  unlock(&ee->mutex);
-  return (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) == 0);
+  memory_barrier(ee);
+  return (epoll_ctl(epollfd, EPOLL_CTL_ADD, ee->fd, &ev) == 0);
 }
 
 static void stop_polling(epoll_extended_t *ee, int epollfd) {
   // TODO: check for error, return bool or just log?
-  lock(&ee->mutex);
   if (ee->fd == -1 || !ee->polling || epollfd == -1)
     return;
   struct epoll_event ev;
   ev.data.ptr = ee;
   ev.events = 0;
-  unlock(&ee->mutex);
+  memory_barrier(ee);
   if (epoll_ctl(epollfd, EPOLL_CTL_DEL, ee->fd, &ev) == -1)
     EPOLL_FATAL("EPOLL_CTL_DEL", errno);
-  lock(&ee->mutex);
   ee->fd = -1;
   ee->polling = false;
-  unlock(&ee->mutex);
-  pmutex_finalize(&ee->mutex);
 }
 
 /*
@@ -642,11 +642,9 @@ static void psocket_gai_error(psocket_t *ps, int gai_err, const char* what) {
 static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
   struct epoll_event ev;
   ev.data.ptr = ee;
-  lock(&ee->mutex);
   ev.events = ee->wanted | EPOLLONESHOT;
-  int fd = ee->fd;
-  unlock(&ee->mutex);
-  if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, fd, &ev) == -1)
+  memory_barrier(ee);
+  if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
     EPOLL_FATAL("arming polled file descriptor", errno);
 }
 
@@ -1185,11 +1183,9 @@ static void pconnection_start(pconnection_t *pc) {
 
   start_polling(&pc->timer.epoll_io, efd);  // TODO: check for error
   epoll_extended_t *ee = &pc->psocket.epoll_io;
-  lock(&ee->mutex);
   ee->fd = pc->psocket.sockfd;
   ee->wanted = EPOLLIN | EPOLLOUT;
   ee->polling = false;
-  unlock(&ee->mutex);
   start_polling(ee, efd);  // TODO: check for error
 }
 
@@ -1256,6 +1252,8 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
 
   lock(&pc->context.mutex);
   proactor_add(&pc->context);
+  pn_connection_open(pc->driver.connection); /* Auto-open */
+
   bool notify = false;
   bool notify_proactor = false;
 
@@ -1621,14 +1619,11 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
 
 /* Set up an epoll_extended_t to be used for wakeup or interrupts */
 static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd) {
-  pmutex_init(&ee->mutex);
-  lock(&ee->mutex);
   ee->psocket = NULL;
   ee->fd = eventfd;
   ee->type = WAKE;
   ee->wanted = EPOLLIN;
   ee->polling = false;
-  unlock(&ee->mutex);
   start_polling(ee, epollfd);  // TODO: check for error
 }
 
@@ -1640,7 +1635,6 @@ pn_proactor_t *pn_proactor() {
   pmutex_init(&p->eventfd_mutex);
   pmutex_init(&p->bind_mutex);
   ptimer_init(&p->timer, 0);
-  pmutex_init(&p->overflow_mutex);
 
   if ((p->epollfd = epoll_create(1)) >= 0) {
     if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
@@ -1822,11 +1816,7 @@ static bool proactor_remove(pcontext_t *ctx) {
 }
 
 static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t *ee) {
-  lock(&ee->mutex);
-  int fd = ee->fd;
-  unlock(&ee->mutex);
-
-  if  (fd == p->interruptfd) {        /* Interrupts have their own dedicated eventfd */
+  if  (ee->fd == p->interruptfd) {        /* Interrupts have their own dedicated eventfd */
     (void)read_uint64(p->interruptfd);
     rearm(p, &p->epoll_interrupt);
     return proactor_process(p, PN_PROACTOR_INTERRUPT);
@@ -1871,28 +1861,25 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_blo
     }
     assert(n == 1);
     epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr;
+    memory_barrier(ee);
 
-    lock(&ee->mutex);
-    epoll_type_t type = ee->type;
-    psocket_t* psocket = ee->psocket;
-    unlock(&ee->mutex);
-    if (type == WAKE) {
+    if (ee->type == WAKE) {
       batch = process_inbound_wake(p, ee);
-    } else if (type == PROACTOR_TIMER) {
+    } else if (ee->type == PROACTOR_TIMER) {
       batch = proactor_process(p, PN_PROACTOR_TIMEOUT);
     } else {
-      pconnection_t *pc = psocket_pconnection(psocket);
+      pconnection_t *pc = psocket_pconnection(ee->psocket);
       if (pc) {
-        if (type == PCONNECTION_IO) {
+        if (ee->type == PCONNECTION_IO) {
           batch = pconnection_process(pc, ev.events, false, false);
         } else {
-          assert(type == PCONNECTION_TIMER);
+          assert(ee->type == PCONNECTION_TIMER);
           batch = pconnection_process(pc, 0, true, false);
         }
       }
       else {
         // TODO: can any of the listener processing be parallelized like IOCP?
-        batch = listener_process(psocket, ev.events);
+        batch = listener_process(ee->psocket, ev.events);
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org