You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/11/29 19:53:23 UTC

qpid-proton git commit: PROTON-1531: epoll proactor: use shutdown to avoid race on close

Repository: qpid-proton
Updated Branches:
  refs/heads/master a7119f56c -> 584d3afc3


PROTON-1531: epoll proactor: use shutdown to avoid race on close


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/584d3afc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/584d3afc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/584d3afc

Branch: refs/heads/master
Commit: 584d3afc32ffa9a3e92b0287da7c90e390265a9d
Parents: a7119f5
Author: Clifford Jansen <cl...@apache.org>
Authored: Wed Nov 29 11:52:02 2017 -0800
Committer: Clifford Jansen <cl...@apache.org>
Committed: Wed Nov 29 11:52:02 2017 -0800

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 66 +++++++++++++++++++++++++-------------
 1 file changed, 43 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/584d3afc/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 080cd4b..1fb0c9e 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -558,6 +558,7 @@ struct acceptor_t{
 struct pn_listener_t {
   acceptor_t *acceptors;          /* Array of listening sockets */
   size_t acceptors_size;
+  int active_count;               /* Number of listener sockets registered with epoll */
   pcontext_t context;
   pn_condition_t *condition;
   pn_collector_t *collector;
@@ -1447,6 +1448,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
           ps->epoll_io.polling = false;
           lock(&l->rearm_mutex);
           start_polling(&ps->epoll_io, ps->proactor->epollfd);  // TODO: check for error
+          l->active_count++;
           acceptor->armed = true;
           unlock(&l->rearm_mutex);
         } else {
@@ -1480,9 +1482,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
   return;
 }
 
-// call with lock held
+// call with lock held and context.working false
 static inline bool listener_can_free(pn_listener_t *l) {
-  return l->context.closing && l->close_dispatched && !l->context.wake_ops;
+  return l->context.closing && l->close_dispatched && !l->context.wake_ops && !l->active_count;
 }
 
 static inline void listener_final_free(pn_listener_t *l) {
@@ -1513,21 +1515,24 @@ void pn_listener_free(pn_listener_t *l) {
 
 /* Always call with lock held so it can be unlocked around overflow processing. */
 static void listener_begin_close(pn_listener_t* l) {
-  // TODO: switch to shutdown(sock, SHUT_RD) and wait for HUP callback per listener socket (analogous to pconnection)
-  int close_count = 0;
   if (!l->context.closing) {
     l->context.closing = true;
 
     /* Close all listening sockets */
     for (size_t i = 0; i < l->acceptors_size; ++i) {
-      psocket_t *ps = &l->acceptors[i].psocket;
+      acceptor_t *a = &l->acceptors[i];
+      psocket_t *ps = &a->psocket;
       if (ps->sockfd >= 0) {
         lock(&l->rearm_mutex);
-        stop_polling(&ps->epoll_io, ps->proactor->epollfd);  // race: PROTON-1531
+        if (a->armed) {
+          shutdown(ps->sockfd, SHUT_RD);  // Force epoll event and callback
+        } else {
+          stop_polling(&ps->epoll_io, ps->proactor->epollfd);
+          close(ps->sockfd);
+          ps->sockfd = -1;
+          l->active_count--;
+        }
         unlock(&l->rearm_mutex);
-        close(ps->sockfd);
-        ps->sockfd = -1;
-        close_count++;
       }
     }
     /* Close all sockets waiting for a pn_listener_accept() */
@@ -1536,13 +1541,13 @@ static void listener_begin_close(pn_listener_t* l) {
     while (a) {
       close(a->accepted_fd);
       a->accepted_fd = -1;
-      close_count++;
       l->pending_count--;
       a = listener_list_next(&l->pending_acceptors);
     }
     assert(!l->pending_count);
 
     unlock(&l->context.mutex);
+    /* Remove all acceptors from the overflow list.  closing flag prevents re-insertion.*/
     proactor_rearm_overflow(pn_listener_proactor(l));
     lock(&l->context.mutex);
     pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
@@ -1568,6 +1573,7 @@ static void listener_forced_shutdown(pn_listener_t *l) {
   // pconnection_process will never be called again.  Zero everything.
   l->context.wake_ops = 0;
   l->close_dispatched = true;
+  l->active_count = 0;
   assert(listener_can_free(l));
   pn_listener_free(l);
 }
@@ -1600,11 +1606,21 @@ static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
   lock(&l->context.mutex);
   if (events) {
     a->armed = false;
-    if (events & EPOLLRDHUP) {
-      /* Calls listener_begin_close which closes all the listener's sockets */
-      psocket_error(ps, errno, "listener epoll");
-    } else if (!l->context.closing && events & EPOLLIN) {
-      listener_accept_lh(ps);
+    if (l->context.closing) {
+      lock(&l->rearm_mutex);
+      stop_polling(&ps->epoll_io, ps->proactor->epollfd);
+      unlock(&l->rearm_mutex);
+      close(ps->sockfd);
+      ps->sockfd = -1;
+      l->active_count--;
+    }
+    else {
+      if (events & EPOLLRDHUP) {
+        /* Calls listener_begin_close which closes all the listener's sockets */
+        psocket_error(ps, errno, "listener epoll");
+      } else if (!l->context.closing && events & EPOLLIN) {
+        listener_accept_lh(ps);
+      }
     }
   } else {
     wake_done(&l->context); // callback accounting
@@ -1614,8 +1630,14 @@ static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
     l->context.working = true;
     if (listener_has_event(l))
       lb = &l->batch;
-    else
+    else {
       l->context.working = false;
+      if (listener_can_free(l)) {
+        unlock(&l->context.mutex);
+        pn_listener_free(l);
+        return NULL;
+      }
+    }
   }
   unlock(&l->context.mutex);
   return lb;
@@ -1626,7 +1648,7 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
   lock(&l->context.mutex);
   pn_event_t *e = pn_collector_next(l->collector);
   if (!e && l->pending_count && !l->unclaimed) {
-    // empty collector means pn_collector_put() will not coallesce
+    // empty collector means pn_collector_put() will not coalesce
     pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
     l->unclaimed = true;
     l->pending_count--;
@@ -1643,12 +1665,10 @@ static void listener_done(pn_listener_t *l) {
   lock(&l->context.mutex);
   l->context.working = false;
 
-  if (l->close_dispatched) {
-    if (listener_can_free(l)) {
-      unlock(&l->context.mutex);
-      pn_listener_free(l);
-      return;
-    }
+  if (listener_can_free(l)) {
+    unlock(&l->context.mutex);
+    pn_listener_free(l);
+    return;
   } else if (listener_has_event(l))
     notify = wake(&l->context);
   unlock(&l->context.mutex);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org