You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/04/04 13:34:27 UTC

[1/3] qpid-proton git commit: PROTON-1454: C proactor API doc overhaul, minor API improvements.

Repository: qpid-proton
Updated Branches:
  refs/heads/master ed82ea86f -> 7dfd0533d


PROTON-1454: C proactor API doc overhaul, minor API improvements.

Reviewed and improved all API doc in proactor.h and listener.h

API improvements:
- Reverted a85c89a "PROTON-1445: Change proactor ownership model" - it was a bad idea.
- pn_proactor_connect|listen return void, all errors reported by events
- rename pn_proactor_disown_connection -> pn_proactor_release_connection
- document the list of event types used by the proactor.
- const fixes for pn_proactor_addr functions
- verify all proactor library functions are in proactor.h or listener.h


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/7dfd0533
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/7dfd0533
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/7dfd0533

Branch: refs/heads/master
Commit: 7dfd0533ddc47c9652881e835abf37de41f4274d
Parents: 588e2e4
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Apr 3 17:59:23 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Apr 4 09:14:55 2017 -0400

----------------------------------------------------------------------
 examples/c/proactor/broker.c       |   2 -
 examples/c/proactor/direct.c       |   2 -
 examples/c/proactor/receive.c      |   4 -
 examples/c/proactor/send.c         |   1 -
 proton-c/docs/api/CMakeLists.txt   |   2 +-
 proton-c/docs/api/index.md         |  15 +-
 proton-c/docs/api/user.doxygen.in  |   2 +
 proton-c/include/proton/event.h    |   8 +-
 proton-c/include/proton/listener.h |  36 ++--
 proton-c/include/proton/proactor.h | 364 +++++++++++++++++++++-----------
 proton-c/include/proton/types.h    |   4 -
 proton-c/src/proactor/libuv.c      |  52 ++---
 proton-c/src/tests/proactor.c      | 190 +++++++++++------
 proton-c/src/tests/test_tools.h    |   6 +-
 14 files changed, 424 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7dfd0533/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index 6a7d1eb..6501927 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -340,7 +340,6 @@ static void handle(broker_t* b, pn_event_t* e) {
    case PN_TRANSPORT_CLOSED:
     connection_unsub(b, pn_event_connection(e));
     check_condition(e, pn_transport_condition(pn_event_transport(e)));
-    pn_connection_free(pn_event_connection(e));
     break;
 
    case PN_CONNECTION_REMOTE_CLOSE:
@@ -366,7 +365,6 @@ static void handle(broker_t* b, pn_event_t* e) {
    case PN_LISTENER_CLOSE:
     check_condition(e, pn_listener_condition(pn_event_listener(e)));
     broker_stop(b);
-    pn_listener_free(pn_event_listener(e));
     break;
 
  break;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7dfd0533/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
index bda66db..f76895c 100644
--- a/examples/c/proactor/direct.c
+++ b/examples/c/proactor/direct.c
@@ -246,7 +246,6 @@ static bool handle(app_data_t* app, pn_event_t* event) {
    case PN_TRANSPORT_CLOSED:
     check_condition(event, pn_transport_condition(pn_event_transport(event)));
     pn_listener_close(app->listener); /* Finished */
-    pn_connection_free(pn_event_connection(event));
     break;
 
    case PN_CONNECTION_REMOTE_CLOSE:
@@ -272,7 +271,6 @@ static bool handle(app_data_t* app, pn_event_t* event) {
 
    case PN_LISTENER_CLOSE:
     check_condition(event, pn_listener_condition(pn_event_listener(event)));
-    pn_listener_free(pn_event_listener(event));
     break;
 
    case PN_PROACTOR_INACTIVE:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7dfd0533/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index 6b4f02c..c8d3363 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -126,10 +126,6 @@ static bool handle(app_data_t* app, pn_event_t* event) {
    } break;
 
    case PN_TRANSPORT_CLOSED:
-    pn_connection_free(pn_event_connection(event));
-    break;
-
-   case PN_TRANSPORT_ERROR:
     check_condition(event, pn_transport_condition(pn_event_transport(event)));
     break;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7dfd0533/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index 0b2e68f..c21ac68 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -137,7 +137,6 @@ static bool handle(app_data_t* app, pn_event_t* event) {
 
    case PN_TRANSPORT_CLOSED:
     check_condition(event, pn_transport_condition(pn_event_transport(event)));
-    pn_connection_free(pn_event_connection(event));
     break;
 
    case PN_CONNECTION_REMOTE_CLOSE:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7dfd0533/proton-c/docs/api/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/docs/api/CMakeLists.txt b/proton-c/docs/api/CMakeLists.txt
index 7756e48..5324fc4 100644
--- a/proton-c/docs/api/CMakeLists.txt
+++ b/proton-c/docs/api/CMakeLists.txt
@@ -22,7 +22,7 @@ if (DOXYGEN_FOUND)
   configure_file (${CMAKE_CURRENT_SOURCE_DIR}/user.doxygen.in
                   ${CMAKE_CURRENT_BINARY_DIR}/user.doxygen)
   add_custom_target (docs-c COMMAND ${DOXYGEN_EXECUTABLE} user.doxygen)
-  add_dependencies (docs docs-c)
+  add_dependencies (docs docs-c user.doxygen.in)
 
   # HTML files are generated to ./html - put those in the install.
   install (DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/html/"

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7dfd0533/proton-c/docs/api/index.md
----------------------------------------------------------------------
diff --git a/proton-c/docs/api/index.md b/proton-c/docs/api/index.md
index cf35c33..18accd4 100644
--- a/proton-c/docs/api/index.md
+++ b/proton-c/docs/api/index.md
@@ -20,9 +20,7 @@ api\_types.
 
 @ref codec has functions for AMQP data encoding and decoding.
 
-## IO
-
-@ref io holds interfaces for integrating Proton with platform IO.
+## Proactor
 
 The @ref proactor is a portable, proactive, asynchronous API for
 single- or multithreaded applications. It associates AMQP @ref
@@ -30,8 +28,9 @@ connection "connections" with network connections (@ref transport
 "transports") and allows one or more threads to handle @ref event
 "events".
 
-**Low-level integration** - The @ref connection\_driver provides a
-low-level SPI to feed byte streams from any source to the protocol
-engine. You can use it to integrate Proton directly with a foreign
-event loop or IO library, or to implement your own @ref proactor to
-transparently replace Proton's IO layer.
+## Low-level IO integration
+
+The @ref connection\_driver Connection Driver is a low-level SPI to feed byte
+streams from any source to the protocol engine. You can use it to integrate
+Proton directly with a foreign event loop or IO library, or to implement your
+own @ref proactor to transparently replace Proton's IO layer.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7dfd0533/proton-c/docs/api/user.doxygen.in
----------------------------------------------------------------------
diff --git a/proton-c/docs/api/user.doxygen.in b/proton-c/docs/api/user.doxygen.in
index 4a02bd2..686ac59 100644
--- a/proton-c/docs/api/user.doxygen.in
+++ b/proton-c/docs/api/user.doxygen.in
@@ -35,6 +35,8 @@ HIDE_COMPOUND_REFERENCE = YES
 HIDE_SCOPE_NAMES        = YES
 MAX_INITIALIZER_LINES   = 0
 ALPHABETICAL_INDEX      = NO
+# Use declaration order, don't alphabetize
+SORT_MEMBER_DOCS       = NO
 
 # Redefine protected as private and strip out the PN_EXTERN and
 # PNX_EXTERN macros

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7dfd0533/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index d11bdfc..d75988c 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -260,7 +260,7 @@ typedef enum {
   PN_TRANSPORT,
 
   /**
-   * The transport has authenticated, if this is received by a server
+   * The transport has authenticated. If this is received by a server
    * the associated transport has authenticated an incoming connection
    * and pn_transport_get_user() can be used to obtain the authenticated
    * user.
@@ -322,7 +322,7 @@ typedef enum {
   PN_LISTENER_CLOSE,
 
   /**
-   * Indicates pn_proactor_interrupt() was called to interrupt a proactor thread
+   * Indicates pn_proactor_interrupt() was called to interrupt a proactor thread.
    * Events of this type point to the @ref pn_proactor_t.
    */
   PN_PROACTOR_INTERRUPT,
@@ -334,7 +334,7 @@ typedef enum {
   PN_PROACTOR_TIMEOUT,
 
   /**
-   * The proactor becaome inactive: all listeners and connections are closed and
+   * The proactor is inactive. All listeners and connections are closed and
    * their events processed, the timeout is expired.
    *
    * Events of this type point to the @ref pn_proactor_t.
@@ -342,7 +342,7 @@ typedef enum {
   PN_PROACTOR_INACTIVE,
 
   /**
-   * Indicates the listener is listeneing.
+   * The listener is listeneing.
    * Events of this type point to the @ref pn_listener_t.
    */
   PN_LISTENER_OPEN

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7dfd0533/proton-c/include/proton/listener.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h
index bf52150..d06adb9 100644
--- a/proton-c/include/proton/listener.h
+++ b/proton-c/include/proton/listener.h
@@ -30,35 +30,40 @@ extern "C" {
 
 /**
  * @file
+ * @copydoc listener
  *
- * **Experimental** - A listener for incoming connections for the @ref
- * proactor.
+ * @defgroup listener Listener
+ * @ingroup proactor
+ *
+ * **Experimental** - A listener for incoming connections for the @ref proactor.
+ *
+ * @note Thread safety: Listener has the same thread-safety rules as a @ref core
+ * object; calls to a single listener must be serialized with the exception of
+ * pn_listener_close()
  *
- * @addtogroup proactor
  * @{
  */
 
 /**
  * Create a listener to pass to pn_proactor_listen()
  *
- * Must be freed with pn_listener_free()
- *
- * You can use pn_listener_set_context() or pn_listener_attachments() to set
- * application data that can be accessed when accepting connections.
+ * You can use pn_listener_attachments() to set application data that can be
+ * accessed when accepting connections.
  */
 PNP_EXTERN pn_listener_t *pn_listener(void);
 
 /**
- * Free a listener. Must not be in use, see pn_proactor_listen()
+ * Free a listener. You don't need to call this unless you create a listener
+ * with pn_listen() but never pass it to pn_proactor_listen()
  */
 PNP_EXTERN void pn_listener_free(pn_listener_t *l);
 
 /**
- * Asynchronously accept a connection using the listener.
+ * Bind @p connection to a new transport accepted from @p listener.
+ * Errors are returned as @ref PN_TRANSPORT_CLOSED events by pn_proactor_wait().
  *
- * @param[in] connection the listener takes ownership, do not free.
  */
-PNP_EXTERN int pn_listener_accept(pn_listener_t*, pn_connection_t *connection);
+PNP_EXTERN void pn_listener_accept(pn_listener_t*, pn_connection_t *);
 
 /**
  * Get the error condition for a listener.
@@ -93,10 +98,11 @@ PNP_EXTERN void pn_listener_set_context(pn_listener_t *listener, void *context);
 PNP_EXTERN pn_record_t *pn_listener_attachments(pn_listener_t *listener);
 
 /**
- * Close the listener (thread safe).
- *
+ * Close the listener.
  * The PN_LISTENER_CLOSE event is generated when the listener has stopped listening.
  *
+ * @note Thread safe. Must not be called after the PN_LISTENER_CLOSE event has
+ * been handled as the listener may be freed .
  */
 PNP_EXTERN void pn_listener_close(pn_listener_t *l);
 
@@ -106,7 +112,9 @@ PNP_EXTERN void pn_listener_close(pn_listener_t *l);
 PNP_EXTERN pn_proactor_t *pn_listener_proactor(pn_listener_t *c);
 
 /**
- * Return the listener associated with an event or NULL.
+ * Return the listener associated with an event.
+ *
+ * @return NULL if the event is not associated with a listener.
  */
 PNP_EXTERN pn_listener_t *pn_event_listener(pn_event_t *event);
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7dfd0533/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index d03589c..8eba94f 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -30,230 +30,356 @@ extern "C" {
 
 /**
  * @file
+ * @copydoc proactor
  *
- * **Experimental** - Multithreaded IO
+ * @addtogroup proactor Proactor
  *
- * The proactor associates a @ref connection with a @ref transport,
- * either by making an outgoing connection or accepting an incoming
- * one.  It delivers @ref event "events" to application threads for
- * handling.
+ * The proactor associates an abstract AMQP protocol @ref connection with a
+ * concrete IO @ref transport implementation for outgoing and incoming
+ * connections. pn_proactor_wait() returns @ref proactor_events to application
+ * threads for handling.
  *
- * ## Multi-threading
+ * The @ref pn_proactor* functions are thread-safe, but to handle @ref proactor_events you
+ * must also use the @ref core APIs which are not. @ref core objects associated
+ * with different connections can be used concurrently, but objects associated
+ * with a single connection cannot.
  *
- * The @ref proactor is thread-safe, but the protocol engine is not.
- * The proactor ensures that each @ref connection and its associated
- * values (@ref session, @ref link etc.) is handle sequentially, even
- * if there are multiple application threads. See pn_proactor_wait().
+ * The proactor *serializes* @ref proactor_events for each connection - it never returns
+ * @ref proactor_events for the same connection concurrently in different
+ * threads. Event-handling code can safely use any @ref core object obtained
+ * from the current event. You can attach application data to @ref core objects
+ * (for example with pn_connection_attachments()).
+ *
+ * pn_connection_wake() allows any thread to "wake up" a connection. It causes
+ * pn_proactor_wait() to return a @ref PN_CONNECTION_WAKE event that is
+ * serialized with the connection's other @ref proactor_events. You can use this to implement
+ * communication between different connections, or from non-proactor threads.
+ *
+ * Serialization and pn_connection_wake() simplify building applications with a
+ * shared thread pool, which serialize work per connection. Many other
+ * variations are possible, but you are responsible for any additional
+ * synchronization needed.
  *
  * @addtogroup proactor
  * @{
  */
 
 /**
- * Stores a network address in native format.
+ * Create a proactor. Must be freed with pn_proactor_free()
  */
-typedef struct pn_proactor_addr_t pn_proactor_addr_t;
+PNP_EXTERN pn_proactor_t *pn_proactor(void);
 
 /**
- * Create a proactor. Must be freed with pn_proactor_free()
+ * Free the proactor. Abort open connections/listeners, clean up all resources.
  */
-PNP_EXTERN pn_proactor_t *pn_proactor(void);
+PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor);
 
 /**
- * Disconnect all connections and listeners currently active in the proactor.
+ * Bind @p connection to a new @ref transport connected to @p addr.
+ * Errors are returned as  @ref PN_TRANSPORT_CLOSED events by pn_proactor_wait().
  *
- * PN_LISTENER_CLOSE, PN_TRANSPORT_CLOSED and other events are generated as usual.
- * If no new listeners or connections are created, then a PN_PROACTOR_INACTIVE event
- * will be generated when all connections and listeners are disconnected.
+ * @note Thread safe.
  *
- * Note the proactor remains active, connections and listeners created after a call to
- * pn_proactor_disconnect() are not affected by it.
+ * @param connection @p proactor *takes ownership* of @p connection and will
+ * automatically call pn_connection_free() after the final @ref
+ * PN_TRANSPORT_CLOSED event is handled, or when pn_proactor_free() is
+ * called. You can prevent the automatic free with
+ * pn_proactor_release_connection()
  *
- * @param condition if not NULL the condition data is copied to the transports and listeners.
- */
-PNP_EXTERN void pn_proactor_disconnect(pn_proactor_t *proactor, pn_condition_t *condition);
-
-/**
- * Free the proactor. Abort any open network connections and clean up all
- * associated resources.
+ * @param[in] addr the network address in the form "host:port" or as a URL
+ * For a URL *only* the host and port fields are used, the rest is ignored.
+ *
+ * Three special cases are allowed:
+ *
+ * - "host": Connect to "host" on the standard AMQP port (5672).
+ * - ":port": Connect to the local host on "port" using the default protocol.
+ * - "": Connect to the local host on the AMQP port using the default protocol.
+ *
+ * @note The network address @p addr and AMQP address are different things. The
+ * network address enables connection to a remote host, the AMQP address
+ * identifies an AMQP node (such as a queue or topic) *after* you have
+ * established the connection.
+ * The special case ":port" connects to the local host via the default protocol.
+ * The special case "" connects to the local host on the AMQP standard port.
+ *
+ * It is common to combine the two into a URL like this:
+ *
+ *     amqp[s]://user:pass@host:port/amqp_address
+ *
+ * The proactor will extract the host and port only. If you want to use other
+ * fields (e.g. to set up security) you must call the relevant functions on @p
+ * connection before pn_proactor_connect() and handle @ref PN_CONNECTION_BOUND
+ * to set up the @ref transport.
+ *
+ * @note Thread safe.
+ *
+ * @param[in] proactor the proactor object
+ *
+ * @param[in] connection @ref connection to be connected to @p addr.
  */
-PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor);
+PNP_EXTERN void pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection, const char *addr);
 
 /**
- * Connect connection to host/port. Connection and transport events will be
- * returned by pn_proactor_wait()
+ * Start listening for incoming connections.
+ * Errors are returned as @ref proactor_events by pn_proactor_wait().
+ *
+ * @note Thread safe.
  *
  * @param[in] proactor the proactor object
- * @param[in] connection must not be freed until after the final PN_TRANSPORT_CLOSED event or pn_proactor_free()
- * @param[in] addr the network address (not AMQP address) to connect to. May
- * be in the form "host:port" or an "amqp://" or "amqps://" URL. The `/path` part of
- * the URL is ignored.
- * @param[in] port port to connect to
- *
- * @return error on immediate error, e.g. an allocation failure.
- * Other errors are indicated by connection or transport events via
-PNP_EXTERN  * pn_proactor_wait()
+ *
+ * @param[in] listener @p proactor *takes ownership* of @p listener, and will
+ * automatically call pn_listener_free() after the final PN_LISTENER_CLOSE event
+ * is handled, or when pn_proactor_free() is called.
+ *
+ * @param[in] addr the network address in the form "host:port" or as a URL
+ * For a URL *only* the host and port fields are used, the rest is ignored.
+ *
+ * Three special cases are allowed:
+ *
+ * - "host": Listen on the standard AMQP port (5672) on the interface and protocol identified by "host"
+ * - ":port": Listen on "port", on all local interfaces, for all protocols.
+ * - "": Listen on the standard AMQP port, on all local interfaces, for all protocols.
+
+ * @param[in] backlog of un-handled connection requests to allow before refusing
+ * connections. If @p addr resolves to multiple interface/protocol combinations,
+ * the backlog applies to each separately.
  */
-PNP_EXTERN int pn_proactor_connect(
-  pn_proactor_t *proactor, pn_connection_t *connection, const char *addr);
+PNP_EXTERN void pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener, const char *addr, int backlog);
 
 /**
- * Start listening with listener.
- *
- * pn_proactor_wait() will return a PN_LISTENER_ACCEPT event when a connection can be
- * accepted.
+ * Disconnect all connections and listeners belonging to the proactor.
  *
+ * @ref PN_LISTENER_CLOSE, @ref PN_TRANSPORT_CLOSED and other @ref proactor_events are
+ * generated as usual.  If no new listeners or connections are created, then a
+ * @ref PN_PROACTOR_INACTIVE event will be generated when all connections and
+ * listeners are disconnected.
  *
- * @param[in] proactor the proactor object
- * @param[in] listener must not be freed until after the final PN_LISTENER_CLOSE event or pn_proactor_free()
- * @param[in] addr the network address (not AMQP address) to connect to in "host:port"
+ * Note the proactor remains active, connections and listeners created after a call to
+ * pn_proactor_disconnect() are not affected by it.
  *
- * The host can be a host name, IPV4 or IPV6 literal, or the empty string. The empty
- * string listens on all local addresses. A host name listens on all addresses associated
- * with the name. An IPV6 literal address (or wildcard '[::]') listens only for IPV6. An
- * IPV4 literal address (or wildcard '0.0.0.0') listens only for IPV4."
+ * @note Thread safe.
  *
- * @param[in] backlog number of connection requests to queue. If the host resolves
- * to multiple addresses, this backlog applies to each address.
+ * @param proactor the proactor
  *
- * @return error on immediate error, e.g. an allocation failure.
- * Other errors are indicated by pn_listener_condition() on the
- * PN_LISTENER_CLOSE event.
+ * @param condition if not NULL the condition data is copied to each
+ * disconnected transports and listener and is available in the close event.
  */
-PNP_EXTERN int pn_proactor_listen(
-  pn_proactor_t *proactor, pn_listener_t *listener, const char *addr, int backlog);
+PNP_EXTERN void pn_proactor_disconnect(pn_proactor_t *proactor, pn_condition_t *condition);
 
 /**
- * Wait until there is at least one event to handle.
- * Always returns a non-empty batch of events.
+ * Wait until there are @ref proactor_events to handle.
  *
  * You must call pn_proactor_done() when you are finished with the batch, you
  * must not use the batch pointer after calling pn_proactor_done().
  *
- * Normally it is most efficient to handle the entire batch in one thread, but
- * you can call pn_proactor_done() on an unfinished the batch. The remaining
- * events will be returned by another call to pn_proactor_done(), possibly in a
- * different thread.
+ * Normally it is most efficient to handle the entire batch in the calling
+ * thread and then call pn_proactor_done(), but see pn_proactor_done() for more options.
  *
- * @note You can generate events to force threads to wake up from
- * pn_proactor_wait() using pn_proactor_interrupt(), pn_proactor_set_timeout()
- * and pn_connection_wake()
+ * pn_proactor_get() is a non-blocking version of this call.
+ *
+ * @note Thread Safe.
+ *
+ * @return a non-empty batch of events that must be processed in sequence.
  *
- * @note Thread-safe: can be called concurrently. Events in a single
- * batch must be handled in sequence, but batches returned by separate
- * calls can be handled concurrently.
  */
 PNP_EXTERN pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor);
 
 /**
- * Return a batch of events if one is available immediately, otherwise return NULL.  If it
- * does return an event batch, the rules are the same as for pn_proactor_wait()
+ * Return @ref proactor_events if any are available immediately.  If not, return NULL.
+ * If the return value is not NULL, the behavior is the same as pn_proactor_wait()
+ *
+ * @note Thread Safe.
  */
 PNP_EXTERN pn_event_batch_t *pn_proactor_get(pn_proactor_t *proactor);
 
 /**
- * Call when done handling a batch of events.
+ * Call when finished handling a batch of events.
  *
- * Must be called exactly once to match each call to
- * pn_proactor_wait().
+ * Must be called exactly once to match each call to pn_proactor_wait().
  *
- * @note Thread-safe: may be called from any thread provided the
- * exactly once rule is respected.
+ * @note Thread-safe: May be called from any thread provided the exactly once
+ * rule is respected.
  */
 PNP_EXTERN void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events);
 
 /**
- * Cause PN_PROACTOR_INTERRUPT to be returned to exactly one call of
- * pn_proactor_wait().
+ * Return a @ref PN_PROACTOR_INTERRUPT event as soon as possible.
  *
- * If threads are blocked in pn_proactor_wait(), one of them will be
- * interrupted, otherwise the interrupt will be returned by a future
- * call to pn_proactor_wait(). Calling pn_proactor_interrupt() N times
- * will return PN_PROACTOR_INTERRUPT to N current or future calls of
- * pn_proactor_wait()
+ * Exactly one @ref PN_PROACTOR_INTERRUPT event is generated for each call to
+ * pn_proactor_interrupt().  If threads are blocked in pn_proactor_wait(), one
+ * of them will be interrupted, otherwise the interrupt will be returned by a
+ * future call to pn_proactor_wait(). Calling pn_proactor_interrupt().
  *
- * @note Thread-safe.
+ * @note Thread safe
  */
 PNP_EXTERN void pn_proactor_interrupt(pn_proactor_t *proactor);
 
 /**
- * Cause PN_PROACTOR_TIMEOUT to be returned to a thread calling wait()
- * after timeout milliseconds. Thread-safe.
+ * Return a @ref PN_PROACTOR_TIMEOUT after @p timeout milliseconds elapse. If no
+ * threads are blocked in pn_proactor_wait() when the timeout elapses, the event
+ * will be delivered to the next available thread.
  *
- * Note: calling pn_proactor_set_timeout() again before the
- * PN_PROACTOR_TIMEOUT is delivered will cancel the previous timeout
- * and deliver an event only after the new
- * timeout.
+ * Calling pn_proactor_set_timeout() again before the PN_PROACTOR_TIMEOUT
+ * is delivered will cancel the previous timeout and deliver an event only after
+ * the new timeout.
  *
- * Note: PN_PROACTOR_TIMEOUT events will be delivered in series, never
- * concurrently.
+ * @note Thread safe
  */
 PNP_EXTERN void pn_proactor_set_timeout(pn_proactor_t *proactor, pn_millis_t timeout);
 
 /**
- * Cancel the pending timeout set by pn_proactor_set_timeout() if there is one.
+ * Cancel the pending timeout set by pn_proactor_set_timeout(). Does nothing
+ * if no timeout is set.
+ *
+ * @note Thread safe
  */
 PNP_EXTERN void pn_proactor_cancel_timeout(pn_proactor_t *proactor);
 
 /**
- * Cause a PN_CONNECTION_WAKE event to be returned by the proactor, even if
- * there are no IO events pending for the connection.
+ * Release ownership of @p connection, disassociate it from its proactor.
  *
- * @note Thread-safe: this is the only pn_connection_ function that
- * can be called concurrently.
+ * The connection and related objects (@ref session "sessions", @ref link "links"
+ * and so on) remain intact, but the transport is closed and unbound. The
+ * proactor will not return any more events for this connection. The caller must
+ * call pn_connection_free(), either directly or indirectly by re-using @p
+ * connection in another call to pn_proactor_connect() or pn_proactor_listen().
  *
- * Wakes can be "coalesced" - if several pn_connection_wake() calls happen
- * concurrently, there may be only one PN_CONNECTION_WAKE event.
+ * @note If @p connection does not belong to a proactor, this call does nothing.
+ *
+ * @note Thread safe.
+ */
+PNP_EXTERN void pn_proactor_release_connection(pn_connection_t *connection);
+
+/**
+ * Return a @ref PN_CONNECTION_WAKE event for @p connection as soon as possible.
+ *
+ * At least one wake event will be returned, serialized with other @ref proactor_events
+ * for the same connection.  Wakes can be "coalesced" - if several
+ * pn_connection_wake() calls happen close together, there may be only one
+ * PN_CONNECTION_WAKE event that occurs after all of them.
+ *
+ * @note If @p connection does not belong to a proactor, this call does nothing.
+ *
+ * @note Thread safe
  */
 PNP_EXTERN void pn_connection_wake(pn_connection_t *connection);
 
 /**
- * Return the proactor associated with a connection or NULL.
+ * Return the proactor associated with a connection.
+ *
+ * @note Not Thread safe.
+ *
+ * @return the proactor or NULL if the connection does not belong to a proactor.
  */
 PNP_EXTERN pn_proactor_t *pn_connection_proactor(pn_connection_t *connection);
 
 /**
- * Return the proactor associated with an event or NULL.
+ * Return the proactor associated with an event.
+ *
+ * @note Not Thread safe.
+ *
+ * @return the proactor or NULL if the connection does not belong to a proactor.
  */
 PNP_EXTERN pn_proactor_t *pn_event_proactor(pn_event_t *event);
 
 /**
- * Format a network address as a human-readable string in buf, with trailing NUL.
- *
- * @return the length of the full address string, the string is copied to buf.
+ * Stores a network address in native format.
+ */
+typedef struct pn_proactor_addr_t pn_proactor_addr_t;
+
+/**
+ * Format a network address as a human-readable string in buf.
  *
- * If the address string is longer than len it is truncated to len-1 bytes, but the
- * full length is returned.
+ * @return the length of the full address string (including trailing NUL). The
+ * string is copied to buf. If the address string is longer than len it is
+ * truncated to len-1 bytes, but the full length is returned.
  *
  * If len == 0 the length of the address string is returned, buf is ignored.
  *
- * If addr is NULL or points to an invalid address, 0 is returned and buf (if not
- * NULL) is set to the empty string.
+ * If @p addr is not a pointer to a valid address, buf is set to "" and 0 is returned.
+ *
+ * @note Thread safe.
  */
-PNP_EXTERN size_t pn_proactor_addr_str(char *buf, size_t len, pn_proactor_addr_t* addr);
+PNP_EXTERN size_t pn_proactor_addr_str(const pn_proactor_addr_t* addr, char *buf, size_t len);
 
 /**
  * Get the local address of a transport.
  *
- * @return NULL if the transport is not connected or the address is not available. 
+ * @return NULL if the address is not available. Address is immutable, returned
+ * pointer is valid until @p transport is closed.
+ *
+ * @note Thread safe.
  */
-PNP_EXTERN pn_proactor_addr_t *pn_proactor_addr_local(pn_transport_t* c);
+PNP_EXTERN const pn_proactor_addr_t *pn_proactor_addr_local(pn_transport_t* c);
 
 /**
  * Get the remote address of a transport.
  *
- * @return NULL if the transport is not connected or the address is not available. 
+ * @return NULL if the address is not available. Address is immutable, returned
+ * pointer is valid until @p transport is closed.
+ *
+ * @note Thread safe.
  */
-PNP_EXTERN pn_proactor_addr_t *pn_proactor_addr_remote(pn_transport_t* c);
+PNP_EXTERN const pn_proactor_addr_t *pn_proactor_addr_remote(pn_transport_t* c);
 
 /**
- * If the underlying implementation uses `struct sockaddr` (for example POSIX or Windows
- * sockets) return a pointer to a `struct sockaddr_storage` containing the address info,
- * otherwise return NULL.
+ * If the proactor implementation uses `struct sockaddr` (for example on POSIX
+ * or Windows sockets) return a pointer to a `struct sockaddr_storage`
+ * containing the address info, otherwise return NULL.
+ *
+ * @note Thread safe.
  */
-PNP_EXTERN struct sockaddr_storage *pn_proactor_addr_sockaddr(pn_proactor_addr_t *addr);
+PNP_EXTERN const struct sockaddr_storage *pn_proactor_addr_sockaddr(const pn_proactor_addr_t *addr);
+
 
 /**
+ * @defgroup proactor_events Events
+ *
+ * **Experimental** - Events returned by pn_proactor_wait().
+ * pn_proactor_wait() returns a subset of the event types defined by @ref pn_event_type_t.
+ * The PN_REACTOR_..., PN_SELECTABLE_... and PN_..._FINAL events are not returned.
+ *
+ * Enumeration | Brief description, see @ref pn_event_type_t for more
+ * :-- | :--
+ * @ref PN_CONNECTION_INIT | @copybrief PN_CONNECTION_INIT
+ * @ref PN_CONNECTION_BOUND |  @copybrief PN_CONNECTION_BOUND
+ * @ref PN_TIMER_TASK | @copybrief PN_TIMER_TASK
+ * @ref PN_CONNECTION_INIT | @copybrief PN_CONNECTION_INIT
+ * @ref PN_CONNECTION_BOUND | @copybrief PN_CONNECTION_BOUND
+ * @ref PN_CONNECTION_UNBOUND | @copybrief PN_CONNECTION_UNBOUND
+ * @ref PN_CONNECTION_LOCAL_OPEN | @copybrief PN_CONNECTION_LOCAL_OPEN
+ * @ref PN_CONNECTION_REMOTE_OPEN | @copybrief PN_CONNECTION_REMOTE_OPEN
+ * @ref PN_CONNECTION_LOCAL_CLOSE | @copybrief PN_CONNECTION_LOCAL_CLOSE
+ * @ref PN_CONNECTION_REMOTE_CLOSE | @copybrief PN_CONNECTION_REMOTE_CLOSE
+ * @ref PN_SESSION_INIT | @copybrief PN_SESSION_INIT
+ * @ref PN_SESSION_LOCAL_OPEN | @copybrief PN_SESSION_LOCAL_OPEN
+ * @ref PN_SESSION_REMOTE_OPEN | @copybrief PN_SESSION_REMOTE_OPEN
+ * @ref PN_SESSION_LOCAL_CLOSE | @copybrief PN_SESSION_LOCAL_CLOSE
+ * @ref PN_SESSION_REMOTE_CLOSE | @copybrief PN_SESSION_REMOTE_CLOSE
+ * @ref PN_LINK_INIT | @copybrief PN_LINK_INIT
+ * @ref PN_LINK_LOCAL_OPEN | @copybrief PN_LINK_LOCAL_OPEN
+ * @ref PN_LINK_REMOTE_OPEN | @copybrief PN_LINK_REMOTE_OPEN
+ * @ref PN_LINK_LOCAL_CLOSE | @copybrief PN_LINK_LOCAL_CLOSE
+ * @ref PN_LINK_REMOTE_CLOSE | @copybrief PN_LINK_REMOTE_CLOSE
+ * @ref PN_LINK_LOCAL_DETACH | @copybrief PN_LINK_LOCAL_DETACH
+ * @ref PN_LINK_REMOTE_DETACH | @copybrief PN_LINK_REMOTE_DETACH
+ * @ref PN_LINK_FLOW | @copybrief PN_LINK_FLOW
+ * @ref PN_DELIVERY | @copybrief PN_DELIVERY
+ * @ref PN_TRANSPORT | @copybrief PN_TRANSPORT
+ * @ref PN_TRANSPORT_AUTHENTICATED | @copybrief PN_TRANSPORT_AUTHENTICATED
+ * @ref PN_TRANSPORT_ERROR | @copybrief PN_TRANSPORT_ERROR
+ * @ref PN_TRANSPORT_HEAD_CLOSED | @copybrief PN_TRANSPORT_HEAD_CLOSED
+ * @ref PN_TRANSPORT_TAIL_CLOSED | @copybrief PN_TRANSPORT_TAIL_CLOSED
+ * @ref PN_TRANSPORT_CLOSED | The final event for a proactor connection, the transport is closed.
+ * @ref PN_LISTENER_OPEN | @copybrief PN_LISTENER_OPEN
+ * @ref PN_LISTENER_ACCEPT | @copybrief PN_LISTENER_ACCEPT
+ * @ref PN_LISTENER_CLOSE | @copybrief PN_LISTENER_CLOSE
+ * @ref PN_PROACTOR_INTERRUPT | @copybrief PN_PROACTOR_INTERRUPT
+ * @ref PN_PROACTOR_TIMEOUT | @copybrief PN_PROACTOR_TIMEOUT
+ * @ref PN_PROACTOR_INACTIVE | @copybrief PN_PROACTOR_INACTIVE
+ * @ref PN_CONNECTION_WAKE | @copybrief PN_CONNECTION_WAKE
+ *
+ * @}
  * @}
  */
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7dfd0533/proton-c/include/proton/types.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/types.h b/proton-c/include/proton/types.h
index 1abe9e6..5504e8d 100644
--- a/proton-c/include/proton/types.h
+++ b/proton-c/include/proton/types.h
@@ -100,16 +100,12 @@
  * @brief A data structure for AMQP data
  * @ingroup codec
  *
- * @defgroup io IO
- * @brief IO integration interfaces
- *
  * @defgroup proactor Proactor
  * @brief **Experimental** - Multithreaded IO
  * @ingroup io
  *
  * @defgroup connection_driver Connection driver
  * @brief **Experimental** - Low-level IO integration
- * @ingroup io
  *
  * @defgroup messenger Messenger
  * @deprecated

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7dfd0533/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 45863c3..3db31e9 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -216,7 +216,6 @@ struct pn_listener_t {
   pn_collector_t *collector;
   pconnection_queue_t accept;   /* pconnection_t list for accepting */
   listener_state state;
-  int refcount;                 /* Free when proactor and user are done */
 };
 
 typedef enum { TM_NONE, TM_REQUEST, TM_PENDING, TM_FIRED } timeout_state_t;
@@ -308,7 +307,6 @@ static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool ser
   if (!pc || pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
     return NULL;
   }
-  pn_incref(c);                 /* User owns original ref, make one for the proactor */
   work_init(&pc->work, p,  T_CONNECTION);
   pc->next = pconnection_unqueued;
   pc->write.data = &pc->work;
@@ -371,16 +369,10 @@ static void pconnection_free(pconnection_t *pc) {
   if (pc->addr.getaddrinfo.addrinfo) {
     uv_freeaddrinfo(pc->addr.getaddrinfo.addrinfo); /* Interrupted after resolve */
   }
-  /* Don't let driver_destroy call pn_connection_free(), the user does that */
-  pn_connection_t *c = pc->driver.connection;
-  pc->driver.connection = NULL;
-  pn_collector_t *collector = pn_connection_collector(c);
-  if (collector) {
-    pn_collector_release(collector); /* Break circular refs */
-  }
+  pn_incref(pc);                /* Make sure we don't do a circular free */
   pn_connection_driver_destroy(&pc->driver);
-  /* The user either has or will call pn_connection_free(), drop our ref. */
-  pn_decref(c);
+  pn_decref(pc);
+  /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */
 }
 
 /* Final close event for for a pconnection_t, disconnects from proactor */
@@ -648,7 +640,7 @@ static void leader_listen_lh(pn_listener_t *l) {
 }
 
 void pn_listener_free(pn_listener_t *l) {
-  if (l && --l->refcount == 0) {
+  if (l) {
     if (l->addr.getaddrinfo.addrinfo) { /* Interrupted after resolve */
       uv_freeaddrinfo(l->addr.getaddrinfo.addrinfo);
     }
@@ -1103,25 +1095,19 @@ void pn_proactor_cancel_timeout(pn_proactor_t *p) {
   uv_mutex_unlock(&p->lock);
 }
 
-int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
+void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
   pconnection_t *pc = pconnection(p, c, false);
-  if (pc) {
-    pn_connection_open(pc->driver.connection); /* Auto-open */
-    parse_addr(&pc->addr, addr);
-    work_start(&pc->work);
-  } else {
-    return PN_OUT_OF_MEMORY;
-  }
-  return 0;
+  assert(pc);                                  /* FIXME aconway 2017-03-31: memory safety */
+  pn_connection_open(pc->driver.connection); /* Auto-open */
+  parse_addr(&pc->addr, addr);
+  work_start(&pc->work);
 }
 
-int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog) {
+void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog) {
   work_init(&l->work, p, T_LISTENER);
   parse_addr(&l->addr, addr);
   l->backlog = backlog;
-  ++l->refcount;
   work_start(&l->work);
-  return 0;
 }
 
 static void on_proactor_free(uv_handle_t* h, void* v) {
@@ -1208,7 +1194,6 @@ void pn_connection_wake(pn_connection_t* c) {
 pn_listener_t *pn_listener(void) {
   pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
   if (l) {
-    l->refcount = 1;
     l->batch.next_event = listener_batch_next;
     l->collector = pn_collector();
     l->condition = pn_condition();
@@ -1248,12 +1233,10 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
   return l->attachments;
 }
 
-int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
+void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
   uv_mutex_lock(&l->lock);
   pconnection_t *pc = pconnection(l->work.proactor, c, true);
-  if (!pc) {
-    return PN_OUT_OF_MEMORY;
-  }
+  assert(pc);
   /* Get the socket from the accept event that we are processing */
   pn_event_t *e = pn_collector_prev(l->collector);
   assert(pn_event_type(e) == PN_LISTENER_ACCEPT);
@@ -1263,24 +1246,23 @@ int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
   pconnection_push(&l->accept, pc);
   uv_mutex_unlock(&l->lock);
   work_notify(&l->work);
-  return 0;
 }
 
-struct sockaddr_storage *pn_proactor_addr_sockaddr(pn_proactor_addr_t *addr) {
-  return (struct sockaddr_storage*)addr;
+const struct sockaddr_storage *pn_proactor_addr_sockaddr(const pn_proactor_addr_t *addr) {
+  return (const struct sockaddr_storage*)addr;
 }
 
-struct pn_proactor_addr_t *pn_proactor_addr_local(pn_transport_t *t) {
+const struct pn_proactor_addr_t *pn_proactor_addr_local(pn_transport_t *t) {
   pconnection_t *pc = get_pconnection(pn_transport_connection(t));
   return pc ? (pn_proactor_addr_t*)&pc->local : NULL;
 }
 
-struct pn_proactor_addr_t *pn_proactor_addr_remote(pn_transport_t *t) {
+const struct pn_proactor_addr_t *pn_proactor_addr_remote(pn_transport_t *t) {
   pconnection_t *pc = get_pconnection(pn_transport_connection(t));
   return pc ? (pn_proactor_addr_t*)&pc->remote : NULL;
 }
 
-size_t pn_proactor_addr_str(char *buf, size_t len, struct pn_proactor_addr_t* addr) {
+size_t pn_proactor_addr_str(const struct pn_proactor_addr_t* addr, char *buf, size_t len) {
   struct sockaddr_storage *sa = (struct sockaddr_storage*)addr;
   char host[NI_MAXHOST];
   char port[NI_MAXSERV];

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7dfd0533/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index ef12a81..6d81ecb 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -68,15 +68,6 @@ static void proactor_test_free(proactor_test_t *pts, size_t n) {
 
 #define PROACTOR_TEST_FREE(A) proactor_test_free(A, sizeof(A)/sizeof(*A))
 
-/* Clear the event logs in an array of proactors */
-static void proactor_test_clear_logs(proactor_test_t *pts, size_t n) {
-  for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
-    pt->log_len = 0;
-  }
-}
-
-#define PROACTOR_TEST_CLEAR_LOGS(A) proactor_test_clear_logs(A, sizeof(A)/sizeof(*A))
-
 #define TEST_LOG_EQUAL(T, A, PT) \
   TEST_ETYPES_EQUAL((T), (A), sizeof(A)/sizeof(*A), (PT).log, (PT).log_len)
 
@@ -155,6 +146,21 @@ static void proactor_test_drain(proactor_test_t *pts, size_t n) {
 #define PROACTOR_TEST_RUN(A) proactor_test_run((A), sizeof(A)/sizeof(*A))
 #define PROACTOR_TEST_DRAIN(A) proactor_test_drain((A), sizeof(A)/sizeof(*A))
 
+/* Combine a test_port with a pn_listener */
+typedef struct proactor_test_listener_t {
+  test_port_t port;
+  pn_listener_t *listener;
+} proactor_test_listener_t;
+
+proactor_test_listener_t proactor_test_listen(proactor_test_t *pt, const char *host) {
+  proactor_test_listener_t l = { test_port(host), pn_listener() };
+  pn_proactor_listen(pt->proactor, l.listener, l.port.host_port, 4);
+  TEST_ETYPE_EQUAL(pt->t, PN_LISTENER_OPEN, proactor_test_run(pt, 1));
+  sock_close(l.port.sock);
+  return l;
+}
+
+
 /* Wait for the next single event, return its type */
 static pn_event_type_t wait_next(pn_proactor_t *proactor) {
   pn_event_batch_t *events = pn_proactor_wait(proactor);
@@ -197,22 +203,11 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
 
   switch (pn_event_type(e)) {
 
-    /* Cleanup events */
-   case PN_LISTENER_CLOSE:
-    pn_listener_free(pn_event_listener(e));
-    return PN_EVENT_NONE;
-
-   case PN_TRANSPORT_CLOSED:
-    pn_connection_free(pn_event_connection(e));
-    return PN_TRANSPORT_CLOSED;
-
     /* Stop on these events */
+   case PN_TRANSPORT_CLOSED:
    case PN_PROACTOR_INACTIVE:
    case PN_PROACTOR_TIMEOUT:
-    return pn_event_type(e);
-
    case PN_LISTENER_OPEN:
-    last_accepted = NULL;
     return pn_event_type(e);
 
    case PN_LISTENER_ACCEPT:
@@ -229,11 +224,12 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) {
     pn_connection_close(c);     /* Return the close */
     return PN_EVENT_NONE;
 
-    /* Ignored these events */
+    /* Ignore these events */
    case PN_CONNECTION_INIT:
    case PN_CONNECTION_BOUND:
    case PN_CONNECTION_LOCAL_OPEN:
    case PN_CONNECTION_LOCAL_CLOSE:
+   case PN_LISTENER_CLOSE:
    case PN_TRANSPORT:
    case PN_TRANSPORT_ERROR:
    case PN_TRANSPORT_HEAD_CLOSED:
@@ -279,13 +275,9 @@ static pn_event_type_t open_close_handler(test_t *t, pn_event_t *e) {
 static void test_client_server(test_t *t) {
   proactor_test_t pts[] ={ { open_close_handler }, { common_handler } };
   PROACTOR_TEST_INIT(pts, t);
-  pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
-  test_port_t port = test_port(localhost);
-  pn_proactor_listen(server, pn_listener(), port.host_port, 4);
-  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
-  sock_close(port.sock);
+  proactor_test_listener_t l = proactor_test_listen(&pts[1], localhost);
   /* Connect and wait for close at both ends */
-  pn_proactor_connect(client, pn_connection(), port.host_port);
+  pn_proactor_connect(pts[0].proactor, pn_connection(), l.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
   PROACTOR_TEST_FREE(pts);
@@ -315,6 +307,7 @@ static void test_connection_wake(test_t *t) {
   sock_close(port.sock);
 
   pn_connection_t *c = pn_connection();
+  pn_incref(c);                 /* Keep a reference for wake() after free */
   pn_proactor_connect(client, c, port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
   TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
@@ -327,15 +320,16 @@ static void test_connection_wake(test_t *t) {
   PROACTOR_TEST_FREE(pts);
   /* The pn_connection_t is still valid so wake is legal but a no-op */
   pn_connection_wake(c);
-  pn_connection_free(c);
+  pn_decref(c);
 }
 
-/* Close the transport to abort a connection, i.e. close the socket abruptly */
-static pn_event_type_t open_abort_handler(test_t *t, pn_event_t *e) {
+/* Close the transport to abort a connection, i.e. close the socket without an AMQP close */
+static pn_event_type_t listen_abort_handler(test_t *t, pn_event_t *e) {
   switch (pn_event_type(e)) {
    case PN_CONNECTION_REMOTE_OPEN:
     /* Close the transport - abruptly closes the socket */
-    pn_transport_close(pn_connection_transport(pn_event_connection(e)));
+    pn_transport_close_tail(pn_connection_transport(pn_event_connection(e)));
+    pn_transport_close_head(pn_connection_transport(pn_event_connection(e)));
     return PN_EVENT_NONE;
 
    default:
@@ -344,9 +338,9 @@ static pn_event_type_t open_abort_handler(test_t *t, pn_event_t *e) {
   }
 }
 
-/* Test an aborted connection */
+/* Verify that pn_transport_close_head/tail aborts a connection without an AMQP protoocol close */
 static void test_abort(test_t *t) {
-  proactor_test_t pts[] ={ { open_close_handler }, { open_abort_handler } };
+  proactor_test_t pts[] ={ { open_close_handler }, { listen_abort_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
   test_port_t port = test_port(localhost);
@@ -354,9 +348,6 @@ static void test_abort(test_t *t) {
   pn_proactor_listen(server, l, port.host_port, 4);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port.sock);
-
-  /* Run to completion, then examine logs */
-  PROACTOR_TEST_CLEAR_LOGS(pts);
   pn_proactor_connect(client, pn_connection(), port.host_port);
   /* server transport closes */
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
@@ -370,15 +361,96 @@ static void test_abort(test_t *t) {
     TEST_STR_EQUAL(t, "amqp:connection:framing-error", pn_condition_get_name(last_condition));
     TEST_STR_IN(t, "abort", pn_condition_get_description(last_condition));
   }
+  pn_listener_close(l);
+  PROACTOR_TEST_DRAIN(pts);
 
-  /* Make sure there are PN_CONNECTION_CLOSE events in the shutdown sequence */
-  static const pn_event_type_t want_client[] = { PN_CONNECTION_INIT, PN_CONNECTION_LOCAL_OPEN, PN_CONNECTION_BOUND, PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED };
+  /* Verify expected event sequences, no unexpected events */
+  static const pn_event_type_t want_client[] = {
+    PN_CONNECTION_INIT,
+    PN_CONNECTION_LOCAL_OPEN,
+    PN_CONNECTION_BOUND,
+    PN_TRANSPORT_TAIL_CLOSED,
+    PN_TRANSPORT_ERROR,
+    PN_TRANSPORT_HEAD_CLOSED,
+    PN_TRANSPORT_CLOSED
+  };
   TEST_LOG_EQUAL(t, want_client, pts[0]);
-  static const pn_event_type_t want_server[] = { PN_LISTENER_ACCEPT, PN_CONNECTION_INIT, PN_CONNECTION_BOUND, PN_CONNECTION_REMOTE_OPEN, PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED };
+
+  static const pn_event_type_t want_server[] = {
+    PN_LISTENER_OPEN,
+    PN_LISTENER_ACCEPT,
+    PN_CONNECTION_INIT,
+    PN_CONNECTION_BOUND,
+    PN_CONNECTION_REMOTE_OPEN,
+    PN_TRANSPORT_TAIL_CLOSED,
+    PN_TRANSPORT_ERROR,
+    PN_TRANSPORT_HEAD_CLOSED,
+    PN_TRANSPORT_CLOSED,
+    PN_LISTENER_CLOSE
+  };
+  TEST_LOG_EQUAL(t, want_server, pts[1]);
+
+  PROACTOR_TEST_FREE(pts);
+}
+
+/* Refuse a connection: abort before the AMQP open sequence begins. */
+static pn_event_type_t listen_refuse_handler(test_t *t, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+
+   case PN_CONNECTION_BOUND:
+    /* Close the transport - abruptly closes the socket */
+    pn_transport_close_tail(pn_connection_transport(pn_event_connection(e)));
+    pn_transport_close_head(pn_connection_transport(pn_event_connection(e)));
+    return PN_EVENT_NONE;
+
+   default:
+    /* Don't auto-close the listener to keep the event sequences simple */
+    return listen_handler(t, e);
+  }
+}
+
+/* Verify that pn_transport_close_head/tail aborts a connection without an AMQP protoocol close */
+static void test_refuse(test_t *t) {
+  proactor_test_t pts[] = { { open_close_handler }, { listen_refuse_handler } };
+  PROACTOR_TEST_INIT(pts, t);
+  pn_proactor_t *client = pts[0].proactor;
+  proactor_test_listener_t l = proactor_test_listen(&pts[1], localhost);
+  pn_proactor_connect(client, pn_connection(), l.port.host_port);
+
+  /* client transport closes */
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); /* client */
+  if (TEST_CHECK(t, last_condition) && TEST_CHECK(t, pn_condition_is_set(last_condition))) {
+    TEST_STR_EQUAL(t, "amqp:connection:framing-error", pn_condition_get_name(last_condition));
+  }
+  pn_listener_close(l.listener);
+  PROACTOR_TEST_DRAIN(pts);
+
+  /* Verify expected event sequences, no unexpected events */
+  static const pn_event_type_t want_client[] = {
+    PN_CONNECTION_INIT,
+    PN_CONNECTION_LOCAL_OPEN,
+    PN_CONNECTION_BOUND,
+    PN_TRANSPORT_TAIL_CLOSED,
+    PN_TRANSPORT_ERROR,
+    PN_TRANSPORT_HEAD_CLOSED,
+    PN_TRANSPORT_CLOSED
+  };
+  TEST_LOG_EQUAL(t, want_client, pts[0]);
+
+  static const pn_event_type_t want_server[] = {
+    PN_LISTENER_OPEN,
+    PN_LISTENER_ACCEPT,
+    PN_CONNECTION_INIT,
+    PN_CONNECTION_BOUND,
+    PN_TRANSPORT_TAIL_CLOSED,
+    PN_TRANSPORT_ERROR,
+    PN_TRANSPORT_HEAD_CLOSED,
+    PN_TRANSPORT_CLOSED,
+    PN_LISTENER_CLOSE
+  };
   TEST_LOG_EQUAL(t, want_server, pts[1]);
 
   PROACTOR_TEST_FREE(pts);
-  pn_listener_free(l);
 }
 
 /* Test that INACTIVE event is generated when last connections/listeners closes. */
@@ -405,7 +477,6 @@ static void test_inactive(test_t *t) {
   pn_listener_close(l);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
-  pn_listener_free(l);
 
   sock_close(port.sock);
   PROACTOR_TEST_FREE(pts);
@@ -431,7 +502,6 @@ static void test_errors(test_t *t) {
   TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
   TEST_STR_IN(t, "xxx", pn_condition_get_description(last_condition));
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
-  pn_listener_free(l);
 
   /* Connect with no listener */
   c = pn_connection();
@@ -514,15 +584,10 @@ static void test_ipv4_ipv6(test_t *t) {
 
   pn_listener_close(l);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
-  pn_listener_free(l);
-
   pn_listener_close(l6);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
-  pn_listener_free(l6);
-
   pn_listener_close(l4);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
-  pn_listener_free(l4);
 
   PROACTOR_TEST_FREE(pts);
 }
@@ -544,11 +609,7 @@ static void test_free_cleanup(test_t *t) {
     pn_proactor_connect(client, c[i], ports[i].host_port);
   }
   PROACTOR_TEST_FREE(pts);
-  /* Safe to free after proactor is gone */
-  for (int i = 0; i < 3; ++i) {
-    pn_listener_free(l[i]);
-    pn_connection_free(c[i]);
-  }
+
   /* Freeing an unused listener/connector should be safe */
   pn_listener_free(pn_listener());
   pn_connection_free(pn_connection());
@@ -630,7 +691,7 @@ static void test_ssl(test_t *t) {
 static void test_addr(test_t *t) {
   /* Make sure NULL addr gives empty string */
   char str[1024] = "not-empty";
-  pn_proactor_addr_str(str, sizeof(str), NULL);
+  pn_proactor_addr_str(NULL, str, sizeof(str));
   TEST_STR_EQUAL(t, "", str);
 
   proactor_test_t pts[] ={ { open_wake_handler }, { listen_handler } };
@@ -648,21 +709,21 @@ static void test_addr(test_t *t) {
   char cr[1024], cl[1024], sr[1024], sl[1024];
 
   pn_transport_t *ct = pn_connection_transport(c);
-  pn_proactor_addr_str(cr, sizeof(cr), pn_proactor_addr_remote(ct));
+  pn_proactor_addr_str(pn_proactor_addr_remote(ct), cr, sizeof(cr));
   TEST_STR_IN(t, test_port_use_host(&port, ""), cr); /* remote address has listening port */
 
   pn_connection_t *s = last_accepted; /* server side of the connection */
   pn_transport_t *st = pn_connection_transport(s);
   if (!TEST_CHECK(t, st)) return;
-  pn_proactor_addr_str(sl, sizeof(sl), pn_proactor_addr_local(st));
+  pn_proactor_addr_str(pn_proactor_addr_local(st), sl, sizeof(sl));
   TEST_STR_EQUAL(t, cr, sl);  /* client remote == server local */
 
-  pn_proactor_addr_str(cl, sizeof(cl), pn_proactor_addr_local(ct));
-  pn_proactor_addr_str(sr, sizeof(sr), pn_proactor_addr_remote(st));
+  pn_proactor_addr_str(pn_proactor_addr_local(ct), cl, sizeof(cl));
+  pn_proactor_addr_str(pn_proactor_addr_remote(st), sr, sizeof(sr));
   TEST_STR_EQUAL(t, cl, sr);    /* client local == server remote */
 
   /* Examine as sockaddr */
-  struct sockaddr_storage* addr = pn_proactor_addr_sockaddr(pn_proactor_addr_remote(ct));
+  const struct sockaddr_storage* addr = pn_proactor_addr_sockaddr(pn_proactor_addr_remote(ct));
   TEST_CHECK(t, AF_INET == addr->ss_family);
   char host[NI_MAXHOST] = "";
   char serv[NI_MAXSERV] = "";
@@ -675,15 +736,12 @@ static void test_addr(test_t *t) {
   TEST_STR_EQUAL(t, port.str, serv);
 
   /* Make sure you can use NULL, 0 to get length of address string without a crash */
-  size_t len = pn_proactor_addr_str(NULL, 0, pn_proactor_addr_local(ct));
+  size_t len = pn_proactor_addr_str(pn_proactor_addr_local(ct), NULL, 0);
   TEST_CHECK(t, strlen(cl) == len);
 
   sock_close(port.sock);
   PROACTOR_TEST_DRAIN(pts);
   PROACTOR_TEST_FREE(pts);
-  pn_listener_free(l);
-  pn_connection_free(c);
-  pn_connection_free(s);
 }
 
 /* Test pn_proactor_disconnect */
@@ -746,8 +804,6 @@ static void test_disconnect(test_t *t) {
   }
 
   pn_condition_free(cond);
-  pn_listener_free(l);
-  pn_listener_free(l2);
 
   /* Make sure the proactors are still functional */
   test_port_t port3 = test_port(localhost);
@@ -761,7 +817,6 @@ static void test_disconnect(test_t *t) {
 
   PROACTOR_TEST_DRAIN(pts);     /* Drain will  */
   PROACTOR_TEST_FREE(pts);
-  pn_listener_free(l3);
 }
 
 int main(int argc, char **argv) {
@@ -771,13 +826,14 @@ int main(int argc, char **argv) {
   RUN_ARGV_TEST(failed, t, test_interrupt_timeout(&t));
   RUN_ARGV_TEST(failed, t, test_errors(&t));
   RUN_ARGV_TEST(failed, t, test_client_server(&t));
-  RUN_ARGV_TEST(failed, t, test_abort(&t));
   RUN_ARGV_TEST(failed, t, test_connection_wake(&t));
   RUN_ARGV_TEST(failed, t, test_ipv4_ipv6(&t));
   RUN_ARGV_TEST(failed, t, test_free_cleanup(&t));
   RUN_ARGV_TEST(failed, t, test_ssl(&t));
   RUN_ARGV_TEST(failed, t, test_addr(&t));
   RUN_ARGV_TEST(failed, t, test_disconnect(&t));
+  RUN_ARGV_TEST(failed, t, test_abort(&t));
+  RUN_ARGV_TEST(failed, t, test_refuse(&t));
   pn_condition_free(last_condition);
   return failed;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7dfd0533/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 4b75cbc..c5c8364 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -132,7 +132,7 @@ bool test_etype_equal_(test_t *t, pn_event_type_t want, pn_event_type_t got, con
 
 void print_bad_etypes(const char* prefix, const pn_event_type_t* seq, size_t len, size_t bad) {
   fprintf(stderr, "%s", prefix);
-  for (int i = 0; i < len; ++i) {
+  for (size_t i = 0; i < len; ++i) {
     fprintf(stderr, (i == bad) ? ">>>>%s" : "%s", pn_event_type_name(seq[i]));
     if (i < len-1) fprintf(stderr, ", ");
   }
@@ -142,7 +142,7 @@ void print_bad_etypes(const char* prefix, const pn_event_type_t* seq, size_t len
 
 bool test_etypes_equal_(test_t *t, const pn_event_type_t* want, size_t want_len, const pn_event_type_t* got, size_t got_len, const char *file, int line) {
   size_t len = want_len < got_len ? want_len : got_len;
-  for (int i = 0; i < len; ++i) {
+  for (size_t i = 0; i < len; ++i) {
     if (want[i] != got[i]) {
       test_errorf_(t, NULL, file, line, "event sequences don't match:");
       print_bad_etypes("  want: ", want, want_len, i);
@@ -261,7 +261,7 @@ int sock_port(sock_t sock) {
   return ntohs(port);
 }
 
-/* Combines includes a sock_t with the int and char* versions of the port for convenience */
+/* Combines a sock_t with the int and char* versions of the port for convenience */
 typedef struct test_port_t {
   sock_t sock;
   int port;                     /* port as integer */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-proton git commit: NO-JIRA: proactor test: verify expected behavior transport close

Posted by ac...@apache.org.
NO-JIRA: proactor test: verify expected behavior transport close


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/588e2e40
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/588e2e40
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/588e2e40

Branch: refs/heads/master
Commit: 588e2e40afa5eaf9c34df67684a9578d1de5fd82
Parents: f086571
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Mar 31 11:08:49 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Apr 4 09:14:55 2017 -0400

----------------------------------------------------------------------
 proton-c/src/tests/proactor.c   | 81 +++++++++++++++++++++++++++++++++++-
 proton-c/src/tests/test_tools.h | 57 ++++++++++++++++++-------
 2 files changed, 122 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/588e2e40/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index be32b35..ef12a81 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -36,11 +36,15 @@ static const char *localhost = ""; /* host for connect/listen */
 
 typedef pn_event_type_t (*test_handler_fn)(test_t *, pn_event_t*);
 
+#define MAX_EVENT_LOG 2048         /* Max number of event types stored per proactor_test */
+
 /* Proactor and handler that take part in a test */
 typedef struct proactor_test_t {
   test_handler_fn handler;
   test_t *t;
   pn_proactor_t *proactor;
+  pn_event_type_t log[MAX_EVENT_LOG]; /* Log of event types generated by proactor */
+  size_t log_len;                     /* Number of events in the log */
 } proactor_test_t;
 
 
@@ -49,6 +53,7 @@ static void proactor_test_init(proactor_test_t *pts, size_t n, test_t *t) {
   for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
     if (!pt->t) pt->t = t;
     if (!pt->proactor) pt->proactor = pn_proactor();
+    pt->log_len = 0;
     pn_proactor_set_timeout(pt->proactor, timeout);
   }
 }
@@ -63,6 +68,25 @@ static void proactor_test_free(proactor_test_t *pts, size_t n) {
 
 #define PROACTOR_TEST_FREE(A) proactor_test_free(A, sizeof(A)/sizeof(*A))
 
+/* Clear the event logs in an array of proactors */
+static void proactor_test_clear_logs(proactor_test_t *pts, size_t n) {
+  for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
+    pt->log_len = 0;
+  }
+}
+
+#define PROACTOR_TEST_CLEAR_LOGS(A) proactor_test_clear_logs(A, sizeof(A)/sizeof(*A))
+
+#define TEST_LOG_EQUAL(T, A, PT) \
+  TEST_ETYPES_EQUAL((T), (A), sizeof(A)/sizeof(*A), (PT).log, (PT).log_len)
+
+#if 0                           /* FIXME aconway 2017-03-31:  */
+/* Return the last event in the proactor_test's log or PN_EVENT_NONE if it is empty */
+static pn_event_type_t  proactor_test_last_event(proactor_test_t *pt) {
+  return pt->log_len ? pt->log[pt->log_len - 1] : PN_EVENT_NONE;
+}
+#endif
+
 /* Set this to a pn_condition() to save condition data */
 pn_condition_t *last_condition = NULL;
 
@@ -95,6 +119,8 @@ static pn_event_type_t proactor_test_get(proactor_test_t *pts, size_t n) {
         busy = true;
         pn_event_type_t ret = PN_EVENT_NONE;
         for (pn_event_t* e = pn_event_batch_next(eb); e; e = pn_event_batch_next(eb)) {
+          TEST_ASSERT(pt->log_len < MAX_EVENT_LOG);
+          pt->log[pt->log_len++] = pn_event_type(e);
           save_condition(e);
           ret = pt->handler(pt->t, e);
           if (ret) break;
@@ -304,6 +330,57 @@ static void test_connection_wake(test_t *t) {
   pn_connection_free(c);
 }
 
+/* Close the transport to abort a connection, i.e. close the socket abruptly */
+static pn_event_type_t open_abort_handler(test_t *t, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+   case PN_CONNECTION_REMOTE_OPEN:
+    /* Close the transport - abruptly closes the socket */
+    pn_transport_close(pn_connection_transport(pn_event_connection(e)));
+    return PN_EVENT_NONE;
+
+   default:
+    /* Don't auto-close the listener to keep the event sequences simple */
+    return listen_handler(t, e);
+  }
+}
+
+/* Test an aborted connection */
+static void test_abort(test_t *t) {
+  proactor_test_t pts[] ={ { open_close_handler }, { open_abort_handler } };
+  PROACTOR_TEST_INIT(pts, t);
+  pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+  test_port_t port = test_port(localhost);
+  pn_listener_t *l = pn_listener();
+  pn_proactor_listen(server, l, port.host_port, 4);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  sock_close(port.sock);
+
+  /* Run to completion, then examine logs */
+  PROACTOR_TEST_CLEAR_LOGS(pts);
+  pn_proactor_connect(client, pn_connection(), port.host_port);
+  /* server transport closes */
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  if (TEST_CHECK(t, last_condition) && TEST_CHECK(t, pn_condition_is_set(last_condition))) {
+    TEST_STR_EQUAL(t, "amqp:connection:framing-error", pn_condition_get_name(last_condition));
+    TEST_STR_IN(t, "abort", pn_condition_get_description(last_condition));
+  }
+  /* client transport closes */
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); /* client */
+  if (TEST_CHECK(t, last_condition) && TEST_CHECK(t, pn_condition_is_set(last_condition))) {
+    TEST_STR_EQUAL(t, "amqp:connection:framing-error", pn_condition_get_name(last_condition));
+    TEST_STR_IN(t, "abort", pn_condition_get_description(last_condition));
+  }
+
+  /* Make sure there are PN_CONNECTION_CLOSE events in the shutdown sequence */
+  static const pn_event_type_t want_client[] = { PN_CONNECTION_INIT, PN_CONNECTION_LOCAL_OPEN, PN_CONNECTION_BOUND, PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED };
+  TEST_LOG_EQUAL(t, want_client, pts[0]);
+  static const pn_event_type_t want_server[] = { PN_LISTENER_ACCEPT, PN_CONNECTION_INIT, PN_CONNECTION_BOUND, PN_CONNECTION_REMOTE_OPEN, PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_CLOSED };
+  TEST_LOG_EQUAL(t, want_server, pts[1]);
+
+  PROACTOR_TEST_FREE(pts);
+  pn_listener_free(l);
+}
+
 /* Test that INACTIVE event is generated when last connections/listeners closes. */
 static void test_inactive(test_t *t) {
   proactor_test_t pts[] =  { { open_wake_handler }, { listen_handler } };
@@ -540,6 +617,7 @@ static void test_ssl(test_t *t) {
   PROACTOR_TEST_FREE(pts);
 }
 
+
 /* Test pn_proactor_addr funtions */
 
 /* FIXME aconway 2017-03-30: windows will need winsock2.h etc.
@@ -608,7 +686,7 @@ static void test_addr(test_t *t) {
   pn_connection_free(s);
 }
 
-/* Test simple client/server connection with 2 proactors */
+/* Test pn_proactor_disconnect */
 static void test_disconnect(test_t *t) {
   proactor_test_t pts[] ={ { open_wake_handler }, { listen_handler } };
   PROACTOR_TEST_INIT(pts, t);
@@ -693,6 +771,7 @@ int main(int argc, char **argv) {
   RUN_ARGV_TEST(failed, t, test_interrupt_timeout(&t));
   RUN_ARGV_TEST(failed, t, test_errors(&t));
   RUN_ARGV_TEST(failed, t, test_client_server(&t));
+  RUN_ARGV_TEST(failed, t, test_abort(&t));
   RUN_ARGV_TEST(failed, t, test_connection_wake(&t));
   RUN_ARGV_TEST(failed, t, test_ipv4_ipv6(&t));
   RUN_ARGV_TEST(failed, t, test_free_cleanup(&t));

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/588e2e40/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 5404380..4b75cbc 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -41,7 +41,7 @@ typedef struct test_t {
    All output from test marcros goes to stderr so it interleaves with PN_TRACE logs.
 */
 
-static void test_vlogf_(test_t *t, const char *prefix, const char* expr,
+void test_vlogf_(test_t *t, const char *prefix, const char* expr,
                         const char* file, int line, const char *fmt, va_list ap)
 {
   fprintf(stderr, "%s:%d", file, line);
@@ -56,7 +56,7 @@ static void test_vlogf_(test_t *t, const char *prefix, const char* expr,
   fflush(stdout);
 }
 
-static void test_errorf_(test_t *t, const char* expr,
+void test_errorf_(test_t *t, const char* expr,
                          const char* file, int line, const char *fmt, ...) {
   ++t->errors;
   va_list ap;
@@ -65,7 +65,7 @@ static void test_errorf_(test_t *t, const char* expr,
   va_end(ap);
 }
 
-static bool test_check_(test_t *t, bool expr, const char *sexpr,
+bool test_check_(test_t *t, bool expr, const char *sexpr,
                         const char *file, int line, const char* fmt, ...) {
   if (!expr) {
     ++t->errors;
@@ -77,7 +77,7 @@ static bool test_check_(test_t *t, bool expr, const char *sexpr,
   return expr;
 }
 
-static void test_logf_(test_t *t, const char *prefix, const char* expr,
+void test_logf_(test_t *t, const char *prefix, const char* expr,
                        const char* file, int line, const char *fmt, ...) {
   va_list ap;
   va_start(ap, fmt);
@@ -86,7 +86,7 @@ static void test_logf_(test_t *t, const char *prefix, const char* expr,
 }
 
 /* Call via TEST_ASSERT macros */
-static void assert_fail_(const char* expr, const char* file, int line, const char *fmt, ...) {
+void assert_fail_(const char* expr, const char* file, int line, const char *fmt, ...) {
   va_list ap;
   va_start(ap, fmt);
   test_vlogf_(NULL, "assertion failed", expr, file, line, fmt, ap);
@@ -124,10 +124,33 @@ static void assert_fail_(const char* expr, const char* file, int line, const cha
 #define TEST_CHECK(TEST, EXPR) \
   test_check_((TEST), (EXPR), #EXPR, __FILE__, __LINE__, "")
 
-static inline bool test_etype_equal_(test_t *t, int want, int got, const char *file, int line) {
+bool test_etype_equal_(test_t *t, pn_event_type_t want, pn_event_type_t got, const char *file, int line) {
   return test_check_(t, want == got, NULL, file, line, "want %s got %s",
-                     pn_event_type_name((pn_event_type_t)want),
-                     pn_event_type_name((pn_event_type_t)got));
+                     pn_event_type_name(want),
+                     pn_event_type_name(got));
+}
+
+void print_bad_etypes(const char* prefix, const pn_event_type_t* seq, size_t len, size_t bad) {
+  fprintf(stderr, "%s", prefix);
+  for (int i = 0; i < len; ++i) {
+    fprintf(stderr, (i == bad) ? ">>>>%s" : "%s", pn_event_type_name(seq[i]));
+    if (i < len-1) fprintf(stderr, ", ");
+  }
+  if (bad > len) fprintf(stderr, " >>>>");
+  fprintf(stderr, "\n");
+}
+
+bool test_etypes_equal_(test_t *t, const pn_event_type_t* want, size_t want_len, const pn_event_type_t* got, size_t got_len, const char *file, int line) {
+  size_t len = want_len < got_len ? want_len : got_len;
+  for (int i = 0; i < len; ++i) {
+    if (want[i] != got[i]) {
+      test_errorf_(t, NULL, file, line, "event sequences don't match:");
+      print_bad_etypes("  want: ", want, want_len, i);
+      print_bad_etypes("  got:  ", got, got_len, i);
+      return false;
+    }
+  }
+  return want_len == got_len;
 }
 
 #define TEST_STR_EQUAL(TEST, WANT, GOT) \
@@ -140,7 +163,11 @@ static inline bool test_etype_equal_(test_t *t, int want, int got, const char *f
 #define TEST_ETYPE_EQUAL(TEST, WANT, GOT) \
   test_etype_equal_((TEST), (WANT), (GOT), __FILE__, __LINE__)
 
-static inline pn_event_t *test_event_type_(test_t *t, pn_event_type_t want, pn_event_t *got, const char *file, int line) {
+/* Compare arrays of pn_event_type_t */
+#define TEST_ETYPES_EQUAL(TEST, WANT, WLEN, GOT, GLEN)                       \
+  test_etypes_equal_((TEST), (WANT), (WLEN), (GOT), (GLEN), __FILE__, __LINE__)
+
+pn_event_t *test_event_type_(test_t *t, pn_event_type_t want, pn_event_t *got, const char *file, int line) {
   test_check_(t, want == pn_event_type(got), NULL, file, line, "want %s got %s",
               pn_event_type_name(want),
               pn_event_type_name(pn_event_type(got)));
@@ -192,14 +219,14 @@ static inline pn_event_t *test_event_type_(test_t *t, pn_event_type_t want, pn_e
 #include <winsock2.h>
 #include <ws2tcpip.h>
 typedef SOCKET sock_t;
-static inline void sock_close(sock_t sock) { closesocket(sock); }
+void sock_close(sock_t sock) { closesocket(sock); }
 
 #else  /* POSIX */
 
 typedef int sock_t;
 # include <netinet/in.h>
 # include <unistd.h>
-static inline void sock_close(sock_t sock) { close(sock); }
+void sock_close(sock_t sock) { close(sock); }
 #endif
 
 
@@ -208,7 +235,7 @@ static inline void sock_close(sock_t sock) { close(sock); }
    Close the returned fd when the other process is listening.
    Asserts on error.
 */
-static sock_t sock_bind0(void) {
+sock_t sock_bind0(void) {
   int sock =  socket(AF_INET, SOCK_STREAM, 0);
   TEST_ASSERT_ERRNO(sock >= 0, errno);
   int on = 1;
@@ -221,7 +248,7 @@ static sock_t sock_bind0(void) {
   return sock;
 }
 
-static int sock_port(sock_t sock) {
+int sock_port(sock_t sock) {
   struct sockaddr addr = {0};
   socklen_t len = sizeof(addr);
   TEST_ASSERT_ERRNO(getsockname(sock, &addr, &len) == 0, errno);
@@ -243,13 +270,13 @@ typedef struct test_port_t {
 } test_port_t;
 
 /* Modifies tp->host_port to use host, returns the new tp->host_port */
-static const char *test_port_use_host(test_port_t *tp, const char *host) {
+const char *test_port_use_host(test_port_t *tp, const char *host) {
   snprintf(tp->host_port, sizeof(tp->host_port), "%s:%d", host, tp->port);
   return tp->host_port;
 }
 
 /* Create a test_port_t  */
-static inline test_port_t test_port(const char* host) {
+test_port_t test_port(const char* host) {
   test_port_t tp = {0};
   tp.sock = sock_bind0();
   tp.port = sock_port(tp.sock);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-proton git commit: PROTON-1437: pn_proactor_addr_sockaddr returns known struct

Posted by ac...@apache.org.
PROTON-1437: pn_proactor_addr_sockaddr returns known struct

To use the return value of `pn_proactor_addr_sockaddr()` in API calls like
`getnameinfo` you must know its size. Rather than complicate the API to return a
size, return a pointer to the standard `struct sockaddr_storage` which is large
enough to hold any supported address type.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f086571c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f086571c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f086571c

Branch: refs/heads/master
Commit: f086571c9db91bdd25db4693921f0a1a085add39
Parents: ed82ea8
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 30 16:51:05 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Apr 4 09:14:55 2017 -0400

----------------------------------------------------------------------
 proton-c/include/proton/proactor.h |  5 +++--
 proton-c/src/proactor/libuv.c      |  4 ++--
 proton-c/src/tests/proactor.c      | 23 ++++++++++++++++++++++-
 3 files changed, 27 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f086571c/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 2d91661..d03589c 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -248,9 +248,10 @@ PNP_EXTERN pn_proactor_addr_t *pn_proactor_addr_remote(pn_transport_t* c);
 
 /**
  * If the underlying implementation uses `struct sockaddr` (for example POSIX or Windows
- * sockets) return a pointer, otherwise return NULL.
+ * sockets) return a pointer to a `struct sockaddr_storage` containing the address info,
+ * otherwise return NULL.
  */
-PNP_EXTERN struct sockaddr *pn_proactor_addr_sockaddr(pn_proactor_addr_t *addr);
+PNP_EXTERN struct sockaddr_storage *pn_proactor_addr_sockaddr(pn_proactor_addr_t *addr);
 
 /**
  * @}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f086571c/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 2f5e369..45863c3 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -1266,8 +1266,8 @@ int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
   return 0;
 }
 
-struct sockaddr *pn_proactor_addr_sockaddr(pn_proactor_addr_t *addr) {
-  return (struct sockaddr*)addr;
+struct sockaddr_storage *pn_proactor_addr_sockaddr(pn_proactor_addr_t *addr) {
+  return (struct sockaddr_storage*)addr;
 }
 
 struct pn_proactor_addr_t *pn_proactor_addr_local(pn_transport_t *t) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f086571c/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index d22958c..be32b35 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -541,6 +541,14 @@ static void test_ssl(test_t *t) {
 }
 
 /* Test pn_proactor_addr funtions */
+
+/* FIXME aconway 2017-03-30: windows will need winsock2.h etc.
+   These headers are *only* needed for test_addr and only for the getnameinfo part.
+   This is the only non-portable part of the proactor test suite.
+   */
+#include <sys/socket.h>         /* For socket_storage */
+#include <netdb.h>              /* For NI_MAXHOST/NI_MAXSERV */
+
 static void test_addr(test_t *t) {
   /* Make sure NULL addr gives empty string */
   char str[1024] = "not-empty";
@@ -550,7 +558,7 @@ static void test_addr(test_t *t) {
   proactor_test_t pts[] ={ { open_wake_handler }, { listen_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
-  test_port_t port = test_port(localhost);
+  test_port_t port = test_port("127.0.0.1"); /* Use IPv4 to get consistent results all platforms */
   pn_listener_t *l = pn_listener();
   pn_proactor_listen(server, l, port.host_port, 4);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
@@ -575,6 +583,19 @@ static void test_addr(test_t *t) {
   pn_proactor_addr_str(sr, sizeof(sr), pn_proactor_addr_remote(st));
   TEST_STR_EQUAL(t, cl, sr);    /* client local == server remote */
 
+  /* Examine as sockaddr */
+  struct sockaddr_storage* addr = pn_proactor_addr_sockaddr(pn_proactor_addr_remote(ct));
+  TEST_CHECK(t, AF_INET == addr->ss_family);
+  char host[NI_MAXHOST] = "";
+  char serv[NI_MAXSERV] = "";
+  int err = getnameinfo((struct sockaddr*)addr, sizeof(*addr),
+                        host, sizeof(host),
+                        serv, sizeof(serv),
+                        NI_NUMERICHOST | NI_NUMERICSERV);
+  TEST_CHECK(t, 0 == err);
+  TEST_STR_EQUAL(t, "127.0.0.1", host);
+  TEST_STR_EQUAL(t, port.str, serv);
+
   /* Make sure you can use NULL, 0 to get length of address string without a crash */
   size_t len = pn_proactor_addr_str(NULL, 0, pn_proactor_addr_local(ct));
   TEST_CHECK(t, strlen(cl) == len);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org