You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/19 17:48:39 UTC

[5/5] qpid-proton git commit: PROTON-1494: epoll proactor PN_PROACTOR_INACTIVE behavior

PROTON-1494: epoll proactor PN_PROACTOR_INACTIVE behavior

Issue PN_PROACTOR_INACTIVE only when there are no connections, listeners OR
timeout set.

Updated the doc.

Updated the tests - dropped default timeout as it interferes with tests that check
for TIMEOUT or INACTIVE events, which is most of them.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3ae6efdf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3ae6efdf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3ae6efdf

Branch: refs/heads/master
Commit: 3ae6efdf6f0a099adb7310e1abc7ae057f537933
Parents: 24092c9
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Jun 16 16:28:35 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Jun 19 13:34:56 2017 -0400

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 80 ++++++++++++++++++++++----------------
 proton-c/src/tests/proactor.c | 15 +++----
 2 files changed, 52 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ae6efdf/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 54703a5..9e8ea4d 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -376,11 +376,12 @@ struct pn_proactor_t {
   epoll_extended_t epoll_interrupt;
   pn_event_batch_t batch;
   size_t disconnects_pending;   /* unfinished proactor disconnects*/
-  bool interrupt;
-  bool inactive;
-  bool timer_expired;
-  bool timer_cancelled;
-  bool timer_armed;
+  // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch()
+  bool need_interrupt;
+  bool need_inactive;
+  bool need_timeout;
+  bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */
+  bool timer_armed; /* timer is armed in epoll */
   bool shutting_down;
   // wake subsystem
   int eventfd;
@@ -1192,6 +1193,19 @@ static int pgetaddrinfo(const char *host, const char *port, int flags, struct ad
   return getaddrinfo(host, port, &hints, res);
 }
 
+static inline bool is_inactive(pn_proactor_t *p) {
+  return (!p->contexts && !p->disconnects_pending && !p->timeout_set && !p->shutting_down);
+}
+
+/* If inactive set need_inactive and return true if the proactor needs a wakeup */
+static bool wait_if_inactive(pn_proactor_t *p) {
+  if (is_inactive(p)) {
+    p->need_inactive = true;
+    return wake(&p->context);
+  }
+  return false;
+}
+
 void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
   pconnection_t *pc = new_pconnection_t(p, c, false, addr);
   assert(pc); // TODO: memory safety
@@ -1201,6 +1215,7 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
   pn_connection_open(pc->driver.connection); /* Auto-open */
 
   bool notify = false;
+  bool notify_proactor = false;
 
   if (pc->disconnected) {
     notify = wake(&pc->context);    /* Error during initialization */
@@ -1214,10 +1229,13 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
     } else {
       psocket_gai_error(&pc->psocket, gai_error, "connect to ");
       notify = wake(&pc->context);
+      notify_proactor = wait_if_inactive(p);
     }
   }
+  /* We need to issue INACTIVE on immediate failure */
   unlock(&pc->context.mutex);
   if (notify) wake_notify(&pc->context);
+  if (notify_proactor) wake_notify(&p->context);
 }
 
 static void pconnection_tick(pconnection_t *pc) {
@@ -1638,18 +1656,20 @@ static bool proactor_update_batch(pn_proactor_t *p) {
   if (proactor_has_event(p))
     return true;
 
-  if (p->timer_expired) {
-    p->timer_expired = false;
+  if (p->need_timeout) {
+    p->need_timeout = false;
+    p->timeout_set = false;
     proactor_add_event(p, PN_PROACTOR_TIMEOUT);
+    p->need_inactive = is_inactive(p);
     return true;
   }
-  if (p->interrupt) {
-    p->interrupt = false;
+  if (p->need_interrupt) {
+    p->need_interrupt = false;
     proactor_add_event(p, PN_PROACTOR_INTERRUPT);
     return true;
   }
-  if (p->inactive) {
-    p->inactive = false;
+  if (p->need_inactive) {
+    p->need_inactive = false;
     proactor_add_event(p, PN_PROACTOR_INACTIVE);
     return true;
   }
@@ -1669,11 +1689,12 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, pn_event_type_t even
   bool timer_fired = (event == PN_PROACTOR_TIMEOUT) && ptimer_callback(&p->timer) != 0;
   lock(&p->context.mutex);
   if (event == PN_PROACTOR_INTERRUPT) {
-    p->interrupt = true;
+    p->need_interrupt = true;
   } else if (event == PN_PROACTOR_TIMEOUT) {
     p->timer_armed = false;
-    if (timer_fired && !p->timer_cancelled)
-      p->timer_expired = true;
+    if (timer_fired && p->timeout_set) {
+      p->need_timeout = true;
+    }
   } else {
     wake_done(&p->context);
   }
@@ -1708,15 +1729,11 @@ static void proactor_add(pcontext_t *ctx) {
 static bool proactor_remove(pcontext_t *ctx) {
   pn_proactor_t *p = ctx->proactor;
   lock(&p->context.mutex);
-  bool notify = false;
   bool can_free = true;
   if (ctx->disconnecting) {
     // No longer on contexts list
     if (--ctx->disconnect_ops == 0) {
-      if (--p->disconnects_pending == 0 && !p->contexts) {
-        p->inactive = true;
-        notify = wake(&p->context);
-      }
+      --p->disconnects_pending;
     }
     else                  // procator_disconnect() still processing
       can_free = false;   // this psocket
@@ -1731,14 +1748,11 @@ static bool proactor_remove(pcontext_t *ctx) {
       if (p->contexts)
         p->contexts->prev = NULL;
     }
-    if (ctx->next)
+    if (ctx->next) {
       ctx->next->prev = ctx->prev;
-
-    if (!p->contexts && !p->disconnects_pending && !p->shutting_down) {
-      p->inactive = true;
-      notify = wake(&p->context);
     }
   }
+  bool notify = wait_if_inactive(p);
   unlock(&p->context.mutex);
   if (notify) wake_notify(&p->context);
   return can_free;
@@ -1846,7 +1860,8 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
     if (proactor_has_event(p))
       notify = wake(&p->context);
     unlock(&p->context.mutex);
-    if (notify) wake_notify(&p->context);
+    if (notify)
+      wake_notify(&p->context);
     if (rearm_timer)
       rearm(p, &p->timer.epoll_io);
     return;
@@ -1864,10 +1879,10 @@ void pn_proactor_interrupt(pn_proactor_t *p) {
 void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
   bool notify = false;
   lock(&p->context.mutex);
-  p->timer_cancelled = false;
+  p->timeout_set = true;
   if (t == 0) {
     ptimer_set(&p->timer, 0);
-    p->timer_expired = true;
+    p->need_timeout = true;
     notify = wake(&p->context);
   } else {
     ptimer_set(&p->timer, t);
@@ -1878,10 +1893,12 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
 
 void pn_proactor_cancel_timeout(pn_proactor_t *p) {
   lock(&p->context.mutex);
-  p->timer_cancelled = true;  // stays cancelled until next set_timeout()
-  p->timer_expired = false;
+  p->timeout_set = false;
+  p->need_timeout = false;
   ptimer_set(&p->timer, 0);
+  bool notify = wait_if_inactive(p);
   unlock(&p->context.mutex);
+  if (notify) wake_notify(&p->context);
 }
 
 pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
@@ -1951,10 +1968,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
     if (--ctx->disconnect_ops == 0) {
       do_free = true;
       ctx_notify = false;
-      if (--p->disconnects_pending == 0 && !p->contexts) {
-        p->inactive = true;
-        notify = wake(&p->context);
-      }
+      notify = wait_if_inactive(p);
     } else {
       // If initiating the close, wake the pcontext to do the free.
       if (ctx_notify)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ae6efdf/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index f6122c1..6ae633c 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -33,8 +33,6 @@
 #include <stdlib.h>
 #include <string.h>
 
-static pn_millis_t timeout = 7*1000; /* timeout for hanging tests */
-
 static const char *localhost = ""; /* host for connect/listen */
 
 typedef pn_event_type_t (*test_handler_fn)(test_t *, pn_event_t*);
@@ -57,7 +55,6 @@ static void proactor_test_init(proactor_test_t *pts, size_t n, test_t *t) {
     if (!pt->t) pt->t = t;
     if (!pt->proactor) pt->proactor = pn_proactor();
     pt->log_len = 0;
-    pn_proactor_set_timeout(pt->proactor, timeout);
   }
 }
 
@@ -175,14 +172,17 @@ static void test_interrupt_timeout(test_t *t) {
   /* Set an immediate timeout */
   pn_proactor_set_timeout(p, 0);
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p)); /* Inactive because timeout expired */
 
   /* Set a (very short) timeout */
-  pn_proactor_set_timeout(p, 10);
+  pn_proactor_set_timeout(p, 1);
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p));
 
   /* Set and cancel a timeout, make sure we don't get the timeout event */
-  pn_proactor_set_timeout(p, 10);
+  pn_proactor_set_timeout(p, 10000000);
   pn_proactor_cancel_timeout(p);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p));
   TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
 
   pn_proactor_free(p);
@@ -466,11 +466,6 @@ static void test_inactive(test_t *t) {
   proactor_test_t pts[] =  { { open_wake_handler }, { listen_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
-  /* Default test timeout will interfere with the test, cancel it */
-  pn_proactor_cancel_timeout(client);
-  pn_proactor_cancel_timeout(server);
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   /* Listen, connect, disconnect */
   proactor_test_listener_t l = proactor_test_listen(&pts[1], localhost);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org