You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/19 17:48:38 UTC
[4/5] qpid-proton git commit: PROTON-1494: libuv proactor
PN_PROACTOR_INACTIVE behavior
PROTON-1494: libuv proactor PN_PROACTOR_INACTIVE behavior
Include timeout events in INACTIVE count.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/24092c91
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/24092c91
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/24092c91
Branch: refs/heads/master
Commit: 24092c9105fae64e7c91efdf6eaa2cb487ccf19e
Parents: 37421c4
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Jun 16 14:59:32 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Jun 19 13:15:25 2017 -0400
----------------------------------------------------------------------
proton-c/src/proactor/libuv.c | 51 ++++++++++++++++++++++----------------
proton-c/src/tests/proactor.c | 33 +++++++++++++++++++++---
2 files changed, 59 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/24092c91/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 7460194..c0a4d8d 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -240,17 +240,18 @@ struct pn_proactor_t {
/* Protected by lock */
uv_mutex_t lock;
- work_queue_t worker_q; /* ready for work, to be returned via pn_proactor_wait() */
- work_queue_t leader_q; /* waiting for attention by the leader thread */
+ work_queue_t worker_q; /* ready for work, to be returned via pn_proactor_wait() */
+ work_queue_t leader_q; /* waiting for attention by the leader thread */
timeout_state_t timeout_state;
pn_millis_t timeout;
- size_t count; /* connection/listener count for INACTIVE events */
+ size_t active; /* connection/listener count for INACTIVE events */
pn_condition_t *disconnect_cond; /* disconnect condition */
- bool disconnect; /* disconnect requested */
- bool inactive;
- bool has_leader;
- bool batch_working; /* batch is being processed in a worker thread */
- bool need_interrupt; /* Need a PN_PROACTOR_INTERRUPT event */
+
+ bool has_leader; /* A thread is working as leader */
+ bool disconnect; /* disconnect requested */
+ bool batch_working; /* batch is being processed in a worker thread */
+ bool need_interrupt; /* Need a PN_PROACTOR_INTERRUPT event */
+ bool need_inactive; /* need INACTIVE event */
};
@@ -360,19 +361,22 @@ static inline work_t *batch_work(pn_event_batch_t *batch) {
}
/* Total count of listener and connections for PN_PROACTOR_INACTIVE */
-static void leader_inc(pn_proactor_t *p) {
+static void add_active(pn_proactor_t *p) {
uv_mutex_lock(&p->lock);
- ++p->count;
+ ++p->active;
uv_mutex_unlock(&p->lock);
}
-static void leader_dec(pn_proactor_t *p) {
- uv_mutex_lock(&p->lock);
- assert(p->count > 0);
- if (--p->count == 0) {
- p->inactive = true;
- notify(p);
+static void remove_active_lh(pn_proactor_t *p) {
+ assert(p->active > 0);
+ if (--p->active == 0) {
+ p->need_inactive = true;
}
+}
+
+static void remove_active(pn_proactor_t *p) {
+ uv_mutex_lock(&p->lock);
+ remove_active_lh(p);
uv_mutex_unlock(&p->lock);
}
@@ -394,7 +398,7 @@ static void on_close_pconnection_final(uv_handle_t *h) {
will be valid, but no-ops.
*/
pconnection_t *pc = (pconnection_t*)h->data;
- leader_dec(pc->work.proactor);
+ remove_active(pc->work.proactor);
pconnection_free(pc);
}
@@ -495,7 +499,7 @@ static int pconnection_init(pconnection_t *pc) {
}
}
if (!err) {
- leader_inc(pc->work.proactor);
+ add_active(pc->work.proactor);
} else {
pconnection_error(pc, err, "initialization");
}
@@ -638,7 +642,7 @@ static int lsocket(pn_listener_t *l, struct addrinfo *ai) {
/* Listen on all available addresses */
static void leader_listen_lh(pn_listener_t *l) {
- leader_inc(l->work.proactor);
+ add_active(l->work.proactor);
int err = leader_resolve(l->work.proactor, &l->addr, true);
if (!err) {
/* Find the working addresses */
@@ -724,7 +728,7 @@ static bool leader_process_listener(pn_listener_t *l) {
case L_CLOSED: /* Closed, has LISTENER_CLOSE has been processed? */
if (!pn_collector_peek(l->collector)) {
- leader_dec(l->work.proactor);
+ remove_active(l->work.proactor);
closed = true;
}
}
@@ -824,8 +828,8 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
/* Return the next event batch or NULL if no events are available */
static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
if (!p->batch_working) { /* Can generate proactor events */
- if (p->inactive) {
- p->inactive = false;
+ if (p->need_inactive) {
+ p->need_inactive = false;
return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
}
if (p->need_interrupt) {
@@ -834,6 +838,7 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
}
if (p->timeout_state == TM_FIRED) {
p->timeout_state = TM_NONE;
+ remove_active_lh(p);
return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT);
}
}
@@ -1114,6 +1119,7 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
uv_mutex_lock(&p->lock);
p->timeout = t;
p->timeout_state = TM_REQUEST;
+ ++p->active;
uv_mutex_unlock(&p->lock);
notify(p);
}
@@ -1122,6 +1128,7 @@ void pn_proactor_cancel_timeout(pn_proactor_t *p) {
uv_mutex_lock(&p->lock);
if (p->timeout_state != TM_NONE) {
p->timeout_state = TM_NONE;
+ remove_active_lh(p);
notify(p);
}
uv_mutex_unlock(&p->lock);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/24092c91/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 3e1598c..f6122c1 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -130,7 +130,6 @@ static pn_event_type_t proactor_test_run(proactor_test_t *pts, size_t n) {
return e;
}
-
/* Drain and discard outstanding events from an array of proactors */
static void proactor_test_drain(proactor_test_t *pts, size_t n) {
while (proactor_test_get(pts, n))
@@ -467,17 +466,45 @@ static void test_inactive(test_t *t) {
proactor_test_t pts[] = { { open_wake_handler }, { listen_handler } };
PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+ /* Default test timeout will interfere with the test, cancel it */
+ pn_proactor_cancel_timeout(client);
+ pn_proactor_cancel_timeout(server);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+ /* Listen, connect, disconnect */
proactor_test_listener_t l = proactor_test_listen(&pts[1], localhost);
pn_connection_t *c = pn_connection();
pn_proactor_connect(client, c, l.port.host_port);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
pn_connection_wake(c);
TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
- /* expect TRANSPORT_CLOSED from client and server, INACTIVE from client */
+ /* Expect TRANSPORT_CLOSED from client and server, INACTIVE from client */
TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
- /* server won't be INACTIVE until listener is closed */
+
+ /* Immediate timer generates INACTIVE on client (no connections) */
+ pn_proactor_set_timeout(client, 0);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, PROACTOR_TEST_RUN(pts));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+ /* Connect, set-timer, disconnect */
+ pn_proactor_set_timeout(client, 1000000);
+ c = pn_connection();
+ pn_proactor_connect(client, c, l.port.host_port);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+ pn_connection_wake(c);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
+ /* Expect TRANSPORT_CLOSED from client and server */
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ /* No INACTIVE till timer is cancelled */
+ TEST_CHECK(t, pn_proactor_get(server) == NULL);
+ pn_proactor_cancel_timeout(client);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+ /* Server won't be INACTIVE until listener is closed */
TEST_CHECK(t, pn_proactor_get(server) == NULL);
pn_listener_close(l.listener);
TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org