You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/05/24 14:59:40 UTC

qpid-proton git commit: PROTON-1483: C epoll proactor, change timerfd accounting

Repository: qpid-proton
Updated Branches:
  refs/heads/master d6524051f -> a4e5c84d8


PROTON-1483: C epoll proactor, change timerfd accounting


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a4e5c84d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a4e5c84d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a4e5c84d

Branch: refs/heads/master
Commit: a4e5c84d88eb32a1951e058836e5f509749a955b
Parents: d652405
Author: Clifford Jansen <cl...@apache.org>
Authored: Wed May 24 07:52:42 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Wed May 24 07:54:31 2017 -0700

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 137 +++++++++++++++++++++++++------------
 1 file changed, 93 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4e5c84d/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index a1ec1af..2f99cd9 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -117,24 +117,39 @@ typedef struct epoll_extended_t {
 
 /*
  * This timerfd logic assumes EPOLLONESHOT and there never being two
- * active timeout callbacks.  There can be multiple unclaimed expiries
- * processed in a single callback.
+ * active timeout callbacks.  There can be multiple (or zero)
+ * unclaimed expiries processed in a single callback.
+ *
+ * timerfd_set() documentation implies a crisp relationship between
+ * timer expiry count and oldt's return value, but a return value of
+ * zero is ambiguous.  It can lead to no EPOLLIN, EPOLLIN + expected
+ * read, or
+ *
+ *   event expiry (in kernel) -> EPOLLIN
+ *   cancel/settime(0) (thread A) (number of expiries resets to zero)
+ *   read(timerfd) -> -1, EAGAIN  (thread B servicing epoll event)
+ *
+ * The original implementation with counters to track expiry counts
+ * was abandoned in favor of "in doubt" transitions and resolution
+ * at shutdown.
  */
 
 typedef struct ptimer_t {
   pmutex mutex;
   int timerfd;
   epoll_extended_t epoll_io;
-  int pending_count;
-  int skip_count;
+  bool timer_active;
+  bool in_doubt;  // 0 or 1 callbacks are possible
+  bool shutting_down;
 } ptimer_t;
 
 static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) {
   pt->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
   if (pt->timerfd < 0) return false;
   pmutex_init(&pt->mutex);
-  pt->pending_count = 0;
-  pt->skip_count = 0;
+  pt->timer_active = false;
+  pt->in_doubt = false;
+  pt->shutting_down = false;
   epoll_type_t type = ps ? PCONNECTION_TIMER : PROACTOR_TIMER;
   pt->epoll_io.psocket = ps;
   pt->epoll_io.fd = pt->timerfd;
@@ -144,49 +159,74 @@ static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) {
   return true;
 }
 
-static void ptimer_set(ptimer_t *pt, uint64_t t_millis) {
-  // t_millis == 0 -> cancel
-  lock(&pt->mutex);
-  if (t_millis == 0 && pt->pending_count == 0) {
-    unlock(&pt->mutex);
-    return;  // nothing to cancel
-  }
+// Call with ptimer lock held
+static void ptimer_set_lh(ptimer_t *pt, uint64_t t_millis) {
   struct itimerspec newt, oldt;
   memset(&newt, 0, sizeof(newt));
   newt.it_value.tv_sec = t_millis / 1000;
   newt.it_value.tv_nsec = (t_millis % 1000) * 1000000;
 
   timerfd_settime(pt->timerfd, 0, &newt, &oldt);
-  if (oldt.it_value.tv_sec || oldt.it_value.tv_nsec) {
-    // old value cancelled
-    assert (pt->pending_count > 0);
-    pt->pending_count--;
-  } else if (pt->pending_count) {
-    // cancel instance waiting on this lock
-    pt->skip_count++;
-  }
-  if (t_millis)
-    pt->pending_count++;
-  assert(pt->pending_count >= 0);
+  if (pt->timer_active && oldt.it_value.tv_nsec == 0 && oldt.it_value.tv_sec == 0) {
+    // EPOLLIN is possible but not assured
+    pt->in_doubt = true;
+  }
+  pt->timer_active = t_millis;
+}
+
+static void ptimer_set(ptimer_t *pt, uint64_t t_millis) {
+  // t_millis == 0 -> cancel
+  lock(&pt->mutex);
+  if ((t_millis == 0 && !pt->timer_active) || pt->shutting_down) {
+    unlock(&pt->mutex);
+    return;  // nothing to do
+  }
+  ptimer_set_lh(pt, t_millis);
   unlock(&pt->mutex);
 }
 
-// Callback bookkeeping. Return number of uncancelled expiry events.
-static int ptimer_callback(ptimer_t *pt) {
+// Callback bookkeeping. Return true if there is an expired timer.
+static bool ptimer_callback(ptimer_t *pt) {
   lock(&pt->mutex);
-  uint64_t u_exp_count;
+  struct itimerspec current;
+  if (timerfd_gettime(pt->timerfd, &current) == 0) {
+    if (current.it_value.tv_nsec == 0 && current.it_value.tv_sec == 0)
+      pt->timer_active = false;
+  }
+  uint64_t u_exp_count = 0;
   ssize_t l = read(pt->timerfd, &u_exp_count, sizeof(uint64_t));
-  (void)l; /* Silence compiler complaints in release build */
-  assert(l == sizeof(uint64_t));
-  assert(u_exp_count < INT_MAX);  // or test and log it?
-  int exp_count = (int) u_exp_count;
-  assert(exp_count >= pt->skip_count);
-  assert(exp_count <= pt->pending_count);
-  exp_count -= pt->skip_count;
-  pt->skip_count = 0;
-  pt->pending_count -= exp_count;
+  if (l != sizeof(uint64_t)) {
+    if (l == -1) {
+      if (errno != EAGAIN) {
+        EPOLL_FATAL("timer read", errno);
+      }
+    }
+    else
+      EPOLL_FATAL("timer internal error", 0);
+  }
+  if (!pt->timer_active) {
+    // Expiry counter just cleared, timer not set, timerfd not armed
+    pt->in_doubt = false;
+  }
   unlock(&pt->mutex);
-  return (int) exp_count;
+  return (l == sizeof(uint64_t)) && u_exp_count > 0;
+}
+
+// Return true if timerfd has and will have no pollable expiries in the current armed state
+static bool ptimer_shutdown(ptimer_t *pt, bool currently_armed) {
+  lock(&pt->mutex);
+  if (currently_armed) {
+    ptimer_set_lh(pt, 0);
+    pt->shutting_down = true;
+    if (pt->in_doubt)
+      // Force at least one callback.  If two, second cannot proceed with unarmed timerfd.
+      ptimer_set_lh(pt, 1);
+  }
+  else
+    pt->shutting_down = true;
+  bool rv = !pt->in_doubt;
+  unlock(&pt->mutex);
+  return rv;
 }
 
 static void ptimer_finalize(ptimer_t *pt) {
@@ -626,7 +666,7 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
 // Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), timer cancelled.
 // Return true when all possible outstanding epoll events associated with this pconnection have been processed.
 static inline bool pconnection_is_final(pconnection_t *pc) {
-  return !pc->current_arm && !pc->timer.pending_count && !pc->context.wake_ops;
+  return !pc->current_arm && !pc->timer_armed && !pc->context.wake_ops;
 }
 
 static void pconnection_final_free(pconnection_t *pc) {
@@ -661,7 +701,13 @@ static void pconnection_begin_close(pconnection_t *pc) {
     stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
     pc->current_arm = 0;
     pn_connection_driver_close(&pc->driver);
-    ptimer_set(&pc->timer, 0);
+    if (ptimer_shutdown(&pc->timer, pc->timer_armed))
+      pc->timer_armed = false;  // disarmed in the sense that the timer will never fire again
+    else if (!pc->timer_armed) {
+      // In doubt.  One last callback to collect
+      rearm(pc->psocket.proactor, &pc->timer.epoll_io);
+      pc->timer_armed = true;
+    }
   }
 }
 
@@ -669,7 +715,7 @@ static void pconnection_forced_shutdown(pconnection_t *pc) {
   // Called by proactor_free, no competing threads, no epoll activity.
   pconnection_begin_close(pc);
   // pconnection_process will never be called again.  Zero everything.
-  pc->timer.pending_count = 0;
+  pc->timer_armed = false;
   pc->context.wake_ops = 0;
   pn_connection_t *c = pc->driver.connection;
   pn_collector_release(pn_connection_collector(c));
@@ -804,7 +850,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
 
   if (timeout) {
     timer_unarmed = true;
-    timer_fired = (ptimer_callback(&pc->timer) != 0);
+    timer_fired = ptimer_callback(&pc->timer) != 0;
   }
   lock(&pc->context.mutex);
 
@@ -889,11 +935,12 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
     pc->timer_armed = true;  // about to rearm outside the lock
     timer_unarmed = true;    // so we remember
   }
+  bool timer_shutting_down = pc->timer.shutting_down;
 
   unlock(&pc->context.mutex);
   pc->hog_count++; // working context doing work
 
-  if (timer_unarmed) {
+  if (timer_unarmed && !timer_shutting_down) {
     rearm(pc->psocket.proactor, &pc->timer.epoll_io);
     timer_unarmed = false;
   }
@@ -1552,7 +1599,8 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout) {
       return &p->batch;
     }
   }
-  bool rearm_timer = !p->timer_armed;
+  bool rearm_timer = !p->timer_armed && !p->timer.shutting_down;
+  p->timer_armed = true;
   unlock(&p->context.mutex);
   if (rearm_timer)
     rearm(p, &p->timer.epoll_io);
@@ -1701,7 +1749,8 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
   if (bp == p) {
     bool notify = false;
     lock(&p->context.mutex);
-    bool rearm_timer = !p->timer_armed;
+    bool rearm_timer = !p->timer_armed && !p->shutting_down;
+    p->timer_armed = true;
     p->context.working = false;
     proactor_update_batch(p);
     if (proactor_has_event(p))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org