You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/11/29 07:55:15 UTC

qpid-proton git commit: PROTON-1702: epoll proactor: per socket rearming and overflow for listeners

Repository: qpid-proton
Updated Branches:
  refs/heads/master 27c5bf0b7 -> a7119f56c


PROTON-1702: epoll proactor: per socket rearming and overflow for listeners


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a7119f56
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a7119f56
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a7119f56

Branch: refs/heads/master
Commit: a7119f56cf29f1bb2be261951be1120c2243f29c
Parents: 27c5bf0
Author: Clifford Jansen <cl...@apache.org>
Authored: Tue Nov 28 23:54:34 2017 -0800
Committer: Clifford Jansen <cl...@apache.org>
Committed: Tue Nov 28 23:54:34 2017 -0800

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 247 ++++++++++++++++++++++++++-----------
 1 file changed, 177 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a7119f56/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 69e6d0f..080cd4b 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -112,6 +112,8 @@ static void pmutex_finalize(pthread_mutex_t *m) { pthread_mutex_destroy(m); }
 static inline void lock(pmutex *m) { pthread_mutex_lock(m); }
 static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }
 
+typedef struct acceptor_t acceptor_t;
+
 typedef enum {
   WAKE,   /* see if any work to do in proactor/psocket context */
   PCONNECTION_IO,
@@ -407,8 +409,8 @@ struct pn_proactor_t {
   pcontext_t *wake_list_last;
   // Interrupts have a dedicated eventfd because they must be async-signal safe.
   int interruptfd;
-  // If the process runs out of file descriptors, disarm listeners temporarily and save them here.
-  pn_listener_t *overflow;
+  // If the process runs out of file descriptors, disarm listening sockets temporarily and save them here.
+  acceptor_t *overflow;
   pmutex overflow_mutex;
 };
 
@@ -536,24 +538,40 @@ typedef struct pconnection_t {
   pmutex rearm_mutex;                /* protects pconnection_rearm from out of order arming*/
 } pconnection_t;
 
+/*
+ * A listener can have mutiple sockets (as specified in the addrinfo).  They
+ * are armed separately.  The individual psockets can be part of at most one
+ * list: the global proactor overflow retry list or the per-listener list of
+ * pending accepts (valid inbound socket obtained, but pn_listener_accept not
+ * yet called by the application).  These lists will be small and quick to
+ * traverse.
+ */
+
+struct acceptor_t{
+  psocket_t psocket;
+  int accepted_fd;
+  bool armed;
+  bool overflowed;
+  acceptor_t *next;              /* next listener list member */
+};
+
 struct pn_listener_t {
-  psocket_t *psockets;          /* Array of listening sockets */
-  size_t psockets_size;
+  acceptor_t *acceptors;          /* Array of listening sockets */
+  size_t acceptors_size;
   pcontext_t context;
   pn_condition_t *condition;
   pn_collector_t *collector;
   pn_event_batch_t batch;
   pn_record_t *attachments;
   void *listener_context;
+  acceptor_t *pending_acceptors;  /* list of those with a valid inbound fd*/
+  int pending_count;
+  bool unclaimed;                 /* attach event dispatched but no pn_listener_attach() call yet */
   size_t backlog;
-  int accepted_fd;              /* fd accepted but not yet handled by pn_listener_accept() */
-  psocket_t *accepted;          /* psocket from which we accpeted accepted_fd */
   bool close_dispatched;
-  bool armed;
-  pn_listener_t *overflow;       /* Next overflowed listener */
+  pmutex rearm_mutex;             /* orders rearms/disarms, nothing else */
 };
 
-
 static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool topup);
 static void write_flush(pconnection_t *pc);
 static void listener_begin_close(pn_listener_t* l);
@@ -568,6 +586,10 @@ static inline pn_listener_t *psocket_listener(psocket_t* ps) {
   return ps->listener;
 }
 
+static inline acceptor_t *psocket_acceptor(psocket_t* ps) {
+  return !ps->listener ? NULL : (acceptor_t *)ps;
+}
+
 static inline pconnection_t *pcontext_pconnection(pcontext_t *c) {
   return c->type == PCONNECTION ?
     (pconnection_t*)((char*)c - offsetof(pconnection_t, context)) : NULL;
@@ -602,7 +624,7 @@ static inline bool pconnection_has_event(pconnection_t *pc) {
 }
 
 static inline bool listener_has_event(pn_listener_t *l) {
-  return pn_collector_peek(l->collector);
+  return pn_collector_peek(l->collector) || (l->pending_count && !l->unclaimed);
 }
 
 static inline bool proactor_has_event(pn_proactor_t *p) {
@@ -648,18 +670,33 @@ static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
     EPOLL_FATAL("arming polled file descriptor", errno);
 }
 
+static void listener_list_append(acceptor_t **start, acceptor_t *item) {
+  assert(item->next == NULL);
+  if (*start) {
+    acceptor_t *end = *start;
+    while (end->next)
+      end = end->next;
+    end->next = item;
+  }
+  else *start = item;
+}
+
+static acceptor_t *listener_list_next(acceptor_t **start) {
+  acceptor_t *item = *start;
+  if (*start) *start = (*start)->next;
+  if (item) item->next = NULL;
+  return item;
+}
+
 // Add an overflowing listener to the overflow list. Called with listener context lock held.
-static void listener_set_overflow(pn_listener_t *l) {
-  pn_proactor_t *p = l->psockets[0].proactor;
+static void listener_set_overflow(acceptor_t *a) {
+  a->overflowed = true;
+  pn_proactor_t *p = a->psocket.proactor;
   lock(&p->overflow_mutex);
-  l->overflow = p->overflow;
-  p->overflow = l;
+  listener_list_append(&p->overflow, a);
   unlock(&p->overflow_mutex);
 }
 
-static const int dummy__ = 0;
-static pn_listener_t * const NO_OVERFLOW = (pn_listener_t*)&dummy__; /* Bogus pointer */
-
 /* TODO aconway 2017-06-08: we should also call proactor_rearm_overflow after a fixed delay,
    even if the proactor has not freed any file descriptors, since other parts of the process
    might have*/
@@ -667,22 +704,34 @@ static pn_listener_t * const NO_OVERFLOW = (pn_listener_t*)&dummy__; /* Bogus po
 // Activate overflowing listeners, called when there may be available file descriptors.
 static void proactor_rearm_overflow(pn_proactor_t *p) {
   lock(&p->overflow_mutex);
-  pn_listener_t *l = p->overflow;
+  acceptor_t* ovflw = p->overflow;
   p->overflow = NULL;
   unlock(&p->overflow_mutex);
-  while (l) {
+  acceptor_t *a = listener_list_next(&ovflw);
+  while (a) {
+    pn_listener_t *l = a->psocket.listener;
     lock(&l->context.mutex);
-    rearm(l->accepted->proactor, &l->accepted->epoll_io);
-    l->armed = true;
-    l->accepted = NULL;
-    pn_listener_t *next = l->overflow;
-    l->overflow = NO_OVERFLOW;
+    bool rearming = !l->context.closing;
+    bool notify = false;
+    assert(!a->armed);
+    assert(a->overflowed);
+    a->overflowed = false;
+    if (rearming) {
+      lock(&l->rearm_mutex);
+      a->armed = true;
+    }
+    else notify = wake(&l->context);
     unlock(&l->context.mutex);
-    l = next;
+    if (rearming) {
+      rearm(p, &a->psocket.epoll_io);
+      unlock(&l->rearm_mutex);
+    }
+    if (notify) wake_notify(&l->context);
+    a = listener_list_next(&ovflw);
   }
 }
 
-// Close an FD and rearm overflow listeners
+// Close an FD and rearm overflow listeners.  Call with no listener locks held.
 static int pclosefd(pn_proactor_t *p, int fd) {
   int err = close(fd);
   if (!err) proactor_rearm_overflow(p);
@@ -1338,8 +1387,6 @@ pn_listener_t *pn_event_listener(pn_event_t *e) {
 pn_listener_t *pn_listener() {
   pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
   if (l) {
-    l->accepted_fd = -1;
-    l->accepted = NULL;
     l->batch.next_event = listener_batch_next;
     l->collector = pn_collector();
     l->condition = pn_condition();
@@ -1350,6 +1397,7 @@ pn_listener_t *pn_listener() {
     }
     pn_proactor_t *unknown = NULL;  // won't know until pn_proactor_listen
     pcontext_init(&l->context, LISTENER, unknown, l);
+    pmutex_init(&l->rearm_mutex);
   }
   return l;
 }
@@ -1360,7 +1408,6 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
   lock(&l->context.mutex);
   l->context.proactor = p;;
   l->backlog = backlog;
-  l->overflow = NO_OVERFLOW;
 
   char addr_buf[PN_MAX_ADDR];
   const char *host, *port;
@@ -1375,9 +1422,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
       ++len;
     }
     assert(len > 0);            /* guaranteed by getaddrinfo */
-    l->psockets = (psocket_t*)calloc(len, sizeof(psocket_t));
-    assert(l->psockets);      /* TODO aconway 2017-05-05: memory safety */
-    l->psockets_size = 0;
+    l->acceptors = (acceptor_t*)calloc(len, sizeof(acceptor_t));
+    assert(l->acceptors);      /* TODO aconway 2017-05-05: memory safety */
+    l->acceptors_size = 0;
     /* Find working listen addresses */
     for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
       int fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
@@ -1390,13 +1437,18 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
             !bind(fd, ai->ai_addr, ai->ai_addrlen) &&
             !listen(fd, backlog))
         {
-          psocket_t *ps = &l->psockets[l->psockets_size++];
+          acceptor_t *acceptor = &l->acceptors[l->acceptors_size++];
+          acceptor->accepted_fd = -1;
+          psocket_t *ps = &acceptor->psocket;
           psocket_init(ps, p, l, addr);
           ps->sockfd = fd;
           ps->epoll_io.fd = fd;
           ps->epoll_io.wanted = EPOLLIN;
           ps->epoll_io.polling = false;
+          lock(&l->rearm_mutex);
           start_polling(&ps->epoll_io, ps->proactor->epollfd);  // TODO: check for error
+          acceptor->armed = true;
+          unlock(&l->rearm_mutex);
         } else {
           close(fd);
         }
@@ -1408,14 +1460,16 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
   }
   bool notify = wake(&l->context);
 
-  if (l->psockets_size == 0) { /* All failed, create dummy socket with an error */
-    l->psockets = (psocket_t*)realloc(l->psockets, sizeof(psocket_t));
-    memset(l->psockets, 0, sizeof(psocket_t));
-    psocket_init(l->psockets, p, l, addr);
+  if (l->acceptors_size == 0) { /* All failed, create dummy socket with an error */
+    l->acceptors = (acceptor_t*)realloc(l->acceptors, sizeof(acceptor_t));
+    l->acceptors_size = 1;
+    memset(l->acceptors, 0, sizeof(acceptor_t));
+    psocket_init(&l->acceptors[0].psocket, p, l, addr);
+    l->acceptors[0].accepted_fd = -1;
     if (gai_err) {
-      psocket_gai_error(l->psockets, gai_err, "listen on");
+      psocket_gai_error(&l->acceptors[0].psocket, gai_err, "listen on");
     } else {
-      psocket_error(l->psockets, errno, "listen on");
+      psocket_error(&l->acceptors[0].psocket, errno, "listen on");
     }
   } else {
     pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
@@ -1433,7 +1487,8 @@ static inline bool listener_can_free(pn_listener_t *l) {
 
 static inline void listener_final_free(pn_listener_t *l) {
   pcontext_finalize(&l->context);
-  free(l->psockets);
+  pmutex_finalize(&l->rearm_mutex);
+  free(l->acceptors);
   free(l);
 }
 
@@ -1456,18 +1511,40 @@ void pn_listener_free(pn_listener_t *l) {
   }
 }
 
+/* Always call with lock held so it can be unlocked around overflow processing. */
 static void listener_begin_close(pn_listener_t* l) {
   // TODO: switch to shutdown(sock, SHUT_RD) and wait for HUP callback per listener socket (analogous to pconnection)
+  int close_count = 0;
   if (!l->context.closing) {
     l->context.closing = true;
+
     /* Close all listening sockets */
-    for (size_t i = 0; i < l->psockets_size; ++i) {
-      psocket_t *ps = &l->psockets[i];
+    for (size_t i = 0; i < l->acceptors_size; ++i) {
+      psocket_t *ps = &l->acceptors[i].psocket;
       if (ps->sockfd >= 0) {
-        stop_polling(&ps->epoll_io, ps->proactor->epollfd);
-        pclosefd(l->psockets[0].proactor, ps->sockfd);
+        lock(&l->rearm_mutex);
+        stop_polling(&ps->epoll_io, ps->proactor->epollfd);  // race: PROTON-1531
+        unlock(&l->rearm_mutex);
+        close(ps->sockfd);
+        ps->sockfd = -1;
+        close_count++;
       }
     }
+    /* Close all sockets waiting for a pn_listener_accept() */
+    if (l->unclaimed) l->pending_count++;
+    acceptor_t *a = listener_list_next(&l->pending_acceptors);
+    while (a) {
+      close(a->accepted_fd);
+      a->accepted_fd = -1;
+      close_count++;
+      l->pending_count--;
+      a = listener_list_next(&l->pending_acceptors);
+    }
+    assert(!l->pending_count);
+
+    unlock(&l->context.mutex);
+    proactor_rearm_overflow(pn_listener_proactor(l));
+    lock(&l->context.mutex);
     pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
   }
 }
@@ -1485,7 +1562,9 @@ void pn_listener_close(pn_listener_t* l) {
 
 static void listener_forced_shutdown(pn_listener_t *l) {
   // Called by proactor_free, no competing threads, no epoll activity.
+  lock(&l->context.mutex); // needed because of interaction with proactor_rearm_overflow
   listener_begin_close(l);
+  unlock(&l->context.mutex);
   // pconnection_process will never be called again.  Zero everything.
   l->context.wake_ops = 0;
   l->close_dispatched = true;
@@ -1496,15 +1575,17 @@ static void listener_forced_shutdown(pn_listener_t *l) {
 /* Accept a connection as part of listener_process(). Called with listener context lock held. */
 static void listener_accept_lh(psocket_t *ps) {
   pn_listener_t *l = psocket_listener(ps);
-  assert(l->accepted_fd < 0); /* Shouldn't already have an accepted_fd */
-  l->accepted_fd = accept(ps->sockfd, NULL, 0);
-  l->accepted = ps;
-  if (l->accepted_fd >= 0) {
-    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+  acceptor_t *acceptor = psocket_acceptor(ps);
+  assert(acceptor->accepted_fd < 0); /* Shouldn't already have an accepted_fd */
+  acceptor->accepted_fd = accept(ps->sockfd, NULL, 0);
+  if (acceptor->accepted_fd >= 0) {
+    //    acceptor_t *acceptor = listener_list_next(pending_acceptors);
+    listener_list_append(&l->pending_acceptors, acceptor);
+    l->pending_count++;
   } else {
     int err = errno;
     if (err == ENFILE || err == EMFILE) {
-      listener_set_overflow(l);
+      listener_set_overflow(acceptor);
     } else {
       psocket_error(ps, err, "accept");
     }
@@ -1515,9 +1596,10 @@ static void listener_accept_lh(psocket_t *ps) {
 static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
   // TODO: some parallelization of the accept mechanism.
   pn_listener_t *l = psocket_listener(ps);
+  acceptor_t *a = psocket_acceptor(ps);
   lock(&l->context.mutex);
   if (events) {
-    l->armed = false;  // TODO: armed logic should be per socket not per aggregate listener
+    a->armed = false;
     if (events & EPOLLRDHUP) {
       /* Calls listener_begin_close which closes all the listener's sockets */
       psocket_error(ps, errno, "listener epoll");
@@ -1543,6 +1625,13 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
   pn_listener_t *l = batch_listener(batch);
   lock(&l->context.mutex);
   pn_event_t *e = pn_collector_next(l->collector);
+  if (!e && l->pending_count && !l->unclaimed) {
+    // empty collector means pn_collector_put() will not coallesce
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+    l->unclaimed = true;
+    l->pending_count--;
+    e = pn_collector_next(l->collector);
+  }
   if (e && pn_event_type(e) == PN_LISTENER_CLOSE)
     l->close_dispatched = true;
   unlock(&l->context.mutex);
@@ -1560,22 +1649,14 @@ static void listener_done(pn_listener_t *l) {
       pn_listener_free(l);
       return;
     }
-  } else if (listener_has_event(l)) {
+  } else if (listener_has_event(l))
     notify = wake(&l->context);
-  } else if (l->overflow == NO_OVERFLOW &&
-             !l->context.closing && !l->armed && l->accepted_fd < 0 && l->accepted)
-  {
-    /* Don't rearm until the current socket is accepted */
-    rearm(l->accepted->proactor, &l->accepted->epoll_io);
-    l->armed = true;
-    l->accepted = NULL;
-  }
   unlock(&l->context.mutex);
   if (notify) wake_notify(&l->context);
 }
 
 pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
-  return l ? l->psockets[0].proactor : NULL;
+  return l ? l->acceptors[0].psocket.proactor : NULL;
 }
 
 pn_condition_t* pn_listener_condition(pn_listener_t* l) {
@@ -1597,25 +1678,51 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
 void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
   pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
   assert(pc); // TODO: memory safety
-  const char *err = pconnection_setup(pc, l->psockets[0].proactor, c, true, "");
+  const char *err = pconnection_setup(pc, pn_listener_proactor(l), c, true, "");
   if (err) {
     pn_logf("pn_listener_accept failure: %s", err);
     return;
   }
   // TODO: fuller sanity check on input args
 
+  int err2 = 0;
+  int fd = -1;
+  psocket_t *rearming_ps = NULL;
+  bool notify = false;
   lock(&l->context.mutex);
-  int fd = l->accepted_fd;
-  l->accepted_fd = -1;
-  proactor_add(&pc->context);
+  if (l->context.closing)
+    err2 = EBADF;
+  else if (l->unclaimed) {
+    l->unclaimed = false;
+    acceptor_t *a = listener_list_next(&l->pending_acceptors);
+    assert(a);
+    assert(!a->armed);
+    fd = a->accepted_fd;
+    a->accepted_fd = -1;
+    lock(&l->rearm_mutex);
+    rearming_ps = &a->psocket;
+    a->armed = true;
+  }
+  else err2 = EWOULDBLOCK;
 
+  proactor_add(&pc->context);
   lock(&pc->context.mutex);
-  configure_socket(fd);
   pc->psocket.sockfd = fd;
-  pconnection_start(pc);
+  if (fd >= 0) {
+    configure_socket(fd);
+    pconnection_start(pc);
+  }
+  else
+    psocket_error(&pc->psocket, err2, "pn_listener_accept");
+  if (!l->context.working && listener_has_event(l))
+    notify = wake(&l->context);
   unlock(&pc->context.mutex);
-
   unlock(&l->context.mutex);
+  if (rearming_ps) {
+    rearm(rearming_ps->proactor, &rearming_ps->epoll_io);
+    unlock(&l->rearm_mutex);
+  }
+  if (notify) wake_notify(&l->context);
 }
 
 
@@ -1701,7 +1808,7 @@ void pn_proactor_free(pn_proactor_t *p) {
 pn_proactor_t *pn_event_proactor(pn_event_t *e) {
   if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
   pn_listener_t *l = pn_event_listener(e);
-  if (l) return l->psockets[0].proactor;
+  if (l) return l->acceptors[0].psocket.proactor;
   pn_connection_t *c = pn_event_connection(e);
   if (c) return pn_connection_proactor(c);
   return NULL;
@@ -1835,7 +1942,7 @@ static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t
      case PCONNECTION:
       return pconnection_process((pconnection_t *) ctx->owner, 0, false, false);
      case LISTENER:
-      return listener_process(&((pn_listener_t *) ctx->owner)->psockets[0], 0);
+      return listener_process(&((pn_listener_t *) ctx->owner)->acceptors[0].psocket, 0);
      default:
       assert(ctx->type == WAKEABLE); // TODO: implement or remove
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org