You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/01/05 16:35:53 UTC

[29/50] [abbrv] qpid-proton git commit: PROTON-1717 [C proactor] Allow initialization of transport and connection before binding

PROTON-1717 [C proactor] Allow initialization of transport and connection before binding

pn_proactor_connect and pn_listener_accept now take a pn_connection_t* and pn_transport_t*
Either can be NULL, in which case a connection/transport is created with no special config.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4859de40
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4859de40
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4859de40

Branch: refs/heads/go1
Commit: 4859de40b478ae5c401a254b9608a79492f70588
Parents: d6e5360
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Dec 14 17:47:33 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Dec 15 09:14:50 2017 -0500

----------------------------------------------------------------------
 examples/c/broker.c                             |  2 +-
 examples/c/direct.c                             |  2 +-
 examples/c/receive.c                            |  2 +-
 examples/c/send-abort.c                         |  2 +-
 examples/c/send-ssl.c                           |  2 +-
 examples/c/send.c                               |  2 +-
 .../cpp/src/proactor_container_impl.cpp         |  4 +-
 proton-c/bindings/cpp/src/reconnect_test.cpp    |  6 +--
 proton-c/include/proton/listener.h              | 11 +++--
 proton-c/include/proton/proactor.h              | 13 +++--
 proton-c/src/proactor/epoll.c                   | 22 +++++----
 proton-c/src/proactor/libuv.c                   | 16 +++----
 proton-c/src/proactor/win_iocp.c                | 22 +++++----
 proton-c/src/tests/proactor.c                   | 50 ++++++++++----------
 14 files changed, 84 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4859de40/examples/c/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/broker.c b/examples/c/broker.c
index ceb9e96..d5a3eef 100644
--- a/examples/c/broker.c
+++ b/examples/c/broker.c
@@ -291,7 +291,7 @@ static void handle(broker_t* b, pn_event_t* e) {
     break;
 
    case PN_LISTENER_ACCEPT:
-    pn_listener_accept(pn_event_listener(e), pn_connection());
+    pn_listener_accept(pn_event_listener(e), NULL, NULL);
     break;
 
    case PN_CONNECTION_INIT:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4859de40/examples/c/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/direct.c b/examples/c/direct.c
index 104a4a1..cf5db68 100644
--- a/examples/c/direct.c
+++ b/examples/c/direct.c
@@ -234,7 +234,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
     break;
 
    case PN_LISTENER_ACCEPT:
-    pn_listener_accept(pn_event_listener(event), pn_connection());
+    pn_listener_accept(pn_event_listener(event), NULL, NULL);
     break;
 
    case PN_CONNECTION_INIT:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4859de40/examples/c/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/receive.c b/examples/c/receive.c
index e77ef27..8b25115 100644
--- a/examples/c/receive.c
+++ b/examples/c/receive.c
@@ -193,7 +193,7 @@ int main(int argc, char **argv) {
   /* Create the proactor and connect */
   app.proactor = pn_proactor();
   pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
-  pn_proactor_connect(app.proactor, pn_connection(), addr);
+  pn_proactor_connect(app.proactor, NULL, NULL, addr);
   run(&app);
   pn_proactor_free(app.proactor);
   return exit_code;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4859de40/examples/c/send-abort.c
----------------------------------------------------------------------
diff --git a/examples/c/send-abort.c b/examples/c/send-abort.c
index 2894ebe..240d459 100644
--- a/examples/c/send-abort.c
+++ b/examples/c/send-abort.c
@@ -218,7 +218,7 @@ int main(int argc, char **argv) {
 
   app.proactor = pn_proactor();
   pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
-  pn_proactor_connect(app.proactor, pn_connection(), addr);
+  pn_proactor_connect(app.proactor, NULL, NULL, addr);
   run(&app);
   pn_proactor_free(app.proactor);
   free(app.message_buffer.start);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4859de40/examples/c/send-ssl.c
----------------------------------------------------------------------
diff --git a/examples/c/send-ssl.c b/examples/c/send-ssl.c
index e307fa4..cc8c6d0 100644
--- a/examples/c/send-ssl.c
+++ b/examples/c/send-ssl.c
@@ -221,7 +221,7 @@ int main(int argc, char **argv) {
 
   app.proactor = pn_proactor();
   pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
-  pn_proactor_connect(app.proactor, pn_connection(), addr);
+  pn_proactor_connect(app.proactor, NULL, NULL, addr);
   run(&app);
 
   pn_ssl_domain_free(app.ssl_domain);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4859de40/examples/c/send.c
----------------------------------------------------------------------
diff --git a/examples/c/send.c b/examples/c/send.c
index 2f48877..d667828 100644
--- a/examples/c/send.c
+++ b/examples/c/send.c
@@ -196,7 +196,7 @@ int main(int argc, char **argv) {
 
   app.proactor = pn_proactor();
   pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
-  pn_proactor_connect(app.proactor, pn_connection(), addr);
+  pn_proactor_connect(app.proactor, NULL, NULL, addr);
   run(&app);
   pn_proactor_free(app.proactor);
   free(app.message_buffer.start);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4859de40/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 491b69b..25efde6 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -209,7 +209,7 @@ pn_connection_t* container::impl::make_connection_lh(
 void container::impl::start_connection(const url& url, pn_connection_t *pnc) {
     char caddr[PN_MAX_ADDR];
     pn_proactor_addr(caddr, sizeof(caddr), url.host().c_str(), url.port().c_str());
-    pn_proactor_connect(proactor_, pnc, caddr); // Takes ownership of pnc
+    pn_proactor_connect(proactor_, pnc, NULL, caddr); // Takes ownership of pnc
 }
 
 void container::impl::reconnect(pn_connection_t* pnc) {
@@ -531,7 +531,7 @@ bool container::impl::handle(pn_event_t* event) {
         cc.listener_context_ = &lc;
         cc.handler = opts.handler();
         cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, c);
-        pn_listener_accept(l, c);
+        pn_listener_accept(l, c, NULL);
         return false;
     }
     case PN_LISTENER_CLOSE: {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4859de40/proton-c/bindings/cpp/src/reconnect_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/reconnect_test.cpp b/proton-c/bindings/cpp/src/reconnect_test.cpp
index b7df213..cc44b77 100644
--- a/proton-c/bindings/cpp/src/reconnect_test.cpp
+++ b/proton-c/bindings/cpp/src/reconnect_test.cpp
@@ -192,10 +192,10 @@ int test_stop_reconnect() {
     return 0;
 }
 
-int main(int, char**) {
+int main(int argc, char** argv) {
     int failed = 0;
-    RUN_TEST(failed, test_failover_simple());
-    RUN_TEST(failed, test_stop_reconnect());
+    RUN_ARGV_TEST(failed, test_failover_simple());
+    RUN_ARGV_TEST(failed, test_stop_reconnect());
     return failed;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4859de40/proton-c/include/proton/listener.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h
index 0994a70..ebe3614 100644
--- a/proton-c/include/proton/listener.h
+++ b/proton-c/include/proton/listener.h
@@ -56,11 +56,16 @@ PNP_EXTERN pn_listener_t *pn_listener(void);
 PNP_EXTERN void pn_listener_free(pn_listener_t *l);
 
 /**
- * Bind @p connection to a new transport accepted from @p listener.
- * Errors are returned as @ref PN_TRANSPORT_CLOSED events by pn_proactor_wait().
+ * Accept an incoming connection request on @p transport bound to @p connection.
+ * Call after a @ref PN_LISTENER_ACCEPT event.
  *
+ * @param[in] listener the listener
+ * @param[in] connection If NULL a new connection is created.
+ * Memory management is the same as for pn_proactor_connect()
+ * @param[in] transport If NULL a new transport is created.
+ * Memory management is the same as for pn_proactor_connect()
  */
-PNP_EXTERN void pn_listener_accept(pn_listener_t*, pn_connection_t *);
+PNP_EXTERN void pn_listener_accept(pn_listener_t *listener, pn_connection_t *connection, pn_transport_t *transport);
 
 /**
  * Get the error condition for a listener.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4859de40/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 52d3891..1d32668 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -96,27 +96,30 @@ PNP_EXTERN pn_proactor_t *pn_proactor(void);
 PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor);
 
 /**
- * Bind @p connection to a new @ref transport connected to @p addr.
+ * Connect @p transport to @p addr and bind to @p connection.
  * Errors are returned as  @ref PN_TRANSPORT_CLOSED events by pn_proactor_wait().
  *
  * @note Thread-safe
  *
  * @param[in] proactor the proactor object
  *
- * @param[in] connection @p proactor *takes ownership* of @p connection and will
+ * @param[in] connection If NULL a new connection is created.
+ * @p proactor *takes ownership* of @p connection and will
  * automatically call pn_connection_free() after the final @ref
  * PN_TRANSPORT_CLOSED event is handled, or when pn_proactor_free() is
  * called. You can prevent the automatic free with
  * pn_proactor_release_connection()
  *
+ * @param[in] transport If NULL a new transport is created.
+ * @p proactor *takes ownership* of @p transport, it will be freed even
+ * if pn_proactor_release_connection() is called.
+ *
  * @param[in] addr the "host:port" network address, constructed by pn_proactor_addr()
  * An empty host will connect to the local host via the default protocol (IPV6 or IPV4).
  * An empty port will connect to the standard AMQP port (5672).
  *
- * @param[in] connection @ref connection to be connected to @p addr.
- *
  */
-PNP_EXTERN void pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection, const char *addr);
+PNP_EXTERN void pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection, pn_transport_t *transport, const char *addr);
 
 /**
  * Start listening for incoming connections.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4859de40/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 4f3890f..ff515b1 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -760,12 +760,18 @@ static const pn_class_t pconnection_class = PN_CLASS(pconnection);
 
 static void pconnection_tick(pconnection_t *pc);
 
-static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, bool server, const char *addr)
+static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, bool server, const char *addr)
 {
+  if (pn_connection_driver_init(&pc->driver, c, t) != 0) {
+    free(pc);
+    return "pn_connection_driver_init failure";
+  }
+
   lock(&p->bind_mutex);
-  pn_record_t *r = pn_connection_attachments(c);
+  pn_record_t *r = pn_connection_attachments(pc->driver.connection);
   if (pn_record_get(r, PN_PROACTOR)) {
     unlock(&p->bind_mutex);
+    pn_connection_driver_destroy(&pc->driver);
     free(pc);
     return "pn_connection_t already in use";
   }
@@ -774,10 +780,6 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con
   pc->bound = true;
   unlock(&p->bind_mutex);
 
-  if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
-    free(pc);
-    return "pn_connection_driver_init failure";
-  }
   pcontext_init(&pc->context, PCONNECTION, p, pc);
   psocket_init(&pc->psocket, p, NULL, addr);
   pc->new_events = 0;
@@ -1292,10 +1294,10 @@ static bool wake_if_inactive(pn_proactor_t *p) {
   return false;
 }
 
-void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
+void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
   pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
   assert(pc); // TODO: memory safety
-  const char *err = pconnection_setup(pc, p, c, false, addr);
+  const char *err = pconnection_setup(pc, p, c, t, false, addr);
   if (err) {    /* TODO aconway 2017-09-13: errors must be reported as events */
     pn_logf("pn_proactor_connect failure: %s", err);
     return;
@@ -1695,10 +1697,10 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
   return l->attachments;
 }
 
-void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
+void pn_listener_accept(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t) {
   pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
   assert(pc); // TODO: memory safety
-  const char *err = pconnection_setup(pc, pn_listener_proactor(l), c, true, "");
+  const char *err = pconnection_setup(pc, pn_listener_proactor(l), c, t, true, "");
   if (err) {
     pn_logf("pn_listener_accept failure: %s", err);
     return;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4859de40/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 750c743..6f13040 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -176,7 +176,7 @@ typedef struct pconnection_t {
   uv_connect_t connect;         /* Outgoing connection only */
   int connected;      /* 0: not connected, <0: connecting after error, 1 = connected ok */
 
-  lsocket_t *lsocket;         /* Incoming connection only */
+  lsocket_t *lsocket;           /* Incoming connection only */
 
   struct pn_netaddr_t local, remote; /* Actual addresses */
   uv_timer_t timer;
@@ -297,9 +297,9 @@ static void parse_addr(addr_t *addr, const char *str) {
   pni_parse_addr(str, addr->addr_buf, sizeof(addr->addr_buf), &addr->host, &addr->port);
 }
 
-static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool server) {
+static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, bool server) {
   pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
-  if (!pc || pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
+  if (!pc || pn_connection_driver_init(&pc->driver, c, t) != 0) {
     return NULL;
   }
   work_init(&pc->work, p,  T_CONNECTION);
@@ -865,7 +865,7 @@ static bool leader_process_pconnection(pconnection_t *pc) {
     uv_mutex_lock(&pc->lock);
     pc->wake = W_CLOSED;        /* wake() is a no-op from now on */
     uv_mutex_unlock(&pc->lock);
-    uv_safe_close((uv_handle_t*)&pc->tcp, on_close_pconnection);
+      uv_safe_close((uv_handle_t*)&pc->tcp, on_close_pconnection);
   } else {
     /* Check for events that can be generated without blocking for IO */
     check_wake(pc);
@@ -1115,8 +1115,8 @@ void pn_proactor_cancel_timeout(pn_proactor_t *p) {
   uv_mutex_unlock(&p->lock);
 }
 
-void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
-  pconnection_t *pc = pconnection(p, c, false);
+void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
+  pconnection_t *pc = pconnection(p, c, t, false);
   assert(pc);                                  /* TODO aconway 2017-03-31: memory safety */
   pn_connection_open(pc->driver.connection);   /* Auto-open */
   parse_addr(&pc->addr, addr);
@@ -1270,9 +1270,9 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
   return l->attachments;
 }
 
-void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
+void pn_listener_accept(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t) {
   uv_mutex_lock(&l->lock);
-  pconnection_t *pc = pconnection(l->work.proactor, c, true);
+  pconnection_t *pc = pconnection(l->work.proactor, c, t, true);
   assert(pc);
   /* Get the socket from the accept event that we are processing */
   pn_event_t *e = pn_collector_prev(l->collector);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4859de40/proton-c/src/proactor/win_iocp.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/win_iocp.c b/proton-c/src/proactor/win_iocp.c
index ca0c5c4..679b045 100644
--- a/proton-c/src/proactor/win_iocp.c
+++ b/proton-c/src/proactor/win_iocp.c
@@ -2078,11 +2078,17 @@ static void pconnection_finalize(void *vp_pconnection) {
 
 static const pn_class_t pconnection_class = PN_CLASS(pconnection);
 
-static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, bool server, const char *addr) {
+static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, bool server, const char *addr)
+{
+  if (pn_connection_driver_init(&pc->driver, c, t) != 0) {
+    free(pc);
+    return "pn_connection_driver_init failure";
+  }
   {
     csguard g(&p->bind_lock);
-    pn_record_t *r = pn_connection_attachments(c);
+    pn_record_t *r = pn_connection_attachments(pc->driver.connection);
     if (pn_record_get(r, PN_PROACTOR)) {
+      pn_connection_driver_destroy(&pc->driver);
       free(pc);
       return "pn_connection_t already in use";
     }
@@ -2092,10 +2098,6 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con
     pc->can_wake = true;
   }
 
-  if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
-    free(pc);
-    return "pn_connection_driver_init failure";
-  }
   pc->completion_queue = new std::queue<iocp_result_t *>();
   pc->work_queue = new std::queue<iocp_result_t *>();
   pcontext_init(&pc->context, PCONNECTION, p, pc);
@@ -2713,10 +2715,10 @@ static void connect_step_done(pconnection_t *pc, connect_result_t *result) {
   }
 }
 
-void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
+void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
   pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
   assert(pc); // TODO: memory safety
-  const char *err = pconnection_setup(pc, p, c, false, addr);
+  const char *err = pconnection_setup(pc, p, c, t, false, addr);
   if (err) {
     pn_logf("pn_proactor_connect failure: %s", err);
     return;
@@ -3154,7 +3156,7 @@ static void recycle_result(accept_result_t *accept_result) {
   }
 }
 
-void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
+void pn_listener_accept(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t) {
   accept_result_t *accept_result = NULL;
   DWORD err = 0;
   psocket_t *ps = NULL;
@@ -3164,7 +3166,7 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
     csguard g(&l->context.cslock);
     pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
     assert(pc);  // TODO: memory safety
-    const char *err_str = pconnection_setup(pc, p, c, true, "");
+    const char *err_str = pconnection_setup(pc, p, c, t, true, "");
     if (err_str) {
       pn_logf("pn_listener_accept failure: %s", err_str);
       return;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4859de40/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index b0efa55..332a73a 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -207,7 +207,7 @@ static pn_event_type_t common_handler(test_handler_t *th, pn_event_t *e) {
 
    case PN_LISTENER_ACCEPT:
     last_accepted = pn_connection();
-    pn_listener_accept(l, last_accepted);
+    pn_listener_accept(l, last_accepted, NULL);
     pn_listener_close(l);       /* Only accept one connection */
     return PN_EVENT_NONE;
 
@@ -257,7 +257,7 @@ static pn_event_type_t listen_handler(test_handler_t *th, pn_event_t *e) {
    case PN_LISTENER_ACCEPT:
     /* No automatic listener close/free for tests that accept multiple connections */
     last_accepted = pn_connection();
-    pn_listener_accept(pn_event_listener(e), last_accepted);
+    pn_listener_accept(pn_event_listener(e), last_accepted, NULL);
     /* No automatic close */
     return PN_EVENT_NONE;
 
@@ -285,7 +285,7 @@ static void test_client_server(test_t *t) {
   test_proactor_t tps[] ={ test_proactor(t, open_close_handler), test_proactor(t, common_handler) };
   test_listener_t l = test_listen(&tps[1], localhost);
   /* Connect and wait for close at both ends */
-  pn_proactor_connect(tps[0].proactor, pn_connection(), l.port.host_port);
+  pn_proactor_connect(tps[0].proactor, NULL, NULL, l.port.host_port);
   TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);
   TEST_PROACTORS_RUN_UNTIL(tps, PN_TRANSPORT_CLOSED);  
   TEST_PROACTORS_DESTROY(tps);
@@ -312,7 +312,7 @@ static void test_connection_wake(test_t *t) {
 
   pn_connection_t *c = pn_connection();
   pn_incref(c);                 /* Keep a reference for wake() after free */
-  pn_proactor_connect(client, c, l.port.host_port);
+  pn_proactor_connect(client, c, NULL, l.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
   TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
   pn_connection_wake(c);
@@ -325,7 +325,7 @@ static void test_connection_wake(test_t *t) {
 
   /* Verify we don't get a wake after close even if they happen together */
   pn_connection_t *c2 = pn_connection();
-  pn_proactor_connect(client, c2, l.port.host_port);
+  pn_proactor_connect(client, c2, NULL, l.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
   pn_connection_wake(c2);
   pn_proactor_disconnect(client, NULL);
@@ -361,7 +361,7 @@ static void test_abort(test_t *t) {
   test_proactor_t tps[] = { test_proactor(t, open_close_handler), test_proactor(t, listen_abort_handler) };
   pn_proactor_t *client = tps[0].proactor;
   test_listener_t l = test_listen(&tps[1], localhost);
-  pn_proactor_connect(client, pn_connection(), l.port.host_port);
+  pn_proactor_connect(client, NULL, NULL, l.port.host_port);
 
   /* server transport closes */
   if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps))) {
@@ -420,7 +420,7 @@ static void test_refuse(test_t *t) {
   test_proactor_t tps[] = { test_proactor(t, open_close_handler), test_proactor(t, listen_refuse_handler) };
   pn_proactor_t *client = tps[0].proactor;
   test_listener_t l = test_listen(&tps[1], localhost);
-  pn_proactor_connect(client, pn_connection(), l.port.host_port);
+  pn_proactor_connect(client, NULL, NULL, l.port.host_port);
 
   /* client transport closes */
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps)); /* client */
@@ -458,7 +458,7 @@ static void test_inactive(test_t *t) {
   /* Listen, connect, disconnect */
   test_listener_t l = test_listen(&tps[1], localhost);
   pn_connection_t *c = pn_connection();
-  pn_proactor_connect(client, c, l.port.host_port);
+  pn_proactor_connect(client, c, NULL, l.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
   pn_connection_wake(c);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps));
@@ -475,7 +475,7 @@ static void test_inactive(test_t *t) {
   /* Connect, set-timer, disconnect */
   pn_proactor_set_timeout(client, 1000000);
   c = pn_connection();
-  pn_proactor_connect(client, c, l.port.host_port);
+  pn_proactor_connect(client, c, NULL, l.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
   pn_connection_wake(c);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, TEST_PROACTORS_RUN(tps));
@@ -503,7 +503,7 @@ static void test_errors(test_t *t) {
 
   /* Invalid connect/listen service name */
   pn_connection_t *c = pn_connection();
-  pn_proactor_connect(client, c, "127.0.0.1:xxx");
+  pn_proactor_connect(client, c, NULL, "127.0.0.1:xxx");
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
   TEST_COND_DESC(t, "xxx", last_condition);
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
@@ -516,7 +516,7 @@ static void test_errors(test_t *t) {
 
   /* Invalid connect/listen host name */
   c = pn_connection();
-  pn_proactor_connect(client, c, "nosuch.example.com:");
+  pn_proactor_connect(client, c, NULL, "nosuch.example.com:");
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
   TEST_COND_DESC(t, "nosuch", last_condition);
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
@@ -544,7 +544,7 @@ static void test_errors(test_t *t) {
 
   /* Connect with no listener */
   c = pn_connection();
-  pn_proactor_connect(client, c, port.host_port);
+  pn_proactor_connect(client, c, NULL, port.host_port);
   if (TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps))) {
     TEST_COND_DESC(t, "refused", last_condition);
     TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, TEST_PROACTORS_RUN(tps));
@@ -573,7 +573,7 @@ static pn_event_type_t transport_close_connection_handler(test_handler_t *th, pn
  */
 static void test_proton_1586(test_t *t) {
   test_proactor_t tps[] =  { test_proactor(t, transport_close_connection_handler) };
-  pn_proactor_connect(tps[0].proactor, pn_connection(), ":yyy");
+  pn_proactor_connect(tps[0].proactor, NULL, NULL, ":yyy");
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
   TEST_COND_DESC(t, ":yyy", last_condition);
   test_handler_clear(&tps[0].handler, 0); /* Clear events */
@@ -598,7 +598,7 @@ static void test_ipv4_ipv6(test_t *t) {
   TEST_PROACTORS_DRAIN(tps);
 
 #define EXPECT_CONNECT(TP, HOST) do {                                   \
-    pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST))); \
+    pn_proactor_connect(client, NULL, NULL, test_port_use_host(&(TP), (HOST))); \
     TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));   \
     TEST_COND_EMPTY(t, last_condition);                                 \
     TEST_PROACTORS_DRAIN(tps);                                           \
@@ -645,12 +645,12 @@ static void test_release_free(test_t *t) {
   test_listener_t l = test_listen(&tps[1], localhost);
 
   /* leave one connection to the proactor  */
-  pn_proactor_connect(client, pn_connection(), l.port.host_port);
+  pn_proactor_connect(client, NULL, NULL, l.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
 
   /* release c1 and free immediately */
   pn_connection_t *c1 = pn_connection();
-  pn_proactor_connect(client, c1, l.port.host_port);
+  pn_proactor_connect(client, c1, NULL, l.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
   pn_proactor_release_connection(c1); /* We free but socket should still be cleaned up */
   pn_connection_free(c1);
@@ -659,7 +659,7 @@ static void test_release_free(test_t *t) {
 
   /* release c2 and but don't free till after proactor free */
   pn_connection_t *c2 = pn_connection();
-  pn_proactor_connect(client, c2, l.port.host_port);
+  pn_proactor_connect(client, c2, NULL, l.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
   pn_proactor_release_connection(c2);
   TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
@@ -750,7 +750,7 @@ static void test_ssl(test_t *t) {
   test_listener_t l = test_listen(server, localhost);
 
   /* Basic SSL connection */
-  pn_proactor_connect(client->proactor, pn_connection(), l.port.host_port);
+  pn_proactor_connect(client->proactor, NULL, NULL, l.port.host_port);
   /* Open ok at both ends */
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
   TEST_COND_EMPTY(t, last_condition);
@@ -764,7 +764,7 @@ static void test_ssl(test_t *t) {
   TEST_INT_EQUAL(t, 0, pn_ssl_domain_set_peer_authentication(cd, PN_SSL_VERIFY_PEER_NAME, NULL));
   pn_connection_t *c = pn_connection();
   pn_connection_set_hostname(c, "test_server");
-  pn_proactor_connect(client->proactor, c, l.port.host_port);
+  pn_proactor_connect(client->proactor, c, NULL, l.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
   TEST_COND_EMPTY(t, last_condition);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
@@ -775,7 +775,7 @@ static void test_ssl(test_t *t) {
   /* Verify peer with bad hostname */
   c = pn_connection();
   pn_connection_set_hostname(c, "wrongname");
-  pn_proactor_connect(client->proactor, c, l.port.host_port);
+  pn_proactor_connect(client->proactor, c, NULL, l.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, TEST_PROACTORS_RUN(tps));
   TEST_COND_NAME(t, "amqp:connection:framing-error",  last_condition);
   TEST_COND_DESC(t, "SSL",  last_condition);
@@ -854,7 +854,7 @@ static void test_netaddr(test_t *t) {
   /* Use IPv4 to get consistent results all platforms */
   test_listener_t l = test_listen(&tps[1], "127.0.0.1");
   pn_connection_t *c = pn_connection();
-  pn_proactor_connect(client, c, l.port.host_port);
+  pn_proactor_connect(client, c, NULL, l.port.host_port);
   if (!TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps))) {
     TEST_COND_EMPTY(t, last_condition); /* Show the last condition */
     return;                     /* don't continue if connection is closed */
@@ -910,10 +910,10 @@ static void test_disconnect(test_t *t) {
 
   /* Only wait for one connection to remote-open before disconnect */
   pn_connection_t *c = pn_connection();
-  pn_proactor_connect(client, c, l.port.host_port);
+  pn_proactor_connect(client, c, NULL, l.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
   pn_connection_t *c2 = pn_connection();
-  pn_proactor_connect(client, c2, l2.port.host_port);
+  pn_proactor_connect(client, c2, NULL, l2.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
   TEST_PROACTORS_DRAIN(tps);
 
@@ -945,7 +945,7 @@ static void test_disconnect(test_t *t) {
 
   /* Make sure the proactors are still functional */
   test_listener_t l3 = test_listen(&tps[1], localhost);
-  pn_proactor_connect(client, pn_connection(), l3.port.host_port);
+  pn_proactor_connect(client, NULL, NULL, l3.port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, TEST_PROACTORS_RUN(tps));
   pn_proactor_disconnect(client, NULL);
 
@@ -1042,7 +1042,7 @@ static void test_message_stream(test_t *t) {
   pn_message_free(m);
 
   pn_connection_t *c = pn_connection();
-  pn_proactor_connect(client, c, l.port.host_port);
+  pn_proactor_connect(client, c, NULL, l.port.host_port);
   pn_session_t *ssn = pn_session(c);
   pn_session_open(ssn);
   pn_link_t *snd = pn_sender(ssn, "x");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org