You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/19 17:48:38 UTC

[4/5] qpid-proton git commit: PROTON-1494: libuv proactor PN_PROACTOR_INACTIVE behavior

PROTON-1494: libuv proactor PN_PROACTOR_INACTIVE behavior

Include timeout events in INACTIVE count.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/24092c91
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/24092c91
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/24092c91

Branch: refs/heads/master
Commit: 24092c9105fae64e7c91efdf6eaa2cb487ccf19e
Parents: 37421c4
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Jun 16 14:59:32 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Jun 19 13:15:25 2017 -0400

----------------------------------------------------------------------
 proton-c/src/proactor/libuv.c | 51 ++++++++++++++++++++++----------------
 proton-c/src/tests/proactor.c | 33 +++++++++++++++++++++---
 2 files changed, 59 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/24092c91/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 7460194..c0a4d8d 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -240,17 +240,18 @@ struct pn_proactor_t {
 
   /* Protected by lock */
   uv_mutex_t lock;
-  work_queue_t worker_q;      /* ready for work, to be returned via pn_proactor_wait()  */
-  work_queue_t leader_q;      /* waiting for attention by the leader thread */
+  work_queue_t worker_q; /* ready for work, to be returned via pn_proactor_wait()  */
+  work_queue_t leader_q; /* waiting for attention by the leader thread */
   timeout_state_t timeout_state;
   pn_millis_t timeout;
-  size_t count;               /* connection/listener count for INACTIVE events */
+  size_t active;         /* connection/listener count for INACTIVE events */
   pn_condition_t *disconnect_cond; /* disconnect condition */
-  bool disconnect;            /* disconnect requested */
-  bool inactive;
-  bool has_leader;
-  bool batch_working;         /* batch is being processed in a worker thread */
-  bool need_interrupt;        /* Need a PN_PROACTOR_INTERRUPT event */
+
+  bool has_leader;             /* A thread is working as leader */
+  bool disconnect;             /* disconnect requested */
+  bool batch_working;          /* batch is being processed in a worker thread */
+  bool need_interrupt;         /* Need a PN_PROACTOR_INTERRUPT event */
+  bool need_inactive;          /* need INACTIVE event */
 };
 
 
@@ -360,19 +361,22 @@ static inline work_t *batch_work(pn_event_batch_t *batch) {
 }
 
 /* Total count of listener and connections for PN_PROACTOR_INACTIVE */
-static void leader_inc(pn_proactor_t *p) {
+static void add_active(pn_proactor_t *p) {
   uv_mutex_lock(&p->lock);
-  ++p->count;
+  ++p->active;
   uv_mutex_unlock(&p->lock);
 }
 
-static void leader_dec(pn_proactor_t *p) {
-  uv_mutex_lock(&p->lock);
-  assert(p->count > 0);
-  if (--p->count == 0) {
-    p->inactive = true;
-    notify(p);
+static void remove_active_lh(pn_proactor_t *p) {
+  assert(p->active > 0);
+  if (--p->active == 0) {
+    p->need_inactive = true;
   }
+}
+
+static void remove_active(pn_proactor_t *p) {
+  uv_mutex_lock(&p->lock);
+  remove_active_lh(p);
   uv_mutex_unlock(&p->lock);
 }
 
@@ -394,7 +398,7 @@ static void on_close_pconnection_final(uv_handle_t *h) {
      will be valid, but no-ops.
   */
   pconnection_t *pc = (pconnection_t*)h->data;
-  leader_dec(pc->work.proactor);
+  remove_active(pc->work.proactor);
   pconnection_free(pc);
 }
 
@@ -495,7 +499,7 @@ static int pconnection_init(pconnection_t *pc) {
     }
   }
   if (!err) {
-    leader_inc(pc->work.proactor);
+    add_active(pc->work.proactor);
   } else {
     pconnection_error(pc, err, "initialization");
   }
@@ -638,7 +642,7 @@ static int lsocket(pn_listener_t *l, struct addrinfo *ai) {
 
 /* Listen on all available addresses */
 static void leader_listen_lh(pn_listener_t *l) {
-  leader_inc(l->work.proactor);
+  add_active(l->work.proactor);
   int err = leader_resolve(l->work.proactor, &l->addr, true);
   if (!err) {
     /* Find the working addresses */
@@ -724,7 +728,7 @@ static bool leader_process_listener(pn_listener_t *l) {
 
    case L_CLOSED:              /* Closed, has LISTENER_CLOSE has been processed? */
     if (!pn_collector_peek(l->collector)) {
-      leader_dec(l->work.proactor);
+      remove_active(l->work.proactor);
       closed = true;
     }
   }
@@ -824,8 +828,8 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
 /* Return the next event batch or NULL if no events are available */
 static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
   if (!p->batch_working) {       /* Can generate proactor events */
-    if (p->inactive) {
-      p->inactive = false;
+    if (p->need_inactive) {
+      p->need_inactive = false;
       return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
     }
     if (p->need_interrupt) {
@@ -834,6 +838,7 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
     }
     if (p->timeout_state == TM_FIRED) {
       p->timeout_state = TM_NONE;
+      remove_active_lh(p);
       return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT);
     }
   }
@@ -1114,6 +1119,7 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
   uv_mutex_lock(&p->lock);
   p->timeout = t;
   p->timeout_state = TM_REQUEST;
+  ++p->active;
   uv_mutex_unlock(&p->lock);
   notify(p);
 }
@@ -1122,6 +1128,7 @@ void pn_proactor_cancel_timeout(pn_proactor_t *p) {
   uv_mutex_lock(&p->lock);
   if (p->timeout_state != TM_NONE) {
     p->timeout_state = TM_NONE;
+    remove_active_lh(p);
     notify(p);
   }
   uv_mutex_unlock(&p->lock);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/24092c91/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 3e1598c..f6122c1 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -130,7 +130,6 @@ static pn_event_type_t proactor_test_run(proactor_test_t *pts, size_t n) {
   return e;
 }
 
-
 /* Drain and discard outstanding events from an array of proactors */
 static void proactor_test_drain(proactor_test_t *pts, size_t n) {
   while (proactor_test_get(pts, n))
@@ -467,17 +466,45 @@ static void test_inactive(test_t *t) {
   proactor_test_t pts[] =  { { open_wake_handler }, { listen_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+  /* Default test timeout will interfere with the test, cancel it */
+  pn_proactor_cancel_timeout(client);
+  pn_proactor_cancel_timeout(server);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+  /* Listen, connect, disconnect */
   proactor_test_listener_t l = proactor_test_listen(&pts[1], localhost);
   pn_connection_t *c = pn_connection();
   pn_proactor_connect(client, c, l.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
   pn_connection_wake(c);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
-  /* expect TRANSPORT_CLOSED from client and server, INACTIVE from client */
+  /* Expect TRANSPORT_CLOSED from client and server, INACTIVE from client */
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
-  /* server won't be INACTIVE until listener is closed */
+
+  /* Immediate timer generates INACTIVE on client (no connections) */
+  pn_proactor_set_timeout(client, 0);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+  /* Connect, set-timer, disconnect */
+  pn_proactor_set_timeout(client, 1000000);
+  c = pn_connection();
+  pn_proactor_connect(client, c, l.port.host_port);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+  pn_connection_wake(c);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
+  /* Expect TRANSPORT_CLOSED from client and server */
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  /* No INACTIVE till timer is cancelled */
+  TEST_CHECK(t, pn_proactor_get(server) == NULL);
+  pn_proactor_cancel_timeout(client);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+  /* Server won't be INACTIVE until listener is closed */
   TEST_CHECK(t, pn_proactor_get(server) == NULL);
   pn_listener_close(l.listener);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org