You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/19 17:48:39 UTC
[5/5] qpid-proton git commit: PROTON-1494: epoll proactor
PN_PROACTOR_INACTIVE behavior
PROTON-1494: epoll proactor PN_PROACTOR_INACTIVE behavior
Issue PN_PROACTOR_INACTIVE only when there are no connections, listeners OR
timeout set.
Updated the doc.
Updated the tests - dropped default timeout as it interferes with tests that check
for TIMEOUT or INACTIVE events, which is most of them.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3ae6efdf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3ae6efdf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3ae6efdf
Branch: refs/heads/master
Commit: 3ae6efdf6f0a099adb7310e1abc7ae057f537933
Parents: 24092c9
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Jun 16 16:28:35 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Jun 19 13:34:56 2017 -0400
----------------------------------------------------------------------
proton-c/src/proactor/epoll.c | 80 ++++++++++++++++++++++----------------
proton-c/src/tests/proactor.c | 15 +++----
2 files changed, 52 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ae6efdf/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 54703a5..9e8ea4d 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -376,11 +376,12 @@ struct pn_proactor_t {
epoll_extended_t epoll_interrupt;
pn_event_batch_t batch;
size_t disconnects_pending; /* unfinished proactor disconnects*/
- bool interrupt;
- bool inactive;
- bool timer_expired;
- bool timer_cancelled;
- bool timer_armed;
+ // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch()
+ bool need_interrupt;
+ bool need_inactive;
+ bool need_timeout;
+ bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */
+ bool timer_armed; /* timer is armed in epoll */
bool shutting_down;
// wake subsystem
int eventfd;
@@ -1192,6 +1193,19 @@ static int pgetaddrinfo(const char *host, const char *port, int flags, struct ad
return getaddrinfo(host, port, &hints, res);
}
+static inline bool is_inactive(pn_proactor_t *p) {
+ return (!p->contexts && !p->disconnects_pending && !p->timeout_set && !p->shutting_down);
+}
+
+/* If inactive set need_inactive and return true if the proactor needs a wakeup */
+static bool wait_if_inactive(pn_proactor_t *p) {
+ if (is_inactive(p)) {
+ p->need_inactive = true;
+ return wake(&p->context);
+ }
+ return false;
+}
+
void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
pconnection_t *pc = new_pconnection_t(p, c, false, addr);
assert(pc); // TODO: memory safety
@@ -1201,6 +1215,7 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
pn_connection_open(pc->driver.connection); /* Auto-open */
bool notify = false;
+ bool notify_proactor = false;
if (pc->disconnected) {
notify = wake(&pc->context); /* Error during initialization */
@@ -1214,10 +1229,13 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
} else {
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
notify = wake(&pc->context);
+ notify_proactor = wait_if_inactive(p);
}
}
+ /* We need to issue INACTIVE on immediate failure */
unlock(&pc->context.mutex);
if (notify) wake_notify(&pc->context);
+ if (notify_proactor) wake_notify(&p->context);
}
static void pconnection_tick(pconnection_t *pc) {
@@ -1638,18 +1656,20 @@ static bool proactor_update_batch(pn_proactor_t *p) {
if (proactor_has_event(p))
return true;
- if (p->timer_expired) {
- p->timer_expired = false;
+ if (p->need_timeout) {
+ p->need_timeout = false;
+ p->timeout_set = false;
proactor_add_event(p, PN_PROACTOR_TIMEOUT);
+ p->need_inactive = is_inactive(p);
return true;
}
- if (p->interrupt) {
- p->interrupt = false;
+ if (p->need_interrupt) {
+ p->need_interrupt = false;
proactor_add_event(p, PN_PROACTOR_INTERRUPT);
return true;
}
- if (p->inactive) {
- p->inactive = false;
+ if (p->need_inactive) {
+ p->need_inactive = false;
proactor_add_event(p, PN_PROACTOR_INACTIVE);
return true;
}
@@ -1669,11 +1689,12 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, pn_event_type_t even
bool timer_fired = (event == PN_PROACTOR_TIMEOUT) && ptimer_callback(&p->timer) != 0;
lock(&p->context.mutex);
if (event == PN_PROACTOR_INTERRUPT) {
- p->interrupt = true;
+ p->need_interrupt = true;
} else if (event == PN_PROACTOR_TIMEOUT) {
p->timer_armed = false;
- if (timer_fired && !p->timer_cancelled)
- p->timer_expired = true;
+ if (timer_fired && p->timeout_set) {
+ p->need_timeout = true;
+ }
} else {
wake_done(&p->context);
}
@@ -1708,15 +1729,11 @@ static void proactor_add(pcontext_t *ctx) {
static bool proactor_remove(pcontext_t *ctx) {
pn_proactor_t *p = ctx->proactor;
lock(&p->context.mutex);
- bool notify = false;
bool can_free = true;
if (ctx->disconnecting) {
// No longer on contexts list
if (--ctx->disconnect_ops == 0) {
- if (--p->disconnects_pending == 0 && !p->contexts) {
- p->inactive = true;
- notify = wake(&p->context);
- }
+ --p->disconnects_pending;
}
else // procator_disconnect() still processing
can_free = false; // this psocket
@@ -1731,14 +1748,11 @@ static bool proactor_remove(pcontext_t *ctx) {
if (p->contexts)
p->contexts->prev = NULL;
}
- if (ctx->next)
+ if (ctx->next) {
ctx->next->prev = ctx->prev;
-
- if (!p->contexts && !p->disconnects_pending && !p->shutting_down) {
- p->inactive = true;
- notify = wake(&p->context);
}
}
+ bool notify = wait_if_inactive(p);
unlock(&p->context.mutex);
if (notify) wake_notify(&p->context);
return can_free;
@@ -1846,7 +1860,8 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
if (proactor_has_event(p))
notify = wake(&p->context);
unlock(&p->context.mutex);
- if (notify) wake_notify(&p->context);
+ if (notify)
+ wake_notify(&p->context);
if (rearm_timer)
rearm(p, &p->timer.epoll_io);
return;
@@ -1864,10 +1879,10 @@ void pn_proactor_interrupt(pn_proactor_t *p) {
void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
bool notify = false;
lock(&p->context.mutex);
- p->timer_cancelled = false;
+ p->timeout_set = true;
if (t == 0) {
ptimer_set(&p->timer, 0);
- p->timer_expired = true;
+ p->need_timeout = true;
notify = wake(&p->context);
} else {
ptimer_set(&p->timer, t);
@@ -1878,10 +1893,12 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
void pn_proactor_cancel_timeout(pn_proactor_t *p) {
lock(&p->context.mutex);
- p->timer_cancelled = true; // stays cancelled until next set_timeout()
- p->timer_expired = false;
+ p->timeout_set = false;
+ p->need_timeout = false;
ptimer_set(&p->timer, 0);
+ bool notify = wait_if_inactive(p);
unlock(&p->context.mutex);
+ if (notify) wake_notify(&p->context);
}
pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
@@ -1951,10 +1968,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
if (--ctx->disconnect_ops == 0) {
do_free = true;
ctx_notify = false;
- if (--p->disconnects_pending == 0 && !p->contexts) {
- p->inactive = true;
- notify = wake(&p->context);
- }
+ notify = wait_if_inactive(p);
} else {
// If initiating the close, wake the pcontext to do the free.
if (ctx_notify)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ae6efdf/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index f6122c1..6ae633c 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -33,8 +33,6 @@
#include <stdlib.h>
#include <string.h>
-static pn_millis_t timeout = 7*1000; /* timeout for hanging tests */
-
static const char *localhost = ""; /* host for connect/listen */
typedef pn_event_type_t (*test_handler_fn)(test_t *, pn_event_t*);
@@ -57,7 +55,6 @@ static void proactor_test_init(proactor_test_t *pts, size_t n, test_t *t) {
if (!pt->t) pt->t = t;
if (!pt->proactor) pt->proactor = pn_proactor();
pt->log_len = 0;
- pn_proactor_set_timeout(pt->proactor, timeout);
}
}
@@ -175,14 +172,17 @@ static void test_interrupt_timeout(test_t *t) {
/* Set an immediate timeout */
pn_proactor_set_timeout(p, 0);
TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p)); /* Inactive because timeout expired */
/* Set a (very short) timeout */
- pn_proactor_set_timeout(p, 10);
+ pn_proactor_set_timeout(p, 1);
TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p));
/* Set and cancel a timeout, make sure we don't get the timeout event */
- pn_proactor_set_timeout(p, 10);
+ pn_proactor_set_timeout(p, 10000000);
pn_proactor_cancel_timeout(p);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, wait_next(p));
TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
pn_proactor_free(p);
@@ -466,11 +466,6 @@ static void test_inactive(test_t *t) {
proactor_test_t pts[] = { { open_wake_handler }, { listen_handler } };
PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
- /* Default test timeout will interfere with the test, cancel it */
- pn_proactor_cancel_timeout(client);
- pn_proactor_cancel_timeout(server);
- TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
- TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
/* Listen, connect, disconnect */
proactor_test_listener_t l = proactor_test_listen(&pts[1], localhost);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org