You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/05/24 14:59:40 UTC
qpid-proton git commit: PROTON-1483: C epoll proactor,
change timerfd accounting
Repository: qpid-proton
Updated Branches:
refs/heads/master d6524051f -> a4e5c84d8
PROTON-1483: C epoll proactor, change timerfd accounting
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a4e5c84d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a4e5c84d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a4e5c84d
Branch: refs/heads/master
Commit: a4e5c84d88eb32a1951e058836e5f509749a955b
Parents: d652405
Author: Clifford Jansen <cl...@apache.org>
Authored: Wed May 24 07:52:42 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Wed May 24 07:54:31 2017 -0700
----------------------------------------------------------------------
proton-c/src/proactor/epoll.c | 137 +++++++++++++++++++++++++------------
1 file changed, 93 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4e5c84d/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index a1ec1af..2f99cd9 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -117,24 +117,39 @@ typedef struct epoll_extended_t {
/*
* This timerfd logic assumes EPOLLONESHOT and there never being two
- * active timeout callbacks. There can be multiple unclaimed expiries
- * processed in a single callback.
+ * active timeout callbacks. There can be multiple (or zero)
+ * unclaimed expiries processed in a single callback.
+ *
+ * timerfd_set() documentation implies a crisp relationship between
+ * timer expiry count and oldt's return value, but a return value of
+ * zero is ambiguous. It can lead to no EPOLLIN, EPOLLIN + expected
+ * read, or
+ *
+ * event expiry (in kernel) -> EPOLLIN
+ * cancel/settime(0) (thread A) (number of expiries resets to zero)
+ * read(timerfd) -> -1, EAGAIN (thread B servicing epoll event)
+ *
+ * The original implementation with counters to track expiry counts
+ * was abandoned in favor of "in doubt" transitions and resolution
+ * at shutdown.
*/
typedef struct ptimer_t {
pmutex mutex;
int timerfd;
epoll_extended_t epoll_io;
- int pending_count;
- int skip_count;
+ bool timer_active;
+ bool in_doubt; // 0 or 1 callbacks are possible
+ bool shutting_down;
} ptimer_t;
static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) {
pt->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
if (pt->timerfd < 0) return false;
pmutex_init(&pt->mutex);
- pt->pending_count = 0;
- pt->skip_count = 0;
+ pt->timer_active = false;
+ pt->in_doubt = false;
+ pt->shutting_down = false;
epoll_type_t type = ps ? PCONNECTION_TIMER : PROACTOR_TIMER;
pt->epoll_io.psocket = ps;
pt->epoll_io.fd = pt->timerfd;
@@ -144,49 +159,74 @@ static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) {
return true;
}
-static void ptimer_set(ptimer_t *pt, uint64_t t_millis) {
- // t_millis == 0 -> cancel
- lock(&pt->mutex);
- if (t_millis == 0 && pt->pending_count == 0) {
- unlock(&pt->mutex);
- return; // nothing to cancel
- }
+// Call with ptimer lock held
+static void ptimer_set_lh(ptimer_t *pt, uint64_t t_millis) {
struct itimerspec newt, oldt;
memset(&newt, 0, sizeof(newt));
newt.it_value.tv_sec = t_millis / 1000;
newt.it_value.tv_nsec = (t_millis % 1000) * 1000000;
timerfd_settime(pt->timerfd, 0, &newt, &oldt);
- if (oldt.it_value.tv_sec || oldt.it_value.tv_nsec) {
- // old value cancelled
- assert (pt->pending_count > 0);
- pt->pending_count--;
- } else if (pt->pending_count) {
- // cancel instance waiting on this lock
- pt->skip_count++;
- }
- if (t_millis)
- pt->pending_count++;
- assert(pt->pending_count >= 0);
+ if (pt->timer_active && oldt.it_value.tv_nsec == 0 && oldt.it_value.tv_sec == 0) {
+ // EPOLLIN is possible but not assured
+ pt->in_doubt = true;
+ }
+ pt->timer_active = t_millis;
+}
+
+static void ptimer_set(ptimer_t *pt, uint64_t t_millis) {
+ // t_millis == 0 -> cancel
+ lock(&pt->mutex);
+ if ((t_millis == 0 && !pt->timer_active) || pt->shutting_down) {
+ unlock(&pt->mutex);
+ return; // nothing to do
+ }
+ ptimer_set_lh(pt, t_millis);
unlock(&pt->mutex);
}
-// Callback bookkeeping. Return number of uncancelled expiry events.
-static int ptimer_callback(ptimer_t *pt) {
+// Callback bookkeeping. Return true if there is an expired timer.
+static bool ptimer_callback(ptimer_t *pt) {
lock(&pt->mutex);
- uint64_t u_exp_count;
+ struct itimerspec current;
+ if (timerfd_gettime(pt->timerfd, ¤t) == 0) {
+ if (current.it_value.tv_nsec == 0 && current.it_value.tv_sec == 0)
+ pt->timer_active = false;
+ }
+ uint64_t u_exp_count = 0;
ssize_t l = read(pt->timerfd, &u_exp_count, sizeof(uint64_t));
- (void)l; /* Silence compiler complaints in release build */
- assert(l == sizeof(uint64_t));
- assert(u_exp_count < INT_MAX); // or test and log it?
- int exp_count = (int) u_exp_count;
- assert(exp_count >= pt->skip_count);
- assert(exp_count <= pt->pending_count);
- exp_count -= pt->skip_count;
- pt->skip_count = 0;
- pt->pending_count -= exp_count;
+ if (l != sizeof(uint64_t)) {
+ if (l == -1) {
+ if (errno != EAGAIN) {
+ EPOLL_FATAL("timer read", errno);
+ }
+ }
+ else
+ EPOLL_FATAL("timer internal error", 0);
+ }
+ if (!pt->timer_active) {
+ // Expiry counter just cleared, timer not set, timerfd not armed
+ pt->in_doubt = false;
+ }
unlock(&pt->mutex);
- return (int) exp_count;
+ return (l == sizeof(uint64_t)) && u_exp_count > 0;
+}
+
+// Return true if timerfd has and will have no pollable expiries in the current armed state
+static bool ptimer_shutdown(ptimer_t *pt, bool currently_armed) {
+ lock(&pt->mutex);
+ if (currently_armed) {
+ ptimer_set_lh(pt, 0);
+ pt->shutting_down = true;
+ if (pt->in_doubt)
+ // Force at least one callback. If two, second cannot proceed with unarmed timerfd.
+ ptimer_set_lh(pt, 1);
+ }
+ else
+ pt->shutting_down = true;
+ bool rv = !pt->in_doubt;
+ unlock(&pt->mutex);
+ return rv;
}
static void ptimer_finalize(ptimer_t *pt) {
@@ -626,7 +666,7 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
// Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), timer cancelled.
// Return true when all possible outstanding epoll events associated with this pconnection have been processed.
static inline bool pconnection_is_final(pconnection_t *pc) {
- return !pc->current_arm && !pc->timer.pending_count && !pc->context.wake_ops;
+ return !pc->current_arm && !pc->timer_armed && !pc->context.wake_ops;
}
static void pconnection_final_free(pconnection_t *pc) {
@@ -661,7 +701,13 @@ static void pconnection_begin_close(pconnection_t *pc) {
stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
pc->current_arm = 0;
pn_connection_driver_close(&pc->driver);
- ptimer_set(&pc->timer, 0);
+ if (ptimer_shutdown(&pc->timer, pc->timer_armed))
+ pc->timer_armed = false; // disarmed in the sense that the timer will never fire again
+ else if (!pc->timer_armed) {
+ // In doubt. One last callback to collect
+ rearm(pc->psocket.proactor, &pc->timer.epoll_io);
+ pc->timer_armed = true;
+ }
}
}
@@ -669,7 +715,7 @@ static void pconnection_forced_shutdown(pconnection_t *pc) {
// Called by proactor_free, no competing threads, no epoll activity.
pconnection_begin_close(pc);
// pconnection_process will never be called again. Zero everything.
- pc->timer.pending_count = 0;
+ pc->timer_armed = false;
pc->context.wake_ops = 0;
pn_connection_t *c = pc->driver.connection;
pn_collector_release(pn_connection_collector(c));
@@ -804,7 +850,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
if (timeout) {
timer_unarmed = true;
- timer_fired = (ptimer_callback(&pc->timer) != 0);
+ timer_fired = ptimer_callback(&pc->timer) != 0;
}
lock(&pc->context.mutex);
@@ -889,11 +935,12 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
pc->timer_armed = true; // about to rearm outside the lock
timer_unarmed = true; // so we remember
}
+ bool timer_shutting_down = pc->timer.shutting_down;
unlock(&pc->context.mutex);
pc->hog_count++; // working context doing work
- if (timer_unarmed) {
+ if (timer_unarmed && !timer_shutting_down) {
rearm(pc->psocket.proactor, &pc->timer.epoll_io);
timer_unarmed = false;
}
@@ -1552,7 +1599,8 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout) {
return &p->batch;
}
}
- bool rearm_timer = !p->timer_armed;
+ bool rearm_timer = !p->timer_armed && !p->timer.shutting_down;
+ p->timer_armed = true;
unlock(&p->context.mutex);
if (rearm_timer)
rearm(p, &p->timer.epoll_io);
@@ -1701,7 +1749,8 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
if (bp == p) {
bool notify = false;
lock(&p->context.mutex);
- bool rearm_timer = !p->timer_armed;
+ bool rearm_timer = !p->timer_armed && !p->shutting_down;
+ p->timer_armed = true;
p->context.working = false;
proactor_update_batch(p);
if (proactor_has_event(p))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org