You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/23 22:51:37 UTC
[30/38] qpid-proton git commit: PROTON-1403: C libuv proactor fixes
and tests
PROTON-1403: C libuv proactor fixes and tests
- wake before connection prevented connection
- early errors (e.g. bad host/port) not handled correctly
- INACTIVE event not generated on listener closed
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/204c8474
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/204c8474
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/204c8474
Branch: refs/heads/go1
Commit: 204c8474d2b34f640189eb5c3063622b45fb5cc9
Parents: 6b6dd86
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 20 19:56:19 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 20 20:01:54 2017 -0500
----------------------------------------------------------------------
proton-c/src/proactor/libuv.c | 28 ++++----
proton-c/src/tests/proactor.c | 133 +++++++++++++++++++----------------
proton-c/src/tests/test_tools.h | 14 ++++
3 files changed, 102 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/204c8474/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index c7322cd..61239ac 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -235,11 +235,8 @@ static void to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
/* Push to the worker thread */
static void to_worker(psocket_t *ps) {
uv_mutex_lock(&ps->proactor->lock);
- /* If already ON_WORKER do nothing */
- if (ps->next == &UNLISTED && ps->state != ON_WORKER) {
- ps->state = ON_WORKER;
- push_lh(&ps->proactor->worker_q, ps);
- }
+ ps->state = ON_WORKER;
+ push_lh(&ps->proactor->worker_q, ps);
uv_mutex_unlock(&ps->proactor->lock);
}
@@ -310,7 +307,10 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
static void leader_count(pn_proactor_t *p, int change) {
uv_mutex_lock(&p->lock);
p->count += change;
- p->inactive = (p->count == 0);
+ if (p->count == 0) {
+ p->inactive = true;
+ uv_async_send(&p->async); /* Wake leader */
+ }
uv_mutex_unlock(&p->lock);
}
@@ -329,8 +329,8 @@ static void on_close_pconnection_final(uv_handle_t *h) {
/* Close event for uv_tcp_t of a psocket_t */
static void on_close_psocket(uv_handle_t *h) {
psocket_t *ps = (psocket_t*)h->data;
+ leader_count(ps->proactor, -1);
if (ps->is_conn) {
- leader_count(ps->proactor, -1);
pconnection_t *pc = as_pconnection(ps);
uv_timer_stop(&pc->timer);
/* Delay the free till the timer handle is also closed */
@@ -465,7 +465,7 @@ static void leader_connect(psocket_t *ps) {
if (!err) {
ps->state = ON_UV;
} else {
- psocket_error(ps, err, "connecting to");
+ pconnection_error(pc, err, "connecting to");
}
}
@@ -737,16 +737,16 @@ static void leader_process_lh(pn_proactor_t *p) {
}
for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
assert(ps->state == ON_LEADER);
- if (ps->wakeup) {
- uv_mutex_unlock(&p->lock);
- ps->wakeup(ps);
- ps->wakeup = NULL;
- uv_mutex_lock(&p->lock);
- } else if (ps->action) {
+ if (ps->action) {
uv_mutex_unlock(&p->lock);
ps->action(ps);
ps->action = NULL;
uv_mutex_lock(&p->lock);
+ } else if (ps->wakeup) {
+ uv_mutex_unlock(&p->lock);
+ ps->wakeup(ps);
+ ps->wakeup = NULL;
+ uv_mutex_lock(&p->lock);
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/204c8474/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 9f8b1fe..6b95c11 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -32,6 +32,11 @@ static pn_millis_t timeout = 5*1000; /* timeout for hanging tests */
static const char *localhost = "127.0.0.1"; /* host for connect/listen */
+struct test_events {
+ pn_proactor_t *proactor;
+ pn_event_batch_t *events;
+};
+
/* Wait for the next single event, return its type */
static pn_event_type_t wait_next(pn_proactor_t *proactor) {
pn_event_batch_t *events = pn_proactor_wait(proactor);
@@ -121,49 +126,40 @@ int proactor_test_run(proactor_test_t *pts, size_t n) {
}
-/* Simple test of client connect to a listening server */
-handler_state_t listen_connect_server(test_t *t, pn_event_t *e) {
- switch (pn_event_type(e)) {
- /* Ignore these events */
- case PN_LISTENER_OPEN:
- case PN_CONNECTION_LOCAL_OPEN:
- case PN_CONNECTION_REMOTE_OPEN:
- case PN_CONNECTION_BOUND:
- return H_CONTINUE;
-
- /* Act on these events */
- case PN_LISTENER_ACCEPT:
- pn_listener_accept(pn_event_listener(e), pn_connection());
- return H_CONTINUE;
- case PN_CONNECTION_INIT:
- pn_connection_open(pn_event_connection(e));
- return H_CONTINUE;
- case PN_CONNECTION_REMOTE_CLOSE:
- return H_FINISHED;
+/* Handler for test_listen_connect, does both sides of the connection */
+handler_state_t listen_connect_handler(test_t *t, pn_event_t *e) {
+ pn_connection_t *c = pn_event_connection(e);
+ pn_listener_t *l = pn_event_listener(e);
- default:
- TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
- return H_FAILED;
- break;
- }
-}
-
-handler_state_t listen_connect_client(test_t *t, pn_event_t *e) {
switch (pn_event_type(e)) {
/* Ignore these events */
case PN_CONNECTION_LOCAL_OPEN:
case PN_CONNECTION_BOUND:
+ case PN_CONNECTION_INIT:
return H_CONTINUE;
/* Act on these events */
- case PN_CONNECTION_INIT:
- pn_connection_open(pn_event_connection(e));
+ case PN_LISTENER_ACCEPT: {
+ pn_connection_t *accepted = pn_connection();
+ pn_connection_open(accepted);
+ pn_listener_accept(l, accepted); /* Listener takes ownership of accepted */
return H_CONTINUE;
+ }
+
case PN_CONNECTION_REMOTE_OPEN:
- pn_connection_close(pn_event_connection(e));
+ if (pn_connection_state(c) | PN_LOCAL_ACTIVE) { /* Client is fully open - the test is done */
+ pn_connection_close(c);
+ return H_FINISHED;
+ } else { /* Server returns the open */
+ pn_connection_open(c);
+ }
+
+ case PN_CONNECTION_REMOTE_CLOSE:
+ if (pn_connection_state(c) | PN_LOCAL_ACTIVE) {
+ pn_connection_close(c); /* Return the close */
+ }
return H_FINISHED;
- /* Unexpected events */
default:
TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
return H_FAILED;
@@ -171,38 +167,18 @@ handler_state_t listen_connect_client(test_t *t, pn_event_t *e) {
}
}
-/* Simplest client/server interaction */
-static void test_listen_connect(test_t *t) {
- proactor_test_t pts[] = { { t, listen_connect_client }, { t, listen_connect_server } };
- proactor_test_t *client = &pts[0], *server = &pts[1];
- proactor_test_init(pts, 2);
-
- sock_t sock = sock_bind0(); /* Hold a port */
- char port_str[16];
- snprintf(port_str, sizeof(port_str), "%d", sock_port(sock));
- pn_proactor_listen(server->proactor, pn_listener(), localhost, port_str, 4);
- pn_event_type_t etype = wait_for(server->proactor, PN_LISTENER_OPEN);
- if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
- pn_proactor_connect(client->proactor, pn_connection(), localhost, port_str);
- proactor_test_run(pts, 2);
- }
- sock_close(sock);
- pn_proactor_free(client->proactor);
- pn_proactor_free(server->proactor);
-}
-
-/* Test error handling */
-static void test_listen_connect_error(test_t *t) {
+/* Test bad-address error handling for listen and connect */
+static void test_early_error(test_t *t) {
pn_proactor_t *p = pn_proactor();
pn_proactor_set_timeout(p, timeout); /* In case of hang */
pn_connection_t *c = pn_connection();
- pn_proactor_connect(p, c, "nosuchost", "nosuchport");
+ pn_proactor_connect(p, c, "badhost", "amqp");
pn_event_type_t etype = wait_for(p, PN_TRANSPORT_CLOSED);
TEST_CHECK(t, PN_TRANSPORT_CLOSED == etype, pn_event_type_name(etype));
TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))), "");
pn_listener_t *l = pn_listener();
- pn_proactor_listen(p, l, "nosuchost", "nosuchport", 1);
+ pn_proactor_listen(p, l, "badhost", "amqp", 1);
etype = wait_for(p, PN_LISTENER_CLOSE);
TEST_CHECK(t, PN_LISTENER_CLOSE == etype, pn_event_type_name(etype));
TEST_CHECK(t, pn_condition_is_set(pn_listener_condition(l)), "");
@@ -210,12 +186,51 @@ static void test_listen_connect_error(test_t *t) {
pn_proactor_free(p);
}
+/* Simplest client/server interaction with 2 proactors */
+static void test_listen_connect(test_t *t) {
+ proactor_test_t pts[] = { { t, listen_connect_handler }, { t, listen_connect_handler } };
+ proactor_test_init(pts, 2);
+ pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+ test_port_t port = test_port(); /* Hold a port */
+
+ pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
+ pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
+ if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
+ sock_close(port.sock);
+ pn_proactor_connect(client, pn_connection(), localhost, port.str);
+ proactor_test_run(pts, 2);
+ }
+ pn_proactor_free(client);
+ pn_proactor_free(server);
+}
+
+/* Test that INACTIVE event is generated when last connections/listeners closes. */
+static void test_inactive(test_t *t) {
+ proactor_test_t pts[] = { { t, listen_connect_handler }, { t, listen_connect_handler }};
+ proactor_test_init(pts, 2);
+ pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+ test_port_t port = test_port(); /* Hold a port */
+
+ pn_listener_t *l = pn_listener();
+ pn_proactor_listen(server, l, localhost, port.str, 4);
+ pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
+ if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
+ sock_close(port.sock);
+ pn_proactor_connect(client, pn_connection(), localhost, port.str);
+ proactor_test_run(pts, 2);
+ etype = wait_for(client, PN_PROACTOR_INACTIVE);
+ pn_listener_close(l);
+ etype = wait_for(server, PN_PROACTOR_INACTIVE);
+ }
+ pn_proactor_free(client);
+ pn_proactor_free(server);
+}
+
int main(int argv, char** argc) {
int failed = 0;
- if (0) {
- RUN_TEST(failed, t, test_interrupt_timeout(&t));
- RUN_TEST(failed, t, test_listen_connect_error(&t));
- }
+ RUN_TEST(failed, t, test_inactive(&t));
+ RUN_TEST(failed, t, test_interrupt_timeout(&t));
+ RUN_TEST(failed, t, test_early_error(&t));
RUN_TEST(failed, t, test_listen_connect(&t));
return failed;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/204c8474/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 2619337..2663dcf 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -153,4 +153,18 @@ static int sock_port(sock_t sock) {
return ntohs(port);
}
+typedef struct test_port_t {
+ sock_t sock;
+ int port;
+ char str[256];
+} test_port_t;
+
+static inline test_port_t test_port(void) {
+ test_port_t tp = {0};
+ tp.sock = sock_bind0();
+ tp.port = sock_port(tp.sock);
+ snprintf(tp.str, sizeof(tp.str), "%d", tp.port);
+ return tp;
+}
+
#endif // TESTS_TEST_TOOLS_H
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org