You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/03/29 22:22:52 UTC
[1/3] qpid-proton git commit: PROTON-1437: c proactor address info
Repository: qpid-proton
Updated Branches:
refs/heads/master 2d9c08699 -> edebc4ecf
PROTON-1437: c proactor address info
Provides actual address information for both ends of a proactor-managed
connection using pn_proactor_addr_* functions.
- pn_proactor_addr_* functions are clearly identified as part of proactor lib
- portable print local/remote address as string with no platform-specific headers
- POSIX/windows can test pn_proactor_addr_is_sockaddr() to use native sockaddr API
This can be extended safely to non-sockaddr platforms by making
pn_proactor_addr_is_sockaddr() return false and adding pn_proactor_addr_is_foo()
to indicate the underlying address type
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/7a68a2c8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/7a68a2c8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/7a68a2c8
Branch: refs/heads/master
Commit: 7a68a2c836e242ebe27ed365654a0518d75ecd3a
Parents: 2d9c086
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Mar 29 10:54:35 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Mar 29 13:14:51 2017 -0400
----------------------------------------------------------------------
proton-c/include/proton/proactor.h | 34 ++++++++++++++++++
proton-c/src/proactor/libuv.c | 41 ++++++++++++++++++++-
proton-c/src/tests/proactor.c | 64 ++++++++++++++++++++++++++++++---
proton-c/src/tests/test_tools.h | 5 +--
4 files changed, 137 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7a68a2c8/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 974b432..b77feff 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -50,6 +50,11 @@ extern "C" {
*/
/**
+ * Stores a network address in native format.
+ */
+typedef struct pn_proactor_addr_t pn_proactor_addr_t;
+
+/**
* Create a proactor. Must be freed with pn_proactor_free()
*/
PNP_EXTERN pn_proactor_t *pn_proactor(void);
@@ -199,6 +204,35 @@ PNP_EXTERN pn_proactor_t *pn_connection_proactor(pn_connection_t *connection);
PNP_EXTERN pn_proactor_t *pn_event_proactor(pn_event_t *event);
/**
+ * Format an address as a string in buf, with trailing NUL.
+ *
+ * @return the length of the addresss string.
+ * If the return value is >= len then the address was truncated to len-1 bytes.
+ * A return value of 0 means the address was invalid or NULL.
+ */
+PNP_EXTERN size_t pn_proactor_addr_str(char *buf, size_t len, pn_proactor_addr_t* addr);
+
+/**
+ * Get the local address of a transport.
+ *
+ * @return NULL if the transport is not connected or the address is not available.
+ */
+PNP_EXTERN pn_proactor_addr_t *pn_proactor_addr_local(pn_transport_t* c);
+
+/**
+ * Get the remote address of a transport.
+ *
+ * @return NULL if the transport is not connected or the address is not available.
+ */
+PNP_EXTERN pn_proactor_addr_t *pn_proactor_addr_remote(pn_transport_t* c);
+
+/**
+ * If the underlying implementation uses `struct sockaddr` (for example POSIX or Windows
+ * sockets) return a pointer, otherwise return NULL.
+ */
+PNP_EXTERN struct sockaddr *pn_proactor_addr_sockaddr(pn_proactor_addr_t *addr);
+
+/**
* @}
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7a68a2c8/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index aa10f83..266db98 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -170,6 +170,7 @@ typedef struct pconnection_t {
lsocket_t *lsocket; /* Incoming connection only */
+ struct sockaddr_storage local, remote; /* Actual addresses */
uv_timer_t timer;
uv_write_t write;
size_t writing; /* size of pending write request, 0 if none pending */
@@ -499,11 +500,20 @@ static void on_connect_fail(uv_handle_t *handle) {
}
}
+static void pconnection_addresses(pconnection_t *pc) {
+ int len;
+ len = sizeof(pc->local);
+ uv_tcp_getsockname(&pc->tcp, (struct sockaddr*)&pc->local, &len);
+ len = sizeof(pc->remote);
+ uv_tcp_getpeername(&pc->tcp, (struct sockaddr*)&pc->remote, &len);
+}
+
/* Outgoing connection */
static void on_connect(uv_connect_t *connect, int err) {
pconnection_t *pc = (pconnection_t*)connect->data;
if (!err) {
pc->connected = 1;
+ pconnection_addresses(pc);
work_notify(&pc->work);
uv_freeaddrinfo(pc->addr.getaddrinfo.addrinfo); /* Done with address info */
pc->addr.getaddrinfo.addrinfo = NULL;
@@ -658,8 +668,9 @@ static bool leader_process_listener(pn_listener_t *l) {
/* Process accepted connections */
for (pconnection_t *pc = pconnection_pop(&l->accept); pc; pc = pconnection_pop(&l->accept)) {
int err = pconnection_init(pc);
+ if (!err) err = uv_accept((uv_stream_t*)&pc->lsocket->tcp, (uv_stream_t*)&pc->tcp);
if (!err) {
- err = uv_accept((uv_stream_t*)&pc->lsocket->tcp, (uv_stream_t*)&pc->tcp);
+ pconnection_addresses(pc);
} else {
listener_error(l, err, "accepting from");
pconnection_error(pc, err, "accepting from");
@@ -1199,3 +1210,31 @@ int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
work_notify(&l->work);
return 0;
}
+
+struct sockaddr *pn_proactor_addr_sockaddr(pn_proactor_addr_t *addr) {
+ return (struct sockaddr*)addr;
+}
+
+struct pn_proactor_addr_t *pn_proactor_addr_local(pn_transport_t *t) {
+ pconnection_t *pc = get_pconnection(pn_transport_connection(t));
+ return pc ? (pn_proactor_addr_t*)&pc->local : NULL;
+}
+
+struct pn_proactor_addr_t *pn_proactor_addr_remote(pn_transport_t *t) {
+ pconnection_t *pc = get_pconnection(pn_transport_connection(t));
+ return pc ? (pn_proactor_addr_t*)&pc->remote : NULL;
+}
+
+size_t pn_proactor_addr_str(char *buf, size_t len, struct pn_proactor_addr_t* addr) {
+ struct sockaddr_storage *sa = (struct sockaddr_storage*)addr;
+ char host[NI_MAXHOST];
+ char port[NI_MAXSERV];
+ int err = getnameinfo((struct sockaddr *)sa, sizeof(*sa), host, sizeof(host), port, sizeof(port),
+ NI_NUMERICHOST | NI_NUMERICSERV);
+ if (!err) {
+ return snprintf(buf, len, "%s:%s", host, port); /* FIXME aconway 2017-03-29: ipv6 format? */
+ } else {
+ if (buf) *buf = '\0';
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7a68a2c8/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 80eeb9a..e87774e 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -159,6 +159,9 @@ static void test_interrupt_timeout(test_t *t) {
pn_proactor_free(p);
}
+/* Save the last connection accepted by the common_handler */
+pn_connection_t *last_accepted = NULL;
+
/* Common handler for simple client/server interactions, */
static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
pn_connection_t *c = pn_event_connection(e);
@@ -176,13 +179,17 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
return PN_TRANSPORT_CLOSED;
/* Stop on these events */
- case PN_LISTENER_OPEN:
case PN_PROACTOR_INACTIVE:
case PN_PROACTOR_TIMEOUT:
return pn_event_type(e);
+ case PN_LISTENER_OPEN:
+ last_accepted = NULL;
+ return pn_event_type(e);
+
case PN_LISTENER_ACCEPT:
- pn_listener_accept(l, pn_connection());
+ last_accepted = pn_connection();
+ pn_listener_accept(l, last_accepted);
pn_listener_close(l); /* Only accept one connection */
return PN_EVENT_NONE;
@@ -215,8 +222,9 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
static pn_event_type_t listen_handler(test_t *t, pn_event_t *e) {
switch (pn_event_type(e)) {
case PN_LISTENER_ACCEPT:
- /* No automatic listener close/free for the inactive test */
- pn_listener_accept(pn_event_listener(e), pn_connection());
+ /* No automatic listener close/free for tests that accept multiple connections */
+ last_accepted = pn_connection();
+ pn_listener_accept(pn_event_listener(e), last_accepted);
return PN_EVENT_NONE;
case PN_LISTENER_CLOSE:
@@ -530,6 +538,53 @@ static void test_ssl(test_t *t) {
PROACTOR_TEST_FREE(pts);
}
+/* Test pn_proactor_addr funtions */
+static void test_addr(test_t *t) {
+ /* Make sure NULL addr gives empty string */
+ char str[1024] = "not-empty";
+ pn_proactor_addr_str(str, sizeof(str), NULL);
+ TEST_STR_EQUAL(t, "", str);
+
+ proactor_test_t pts[] ={ { open_wake_handler }, { listen_handler } };
+ PROACTOR_TEST_INIT(pts, t);
+ pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+ test_port_t port = test_port(localhost);
+ pn_listener_t *l = pn_listener();
+ pn_proactor_listen(server, l, port.host_port, 4);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect(client, c, port.host_port);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+
+ /* client remote, client local, server remote and server local address strings */
+ char cr[1024], cl[1024], sr[1024], sl[1024];
+
+ pn_transport_t *ct = pn_connection_transport(c);
+ pn_proactor_addr_str(cr, sizeof(cr), pn_proactor_addr_remote(ct));
+ TEST_STR_IN(t, test_port_use_host(&port, ""), cr); /* remote address has listening port */
+
+ pn_connection_t *s = last_accepted; /* server side of the connection */
+ pn_transport_t *st = pn_connection_transport(s);
+ if (!TEST_CHECK(t, st)) return;
+ pn_proactor_addr_str(sl, sizeof(sl), pn_proactor_addr_local(st));
+ TEST_STR_EQUAL(t, cr, sl); /* client remote == server local */
+
+ pn_proactor_addr_str(cl, sizeof(cl), pn_proactor_addr_local(ct));
+ pn_proactor_addr_str(sr, sizeof(sr), pn_proactor_addr_remote(st));
+ TEST_STR_EQUAL(t, cl, sr); /* client local == server remote */
+
+ /* Make sure you can use NULL, 0 to get length of address string without a crash */
+ size_t len = pn_proactor_addr_str(NULL, 0, pn_proactor_addr_local(ct));
+ TEST_CHECK(t, strlen(cl) == len);
+
+ sock_close(port.sock);
+ PROACTOR_TEST_DRAIN(pts);
+ PROACTOR_TEST_FREE(pts);
+ pn_listener_free(l);
+ pn_connection_free(c);
+ pn_connection_free(s);
+}
+
int main(int argc, char **argv) {
int failed = 0;
RUN_ARGV_TEST(failed, t, test_inactive(&t));
@@ -540,5 +595,6 @@ int main(int argc, char **argv) {
RUN_ARGV_TEST(failed, t, test_ipv4_ipv6(&t));
RUN_ARGV_TEST(failed, t, test_free_cleanup(&t));
RUN_ARGV_TEST(failed, t, test_ssl(&t));
+ RUN_ARGV_TEST(failed, t, test_addr(&t));
return failed;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7a68a2c8/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 0c913ff..5404380 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -131,10 +131,11 @@ static inline bool test_etype_equal_(test_t *t, int want, int got, const char *f
}
#define TEST_STR_EQUAL(TEST, WANT, GOT) \
- TEST_CHECKF((TEST), !strcmp((WANT), (GOT)), " got '%s'", (GOT))
+ test_check_((TEST), !strcmp((WANT), (GOT)), NULL, __FILE__, __LINE__, "want '%s', got '%s'", (WANT), (GOT))
+
#define TEST_STR_IN(TEST, WANT, GOT) \
- TEST_CHECKF((TEST), strstr((GOT), (WANT)), " got '%s'", (GOT))
+ test_check_((TEST), strstr((GOT), (WANT)), NULL, __FILE__, __LINE__, "'%s' not in '%s'", (WANT), (GOT))
#define TEST_ETYPE_EQUAL(TEST, WANT, GOT) \
test_etype_equal_((TEST), (WANT), (GOT), __FILE__, __LINE__)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-proton git commit: PROTON-1452: Test for closed transport
without side effects
Posted by ac...@apache.org.
PROTON-1452: Test for closed transport without side effects
"closed" tests based on pn_transport_pending/available have side effects: may
generate events or modify read/write buffer pointers, which makes using those
tests very sensitive to ordering.
New pn_transport_(head|tail)_closed have no side effects.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/893cb001
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/893cb001
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/893cb001
Branch: refs/heads/master
Commit: 893cb00161d29e2e29c7e7304177f3d92418f405
Parents: 7a68a2c
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Mar 29 16:42:31 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Mar 29 17:58:56 2017 -0400
----------------------------------------------------------------------
proton-c/include/proton/transport.h | 19 +++++++++++--------
proton-c/src/core/connection_driver.c | 4 ++--
proton-c/src/core/transport.c | 12 ++++++------
3 files changed, 19 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/893cb001/proton-c/include/proton/transport.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/transport.h b/proton-c/include/proton/transport.h
index 09fbb03..89b6fad 100644
--- a/proton-c/include/proton/transport.h
+++ b/proton-c/include/proton/transport.h
@@ -610,14 +610,17 @@ PN_EXTERN int pn_transport_close_head(pn_transport_t *transport);
PN_EXTERN bool pn_transport_quiesced(pn_transport_t *transport);
/**
- * Check if a transport is closed.
- *
- * A transport is defined to be closed when both the tail and the head
- * are closed. In other words, when both ::pn_transport_capacity() < 0
- * and ::pn_transport_pending() < 0.
- *
- * @param[in] transport a transport object
- * @return true if the transport is closed, false otherwise
+ * True if pn_transport_close_head() has been called.
+ */
+PN_EXTERN bool pn_transport_head_closed(pn_transport_t *transport);
+
+/**
+ * True if pn_transport_close_tail() has been called.
+ */
+PN_EXTERN bool pn_transport_tail_closed(pn_transport_t *transport);
+
+/**
+ * Equivalent to pn_transport_head_closed(transport) && pn_transport_tail_closed(transport)
*/
PN_EXTERN bool pn_transport_closed(pn_transport_t *transport);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/893cb001/proton-c/src/core/connection_driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c
index f5fddae..40af749 100644
--- a/proton-c/src/core/connection_driver.c
+++ b/proton-c/src/core/connection_driver.c
@@ -90,7 +90,7 @@ void pn_connection_driver_read_done(pn_connection_driver_t *d, size_t n) {
}
bool pn_connection_driver_read_closed(pn_connection_driver_t *d) {
- return pn_transport_capacity(d->transport) < 0;
+ return pn_transport_tail_closed(d->transport);
}
void pn_connection_driver_read_close(pn_connection_driver_t *d) {
@@ -111,7 +111,7 @@ void pn_connection_driver_write_done(pn_connection_driver_t *d, size_t n) {
}
bool pn_connection_driver_write_closed(pn_connection_driver_t *d) {
- return pn_transport_pending(d->transport) < 0;
+ return pn_transport_head_closed(d->transport);
}
void pn_connection_driver_write_close(pn_connection_driver_t *d) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/893cb001/proton-c/src/core/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/transport.c b/proton-c/src/core/transport.c
index 444145a..5065663 100644
--- a/proton-c/src/core/transport.c
+++ b/proton-c/src/core/transport.c
@@ -3009,12 +3009,12 @@ bool pn_transport_quiesced(pn_transport_t *transport)
return true;
}
-bool pn_transport_closed(pn_transport_t *transport)
-{
- assert(transport);
- ssize_t capacity = pn_transport_capacity(transport);
- ssize_t pending = pn_transport_pending(transport);
- return capacity < 0 && pending < 0;
+bool pn_transport_head_closed(pn_transport_t *transport) { return transport->head_closed; }
+
+bool pn_transport_tail_closed(pn_transport_t *transport) { return transport->tail_closed; }
+
+bool pn_transport_closed(pn_transport_t *transport) {
+ return transport->head_closed && transport->tail_closed;
}
pn_connection_t *pn_transport_connection(pn_transport_t *transport)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-proton git commit: PROTON-1452: Add pn_proactor_disconnect
Posted by ac...@apache.org.
PROTON-1452: Add pn_proactor_disconnect
/**
* Disconnect all connections and listeners currently active in the proactor.
*
* PN_LISTENER_CLOSE, PN_TRANSPORT_CLOSED and other events are generated as usual.
* If no new listeners or connections are created, then a PN_PROACTOR_INACTIVE event
* will be generated when all connections and listeners are disconnected.
*
* Note the proactor remains active, connections and listeners created after a call to
* pn_proactor_disconnect() are not affected by it.
*
* @param condition if not NULL the condition data is copied to the transports and listeners.
*/
PNP_EXTERN void pn_proactor_disconnect(pn_proactor_t *proactor, pn_condition_t *condition);
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/edebc4ec
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/edebc4ec
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/edebc4ec
Branch: refs/heads/master
Commit: edebc4ecf693514fe3becd64bbeb1c8361b31e62
Parents: 893cb00
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Mar 29 17:58:00 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Mar 29 18:15:58 2017 -0400
----------------------------------------------------------------------
proton-c/include/proton/proactor.h | 14 ++++
proton-c/src/proactor/libuv.c | 87 ++++++++++++++++----
proton-c/src/tests/proactor.c | 137 +++++++++++++++++++++++++-------
3 files changed, 195 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/edebc4ec/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index b77feff..d13c6d6 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -60,6 +60,20 @@ typedef struct pn_proactor_addr_t pn_proactor_addr_t;
PNP_EXTERN pn_proactor_t *pn_proactor(void);
/**
+ * Disconnect all connections and listeners currently active in the proactor.
+ *
+ * PN_LISTENER_CLOSE, PN_TRANSPORT_CLOSED and other events are generated as usual.
+ * If no new listeners or connections are created, then a PN_PROACTOR_INACTIVE event
+ * will be generated when all connections and listeners are disconnected.
+ *
+ * Note the proactor remains active, connections and listeners created after a call to
+ * pn_proactor_disconnect() are not affected by it.
+ *
+ * @param condition if not NULL the condition data is copied to the transports and listeners.
+ */
+PNP_EXTERN void pn_proactor_disconnect(pn_proactor_t *proactor, pn_condition_t *condition);
+
+/**
* Free the proactor. Abort any open network connections and clean up all
* associated resources.
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/edebc4ec/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 266db98..2f5e369 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -240,6 +240,8 @@ struct pn_proactor_t {
timeout_state_t timeout_state;
pn_millis_t timeout;
size_t count; /* connection/listener count for INACTIVE events */
+ pn_condition_t *disconnect_cond; /* disconnect condition */
+ bool disconnect; /* disconnect requested */
bool inactive;
bool has_leader;
bool batch_working; /* batch is being processed in a worker thread */
@@ -906,6 +908,34 @@ void pconnection_detach(pconnection_t *pc) {
}
}
+static void on_proactor_disconnect(uv_handle_t* h, void* v) {
+ if (h->type == UV_TCP) {
+ switch (*(struct_type*)h->data) {
+ case T_CONNECTION: {
+ pconnection_t *pc = (pconnection_t*)h->data;
+ pn_condition_t *cond = pc->work.proactor->disconnect_cond;
+ if (cond) {
+ pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
+ }
+ pn_connection_driver_close(&pc->driver);
+ work_notify(&pc->work);
+ break;
+ }
+ case T_LSOCKET: {
+ pn_listener_t *l = ((lsocket_t*)h->data)->parent;
+ pn_condition_t *cond = l->work.proactor->disconnect_cond;
+ if (cond) {
+ pn_condition_copy(pn_listener_condition(l), cond);
+ }
+ pn_listener_close(l);
+ break;
+ }
+ default:
+ break;
+ }
+ }
+}
+
/* Process the leader_q and the UV loop, in the leader thread */
static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
/* Set timeout timer if there was a request, let it count down while we process work */
@@ -914,6 +944,13 @@ static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
uv_timer_stop(&p->timer);
uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
}
+ /* If disconnect was requested, walk the socket list */
+ if (p->disconnect) {
+ p->disconnect = false;
+ uv_mutex_unlock(&p->lock);
+ uv_walk(&p->loop, on_proactor_disconnect, NULL);
+ uv_mutex_lock(&p->lock);
+ }
pn_event_batch_t *batch = NULL;
for (work_t *w = work_pop(&p->leader_q); w; w = work_pop(&p->leader_q)) {
assert(!w->working);
@@ -1035,6 +1072,20 @@ void pn_proactor_interrupt(pn_proactor_t *p) {
notify(p);
}
+void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
+ uv_mutex_lock(&p->lock);
+ if (!p->disconnect) {
+ p->disconnect = true;
+ if (cond) {
+ pn_condition_copy(p->disconnect_cond, cond);
+ } else {
+ pn_condition_clear(p->disconnect_cond);
+ }
+ notify(p);
+ }
+ uv_mutex_unlock(&p->lock);
+}
+
void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
uv_mutex_lock(&p->lock);
p->timeout = t;
@@ -1045,9 +1096,11 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
void pn_proactor_cancel_timeout(pn_proactor_t *p) {
uv_mutex_lock(&p->lock);
- p->timeout_state = TM_NONE;
+ if (p->timeout_state != TM_NONE) {
+ p->timeout_state = TM_NONE;
+ notify(p);
+ }
uv_mutex_unlock(&p->lock);
- notify(p);
}
int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
@@ -1071,20 +1124,6 @@ int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int
return 0;
}
-pn_proactor_t *pn_proactor() {
- pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(pn_proactor_t));
- p->collector = pn_collector();
- p->batch.next_event = &proactor_batch_next;
- if (!p->collector) return NULL;
- uv_loop_init(&p->loop);
- uv_mutex_init(&p->lock);
- uv_cond_init(&p->cond);
- uv_async_init(&p->loop, &p->async, NULL);
- uv_timer_init(&p->loop, &p->timer);
- p->timer.data = p;
- return p;
-}
-
static void on_proactor_free(uv_handle_t* h, void* v) {
uv_safe_close(h, NULL); /* Close the handle */
if (h->type == UV_TCP) { /* Put the corresponding work item on the leader_q for cleanup */
@@ -1108,6 +1147,21 @@ static void work_free(work_t *w) {
}
}
+pn_proactor_t *pn_proactor() {
+ pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(pn_proactor_t));
+ p->collector = pn_collector();
+ p->batch.next_event = &proactor_batch_next;
+ if (!p->collector) return NULL;
+ uv_loop_init(&p->loop);
+ uv_mutex_init(&p->lock);
+ uv_cond_init(&p->cond);
+ uv_async_init(&p->loop, &p->async, NULL);
+ uv_timer_init(&p->loop, &p->timer);
+ p->timer.data = p;
+ p->disconnect_cond = pn_condition();
+ return p;
+}
+
void pn_proactor_free(pn_proactor_t *p) {
/* Close all open handles */
uv_walk(&p->loop, on_proactor_free, NULL);
@@ -1125,6 +1179,7 @@ void pn_proactor_free(pn_proactor_t *p) {
uv_mutex_destroy(&p->lock);
uv_cond_destroy(&p->cond);
pn_collector_free(p->collector);
+ pn_condition_free(p->disconnect_cond);
free(p);
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/edebc4ec/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index e87774e..d22958c 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -32,13 +32,10 @@
static pn_millis_t timeout = 7*1000; /* timeout for hanging tests */
-static const char *localhost = "127.0.0.1"; /* host for connect/listen */
+static const char *localhost = ""; /* host for connect/listen */
typedef pn_event_type_t (*test_handler_fn)(test_t *, pn_event_t*);
-/* Save the last condition description of a handled event here */
-char last_condition[1024] = {0};
-
/* Proactor and handler that take part in a test */
typedef struct proactor_test_t {
test_handler_fn handler;
@@ -66,18 +63,22 @@ static void proactor_test_free(proactor_test_t *pts, size_t n) {
#define PROACTOR_TEST_FREE(A) proactor_test_free(A, sizeof(A)/sizeof(*A))
+/* Set this to a pn_condition() to save condition data */
+pn_condition_t *last_condition = NULL;
+
static void save_condition(pn_event_t *e) {
- /* TODO aconway 2017-03-23: extend pn_event_condition to include listener */
- last_condition[0] = '\0';
- pn_condition_t *cond = NULL;
- if (pn_event_listener(e)) {
- cond = pn_listener_condition(pn_event_listener(e));
- } else {
- cond = pn_event_condition(e);
- }
- if (cond && pn_condition_is_set(cond)) {
- const char *desc = pn_condition_get_description(cond);
- strncpy(last_condition, desc, sizeof(last_condition));
+ if (last_condition) {
+ pn_condition_t *cond = NULL;
+ if (pn_event_listener(e)) {
+ cond = pn_listener_condition(pn_event_listener(e));
+ } else {
+ cond = pn_event_condition(e);
+ }
+ if (cond) {
+ pn_condition_copy(last_condition, cond);
+ } else {
+ pn_condition_clear(last_condition);
+ }
}
}
@@ -85,6 +86,7 @@ static void save_condition(pn_event_t *e) {
* all proactors return NULL
*/
static pn_event_type_t proactor_test_get(proactor_test_t *pts, size_t n) {
+ if (last_condition) pn_condition_clear(last_condition);
while (true) {
bool busy = false;
for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
@@ -343,14 +345,14 @@ static void test_errors(test_t *t) {
pn_connection_t *c = pn_connection();
pn_proactor_connect(client, c, "127.0.0.1:xxx");
TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
- TEST_STR_IN(t, "xxx", last_condition);
+ TEST_STR_IN(t, "xxx", pn_condition_get_description(last_condition));
TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
pn_listener_t *l = pn_listener();
pn_proactor_listen(server, l, "127.0.0.1:xxx", 1);
TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
- TEST_STR_IN(t, "xxx", last_condition);
+ TEST_STR_IN(t, "xxx", pn_condition_get_description(last_condition));
TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
pn_listener_free(l);
@@ -358,7 +360,7 @@ static void test_errors(test_t *t) {
c = pn_connection();
pn_proactor_connect(client, c, port.host_port);
if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts))) {
- TEST_STR_IN(t, "connection refused", last_condition);
+ TEST_STR_IN(t, "connection refused", pn_condition_get_description(last_condition));
TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
sock_close(port.sock);
PROACTOR_TEST_FREE(pts);
@@ -380,7 +382,7 @@ static void test_ipv4_ipv6(test_t *t) {
pn_event_type_t e = PROACTOR_TEST_GET(pts);
bool has_ipv6 = (e != PN_LISTENER_CLOSE);
if (!has_ipv6) {
- TEST_LOGF(t, "skip IPv6 tests: %s", last_condition);
+ TEST_LOGF(t, "skip IPv6 tests: %s", pn_condition_get_description(last_condition));
}
PROACTOR_TEST_DRAIN(pts);
@@ -390,7 +392,7 @@ static void test_ipv4_ipv6(test_t *t) {
pn_proactor_listen(server, l4, port4.host_port, 4);
TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
sock_close(port4.sock);
- TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s", last_condition);
+ TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s", pn_condition_get_description(last_condition));
PROACTOR_TEST_DRAIN(pts);
/* Empty address listens on both IPv4 and IPv6 on all interfaces */
@@ -400,20 +402,20 @@ static void test_ipv4_ipv6(test_t *t) {
TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
sock_close(port.sock);
e = PROACTOR_TEST_GET(pts);
- TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s", last_condition);
+ TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s", pn_condition_get_description(last_condition));
PROACTOR_TEST_DRAIN(pts);
#define EXPECT_CONNECT(TP, HOST) do { \
pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST))); \
- TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); \
- TEST_STR_EQUAL(t, "", last_condition); \
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); \
+ TEST_CHECK(t, !pn_condition_is_set(last_condition)); \
PROACTOR_TEST_DRAIN(pts); \
} while(0)
#define EXPECT_FAIL(TP, HOST) do { \
pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST))); \
- TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); \
- TEST_STR_IN(t, "refused", last_condition); \
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); \
+ TEST_STR_IN(t, "refused", pn_condition_get_description(last_condition)); \
PROACTOR_TEST_DRAIN(pts); \
} while(0)
@@ -529,9 +531,9 @@ static void test_ssl(test_t *t) {
pn_proactor_connect(client, c, port.host_port);
/* Open ok at both ends */
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
- TEST_STR_EQUAL(t,"", last_condition);
+ TEST_CHECK(t, !pn_condition_is_set(last_condition));
TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
- TEST_STR_EQUAL(t, "", last_condition);
+ TEST_CHECK(t, !pn_condition_is_set(last_condition));
TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
@@ -585,8 +587,87 @@ static void test_addr(test_t *t) {
pn_connection_free(s);
}
+/* Test simple client/server connection with 2 proactors */
+static void test_disconnect(test_t *t) {
+ proactor_test_t pts[] ={ { open_wake_handler }, { listen_handler } };
+ PROACTOR_TEST_INIT(pts, t);
+ pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+
+ test_port_t port = test_port(localhost);
+ pn_listener_t* l = pn_listener();
+ pn_proactor_listen(server, l, port.host_port, 4);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+ sock_close(port.sock);
+
+ test_port_t port2 = test_port(localhost);
+ pn_listener_t* l2 = pn_listener();
+ pn_proactor_listen(server, l2, port2.host_port, 4);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+ sock_close(port2.sock);
+
+ /* We will disconnect one connection after it is remote-open */
+ pn_proactor_connect(client, pn_connection(), port.host_port);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+ pn_proactor_connect(client, pn_connection(), port2.host_port);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+
+ pn_condition_t *cond = pn_condition();
+ pn_condition_set_name(cond, "test-name");
+ pn_condition_set_description(cond, "test-description");
+
+ pn_proactor_disconnect(client, cond);
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ TEST_STR_EQUAL(t, "test-name", pn_condition_get_name(last_condition));
+ /* Note: pn_transport adds "(connection aborted)" on client side if transport closed early. */
+ TEST_STR_EQUAL(t, "test-description (connection aborted)", pn_condition_get_description(last_condition));
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+ pn_proactor_disconnect(server, cond);
+ int expect_tclose = 2, expect_lclose = 2;
+ while (expect_tclose || expect_lclose) {
+ pn_event_type_t et = PROACTOR_TEST_RUN(pts);
+ switch (et) {
+ case PN_TRANSPORT_CLOSED:
+ TEST_CHECK(t, --expect_tclose >= 0);
+ TEST_STR_EQUAL(t, "test-name", pn_condition_get_name(last_condition));
+ TEST_STR_EQUAL(t, "test-description", pn_condition_get_description(last_condition));
+ break;
+ case PN_LISTENER_CLOSE:
+ TEST_CHECK(t, --expect_lclose >= 0);
+ TEST_STR_EQUAL(t, "test-name", pn_condition_get_name(last_condition));
+ TEST_STR_EQUAL(t, "test-description", pn_condition_get_description(last_condition));
+ break;
+ default:
+ TEST_ERRORF(t, "%s unexpected: want %d TRANSPORT_CLOSED, %d LISTENER_CLOSE",
+ pn_event_type_name(et), expect_tclose, expect_lclose);
+ expect_lclose = expect_tclose = 0;
+ continue;
+ }
+ }
+
+ pn_condition_free(cond);
+ pn_listener_free(l);
+ pn_listener_free(l2);
+
+ /* Make sure the proactors are still functional */
+ test_port_t port3 = test_port(localhost);
+ pn_listener_t* l3 = pn_listener();
+ pn_proactor_listen(server, l3, port3.host_port, 4);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+ sock_close(port3.sock);
+ pn_proactor_connect(client, pn_connection(), port3.host_port);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+ pn_proactor_disconnect(client, NULL);
+
+ PROACTOR_TEST_DRAIN(pts); /* Drain will */
+ PROACTOR_TEST_FREE(pts);
+ pn_listener_free(l3);
+}
+
int main(int argc, char **argv) {
int failed = 0;
+ last_condition = pn_condition();
RUN_ARGV_TEST(failed, t, test_inactive(&t));
RUN_ARGV_TEST(failed, t, test_interrupt_timeout(&t));
RUN_ARGV_TEST(failed, t, test_errors(&t));
@@ -596,5 +677,7 @@ int main(int argc, char **argv) {
RUN_ARGV_TEST(failed, t, test_free_cleanup(&t));
RUN_ARGV_TEST(failed, t, test_ssl(&t));
RUN_ARGV_TEST(failed, t, test_addr(&t));
+ RUN_ARGV_TEST(failed, t, test_disconnect(&t));
+ pn_condition_free(last_condition);
return failed;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org