You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/28 00:47:17 UTC

[1/4] qpid-proton git commit: NO-JIRA: Removed out-of-date FIXME comment

Repository: qpid-proton
Updated Branches:
  refs/heads/master 13e40e9e6 -> b173c3a81


NO-JIRA: Removed out-of-date FIXME comment


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0c6563fe
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0c6563fe
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0c6563fe

Branch: refs/heads/master
Commit: 0c6563fe0d63365ee70590654490636910a9d7b4
Parents: 4417e35
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 27 17:09:21 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 19:45:13 2017 -0500

----------------------------------------------------------------------
 proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c6563fe/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
index 133faad..2576046 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
@@ -30,7 +30,6 @@ import (
 func TestLinkSettings(t *testing.T) {
 	cConn, sConn := net.Pipe()
 	done := make(chan error)
-	// FIXME aconway 2017-02-23: bug in timeout conversion (pn_second_t)
 	settings := TerminusSettings{Durability: 1, Expiry: 2, Timeout: 42 * time.Second, Dynamic: true}
 	go func() { // Server
 		close(done)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/4] qpid-proton git commit: PROTON-1418: pn_vlogf missing newline after log.

Posted by ac...@apache.org.
PROTON-1418: pn_vlogf missing newline after log.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4417e356
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4417e356
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4417e356

Branch: refs/heads/master
Commit: 4417e3566b80cdde992b0a32491892a4c736f2bd
Parents: 0e81224
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 27 16:22:20 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 19:45:13 2017 -0500

----------------------------------------------------------------------
 proton-c/src/core/log.c | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4417e356/proton-c/src/core/log.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/log.c b/proton-c/src/core/log.c
index 60c72e7..db83e7e 100644
--- a/proton-c/src/core/log.c
+++ b/proton-c/src/core/log.c
@@ -50,6 +50,7 @@ void pn_log_logger(pn_logger_t new_logger) {
 
 void pn_vlogf_impl(const char *fmt, va_list ap) {
     vfprintf(stderr, fmt, ap);
+    fprintf(stderr, "\n");
 }
 
 /**@internal


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/4] qpid-proton git commit: NO-JIRA: Fixed bogus cmake test for existence of proactor in examples

Posted by ac...@apache.org.
NO-JIRA: Fixed bogus cmake test for existence of proactor in examples


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0e81224d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0e81224d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0e81224d

Branch: refs/heads/master
Commit: 0e81224d3112c018e11ad3e4bc66eca016ef5656
Parents: 13e40e9
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 27 19:15:53 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 19:45:13 2017 -0500

----------------------------------------------------------------------
 examples/c/proactor/CMakeLists.txt | 5 -----
 proton-c/CMakeLists.txt            | 1 +
 2 files changed, 1 insertion(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e81224d/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
index bb6cb0f..2cb7ad9 100644
--- a/examples/c/proactor/CMakeLists.txt
+++ b/examples/c/proactor/CMakeLists.txt
@@ -21,11 +21,6 @@ find_package(Proton REQUIRED)
 include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
 include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
 
-# Check if the proton library has a proactor implementation.
-include(CheckFunctionExists)
-set(CMAKE_REQUIRED_LIBRARIES ${Proton_LIBRARIES})
-check_function_exists(pn_proactor HAS_PROACTOR)
-
 if(HAS_PROACTOR)
 
 add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e81224d/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index 60e739d..c56c03f 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -578,6 +578,7 @@ endif(MSVC)
 
 if (qpid-proton-proactor)
   set(HAS_PROACTOR 1)
+  set(HAS_PROACTOR 1 PARENT_SCOPE) # Visible to examples
   add_library (
     qpid-proton-proactor SHARED
     ${qpid-proton-proactor}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[4/4] qpid-proton git commit: PROTON-1421: c proactor to take a connection URL string

Posted by ac...@apache.org.
PROTON-1421: c proactor to take a connection URL string

Instead of host, port take a single string in "host:port" or URL format for
pn_proactor_connect and pn_proactor_listen


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b173c3a8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b173c3a8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b173c3a8

Branch: refs/heads/master
Commit: b173c3a811a36c1dd48d492b8cb6f8b9579ac417
Parents: 0c6563f
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 27 19:32:01 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 19:45:19 2017 -0500

----------------------------------------------------------------------
 examples/c/proactor/broker.c       | 121 +++++-------------------
 examples/c/proactor/direct.c       | 156 ++++++++++++-------------------
 examples/c/proactor/receive.c      |  95 +++++++------------
 examples/c/proactor/send.c         | 160 ++++++++++++--------------------
 examples/c/proactor/test.py        |  44 ++++-----
 proton-c/include/proton/proactor.h |  17 ++--
 proton-c/src/proactor/libuv.c      |  30 +++---
 proton-c/src/tests/proactor.c      |  28 +++---
 proton-c/src/tests/test_tools.h    |   8 +-
 9 files changed, 237 insertions(+), 422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index 370d2e8..302bdec 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -19,45 +19,16 @@
 
 #include "thread.h"
 
-#include <proton/connection_driver.h>
-#include <proton/proactor.h>
 #include <proton/engine.h>
 #include <proton/listener.h>
+#include <proton/proactor.h>
 #include <proton/sasl.h>
 #include <proton/transport.h>
-#include <proton/url.h>
-#include "pncompat/misc_funcs.inc"
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 
-bool enable_debug = false;
-
-void debug(const char* fmt, ...) {
-  if (enable_debug) {
-    va_list(ap);
-    va_start(ap, fmt);
-    vfprintf(stderr, fmt, ap);
-    fputc('\n', stderr);
-    fflush(stderr);
-  }
-}
-
-void check(int err, const char* s) {
-  if (err != 0) {
-    perror(s);
-    exit(1);
-  }
-}
-
-void pcheck(int err, const char* s) {
-  if (err != 0) {
-    fprintf(stderr, "%s: %s", s, pn_code(err));
-    exit(1);
-  }
-}
-
 /* Simple re-sizable vector that acts as a queue */
 #define VEC(T) struct { T* data; size_t len, cap; }
 
@@ -98,7 +69,6 @@ typedef struct queue_t {
 } queue_t;
 
 static void queue_init(queue_t *q, const char* name, queue_t *next) {
-  debug("created queue %s", name);
   pthread_mutex_init(&q->lock, NULL);
   q->name = strdup(name);
   VEC_INIT(q->messages);
@@ -126,7 +96,6 @@ static void queue_send(queue_t *q, pn_link_t *s) {
   size_t tag = 0;
   pthread_mutex_lock(&q->lock);
   if (q->messages.len == 0) { /* Empty, record connection as waiting */
-    debug("queue is empty %s", q->name);
     /* Record connection for wake-up if not already on the list. */
     pn_connection_t *c = pn_session_connection(pn_link_session(s));
     size_t i = 0;
@@ -136,7 +105,6 @@ static void queue_send(queue_t *q, pn_link_t *s) {
       VEC_PUSH(q->waiting, c);
     }
   } else {
-    debug("sending from queue %s", q->name);
     m = q->messages.data[0];
     VEC_POP(q->messages);
     tag = ++q->sent;
@@ -169,7 +137,6 @@ bool pn_connection_get_check_queues(pn_connection_t *c) {
    If the queue was previously empty, notify waiting senders.
 */
 static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
-  debug("received to queue %s", q->name);
   pthread_mutex_lock(&q->lock);
   VEC_PUSH(q->messages, m);
   if (q->messages.len == 1) { /* Was empty, notify waiting connections */
@@ -221,22 +188,12 @@ queue_t* queues_get(queues_t *qs, const char* name) {
 /* The broker implementation */
 typedef struct broker_t {
   pn_proactor_t *proactor;
-  queues_t queues;
-  const char *container_id;     /* AMQP container-id */
   size_t threads;
-  pn_millis_t heartbeat;
+  const char *container_id;     /* AMQP container-id */
+  queues_t queues;
   bool finished;
 } broker_t;
 
-void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) {
-  memset(b, 0, sizeof(*b));
-  b->proactor = pn_proactor();
-  queues_init(&b->queues);
-  b->container_id = container_id;
-  b->threads = threads;
-  b->heartbeat = 0;
-}
-
 void broker_stop(broker_t *b) {
   /* In this broker an interrupt stops a thread, stopping all threads stops the broker */
   for (size_t i = 0; i < b->threads; ++i)
@@ -293,10 +250,10 @@ static int exit_code = 0;
 
 static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   if (pn_condition_is_set(cond)) {
-    exit_code = 1;
-    const char *ename = e ? pn_event_type_name(pn_event_type(e)) : "UNKNOWN";
-    fprintf(stderr, "%s: %s: %s\n", ename,
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
             pn_condition_get_name(cond), pn_condition_get_description(cond));
+    pn_connection_close(pn_event_connection(e));
+    exit_code = 1;
   }
 }
 
@@ -307,6 +264,11 @@ static void handle(broker_t* b, pn_event_t* e) {
 
   switch (pn_event_type(e)) {
 
+   case PN_LISTENER_OPEN:
+    printf("listening\n");
+    fflush(stdout);
+    break;
+
    case PN_LISTENER_ACCEPT:
     pn_listener_accept(pn_event_listener(e), pn_connection());
     break;
@@ -320,7 +282,6 @@ static void handle(broker_t* b, pn_event_t* e) {
      pn_transport_t *t = pn_connection_transport(c);
      pn_transport_require_auth(t, false);
      pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
-     pn_transport_set_idle_timeout(t, 2 * b->heartbeat);
    }
    case PN_CONNECTION_REMOTE_OPEN: {
      pn_connection_open(pn_event_connection(e)); /* Complete the open */
@@ -431,62 +392,26 @@ static void* broker_thread(void *void_broker) {
   return NULL;
 }
 
-static void usage(const char *arg0) {
-  fprintf(stderr, "Usage: %s [-d] [-a url] [-t thread-count]\n", arg0);
-  exit(1);
-}
-
 int main(int argc, char **argv) {
-  /* Command line options */
-  char *urlstr = NULL;
-  char container_id[256];
-  /* Note container-id should be unique */
-  snprintf(container_id, sizeof(container_id), "%s", argv[0]);
-  size_t nthreads = 4;
-  pn_millis_t heartbeat = 0;
-  int opt;
-  while ((opt = getopt(argc, argv, "a:t:dh:c:")) != -1) {
-    switch (opt) {
-     case 'a': urlstr = optarg; break;
-     case 't': nthreads = atoi(optarg); break;
-     case 'd': enable_debug = true; break;
-     case 'h': heartbeat = atoi(optarg); break;
-     case 'c': strncpy(container_id, optarg, sizeof(container_id)); break;
-     default: usage(argv[0]); break;
-    }
-  }
-  if (optind < argc)
-    usage(argv[0]);
-
-  broker_t b;
-  broker_init(&b, container_id, nthreads, heartbeat);
-
-  /* Parse the URL or use default values */
-  const char *host = "0.0.0.0";
-  const char *port = "amqp";
-  pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
-  if (url) {
-    if (pn_url_get_host(url)) host = pn_url_get_host(url);
-    if (pn_url_get_port(url)) port = (pn_url_get_port(url));
-  }
+  broker_t b = {0};
+  b.proactor = pn_proactor();
+  queues_init(&b.queues);
+  b.container_id = argv[0];
+  b.threads = 4;
+  const char *addr = (argc > 1) ? argv[1] : "127.0.0.1:amqp";
 
-  pn_proactor_listen(b.proactor, pn_listener(), host, port, 16);
-  printf("listening on '%s:%s' %zd threads\n", host, port, b.threads);
-  fflush(stdout);
+  /* Listen on addr */
+  pn_proactor_listen(b.proactor, pn_listener(), addr, 16);
 
-  if (url) pn_url_free(url);
-  if (b.threads <= 0) {
-    fprintf(stderr, "invalid value -t %zu, threads must be > 0\n", b.threads);
-    exit(1);
-  }
-  /* Start n-1 threads and use main thread */
+  /* Start n-1 threads */
   pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), b.threads);
   for (size_t i = 0; i < b.threads-1; ++i) {
-    check(pthread_create(&threads[i], NULL, broker_thread, &b), "pthread_create");
+    pthread_create(&threads[i], NULL, broker_thread, &b);
   }
   broker_thread(&b);            /* Use the main thread too. */
+  /* Join the other threads */
   for (size_t i = 0; i < b.threads-1; ++i) {
-    check(pthread_join(threads[i], NULL), "pthread_join");
+    pthread_join(threads[i], NULL);
   }
   pn_proactor_free(b.proactor);
   free(threads);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
index 126ae4f..f76895c 100644
--- a/examples/c/proactor/direct.c
+++ b/examples/c/proactor/direct.c
@@ -19,8 +19,8 @@
  *
  */
 
+#include <proton/condition.h>
 #include <proton/connection.h>
-#include <proton/connection_driver.h>
 #include <proton/delivery.h>
 #include <proton/link.h>
 #include <proton/listener.h>
@@ -29,30 +29,24 @@
 #include <proton/sasl.h>
 #include <proton/session.h>
 #include <proton/transport.h>
-#include <proton/url.h>
-#include "pncompat/misc_funcs.inc"
 
 #include <stdio.h>
 #include <stdlib.h>
-#include <string.h>
-
-typedef char str[1024];
 
 typedef struct app_data_t {
-  /* Common values */
+  const char *connection_address;
+  const char *amqp_address;
+  const char *container_id;
+  int message_count;
+
   pn_proactor_t *proactor;
-  bool finished;
-  str address;
-  str container_id;
+  pn_listener_t *listener;
   pn_rwbytes_t message_buffer;
-  int message_count;
 
   /* Sender values */
   int sent;
   int acknowledged;
   pn_link_t *sender;
-  pn_millis_t delay;
-  bool delaying;
 
   /* Receiver values */
   int received;
@@ -64,9 +58,10 @@ static int exit_code = 0;
 
 static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   if (pn_condition_is_set(cond)) {
-    exit_code = 1;
     fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
             pn_condition_get_name(cond), pn_condition_get_description(cond));
+    pn_connection_close(pn_event_connection(e));
+    exit_code = 1;
   }
 }
 
@@ -104,23 +99,6 @@ static pn_bytes_t encode_message(app_data_t* app) {
   return pn_bytes(mbuf.size, mbuf.start);
 }
 
-static void send(app_data_t* app) {
-  while (pn_link_credit(app->sender) > 0 && app->sent < app->message_count) {
-    ++app->sent;
-    // Use sent counter bytes as unique delivery tag.
-    pn_delivery(app->sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
-    pn_bytes_t msgbuf = encode_message(app);
-    pn_link_send(app->sender, msgbuf.start, msgbuf.size);
-    pn_link_advance(app->sender);
-    if (app->delay && app->sent < app->message_count) {
-      /* If delay is set, wait for TIMEOUT event to send more */
-      app->delaying = true;
-      pn_proactor_set_timeout(app->proactor, app->delay);
-      break;
-    }
-  }
-}
-
 #define MAX_SIZE 1024
 
 static void decode_message(pn_delivery_t *dlv) {
@@ -195,16 +173,23 @@ static void handle_send(app_data_t* app, pn_event_t* event) {
 
    case PN_LINK_REMOTE_OPEN: {
      pn_link_t* l = pn_event_link(event);
-     pn_terminus_set_address(pn_link_target(l), app->address);
+     pn_terminus_set_address(pn_link_target(l), app->amqp_address);
      pn_link_open(l);
    } break;
 
-   case PN_LINK_FLOW:
-    /* The peer has given us some credit, now we can send messages */
-    if (!app->delaying) {
-      app->sender = pn_event_link(event);
-      send(app);
-    } break;
+   case PN_LINK_FLOW: {
+     /* The peer has given us some credit, now we can send messages */
+     pn_link_t *sender = pn_event_link(event);
+     while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
+       ++app->sent;
+       // Use sent counter as unique delivery tag.
+       pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+       pn_bytes_t msgbuf = encode_message(app);
+       pn_link_send(sender, msgbuf.start, msgbuf.size);
+       pn_link_advance(sender);
+     }
+     break;
+   }
 
    case PN_DELIVERY: {
      /* We received acknowledgedment from the peer that a message was delivered. */
@@ -213,6 +198,7 @@ static void handle_send(app_data_t* app, pn_event_t* event) {
        if (++app->acknowledged == app->message_count) {
          printf("%d messages sent and acknowledged\n", app->acknowledged);
          pn_connection_close(pn_event_connection(event));
+         /* Continue handling events till we receive TRANSPORT_CLOSED */
        }
      }
    } break;
@@ -222,10 +208,17 @@ static void handle_send(app_data_t* app, pn_event_t* event) {
   }
 }
 
-/* Handle all events, delegate to handle_send or handle_receive depending on link mode */
-static void handle(app_data_t* app, pn_event_t* event) {
+/* Handle all events, delegate to handle_send or handle_receive depending on link mode.
+   Return true to continue, false to exit
+*/
+static bool handle(app_data_t* app, pn_event_t* event) {
   switch (pn_event_type(event)) {
 
+   case PN_LISTENER_OPEN:
+    printf("listening\n");
+    fflush(stdout);
+    break;
+
    case PN_LISTENER_ACCEPT:
     pn_listener_accept(pn_event_listener(event), pn_connection());
     break;
@@ -252,7 +245,7 @@ static void handle(app_data_t* app, pn_event_t* event) {
 
    case PN_TRANSPORT_CLOSED:
     check_condition(event, pn_transport_condition(pn_event_transport(event)));
-    app->finished = true;
+    pn_listener_close(app->listener); /* Finished */
     break;
 
    case PN_CONNECTION_REMOTE_CLOSE:
@@ -276,19 +269,12 @@ static void handle(app_data_t* app, pn_event_t* event) {
     pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
     break;
 
-   case PN_CONNECTION_WAKE:
-    /* Timeout, we can send more. */
-    app->delaying = false;
-    send(app);
+   case PN_LISTENER_CLOSE:
+    check_condition(event, pn_listener_condition(pn_event_listener(event)));
     break;
 
    case PN_PROACTOR_INACTIVE:
-    app->finished = true;
-    break;
-
-   case PN_LISTENER_CLOSE:
-    check_condition(event, pn_listener_condition(pn_event_listener(event)));
-    app->finished = true;
+    return false;
     break;
 
    default: {
@@ -302,60 +288,34 @@ static void handle(app_data_t* app, pn_event_t* event) {
      }
    }
   }
+  return true;
 }
 
-static void usage(const char *arg0) {
-  fprintf(stderr, "Usage: %s [-a URL] [-m message-count] [-d delay-ms]\n", arg0);
-  fprintf(stderr, "Demonstrates direct peer-to-peer AMQP communication without a broker. Accepts a connection from either the send.c or receive.c client and provides the complementary behavior (receive or send.");
-  exit(1);
+void run(app_data_t *app) {
+  /* Loop and handle events */
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+      if (!handle(app, e)) {
+        return;
+      }
+    }
+    pn_proactor_done(app->proactor, events);
+  } while(true);
 }
 
 int main(int argc, char **argv) {
-  /* Default values for application and connection. */
-  app_data_t app = {0};
-  app.message_count = 100;
-  const char* urlstr = NULL;
-
-  int opt;
-  while((opt = getopt(argc, argv, "a:m:d:")) != -1) {
-    switch(opt) {
-     case 'a': urlstr = optarg; break;
-     case 'm': app.message_count = atoi(optarg); break;
-     case 'd': app.delay = atoi(optarg); break;
-     default: usage(argv[0]); break;
-    }
-  }
-  if (optind < argc)
-    usage(argv[0]);
-  /* Note container-id should be unique */
-  snprintf(app.container_id, sizeof(app.container_id), "%s", argv[0]);
-
-  /* Parse the URL or use default values */
-  const char *host = "0.0.0.0";
-  const char *port = "amqp";
-  strncpy(app.address, "example", sizeof(app.address));
-  pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
-  if (url) {
-    if (pn_url_get_host(url)) host = pn_url_get_host(url);
-    if (pn_url_get_port(url)) port = (pn_url_get_port(url));
-    if (pn_url_get_path(url)) strncpy(app.address, pn_url_get_path(url), sizeof(app.address));
-  }
+  struct app_data_t app = {0};
+  app.container_id = argv[0];   /* Should be unique */
+  app.connection_address = (argc > 1) ? argv[1] : "127.0.0.1:amqp";
+  app.amqp_address = (argc > 2) ? argv[2] : "example";
+  app.message_count = (argc > 3) ? atoi(argv[3]) : 10;
 
+  /* Create the proactor and connect */
   app.proactor = pn_proactor();
-  pn_proactor_listen(app.proactor, pn_listener(), host, port, 16);
-  printf("listening on '%s:%s'\n", host, port);
-  fflush(stdout);
-  if (url) pn_url_free(url);
-
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(app.proactor);
-    pn_event_t *e;
-    while ((e = pn_event_batch_next(events))) {
-      handle(&app, e);
-    }
-    pn_proactor_done(app.proactor, events);
-  } while(!app.finished);
-
+  app.listener = pn_listener();
+  pn_proactor_listen(app.proactor, app.listener, app.connection_address, 16);
+  run(&app);
   pn_proactor_free(app.proactor);
   free(app.message_buffer.start);
   return exit_code;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index 65ec069..43a68cd 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -20,29 +20,25 @@
  */
 
 #include <proton/connection.h>
-#include <proton/connection_driver.h>
+#include <proton/condition.h>
 #include <proton/delivery.h>
-#include <proton/proactor.h>
 #include <proton/link.h>
 #include <proton/message.h>
+#include <proton/proactor.h>
 #include <proton/session.h>
 #include <proton/transport.h>
-#include <proton/url.h>
-#include "pncompat/misc_funcs.inc"
 
 #include <stdio.h>
 #include <stdlib.h>
-#include <string.h>
-
-typedef char str[1024];
 
 typedef struct app_data_t {
-  str address;
-  str container_id;
-  pn_rwbytes_t message_buffer;
+  const char *connection_address;
+  const char *amqp_address;
+  const char *container_id;
   int message_count;
-  int received;
+
   pn_proactor_t *proactor;
+  int received;
   bool finished;
 } app_data_t;
 
@@ -52,9 +48,10 @@ static int exit_code = 0;
 
 static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   if (pn_condition_is_set(cond)) {
-    exit_code = 1;
     fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
             pn_condition_get_name(cond), pn_condition_get_description(cond));
+    pn_connection_close(pn_event_connection(e));
+    exit_code = 1;
   }
 }
 
@@ -81,7 +78,8 @@ static void decode_message(pn_delivery_t *dlv) {
   }
 }
 
-static void handle(app_data_t* app, pn_event_t* event) {
+/* Return true to continue, false to exit */
+static bool handle(app_data_t* app, pn_event_t* event) {
   switch (pn_event_type(event)) {
 
    case PN_CONNECTION_INIT: {
@@ -91,7 +89,7 @@ static void handle(app_data_t* app, pn_event_t* event) {
      pn_session_t* s = pn_session(c);
      pn_session_open(s);
      pn_link_t* l = pn_receiver(s, "my_receiver");
-     pn_terminus_set_address(pn_link_source(l), app->address);
+     pn_terminus_set_address(pn_link_source(l), app->amqp_address);
      pn_link_open(l);
      /* cannot receive without granting credit: */
      pn_link_flow(l, app->message_count ? app->message_count : BATCH);
@@ -148,63 +146,38 @@ static void handle(app_data_t* app, pn_event_t* event) {
     break;
 
    case PN_PROACTOR_INACTIVE:
-    app->finished = true;
+    return false;
     break;
 
-   default: break;
+   default:
+    break;
   }
+    return true;
 }
 
-static void usage(const char *arg0) {
-  fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
-  exit(1);
+void run(app_data_t *app) {
+  /* Loop and handle events */
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+      if (!handle(app, e)) {
+        return;
+      }
+    }
+    pn_proactor_done(app->proactor, events);
+  } while(true);
 }
 
 int main(int argc, char **argv) {
-  /* Default values for application and connection. */
-  app_data_t app = {{0}};
-  app.message_count = 100;
-  const char* urlstr = NULL;
-
-  int opt;
-  while((opt = getopt(argc, argv, "a:m:")) != -1) {
-    switch(opt) {
-     case 'a': urlstr = optarg; break;
-     case 'm': app.message_count = atoi(optarg); break;
-     default: usage(argv[0]); break;
-    }
-  }
-  if (optind < argc)
-    usage(argv[0]);
-  /* Note container-id should be unique */
-  snprintf(app.container_id, sizeof(app.container_id), "%s", argv[0]);
-
-  /* Parse the URL or use default values */
-  const char *host = "127.0.0.1";
-  const char *port = "amqp";
-  strncpy(app.address, "example", sizeof(app.address));
-  pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
-  if (url) {
-    if (pn_url_get_host(url)) host = pn_url_get_host(url);
-    if (pn_url_get_port(url)) port = (pn_url_get_port(url));
-    if (pn_url_get_path(url)) strncpy(app.address, pn_url_get_path(url), sizeof(app.address));
-  }
+  struct app_data_t app = {0};
+  app.container_id = argv[0];   /* Should be unique */
+  app.connection_address = (argc > 1) ? argv[1] : "127.0.0.1:amqp";
+  app.amqp_address = (argc > 2) ? argv[2] : "example";
+  app.message_count = (argc > 3) ? atoi(argv[3]) : 10;
 
   /* Create the proactor and connect */
   app.proactor = pn_proactor();
-  pn_proactor_connect(app.proactor, pn_connection(), host, port);
-  if (url) pn_url_free(url);
-
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(app.proactor);
-    pn_event_t *e;
-    while ((e = pn_event_batch_next(events))) {
-      handle(&app, e);
-    }
-    pn_proactor_done(app.proactor, events);
-  } while(!app.finished);
-
+  pn_proactor_connect(app.proactor, pn_connection(), app.connection_address);
+  run(&app);
   pn_proactor_free(app.proactor);
-  free(app.message_buffer.start);
-  return exit_code;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index 0786f19..c21ac68 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -20,43 +20,37 @@
  */
 
 #include <proton/connection.h>
-#include <proton/connection_driver.h>
+#include <proton/condition.h>
 #include <proton/delivery.h>
-#include <proton/proactor.h>
 #include <proton/link.h>
 #include <proton/message.h>
+#include <proton/proactor.h>
 #include <proton/session.h>
 #include <proton/transport.h>
-#include <proton/url.h>
-#include "pncompat/misc_funcs.inc"
 
 #include <stdio.h>
 #include <stdlib.h>
-#include <string.h>
-
-typedef char str[1024];
 
 typedef struct app_data_t {
-  str address;
-  str container_id;
-  pn_rwbytes_t message_buffer;
+  const char *connection_address;
+  const char *amqp_address;
+  const char *container_id;
   int message_count;
+
+  pn_proactor_t *proactor;
+  pn_rwbytes_t message_buffer;
   int sent;
   int acknowledged;
-  pn_proactor_t *proactor;
-  pn_millis_t delay;
-  bool delaying;
-  pn_link_t *sender;
-  bool finished;
 } app_data_t;
 
 static int exit_code = 0;
 
 static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   if (pn_condition_is_set(cond)) {
-    exit_code = 1;
     fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
             pn_condition_get_name(cond), pn_condition_get_description(cond));
+    pn_connection_close(pn_event_connection(e));
+    exit_code = 1;
   }
 }
 
@@ -94,55 +88,35 @@ static pn_bytes_t encode_message(app_data_t* app) {
   return pn_bytes(mbuf.size, mbuf.start);
 }
 
-static void send(app_data_t* app) {
-  while (pn_link_credit(app->sender) > 0 && app->sent < app->message_count) {
-    ++app->sent;
-    // Use sent counter bytes as unique delivery tag.
-    pn_delivery(app->sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
-    pn_bytes_t msgbuf = encode_message(app);
-    pn_link_send(app->sender, msgbuf.start, msgbuf.size);
-    pn_link_advance(app->sender);
-    if (app->delay && app->sent < app->message_count) {
-      /* If delay is set, wait for TIMEOUT event to send more */
-      app->delaying = true;
-      pn_proactor_set_timeout(app->proactor, app->delay);
-      break;
-    }
-  }
-}
-
-static void handle(app_data_t* app, pn_event_t* event) {
+/* Returns true to continue, false if finished */
+static bool handle(app_data_t* app, pn_event_t* event) {
   switch (pn_event_type(event)) {
 
    case PN_CONNECTION_INIT: {
      pn_connection_t* c = pn_event_connection(event);
      pn_connection_set_container(c, app->container_id);
      pn_connection_open(c);
-     pn_session_t* s = pn_session(c);
+     pn_session_t* s = pn_session(pn_event_connection(event));
      pn_session_open(s);
      pn_link_t* l = pn_sender(s, "my_sender");
-     pn_terminus_set_address(pn_link_target(l), app->address);
+     pn_terminus_set_address(pn_link_target(l), app->amqp_address);
      pn_link_open(l);
-   } break;
-
-   case PN_LINK_FLOW:
-    /* The peer has given us some credit, now we can send messages */
-    if (!app->delaying) {
-      app->sender = pn_event_link(event);
-      send(app);
-    }
-    break;
-
-   case PN_PROACTOR_TIMEOUT:
-    /* Wake the sender's connection */
-    pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
-    break;
-
-   case PN_CONNECTION_WAKE:
-    /* Timeout, we can send more. */
-    app->delaying = false;
-    send(app);
-    break;
+     break;
+   }
+
+   case PN_LINK_FLOW: {
+     /* The peer has given us some credit, now we can send messages */
+     pn_link_t *sender = pn_event_link(event);
+     while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
+       ++app->sent;
+       // Use sent counter as unique delivery tag.
+       pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+       pn_bytes_t msgbuf = encode_message(app);
+       pn_link_send(sender, msgbuf.start, msgbuf.size);
+       pn_link_advance(sender);
+     }
+     break;
+   }
 
    case PN_DELIVERY: {
      /* We received acknowledgedment from the peer that a message was delivered. */
@@ -151,9 +125,15 @@ static void handle(app_data_t* app, pn_event_t* event) {
        if (++app->acknowledged == app->message_count) {
          printf("%d messages sent and acknowledged\n", app->acknowledged);
          pn_connection_close(pn_event_connection(event));
+         /* Continue handling events till we receive TRANSPORT_CLOSED */
        }
+     } else {
+       fprintf(stderr, "unexpected delivery state %d\n", (int)pn_delivery_remote_state(d));
+       pn_connection_close(pn_event_connection(event));
+       exit_code=1;
      }
-   } break;
+     break;
+   }
 
    case PN_TRANSPORT_CLOSED:
     check_condition(event, pn_transport_condition(pn_event_transport(event)));
@@ -176,63 +156,37 @@ static void handle(app_data_t* app, pn_event_t* event) {
     break;
 
    case PN_PROACTOR_INACTIVE:
-    app->finished = true;
-    break;
+    return false;
 
    default: break;
   }
+  return true;
 }
 
-static void usage(const char *arg0) {
-  fprintf(stderr, "Usage: %s [-a url] [-m message-count] [-d delay-ms]\n", arg0);
-  exit(1);
+void run(app_data_t *app) {
+  /* Loop and handle events */
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+      if (!handle(app, e)) {
+        return;
+      }
+    }
+    pn_proactor_done(app->proactor, events);
+  } while(true);
 }
 
 int main(int argc, char **argv) {
-  /* Default values for application and connection. */
-  app_data_t app = {{0}};
-  app.message_count = 100;
-  const char* urlstr = NULL;
-
-  int opt;
-  while((opt = getopt(argc, argv, "a:m:d:")) != -1) {
-    switch(opt) {
-     case 'a': urlstr = optarg; break;
-     case 'm': app.message_count = atoi(optarg); break;
-     case 'd': app.delay = atoi(optarg); break;
-     default: usage(argv[0]); break;
-    }
-  }
-  if (optind < argc)
-    usage(argv[0]);
-  /* Note container-id should be unique */
-  snprintf(app.container_id, sizeof(app.container_id), "%s", argv[0]);
-
-  /* Parse the URL or use default values */
-  const char *host = "127.0.0.1";
-  const char *port = "amqp";
-  strncpy(app.address, "example", sizeof(app.address));
-  pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
-  if (url) {
-    if (pn_url_get_host(url)) host = pn_url_get_host(url);
-    if (pn_url_get_port(url)) port = (pn_url_get_port(url));
-    if (pn_url_get_path(url)) strncpy(app.address, pn_url_get_path(url), sizeof(app.address));
-  }
+  struct app_data_t app = {0};
+  app.container_id = argv[0];   /* Should be unique */
+  app.connection_address = (argc > 1) ? argv[1] : "127.0.0.1:amqp";
+  app.amqp_address = (argc > 2) ? argv[2] : "example";
+  app.message_count = (argc > 3) ? atoi(argv[3]) : 10;
 
   /* Create the proactor and connect */
   app.proactor = pn_proactor();
-  pn_proactor_connect(app.proactor, pn_connection(), host, port);
-  if (url) pn_url_free(url);
-
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(app.proactor);
-    pn_event_t *e;
-    while ((e = pn_event_batch_next(events))) {
-      handle(&app, e);
-    }
-    pn_proactor_done(app.proactor, events);
-  } while(!app.finished);
-
+  pn_proactor_connect(app.proactor, pn_connection(), app.connection_address);
+  run(&app);
   pn_proactor_free(app.proactor);
   free(app.message_buffer.start);
   return exit_code;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
index 692e4be..84c519e 100644
--- a/examples/c/proactor/test.py
+++ b/examples/c/proactor/test.py
@@ -36,7 +36,7 @@ class Broker(object):
     def __enter__(self):
         with bind0() as sock:
             self.addr = "127.0.0.1:%s/examples" % (sock.port())
-            self.proc = self.test.proc(["broker", "-a", self.addr])
+            self.proc = self.test.proc(["broker", self.addr])
             self.proc.wait_re("listening")
             return self
 
@@ -52,44 +52,36 @@ class CExampleTest(ExampleTestCase):
     def test_send_receive(self):
         """Send first then receive"""
         with Broker(self) as b:
-            s = self.proc(["send", "-a", b.addr])
-            self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
-            r = self.proc(["receive", "-a", b.addr])
-            self.assertEqual(receive_expect(100), r.wait_out())
+            s = self.proc(["send", b.addr])
+            self.assertEqual("10 messages sent and acknowledged\n", s.wait_out())
+            r = self.proc(["receive", b.addr])
+            self.assertEqual(receive_expect(10), r.wait_out())
 
     def test_receive_send(self):
         """Start receiving  first, then send."""
         with Broker(self) as b:
-            r = self.proc(["receive", "-a", b.addr]);
-            s = self.proc(["send", "-a", b.addr]);
-            self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
-            self.assertEqual(receive_expect(100), r.wait_out())
-
-    def test_timed_send(self):
-        """Send with timed delay"""
-        with Broker(self) as b:
-            s = self.proc(["send", "-a", b.addr, "-d100", "-m3"])
-            self.assertEqual("3 messages sent and acknowledged\n", s.wait_out())
-            r = self.proc(["receive", "-a", b.addr, "-m3"])
-            self.assertEqual(receive_expect(3), r.wait_out())
+            r = self.proc(["receive", b.addr]);
+            s = self.proc(["send", b.addr]);
+            self.assertEqual("10 messages sent and acknowledged\n", s.wait_out())
+            self.assertEqual(receive_expect(10), r.wait_out())
 
     def test_send_direct(self):
         """Send to direct server"""
         with bind0() as sock:
-            addr = "127.0.0.1:%s/examples" % sock.port()
-            d = self.proc(["direct", "-a", addr])
+            addr = "127.0.0.1:%s" % sock.port()
+            d = self.proc(["direct", addr])
             d.wait_re("listening")
-            self.assertEqual("100 messages sent and acknowledged\n", self.proc(["send", "-a", addr]).wait_out())
-            self.assertIn(receive_expect(100), d.wait_out())
+            self.assertEqual("10 messages sent and acknowledged\n", self.proc(["send", addr]).wait_out())
+            self.assertIn(receive_expect(10), d.wait_out())
 
     def test_receive_direct(self):
         """Receive from direct server"""
         with bind0() as sock:
-            addr = "127.0.0.1:%s/examples" % sock.port()
-            d = self.proc(["direct", "-a", addr])
-            d.wait_re("listenin")
-            self.assertEqual(receive_expect(100), self.proc(["receive", "-a", addr]).wait_out())
-            self.assertIn("100 messages sent and acknowledged\n", d.wait_out())
+            addr = "127.0.0.1:%s" % sock.port()
+            d = self.proc(["direct", addr])
+            d.wait_re("listening")
+            self.assertEqual(receive_expect(10), self.proc(["receive", addr]).wait_out())
+            self.assertIn("10 messages sent and acknowledged\n", d.wait_out())
 
 
 if __name__ == "__main__":

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 94c4891..9a8a0ad 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -66,15 +66,17 @@ PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor);
  *
  * @param[in] proactor the proactor object
  * @param[in] connection the proactor takes ownership, do not free
- * @param[in] host address to connect on
+ * @param[in] addr the network address (not AMQP address) to connect to. May
+ * be in the form "host:port" or an "amqp://" or "amqps://" URL. The `/path` part of
+ * the URL is ignored.
  * @param[in] port port to connect to
  *
  * @return error on immediate error, e.g. an allocation failure.
  * Other errors are indicated by connection or transport events via
 PNP_EXTERN  * pn_proactor_wait()
  */
-PNP_EXTERN int pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection,
-                        const char *host, const char *port);
+PNP_EXTERN int pn_proactor_connect(
+  pn_proactor_t *proactor, pn_connection_t *connection, const char *addr);
 
 /**
  * Start listening with listener.  pn_proactor_wait() will return a
@@ -82,16 +84,17 @@ PNP_EXTERN int pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *con
  *
  * @param[in] proactor the proactor object
  * @param[in] listener proactor takes ownership of listener, do not free
- * @param[in] host address to listen on
- * @param[in] port port to listen on
+ * @param[in] addr the network address (not AMQP address) to connect to. May
+ * be in the form "host:port" or an "amqp://" or "amqps://" URL. The `/path` part of
+ * the URL is ignored.
  * @param[in] backlog number of connection requests to queue
  *
  * @return error on immediate error, e.g. an allocation failure.
  * Other errors are indicated by pn_listener_condition() on the
  * PN_LISTENER_CLOSE event.
  */
-PNP_EXTERN int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener,
-                       const char *host, const char *port, int backlog);
+PNP_EXTERN int pn_proactor_listen(
+  pn_proactor_t *proactor, pn_listener_t *listener, const char *addr, int backlog);
 
 /**
  * Wait until there is at least one event to handle.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 2fafbb3..393df27 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -366,8 +366,6 @@ static void psocket_error(psocket_t *ps, int err, const char* what) {
   }
 }
 
-/* FIXME aconway 2017-02-25: split socket/queue */
-
 /* psocket uv-initialization */
 static int leader_init(psocket_t *ps) {
   ps->working = false;
@@ -418,7 +416,7 @@ static void on_connect(uv_connect_t *connect, int err) {
   pconnection_t *pc = (pconnection_t*)connect->data;
   assert(!pc->psocket.working);
   if (err) pconnection_error(pc, err, "on connect to");
-  pconnection_detach(pc);       /* FIXME aconway 2017-02-25: detach AFTER error or vv */
+  pconnection_detach(pc);
 }
 
 /* Incoming connection ready to be accepted */
@@ -438,8 +436,6 @@ static void on_connection(uv_stream_t* server, int err) {
   }
 }
 
-// #error FIXME REVIW UPWARDS FROM HERE ^^^^
-
 /* Common address resolution for leader_listen and leader_connect */
 static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
   int err = leader_init(ps);
@@ -532,7 +528,6 @@ static void on_tick(uv_timer_t *timer) {
   assert(!pc->psocket.working);
   leader_tick(pc);
   pconnection_detach(pc);
-  /* FIXME aconway 2017-02-25: optimize - don't detach if no work. Need to check for finished? */
 }
 
 static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
@@ -768,7 +763,7 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
 
 void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
   uv_mutex_lock(&p->lock);
-  psocket_t *ps = batch_psocket(batch); /* FIXME aconway 2017-02-26: replace with switch? */
+  psocket_t *ps = batch_psocket(batch);
   if (ps) {
     assert(ps->working);
     assert(ps->next == &UNLISTED);
@@ -817,8 +812,13 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
   notify(p);
 }
 
-int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
-  pconnection_t *pc = pconnection(p, c, false, host, port);
+int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
+  pn_url_t *url = pn_url_parse(addr);
+  if (!url) {
+    return PN_OUT_OF_MEMORY;
+  }
+  pconnection_t *pc = pconnection(p, c, false, pn_url_get_host(url), pn_url_get_port(url));
+  pn_url_free(url);
   if (!pc) {
     return PN_OUT_OF_MEMORY;
   }
@@ -826,10 +826,14 @@ int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host,
   return 0;
 }
 
-int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog)
-{
+int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog) {
   assert(!l->closed);
-  psocket_init(&l->psocket, p, false, host, port);
+  pn_url_t *url = pn_url_parse(addr);
+  if (!url) {
+    return PN_OUT_OF_MEMORY;
+  }
+  psocket_init(&l->psocket, p, false, pn_url_get_host(url), pn_url_get_port(url));
+  pn_url_free(url);
   l->backlog = backlog;
   psocket_start(&l->psocket);
   return 0;
@@ -861,7 +865,7 @@ void pn_proactor_free(pn_proactor_t *p) {
   uv_mutex_destroy(&p->lock);
   uv_cond_destroy(&p->cond);
   pn_collector_free(p->collector);
-  /* FIXME aconway 2017-02-25: restore */
+  /* FIXME aconway 2017-02-25: memory leaks, need to clean up the queues */
   /* assert(!p->worker_q.front); */
   /* assert(!p->leader_q.front); */
   free(p);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index a0ddcda..84524b5 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -163,10 +163,10 @@ static void test_client_server(test_t *t) {
   proactor_test_t pts[] ={ { open_close_handler }, { common_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
-  test_port_t port = test_port();
-  pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
+  test_port_t port = test_port(localhost);
+  pn_proactor_listen(server, pn_listener(), port.host_port, 4);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
-  pn_proactor_connect(client, pn_connection(), localhost, port.str);
+  pn_proactor_connect(client, pn_connection(), port.host_port);
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
   sock_close(port.sock);
   PROACTOR_TEST_FREE(pts);
@@ -190,11 +190,11 @@ static void test_connection_wake(test_t *t) {
   proactor_test_t pts[] =  { { open_wake_handler }, { common_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
-  test_port_t port = test_port();          /* Hold a port */
-  pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
+  test_port_t port = test_port(localhost);          /* Hold a port */
+  pn_proactor_listen(server, pn_listener(), port.host_port, 4);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   pn_connection_t *c = pn_connection();
-  pn_proactor_connect(client, c, localhost, port.str);
+  pn_proactor_connect(client, c, port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
   TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
   pn_connection_wake(c);
@@ -209,13 +209,13 @@ static void test_inactive(test_t *t) {
   proactor_test_t pts[] =  { { open_wake_handler }, { common_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
-  test_port_t port = test_port();          /* Hold a port */
+  test_port_t port = test_port(localhost);          /* Hold a port */
 
   pn_listener_t *l = pn_listener();
-  pn_proactor_listen(server, l, localhost, port.str,  4);
+  pn_proactor_listen(server, l, port.host_port,  4);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   pn_connection_t *c = pn_connection();
-  pn_proactor_connect(client, c, localhost, port.str);
+  pn_proactor_connect(client, c, port.host_port);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
   pn_connection_wake(c);
   TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
@@ -246,17 +246,17 @@ static void test_errors(test_t *t) {
   proactor_test_t pts[] =  { { open_wake_handler }, { common_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
-  test_port_t port = test_port();          /* Hold a port */
+  test_port_t port = test_port(localhost);          /* Hold a port */
 
   /* Invalid connect/listen parameters */
   pn_connection_t *c = pn_connection();
-  pn_proactor_connect(client, c, localhost, "xxx");
+  pn_proactor_connect(client, c, "127.0.0.1:xxx");
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
   TEST_CHECK_ERROR(t, "xxx", pn_transport_condition(pn_connection_transport(c)));
   TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   pn_listener_t *l = pn_listener();
-  pn_proactor_listen(server, l, localhost, "xxx", 1);
+  pn_proactor_listen(server, l, "127.0.0.1:xxx", 1);
   TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
   TEST_CHECK_ERROR(t, "xxx", pn_listener_condition(l));
@@ -264,7 +264,7 @@ static void test_errors(test_t *t) {
 
   /* Connect with no listener */
   c = pn_connection();
-  pn_proactor_connect(client, c, localhost, port.str);
+  pn_proactor_connect(client, c, port.host_port);
   TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
   TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))));
   TEST_CHECK_ERROR(t, "connection refused", pn_transport_condition(pn_connection_transport(c)));
@@ -274,6 +274,8 @@ static void test_errors(test_t *t) {
   PROACTOR_TEST_FREE(pts);
 }
 
+/* Tests for use of URLs */
+
 
 int main(int argc, char **argv) {
   int failed = 0;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b173c3a8/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 97dac3f..9fe679c 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -202,16 +202,18 @@ static int sock_port(sock_t sock) {
 /* Combines includes a sock_t with the int and char* versions of the port for convenience */
 typedef struct test_port_t {
   sock_t sock;
-  int port;
-  char str[256];
+  int port;                     /* port as integer */
+  char str[256];                /* port as string */
+  char host_port[256];          /* host:port string */
 } test_port_t;
 
 /* Create a test_port_t  */
-static inline test_port_t test_port(void) {
+static inline test_port_t test_port(const char* host) {
   test_port_t tp = {0};
   tp.sock = sock_bind0();
   tp.port = sock_port(tp.sock);
   snprintf(tp.str, sizeof(tp.str), "%d", tp.port);
+  snprintf(tp.host_port, sizeof(tp.host_port), "%s:%d", host, tp.port);
   return tp;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org