You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/03/23 01:44:39 UTC

qpid-proton git commit: PROTON-1443: c proactor - pn_proactor_set_timeout(0) to mean immediate

Repository: qpid-proton
Updated Branches:
  refs/heads/master b143db26f -> cfa366356


PROTON-1443: c proactor - pn_proactor_set_timeout(0) to mean immediate

Also added pn_proactor_cancel_timeout()


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cfa36635
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cfa36635
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cfa36635

Branch: refs/heads/master
Commit: cfa3663568e3717e7978a3c0223a8c3b0f945656
Parents: b143db2
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Mar 22 21:41:19 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Mar 22 21:41:19 2017 -0400

----------------------------------------------------------------------
 proton-c/include/proton/proactor.h |  8 +++++--
 proton-c/src/proactor/libuv.c      | 40 ++++++++++++++++++++-------------
 proton-c/src/tests/proactor.c      | 14 +++++++++++-
 3 files changed, 43 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cfa36635/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 185a1af..345e3fc 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -164,8 +164,7 @@ PNP_EXTERN void pn_proactor_interrupt(pn_proactor_t *proactor);
  * Note: calling pn_proactor_set_timeout() again before the
  * PN_PROACTOR_TIMEOUT is delivered will cancel the previous timeout
  * and deliver an event only after the new
- * timeout. `pn_proactor_set_timeout(0)` will cancel the timeout
- * without setting a new one.
+ * timeout.
  *
  * Note: PN_PROACTOR_TIMEOUT events will be delivered in series, never
  * concurrently.
@@ -173,6 +172,11 @@ PNP_EXTERN void pn_proactor_interrupt(pn_proactor_t *proactor);
 PNP_EXTERN void pn_proactor_set_timeout(pn_proactor_t *proactor, pn_millis_t timeout);
 
 /**
+ * Cancel the pending timeout set by pn_proactor_set_timeout() if there is one.
+ */
+PNP_EXTERN void pn_proactor_cancel_timeout(pn_proactor_t *proactor);
+
+/**
  * Cause a PN_CONNECTION_WAKE event to be returned by the proactor, even if
  * there are no IO events pending for the connection.
  *

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cfa36635/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 8fc3eaf..1ba840a 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -217,6 +217,8 @@ struct pn_listener_t {
   listener_state state;
 };
 
+typedef enum { TM_NONE, TM_REQUEST, TM_PENDING, TM_FIRED } timeout_state_t;
+
 struct pn_proactor_t {
   /* Leader thread  */
   uv_cond_t cond;
@@ -230,16 +232,15 @@ struct pn_proactor_t {
 
   /* Protected by lock */
   uv_mutex_t lock;
-  work_queue_t worker_q;               /* ready for work, to be returned via pn_proactor_wait()  */
-  work_queue_t leader_q;               /* waiting for attention by the leader thread */
-  size_t interrupt;             /* pending interrupts */
+  work_queue_t worker_q;      /* ready for work, to be returned via pn_proactor_wait()  */
+  work_queue_t leader_q;      /* waiting for attention by the leader thread */
+  size_t interrupt;           /* pending interrupts */
+  timeout_state_t timeout_state;
   pn_millis_t timeout;
-  size_t count;                 /* connection/listener count for INACTIVE events */
+  size_t count;               /* connection/listener count for INACTIVE events */
   bool inactive;
-  bool timeout_request;
-  bool timeout_elapsed;
   bool has_leader;
-  bool batch_working;          /* batch is being processed in a worker thread */
+  bool batch_working;         /* batch is being processed in a worker thread */
 };
 
 
@@ -736,7 +737,9 @@ static void on_write(uv_write_t* write, int err) {
 static void on_timeout(uv_timer_t *timer) {
   pn_proactor_t *p = (pn_proactor_t*)timer->data;
   uv_mutex_lock(&p->lock);
-  p->timeout_elapsed = true;
+  if (p->timeout_state == TM_PENDING) { /* Only fire if still pending */
+    p->timeout_state = TM_FIRED;
+  }
   uv_stop(&p->loop);            /* UV does not always stop after on_timeout without this */
   uv_mutex_unlock(&p->lock);
 }
@@ -787,8 +790,8 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
       --p->interrupt;
       return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
     }
-    if (p->timeout_elapsed) {
-      p->timeout_elapsed = false;
+    if (p->timeout_state == TM_FIRED) {
+      p->timeout_state = TM_NONE;
       return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT);
     }
   }
@@ -883,12 +886,10 @@ void pconnection_detach(pconnection_t *pc) {
 /* Process the leader_q and the UV loop, in the leader thread */
 static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
   /* Set timeout timer if there was a request, let it count down while we process work */
-  if (p->timeout_request) {
-    p->timeout_request = false;
+  if (p->timeout_state == TM_REQUEST) {
+    p->timeout_state = TM_PENDING;
     uv_timer_stop(&p->timer);
-    if (p->timeout) {
-      uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
-    }
+    uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
   }
   pn_event_batch_t *batch = NULL;
   for (work_t *w = work_pop(&p->leader_q); w; w = work_pop(&p->leader_q)) {
@@ -1014,7 +1015,14 @@ void pn_proactor_interrupt(pn_proactor_t *p) {
 void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
   uv_mutex_lock(&p->lock);
   p->timeout = t;
-  p->timeout_request = true;
+  p->timeout_state = TM_REQUEST;
+  uv_mutex_unlock(&p->lock);
+  notify(p);
+}
+
+void pn_proactor_cancel_timeout(pn_proactor_t *p) {
+  uv_mutex_lock(&p->lock);
+  p->timeout_state = TM_NONE;
   uv_mutex_unlock(&p->lock);
   notify(p);
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cfa36635/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 38237b0..34a1880 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -121,8 +121,20 @@ static void test_interrupt_timeout(test_t *t) {
   pn_proactor_interrupt(p);
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INTERRUPT, wait_next(p));
   TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
-  pn_proactor_set_timeout(p, 1); /* very short timeout */
+
+  /* Set an immediate timeout */
+  pn_proactor_set_timeout(p, 0);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+
+  /* Set a (very short) timeout */
+  pn_proactor_set_timeout(p, 10);
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+
+  /* Set and cancel a timeout, make sure we don't get the timeout event */
+  pn_proactor_set_timeout(p, 10);
+  pn_proactor_cancel_timeout(p);
+  TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
+
   pn_proactor_free(p);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org