You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/03/29 22:22:52 UTC

[1/3] qpid-proton git commit: PROTON-1437: c proactor address info

Repository: qpid-proton
Updated Branches:
  refs/heads/master 2d9c08699 -> edebc4ecf


PROTON-1437: c proactor address info

Provides actual address information for both ends of a proactor-managed
connection using pn_proactor_addr_* functions.

- pn_proactor_addr_* functions are clearly identified as part of proactor lib
- portable print local/remote address as string with no platform-specific headers
- POSIX/windows can test pn_proactor_addr_is_sockaddr() to use native sockaddr API

This can be extended safely to non-sockaddr platforms by making
pn_proactor_addr_is_sockaddr() return false and adding pn_proactor_addr_is_foo()
to indicate the underlying address type


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/7a68a2c8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/7a68a2c8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/7a68a2c8

Branch: refs/heads/master
Commit: 7a68a2c836e242ebe27ed365654a0518d75ecd3a
Parents: 2d9c086
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Mar 29 10:54:35 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Mar 29 13:14:51 2017 -0400

----------------------------------------------------------------------
 proton-c/include/proton/proactor.h | 34 ++++++++++++++++++
 proton-c/src/proactor/libuv.c      | 41 ++++++++++++++++++++-
 proton-c/src/tests/proactor.c      | 64 ++++++++++++++++++++++++++++++---
 proton-c/src/tests/test_tools.h    |  5 +--
 4 files changed, 137 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7a68a2c8/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 974b432..b77feff 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -50,6 +50,11 @@ extern "C" {
  */
 
 /**
+ * Stores a network address in native format.
+ */
+typedef struct pn_proactor_addr_t pn_proactor_addr_t;
+
+/**
  * Create a proactor. Must be freed with pn_proactor_free()
  */
 PNP_EXTERN pn_proactor_t *pn_proactor(void);
@@ -199,6 +204,35 @@ PNP_EXTERN pn_proactor_t *pn_connection_proactor(pn_connection_t *connection);
 PNP_EXTERN pn_proactor_t *pn_event_proactor(pn_event_t *event);
 
 /**
+ * Format an address as a string in buf, with trailing NUL.
+ *
+ * @return the length of the addresss string.
+ * If the return value is >= len then the address was truncated to len-1 bytes.
+ * A return value of 0 means the address was invalid or NULL.
+ */
+PNP_EXTERN size_t pn_proactor_addr_str(char *buf, size_t len, pn_proactor_addr_t* addr);
+
+/**
+ * Get the local address of a transport.
+ *
+ * @return NULL if the transport is not connected or the address is not available. 
+ */
+PNP_EXTERN pn_proactor_addr_t *pn_proactor_addr_local(pn_transport_t* c);
+
+/**
+ * Get the remote address of a transport.
+ *
+ * @return NULL if the transport is not connected or the address is not available. 
+ */
+PNP_EXTERN pn_proactor_addr_t *pn_proactor_addr_remote(pn_transport_t* c);
+
+/**
+ * If the underlying implementation uses `struct sockaddr` (for example POSIX or Windows
+ * sockets) return a pointer, otherwise return NULL.
+ */
+PNP_EXTERN struct sockaddr *pn_proactor_addr_sockaddr(pn_proactor_addr_t *addr);
+
+/**
  * @}
  */
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7a68a2c8/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index aa10f83..266db98 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -170,6 +170,7 @@ typedef struct pconnection_t {
 
   lsocket_t *lsocket;         /* Incoming connection only */
 
+  struct sockaddr_storage local, remote; /* Actual addresses */
   uv_timer_t timer;
   uv_write_t write;
   size_t writing;               /* size of pending write request, 0 if none pending */
@@ -499,11 +500,20 @@ static void on_connect_fail(uv_handle_t *handle) {
   }
 }
 
+static void pconnection_addresses(pconnection_t *pc) {
+  int len;
+  len = sizeof(pc->local);
+  uv_tcp_getsockname(&pc->tcp, (struct sockaddr*)&pc->local, &len);
+  len = sizeof(pc->remote);
+  uv_tcp_getpeername(&pc->tcp, (struct sockaddr*)&pc->remote, &len);
+}
+
 /* Outgoing connection */
 static void on_connect(uv_connect_t *connect, int err) {
   pconnection_t *pc = (pconnection_t*)connect->data;
   if (!err) {
     pc->connected = 1;
+    pconnection_addresses(pc);
     work_notify(&pc->work);
     uv_freeaddrinfo(pc->addr.getaddrinfo.addrinfo); /* Done with address info */
     pc->addr.getaddrinfo.addrinfo = NULL;
@@ -658,8 +668,9 @@ static bool leader_process_listener(pn_listener_t *l) {
   /* Process accepted connections */
   for (pconnection_t *pc = pconnection_pop(&l->accept); pc; pc = pconnection_pop(&l->accept)) {
     int err = pconnection_init(pc);
+    if (!err) err = uv_accept((uv_stream_t*)&pc->lsocket->tcp, (uv_stream_t*)&pc->tcp);
     if (!err) {
-      err = uv_accept((uv_stream_t*)&pc->lsocket->tcp, (uv_stream_t*)&pc->tcp);
+      pconnection_addresses(pc);
     } else {
       listener_error(l, err, "accepting from");
       pconnection_error(pc, err, "accepting from");
@@ -1199,3 +1210,31 @@ int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
   work_notify(&l->work);
   return 0;
 }
+
+struct sockaddr *pn_proactor_addr_sockaddr(pn_proactor_addr_t *addr) {
+  return (struct sockaddr*)addr;
+}
+
+struct pn_proactor_addr_t *pn_proactor_addr_local(pn_transport_t *t) {
+  pconnection_t *pc = get_pconnection(pn_transport_connection(t));
+  return pc ? (pn_proactor_addr_t*)&pc->local : NULL;
+}
+
+struct pn_proactor_addr_t *pn_proactor_addr_remote(pn_transport_t *t) {
+  pconnection_t *pc = get_pconnection(pn_transport_connection(t));
+  return pc ? (pn_proactor_addr_t*)&pc->remote : NULL;
+}
+
+size_t pn_proactor_addr_str(char *buf, size_t len, struct pn_proactor_addr_t* addr) {
+  struct sockaddr_storage *sa = (struct sockaddr_storage*)addr;
+  char host[NI_MAXHOST];
+  char port[NI_MAXSERV];
+  int err = getnameinfo((struct sockaddr *)sa, sizeof(*sa), host, sizeof(host), port, sizeof(port),
+                        NI_NUMERICHOST | NI_NUMERICSERV);
+  if (!err) {
+    return snprintf(buf, len, "%s:%s", host, port); /* FIXME aconway 2017-03-29: ipv6 format? */
+  } else {
+    if (buf) *buf = '\0';
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7a68a2c8/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 80eeb9a..e87774e 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -159,6 +159,9 @@ static void test_interrupt_timeout(test_t *t) {
   pn_proactor_free(p);
 }
 
+/* Save the last connection accepted by the common_handler */
+pn_connection_t *last_accepted = NULL;
+
 /* Common handler for simple client/server interactions,  */
 static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
   pn_connection_t *c = pn_event_connection(e);
@@ -176,13 +179,17 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
     return PN_TRANSPORT_CLOSED;
 
     /* Stop on these events */
-   case PN_LISTENER_OPEN:
    case PN_PROACTOR_INACTIVE:
    case PN_PROACTOR_TIMEOUT:
     return pn_event_type(e);
 
+   case PN_LISTENER_OPEN:
+    last_accepted = NULL;
+    return pn_event_type(e);
+
    case PN_LISTENER_ACCEPT:
-    pn_listener_accept(l, pn_connection());
+    last_accepted = pn_connection();
+    pn_listener_accept(l, last_accepted);
     pn_listener_close(l);       /* Only accept one connection */
     return PN_EVENT_NONE;
 
@@ -215,8 +222,9 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
 static pn_event_type_t listen_handler(test_t *t, pn_event_t *e) {
   switch (pn_event_type(e)) {
    case PN_LISTENER_ACCEPT:
-    /* No automatic listener close/free for the inactive test */
-    pn_listener_accept(pn_event_listener(e), pn_connection());
+    /* No automatic listener close/free for tests that accept multiple connections */
+    last_accepted = pn_connection();
+    pn_listener_accept(pn_event_listener(e), last_accepted);
     return PN_EVENT_NONE;
 
    case PN_LISTENER_CLOSE:
@@ -530,6 +538,53 @@ static void test_ssl(test_t *t) {
   PROACTOR_TEST_FREE(pts);
 }
 
+/* Test pn_proactor_addr funtions */
+static void test_addr(test_t *t) {
+  /* Make sure NULL addr gives empty string */
+  char str[1024] = "not-empty";
+  pn_proactor_addr_str(str, sizeof(str), NULL);
+  TEST_STR_EQUAL(t, "", str);
+
+  proactor_test_t pts[] ={ { open_wake_handler }, { listen_handler } };
+  PROACTOR_TEST_INIT(pts, t);
+  pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+  test_port_t port = test_port(localhost);
+  pn_listener_t *l = pn_listener();
+  pn_proactor_listen(server, l, port.host_port, 4);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  pn_connection_t *c = pn_connection();
+  pn_proactor_connect(client, c, port.host_port);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+
+  /* client remote, client local, server remote and server local address strings */
+  char cr[1024], cl[1024], sr[1024], sl[1024];
+
+  pn_transport_t *ct = pn_connection_transport(c);
+  pn_proactor_addr_str(cr, sizeof(cr), pn_proactor_addr_remote(ct));
+  TEST_STR_IN(t, test_port_use_host(&port, ""), cr); /* remote address has listening port */
+
+  pn_connection_t *s = last_accepted; /* server side of the connection */
+  pn_transport_t *st = pn_connection_transport(s);
+  if (!TEST_CHECK(t, st)) return;
+  pn_proactor_addr_str(sl, sizeof(sl), pn_proactor_addr_local(st));
+  TEST_STR_EQUAL(t, cr, sl);  /* client remote == server local */
+
+  pn_proactor_addr_str(cl, sizeof(cl), pn_proactor_addr_local(ct));
+  pn_proactor_addr_str(sr, sizeof(sr), pn_proactor_addr_remote(st));
+  TEST_STR_EQUAL(t, cl, sr);    /* client local == server remote */
+
+  /* Make sure you can use NULL, 0 to get length of address string without a crash */
+  size_t len = pn_proactor_addr_str(NULL, 0, pn_proactor_addr_local(ct));
+  TEST_CHECK(t, strlen(cl) == len);
+
+  sock_close(port.sock);
+  PROACTOR_TEST_DRAIN(pts);
+  PROACTOR_TEST_FREE(pts);
+  pn_listener_free(l);
+  pn_connection_free(c);
+  pn_connection_free(s);
+}
+
 int main(int argc, char **argv) {
   int failed = 0;
   RUN_ARGV_TEST(failed, t, test_inactive(&t));
@@ -540,5 +595,6 @@ int main(int argc, char **argv) {
   RUN_ARGV_TEST(failed, t, test_ipv4_ipv6(&t));
   RUN_ARGV_TEST(failed, t, test_free_cleanup(&t));
   RUN_ARGV_TEST(failed, t, test_ssl(&t));
+  RUN_ARGV_TEST(failed, t, test_addr(&t));
   return failed;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7a68a2c8/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 0c913ff..5404380 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -131,10 +131,11 @@ static inline bool test_etype_equal_(test_t *t, int want, int got, const char *f
 }
 
 #define TEST_STR_EQUAL(TEST, WANT, GOT) \
-  TEST_CHECKF((TEST), !strcmp((WANT), (GOT)), " got '%s'", (GOT))
+  test_check_((TEST), !strcmp((WANT), (GOT)), NULL, __FILE__, __LINE__, "want '%s', got '%s'", (WANT), (GOT))
+
 
 #define TEST_STR_IN(TEST, WANT, GOT) \
-  TEST_CHECKF((TEST), strstr((GOT), (WANT)), " got '%s'", (GOT))
+  test_check_((TEST), strstr((GOT), (WANT)), NULL, __FILE__, __LINE__, "'%s' not in '%s'", (WANT), (GOT))
 
 #define TEST_ETYPE_EQUAL(TEST, WANT, GOT) \
   test_etype_equal_((TEST), (WANT), (GOT), __FILE__, __LINE__)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-proton git commit: PROTON-1452: Test for closed transport without side effects

Posted by ac...@apache.org.
PROTON-1452: Test for closed transport without side effects

"closed" tests based on pn_transport_pending/available have side effects: may
generate events or modify read/write buffer pointers, which makes using those
tests very sensitive to ordering.

New pn_transport_(head|tail)_closed have no side effects.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/893cb001
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/893cb001
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/893cb001

Branch: refs/heads/master
Commit: 893cb00161d29e2e29c7e7304177f3d92418f405
Parents: 7a68a2c
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Mar 29 16:42:31 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Mar 29 17:58:56 2017 -0400

----------------------------------------------------------------------
 proton-c/include/proton/transport.h   | 19 +++++++++++--------
 proton-c/src/core/connection_driver.c |  4 ++--
 proton-c/src/core/transport.c         | 12 ++++++------
 3 files changed, 19 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/893cb001/proton-c/include/proton/transport.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/transport.h b/proton-c/include/proton/transport.h
index 09fbb03..89b6fad 100644
--- a/proton-c/include/proton/transport.h
+++ b/proton-c/include/proton/transport.h
@@ -610,14 +610,17 @@ PN_EXTERN int pn_transport_close_head(pn_transport_t *transport);
 PN_EXTERN bool pn_transport_quiesced(pn_transport_t *transport);
 
 /**
- * Check if a transport is closed.
- *
- * A transport is defined to be closed when both the tail and the head
- * are closed. In other words, when both ::pn_transport_capacity() < 0
- * and ::pn_transport_pending() < 0.
- *
- * @param[in] transport a transport object
- * @return true if the transport is closed, false otherwise
+ * True if pn_transport_close_head() has been called.
+ */
+PN_EXTERN bool pn_transport_head_closed(pn_transport_t *transport);
+
+/**
+ * True if pn_transport_close_tail() has been called.
+ */
+PN_EXTERN bool pn_transport_tail_closed(pn_transport_t *transport);
+
+/**
+ * Equivalent to pn_transport_head_closed(transport) && pn_transport_tail_closed(transport)
  */
 PN_EXTERN bool pn_transport_closed(pn_transport_t *transport);
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/893cb001/proton-c/src/core/connection_driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c
index f5fddae..40af749 100644
--- a/proton-c/src/core/connection_driver.c
+++ b/proton-c/src/core/connection_driver.c
@@ -90,7 +90,7 @@ void pn_connection_driver_read_done(pn_connection_driver_t *d, size_t n) {
 }
 
 bool pn_connection_driver_read_closed(pn_connection_driver_t *d) {
-  return pn_transport_capacity(d->transport) < 0;
+  return pn_transport_tail_closed(d->transport);
 }
 
 void pn_connection_driver_read_close(pn_connection_driver_t *d) {
@@ -111,7 +111,7 @@ void pn_connection_driver_write_done(pn_connection_driver_t *d, size_t n) {
 }
 
 bool pn_connection_driver_write_closed(pn_connection_driver_t *d) {
-  return pn_transport_pending(d->transport) < 0;
+  return pn_transport_head_closed(d->transport);
 }
 
 void pn_connection_driver_write_close(pn_connection_driver_t *d) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/893cb001/proton-c/src/core/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/transport.c b/proton-c/src/core/transport.c
index 444145a..5065663 100644
--- a/proton-c/src/core/transport.c
+++ b/proton-c/src/core/transport.c
@@ -3009,12 +3009,12 @@ bool pn_transport_quiesced(pn_transport_t *transport)
   return true;
 }
 
-bool pn_transport_closed(pn_transport_t *transport)
-{
-  assert(transport);
-  ssize_t capacity = pn_transport_capacity(transport);
-  ssize_t pending = pn_transport_pending(transport);
-  return capacity < 0 && pending < 0;
+bool pn_transport_head_closed(pn_transport_t *transport) { return transport->head_closed; }
+
+bool pn_transport_tail_closed(pn_transport_t *transport) { return transport->tail_closed; }
+
+bool pn_transport_closed(pn_transport_t *transport) {
+  return transport->head_closed && transport->tail_closed;
 }
 
 pn_connection_t *pn_transport_connection(pn_transport_t *transport)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-proton git commit: PROTON-1452: Add pn_proactor_disconnect

Posted by ac...@apache.org.
PROTON-1452: Add pn_proactor_disconnect

/**
 * Disconnect all connections and listeners currently active in the proactor.
 *
 * PN_LISTENER_CLOSE, PN_TRANSPORT_CLOSED and other events are generated as usual.
 * If no new listeners or connections are created, then a PN_PROACTOR_INACTIVE event
 * will be generated when all connections and listeners are disconnected.
 *
 * Note the proactor remains active, connections and listeners created after a call to
 * pn_proactor_disconnect() are not affected by it.
 *
 * @param condition if not NULL the condition data is copied to the transports and listeners.
 */
PNP_EXTERN void pn_proactor_disconnect(pn_proactor_t *proactor, pn_condition_t *condition);


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/edebc4ec
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/edebc4ec
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/edebc4ec

Branch: refs/heads/master
Commit: edebc4ecf693514fe3becd64bbeb1c8361b31e62
Parents: 893cb00
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Mar 29 17:58:00 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Mar 29 18:15:58 2017 -0400

----------------------------------------------------------------------
 proton-c/include/proton/proactor.h |  14 ++++
 proton-c/src/proactor/libuv.c      |  87 ++++++++++++++++----
 proton-c/src/tests/proactor.c      | 137 +++++++++++++++++++++++++-------
 3 files changed, 195 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/edebc4ec/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index b77feff..d13c6d6 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -60,6 +60,20 @@ typedef struct pn_proactor_addr_t pn_proactor_addr_t;
 PNP_EXTERN pn_proactor_t *pn_proactor(void);
 
 /**
+ * Disconnect all connections and listeners currently active in the proactor.
+ *
+ * PN_LISTENER_CLOSE, PN_TRANSPORT_CLOSED and other events are generated as usual.
+ * If no new listeners or connections are created, then a PN_PROACTOR_INACTIVE event
+ * will be generated when all connections and listeners are disconnected.
+ *
+ * Note the proactor remains active, connections and listeners created after a call to
+ * pn_proactor_disconnect() are not affected by it.
+ *
+ * @param condition if not NULL the condition data is copied to the transports and listeners.
+ */
+PNP_EXTERN void pn_proactor_disconnect(pn_proactor_t *proactor, pn_condition_t *condition);
+
+/**
  * Free the proactor. Abort any open network connections and clean up all
  * associated resources.
  */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/edebc4ec/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 266db98..2f5e369 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -240,6 +240,8 @@ struct pn_proactor_t {
   timeout_state_t timeout_state;
   pn_millis_t timeout;
   size_t count;               /* connection/listener count for INACTIVE events */
+  pn_condition_t *disconnect_cond; /* disconnect condition */
+  bool disconnect;            /* disconnect requested */
   bool inactive;
   bool has_leader;
   bool batch_working;         /* batch is being processed in a worker thread */
@@ -906,6 +908,34 @@ void pconnection_detach(pconnection_t *pc) {
   }
 }
 
+static void on_proactor_disconnect(uv_handle_t* h, void* v) {
+  if (h->type == UV_TCP) {
+    switch (*(struct_type*)h->data) {
+     case T_CONNECTION: {
+       pconnection_t *pc = (pconnection_t*)h->data;
+       pn_condition_t *cond = pc->work.proactor->disconnect_cond;
+       if (cond) {
+         pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
+       }
+       pn_connection_driver_close(&pc->driver);
+       work_notify(&pc->work);
+       break;
+     }
+     case T_LSOCKET: {
+       pn_listener_t *l = ((lsocket_t*)h->data)->parent;
+       pn_condition_t *cond = l->work.proactor->disconnect_cond;
+       if (cond) {
+         pn_condition_copy(pn_listener_condition(l), cond);
+       }
+       pn_listener_close(l);
+       break;
+     }
+     default:
+      break;
+    }
+  }
+}
+
 /* Process the leader_q and the UV loop, in the leader thread */
 static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
   /* Set timeout timer if there was a request, let it count down while we process work */
@@ -914,6 +944,13 @@ static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
     uv_timer_stop(&p->timer);
     uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
   }
+  /* If disconnect was requested, walk the socket list */
+  if (p->disconnect) {
+    p->disconnect = false;
+    uv_mutex_unlock(&p->lock);
+    uv_walk(&p->loop, on_proactor_disconnect, NULL);
+    uv_mutex_lock(&p->lock);
+  }
   pn_event_batch_t *batch = NULL;
   for (work_t *w = work_pop(&p->leader_q); w; w = work_pop(&p->leader_q)) {
     assert(!w->working);
@@ -1035,6 +1072,20 @@ void pn_proactor_interrupt(pn_proactor_t *p) {
   notify(p);
 }
 
+void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
+  uv_mutex_lock(&p->lock);
+  if (!p->disconnect) {
+    p->disconnect = true;
+    if (cond) {
+      pn_condition_copy(p->disconnect_cond, cond);
+    } else {
+      pn_condition_clear(p->disconnect_cond);
+    }
+    notify(p);
+  }
+  uv_mutex_unlock(&p->lock);
+}
+
 void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
   uv_mutex_lock(&p->lock);
   p->timeout = t;
@@ -1045,9 +1096,11 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
 
 void pn_proactor_cancel_timeout(pn_proactor_t *p) {
   uv_mutex_lock(&p->lock);
-  p->timeout_state = TM_NONE;
+  if (p->timeout_state != TM_NONE) {
+    p->timeout_state = TM_NONE;
+    notify(p);
+  }
   uv_mutex_unlock(&p->lock);
-  notify(p);
 }
 
 int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
@@ -1071,20 +1124,6 @@ int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int
   return 0;
 }
 
-pn_proactor_t *pn_proactor() {
-  pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(pn_proactor_t));
-  p->collector = pn_collector();
-  p->batch.next_event = &proactor_batch_next;
-  if (!p->collector) return NULL;
-  uv_loop_init(&p->loop);
-  uv_mutex_init(&p->lock);
-  uv_cond_init(&p->cond);
-  uv_async_init(&p->loop, &p->async, NULL);
-  uv_timer_init(&p->loop, &p->timer);
-  p->timer.data = p;
-  return p;
-}
-
 static void on_proactor_free(uv_handle_t* h, void* v) {
   uv_safe_close(h, NULL);       /* Close the handle */
   if (h->type == UV_TCP) {      /* Put the corresponding work item on the leader_q for cleanup */
@@ -1108,6 +1147,21 @@ static void work_free(work_t *w) {
   }
 }
 
+pn_proactor_t *pn_proactor() {
+  pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(pn_proactor_t));
+  p->collector = pn_collector();
+  p->batch.next_event = &proactor_batch_next;
+  if (!p->collector) return NULL;
+  uv_loop_init(&p->loop);
+  uv_mutex_init(&p->lock);
+  uv_cond_init(&p->cond);
+  uv_async_init(&p->loop, &p->async, NULL);
+  uv_timer_init(&p->loop, &p->timer);
+  p->timer.data = p;
+  p->disconnect_cond = pn_condition();
+  return p;
+}
+
 void pn_proactor_free(pn_proactor_t *p) {
   /* Close all open handles */
   uv_walk(&p->loop, on_proactor_free, NULL);
@@ -1125,6 +1179,7 @@ void pn_proactor_free(pn_proactor_t *p) {
   uv_mutex_destroy(&p->lock);
   uv_cond_destroy(&p->cond);
   pn_collector_free(p->collector);
+  pn_condition_free(p->disconnect_cond);
   free(p);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/edebc4ec/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index e87774e..d22958c 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -32,13 +32,10 @@
 
 static pn_millis_t timeout = 7*1000; /* timeout for hanging tests */
 
-static const char *localhost = "127.0.0.1"; /* host for connect/listen */
+static const char *localhost = ""; /* host for connect/listen */
 
 typedef pn_event_type_t (*test_handler_fn)(test_t *, pn_event_t*);
 
-/* Save the last condition description of a handled event here  */
-char last_condition[1024] = {0};
-
 /* Proactor and handler that take part in a test */
 typedef struct proactor_test_t {
   test_handler_fn handler;
@@ -66,18 +63,22 @@ static void proactor_test_free(proactor_test_t *pts, size_t n) {
 
 #define PROACTOR_TEST_FREE(A) proactor_test_free(A, sizeof(A)/sizeof(*A))
 
+/* Set this to a pn_condition() to save condition data */
+pn_condition_t *last_condition = NULL;
+
 static void save_condition(pn_event_t *e) {
-  /* TODO aconway 2017-03-23: extend pn_event_condition to include listener */
-  last_condition[0] = '\0';
-  pn_condition_t *cond = NULL;
-  if (pn_event_listener(e)) {
-    cond = pn_listener_condition(pn_event_listener(e));
-  } else {
-    cond = pn_event_condition(e);
-  }
-  if (cond && pn_condition_is_set(cond)) {
-    const char *desc = pn_condition_get_description(cond);
-    strncpy(last_condition, desc, sizeof(last_condition));
+  if (last_condition) {
+    pn_condition_t *cond = NULL;
+    if (pn_event_listener(e)) {
+      cond = pn_listener_condition(pn_event_listener(e));
+    } else {
+      cond = pn_event_condition(e);
+    }
+    if (cond) {
+      pn_condition_copy(last_condition, cond);
+    } else {
+      pn_condition_clear(last_condition);
+    }
   }
 }
 
@@ -85,6 +86,7 @@ static void save_condition(pn_event_t *e) {
  * all proactors return NULL
  */
 static pn_event_type_t proactor_test_get(proactor_test_t *pts, size_t n) {
+  if (last_condition) pn_condition_clear(last_condition);
   while (true) {
     bool busy = false;
     for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
@@ -343,14 +345,14 @@ static void test_errors(test_t *t) {
   pn_connection_t *c = pn_connection();
   pn_proactor_connect(client, c, "127.0.0.1:xxx");
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_STR_IN(t, "xxx", last_condition);
+  TEST_STR_IN(t, "xxx", pn_condition_get_description(last_condition));
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   pn_listener_t *l = pn_listener();
   pn_proactor_listen(server, l, "127.0.0.1:xxx", 1);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
-  TEST_STR_IN(t, "xxx", last_condition);
+  TEST_STR_IN(t, "xxx", pn_condition_get_description(last_condition));
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
   pn_listener_free(l);
 
@@ -358,7 +360,7 @@ static void test_errors(test_t *t) {
   c = pn_connection();
   pn_proactor_connect(client, c, port.host_port);
   if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts))) {
-    TEST_STR_IN(t, "connection refused", last_condition);
+    TEST_STR_IN(t, "connection refused", pn_condition_get_description(last_condition));
     TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
     sock_close(port.sock);
     PROACTOR_TEST_FREE(pts);
@@ -380,7 +382,7 @@ static void test_ipv4_ipv6(test_t *t) {
   pn_event_type_t e = PROACTOR_TEST_GET(pts);
   bool has_ipv6 = (e != PN_LISTENER_CLOSE);
   if (!has_ipv6) {
-    TEST_LOGF(t, "skip IPv6 tests: %s", last_condition);
+    TEST_LOGF(t, "skip IPv6 tests: %s", pn_condition_get_description(last_condition));
   }
   PROACTOR_TEST_DRAIN(pts);
 
@@ -390,7 +392,7 @@ static void test_ipv4_ipv6(test_t *t) {
   pn_proactor_listen(server, l4, port4.host_port, 4);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port4.sock);
-  TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s",  last_condition);
+  TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s",  pn_condition_get_description(last_condition));
   PROACTOR_TEST_DRAIN(pts);
 
   /* Empty address listens on both IPv4 and IPv6 on all interfaces */
@@ -400,20 +402,20 @@ static void test_ipv4_ipv6(test_t *t) {
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port.sock);
   e = PROACTOR_TEST_GET(pts);
-  TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s",  last_condition);
+  TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s",  pn_condition_get_description(last_condition));
   PROACTOR_TEST_DRAIN(pts);
 
 #define EXPECT_CONNECT(TP, HOST) do {                                   \
     pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST))); \
-    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));    \
-    TEST_STR_EQUAL(t, "", last_condition);                              \
+    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));   \
+    TEST_CHECK(t, !pn_condition_is_set(last_condition));                 \
     PROACTOR_TEST_DRAIN(pts);                                           \
   } while(0)
 
 #define EXPECT_FAIL(TP, HOST) do {                                      \
     pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST))); \
-    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));    \
-    TEST_STR_IN(t, "refused", last_condition);                          \
+    TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));   \
+    TEST_STR_IN(t, "refused", pn_condition_get_description(last_condition)); \
     PROACTOR_TEST_DRAIN(pts);                                           \
   } while(0)
 
@@ -529,9 +531,9 @@ static void test_ssl(test_t *t) {
   pn_proactor_connect(client, c, port.host_port);
   /* Open ok at both ends */
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
-  TEST_STR_EQUAL(t,"", last_condition);
+  TEST_CHECK(t, !pn_condition_is_set(last_condition));
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
-  TEST_STR_EQUAL(t, "", last_condition);
+  TEST_CHECK(t, !pn_condition_is_set(last_condition));
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
 
@@ -585,8 +587,87 @@ static void test_addr(test_t *t) {
   pn_connection_free(s);
 }
 
+/* Test simple client/server connection with 2 proactors */
+static void test_disconnect(test_t *t) {
+  proactor_test_t pts[] ={ { open_wake_handler }, { listen_handler } };
+  PROACTOR_TEST_INIT(pts, t);
+  pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+
+  test_port_t port = test_port(localhost);
+  pn_listener_t* l = pn_listener();
+  pn_proactor_listen(server, l, port.host_port, 4);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  sock_close(port.sock);
+
+  test_port_t port2 = test_port(localhost);
+  pn_listener_t* l2 = pn_listener();
+  pn_proactor_listen(server, l2, port2.host_port, 4);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  sock_close(port2.sock);
+
+  /* We will disconnect one connection after it is remote-open */
+  pn_proactor_connect(client, pn_connection(), port.host_port);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+  pn_proactor_connect(client, pn_connection(), port2.host_port);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+
+  pn_condition_t *cond = pn_condition();
+  pn_condition_set_name(cond, "test-name");
+  pn_condition_set_description(cond, "test-description");
+
+  pn_proactor_disconnect(client, cond);
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_STR_EQUAL(t, "test-name", pn_condition_get_name(last_condition));
+  /* Note: pn_transport adds "(connection aborted)" on client side if transport closed early. */
+  TEST_STR_EQUAL(t, "test-description (connection aborted)", pn_condition_get_description(last_condition));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+  pn_proactor_disconnect(server, cond);
+  int expect_tclose = 2, expect_lclose = 2;
+  while (expect_tclose || expect_lclose) {
+    pn_event_type_t et = PROACTOR_TEST_RUN(pts);
+    switch (et) {
+     case PN_TRANSPORT_CLOSED:
+      TEST_CHECK(t, --expect_tclose >= 0);
+        TEST_STR_EQUAL(t, "test-name", pn_condition_get_name(last_condition));
+        TEST_STR_EQUAL(t, "test-description", pn_condition_get_description(last_condition));
+      break;
+     case PN_LISTENER_CLOSE:
+      TEST_CHECK(t, --expect_lclose >= 0);
+      TEST_STR_EQUAL(t, "test-name", pn_condition_get_name(last_condition));
+      TEST_STR_EQUAL(t, "test-description", pn_condition_get_description(last_condition));
+      break;
+     default:
+      TEST_ERRORF(t, "%s unexpected: want %d TRANSPORT_CLOSED, %d LISTENER_CLOSE",
+                  pn_event_type_name(et), expect_tclose, expect_lclose);
+      expect_lclose = expect_tclose = 0;
+      continue;
+    }
+  }
+
+  pn_condition_free(cond);
+  pn_listener_free(l);
+  pn_listener_free(l2);
+
+  /* Make sure the proactors are still functional */
+  test_port_t port3 = test_port(localhost);
+  pn_listener_t* l3 = pn_listener();
+  pn_proactor_listen(server, l3, port3.host_port, 4);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  sock_close(port3.sock);
+  pn_proactor_connect(client, pn_connection(), port3.host_port);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+  pn_proactor_disconnect(client, NULL);
+
+  PROACTOR_TEST_DRAIN(pts);     /* Drain will  */
+  PROACTOR_TEST_FREE(pts);
+  pn_listener_free(l3);
+}
+
 int main(int argc, char **argv) {
   int failed = 0;
+  last_condition = pn_condition();
   RUN_ARGV_TEST(failed, t, test_inactive(&t));
   RUN_ARGV_TEST(failed, t, test_interrupt_timeout(&t));
   RUN_ARGV_TEST(failed, t, test_errors(&t));
@@ -596,5 +677,7 @@ int main(int argc, char **argv) {
   RUN_ARGV_TEST(failed, t, test_free_cleanup(&t));
   RUN_ARGV_TEST(failed, t, test_ssl(&t));
   RUN_ARGV_TEST(failed, t, test_addr(&t));
+  RUN_ARGV_TEST(failed, t, test_disconnect(&t));
+  pn_condition_free(last_condition);
   return failed;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org