You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/21 01:04:52 UTC

qpid-proton git commit: PROTON-1403: C libuv proactor fixes and tests

Repository: qpid-proton
Updated Branches:
  refs/heads/master 6b6dd8699 -> 204c8474d


PROTON-1403: C libuv proactor fixes and tests

- wake before connection prevented connection
- early errors (e.g. bad host/port) not handled correctly
- INACTIVE event not generated on listener closed


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/204c8474
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/204c8474
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/204c8474

Branch: refs/heads/master
Commit: 204c8474d2b34f640189eb5c3063622b45fb5cc9
Parents: 6b6dd86
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 20 19:56:19 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 20 20:01:54 2017 -0500

----------------------------------------------------------------------
 proton-c/src/proactor/libuv.c   |  28 ++++----
 proton-c/src/tests/proactor.c   | 133 +++++++++++++++++++----------------
 proton-c/src/tests/test_tools.h |  14 ++++
 3 files changed, 102 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/204c8474/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index c7322cd..61239ac 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -235,11 +235,8 @@ static void to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
 /* Push to the worker thread */
 static void to_worker(psocket_t *ps) {
   uv_mutex_lock(&ps->proactor->lock);
-  /* If already ON_WORKER do nothing */
-  if (ps->next == &UNLISTED && ps->state != ON_WORKER) {
-    ps->state = ON_WORKER;
-    push_lh(&ps->proactor->worker_q, ps);
-  }
+  ps->state = ON_WORKER;
+  push_lh(&ps->proactor->worker_q, ps);
   uv_mutex_unlock(&ps->proactor->lock);
 }
 
@@ -310,7 +307,10 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
 static void leader_count(pn_proactor_t *p, int change) {
   uv_mutex_lock(&p->lock);
   p->count += change;
-  p->inactive = (p->count == 0);
+  if (p->count == 0) {
+    p->inactive = true;
+    uv_async_send(&p->async); /* Wake leader */
+  }
   uv_mutex_unlock(&p->lock);
 }
 
@@ -329,8 +329,8 @@ static void on_close_pconnection_final(uv_handle_t *h) {
 /* Close event for uv_tcp_t of a psocket_t */
 static void on_close_psocket(uv_handle_t *h) {
   psocket_t *ps = (psocket_t*)h->data;
+  leader_count(ps->proactor, -1);
   if (ps->is_conn) {
-    leader_count(ps->proactor, -1);
     pconnection_t *pc = as_pconnection(ps);
     uv_timer_stop(&pc->timer);
     /* Delay the free till the timer handle is also closed */
@@ -465,7 +465,7 @@ static void leader_connect(psocket_t *ps) {
   if (!err) {
     ps->state = ON_UV;
   } else {
-    psocket_error(ps, err, "connecting to");
+    pconnection_error(pc, err, "connecting to");
   }
 }
 
@@ -737,16 +737,16 @@ static void leader_process_lh(pn_proactor_t *p) {
   }
   for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
     assert(ps->state == ON_LEADER);
-    if (ps->wakeup) {
-      uv_mutex_unlock(&p->lock);
-      ps->wakeup(ps);
-      ps->wakeup = NULL;
-      uv_mutex_lock(&p->lock);
-    } else if (ps->action) {
+    if (ps->action) {
       uv_mutex_unlock(&p->lock);
       ps->action(ps);
       ps->action = NULL;
       uv_mutex_lock(&p->lock);
+    } else if (ps->wakeup) {
+      uv_mutex_unlock(&p->lock);
+      ps->wakeup(ps);
+      ps->wakeup = NULL;
+      uv_mutex_lock(&p->lock);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/204c8474/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 9f8b1fe..6b95c11 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -32,6 +32,11 @@ static pn_millis_t timeout = 5*1000; /* timeout for hanging tests */
 
 static const char *localhost = "127.0.0.1"; /* host for connect/listen */
 
+struct test_events {
+  pn_proactor_t *proactor;
+  pn_event_batch_t *events;
+};
+
 /* Wait for the next single event, return its type */
 static pn_event_type_t wait_next(pn_proactor_t *proactor) {
   pn_event_batch_t *events = pn_proactor_wait(proactor);
@@ -121,49 +126,40 @@ int proactor_test_run(proactor_test_t *pts, size_t n) {
 }
 
 
-/* Simple test of client connect to a listening server */
-handler_state_t listen_connect_server(test_t *t, pn_event_t *e) {
-  switch (pn_event_type(e)) {
-    /* Ignore these events */
-   case PN_LISTENER_OPEN:
-   case PN_CONNECTION_LOCAL_OPEN:
-   case PN_CONNECTION_REMOTE_OPEN:
-   case PN_CONNECTION_BOUND:
-    return H_CONTINUE;
-
-    /* Act on these events */
-   case PN_LISTENER_ACCEPT:
-     pn_listener_accept(pn_event_listener(e), pn_connection());
-     return H_CONTINUE;
-   case PN_CONNECTION_INIT:
-    pn_connection_open(pn_event_connection(e));
-    return H_CONTINUE;
-   case PN_CONNECTION_REMOTE_CLOSE:
-    return H_FINISHED;
+/* Handler for test_listen_connect, does both sides of the connection */
+handler_state_t listen_connect_handler(test_t *t, pn_event_t *e) {
+  pn_connection_t *c = pn_event_connection(e);
+  pn_listener_t *l = pn_event_listener(e);
 
-   default:
-    TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
-    return H_FAILED;
-    break;
-  }
-}
-
-handler_state_t listen_connect_client(test_t *t, pn_event_t *e) {
   switch (pn_event_type(e)) {
     /* Ignore these events */
    case PN_CONNECTION_LOCAL_OPEN:
    case PN_CONNECTION_BOUND:
+   case PN_CONNECTION_INIT:
     return H_CONTINUE;
 
     /* Act on these events */
-   case PN_CONNECTION_INIT:
-    pn_connection_open(pn_event_connection(e));
+   case PN_LISTENER_ACCEPT: {
+    pn_connection_t *accepted = pn_connection();
+    pn_connection_open(accepted);
+    pn_listener_accept(l, accepted); /* Listener takes ownership of accepted */
     return H_CONTINUE;
+   }
+
    case PN_CONNECTION_REMOTE_OPEN:
-    pn_connection_close(pn_event_connection(e));
+    if (pn_connection_state(c) | PN_LOCAL_ACTIVE) { /* Client is fully open - the test is done */
+      pn_connection_close(c);
+      return H_FINISHED;
+    }  else {                   /* Server returns the open */
+      pn_connection_open(c);
+    }
+
+   case PN_CONNECTION_REMOTE_CLOSE:
+    if (pn_connection_state(c) | PN_LOCAL_ACTIVE) {
+      pn_connection_close(c);    /* Return the close */
+    }
     return H_FINISHED;
 
-    /* Unexpected events */
    default:
     TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
     return H_FAILED;
@@ -171,38 +167,18 @@ handler_state_t listen_connect_client(test_t *t, pn_event_t *e) {
   }
 }
 
-/* Simplest client/server interaction */
-static void test_listen_connect(test_t *t) {
-  proactor_test_t pts[] =  { { t, listen_connect_client }, { t, listen_connect_server } };
-  proactor_test_t *client = &pts[0], *server = &pts[1];
-  proactor_test_init(pts, 2);
-
-  sock_t sock = sock_bind0();          /* Hold a port */
-  char port_str[16];
-  snprintf(port_str, sizeof(port_str), "%d", sock_port(sock));
-  pn_proactor_listen(server->proactor, pn_listener(), localhost, port_str, 4);
-  pn_event_type_t etype = wait_for(server->proactor, PN_LISTENER_OPEN);
-  if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
-    pn_proactor_connect(client->proactor, pn_connection(), localhost, port_str);
-    proactor_test_run(pts, 2);
-  }
-  sock_close(sock);
-  pn_proactor_free(client->proactor);
-  pn_proactor_free(server->proactor);
-}
-
-/* Test error handling */
-static void test_listen_connect_error(test_t *t) {
+/* Test bad-address error handling for listen and connect */
+static void test_early_error(test_t *t) {
   pn_proactor_t *p = pn_proactor();
   pn_proactor_set_timeout(p, timeout); /* In case of hang */
   pn_connection_t *c = pn_connection();
-  pn_proactor_connect(p, c, "nosuchost", "nosuchport");
+  pn_proactor_connect(p, c, "badhost", "amqp");
   pn_event_type_t etype = wait_for(p, PN_TRANSPORT_CLOSED);
   TEST_CHECK(t, PN_TRANSPORT_CLOSED == etype, pn_event_type_name(etype));
   TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))), "");
 
   pn_listener_t *l = pn_listener();
-  pn_proactor_listen(p, l, "nosuchost", "nosuchport", 1);
+  pn_proactor_listen(p, l, "badhost", "amqp", 1);
   etype = wait_for(p, PN_LISTENER_CLOSE);
   TEST_CHECK(t, PN_LISTENER_CLOSE == etype, pn_event_type_name(etype));
   TEST_CHECK(t, pn_condition_is_set(pn_listener_condition(l)), "");
@@ -210,12 +186,51 @@ static void test_listen_connect_error(test_t *t) {
   pn_proactor_free(p);
 }
 
+/* Simplest client/server interaction with 2 proactors */
+static void test_listen_connect(test_t *t) {
+  proactor_test_t pts[] =  { { t, listen_connect_handler }, { t, listen_connect_handler } };
+  proactor_test_init(pts, 2);
+  pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+  test_port_t port = test_port();          /* Hold a port */
+
+  pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
+  pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
+  if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
+    sock_close(port.sock);
+    pn_proactor_connect(client, pn_connection(), localhost, port.str);
+    proactor_test_run(pts, 2);
+  }
+  pn_proactor_free(client);
+  pn_proactor_free(server);
+}
+
+/* Test that INACTIVE event is generated when last connections/listeners closes. */
+static void test_inactive(test_t *t) {
+  proactor_test_t pts[] =  { { t, listen_connect_handler }, { t, listen_connect_handler }};
+  proactor_test_init(pts, 2);
+  pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+  test_port_t port = test_port();          /* Hold a port */
+
+  pn_listener_t *l = pn_listener();
+  pn_proactor_listen(server, l, localhost, port.str,  4);
+  pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
+  if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
+    sock_close(port.sock);
+    pn_proactor_connect(client, pn_connection(), localhost, port.str);
+    proactor_test_run(pts, 2);
+    etype = wait_for(client, PN_PROACTOR_INACTIVE);
+    pn_listener_close(l);
+    etype = wait_for(server, PN_PROACTOR_INACTIVE);
+  }
+  pn_proactor_free(client);
+  pn_proactor_free(server);
+}
+
 int main(int argv, char** argc) {
   int failed = 0;
-  if (0) {
-    RUN_TEST(failed, t, test_interrupt_timeout(&t));
-    RUN_TEST(failed, t, test_listen_connect_error(&t));
-  }
+  RUN_TEST(failed, t, test_inactive(&t));
+  RUN_TEST(failed, t, test_interrupt_timeout(&t));
+  RUN_TEST(failed, t, test_early_error(&t));
   RUN_TEST(failed, t, test_listen_connect(&t));
   return failed;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/204c8474/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 2619337..2663dcf 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -153,4 +153,18 @@ static int sock_port(sock_t sock) {
   return ntohs(port);
 }
 
+typedef struct test_port_t {
+  sock_t sock;
+  int port;
+  char str[256];
+} test_port_t;
+
+static inline test_port_t test_port(void) {
+  test_port_t tp = {0};
+  tp.sock = sock_bind0();
+  tp.port = sock_port(tp.sock);
+  snprintf(tp.str, sizeof(tp.str), "%d", tp.port);
+  return tp;
+}
+
 #endif // TESTS_TEST_TOOLS_H


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org