You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2021/12/16 02:12:22 UTC

[qpid-proton] 02/03: PROTON-2362: epoll proactor: fix locking in listener overflow processing

This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit cfd300e7d41ddfd39ae3e130fde8d1d654cfd099
Author: Cliff Jansen <cl...@apache.org>
AuthorDate: Wed Dec 15 17:48:20 2021 -0800

    PROTON-2362: epoll proactor: fix locking in listener overflow processing
---
 c/src/proactor/epoll-internal.h |  1 +
 c/src/proactor/epoll.c          | 17 ++++++++++++-----
 2 files changed, 13 insertions(+), 5 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 155277b..f0afc9e 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -302,6 +302,7 @@ struct pn_listener_t {
   size_t pending_count;              /* number of pending accepted connections */
   size_t backlog;                 /* size of pending accepted array */
   bool close_dispatched;
+  int overflow_count;
   uint32_t sched_io_events;
 };
 
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 2363f48..6a5beb4 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -715,6 +715,7 @@ static acceptor_t *acceptor_list_next(acceptor_t **start) {
 // Add an overflowing acceptor to the overflow list. Called with listener task lock held.
 static void acceptor_set_overflow(acceptor_t *a) {
   a->overflowed = true;
+  a->listener->overflow_count++;
   pn_proactor_t *p = a->listener->task.proactor;
   lock(&p->overflow_mutex);
   acceptor_list_append(&p->overflow, a);
@@ -740,6 +741,7 @@ static void proactor_rearm_overflow(pn_proactor_t *p) {
     assert(!a->armed);
     assert(a->overflowed);
     a->overflowed = false;
+    l->overflow_count++;
     if (rearming) {
       rearm(p, &a->psocket.epoll_io);
       a->armed = true;
@@ -1617,7 +1619,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
 
 // call with lock held and task.working false
 static inline bool listener_can_free(pn_listener_t *l) {
-  return l->task.closing && l->close_dispatched && !l->task.ready && !l->active_count;
+  return l->task.closing && l->close_dispatched && !l->task.ready && !l->active_count && !l->overflow_count;
 }
 
 static inline void listener_final_free(pn_listener_t *l) {
@@ -1675,10 +1677,6 @@ static void listener_begin_close(pn_listener_t* l) {
     }
     assert(!l->pending_count);
 
-    unlock(&l->task.mutex);
-    /* Remove all acceptors from the overflow list.  closing flag prevents re-insertion.*/
-    proactor_rearm_overflow(pn_listener_proactor(l));
-    lock(&l->task.mutex);
     pn_collector_put(l->collector, PN_CLASSCLASS(pn_listener), l, PN_LISTENER_CLOSE);
   }
 }
@@ -1799,8 +1797,17 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
 static void listener_done(pn_listener_t *l) {
   pn_proactor_t *p = l->task.proactor;
   tslot_t *ts = l->task.runner;
+
   lock(&l->task.mutex);
 
+  if (l->close_dispatched && l->overflow_count) {
+    unlock(&l->task.mutex);
+    /* Remove all acceptors from the overflow list.  closing flag prevents re-insertion.*/
+    proactor_rearm_overflow(pn_listener_proactor(l));
+    lock(&l->task.mutex);
+    assert(l->overflow_count == 0);
+  }
+
   // Just in case the app didn't accept all the pending accepts
   // Shuffle the list back to start at 0
   memmove(&l->pending_accepteds[0], &l->pending_accepteds[l->pending_first], l->pending_count * sizeof(accepted_t));

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org