You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/01/10 15:59:09 UTC

[48/55] [partial] qpid-proton-j git commit: PROTON-1385: retain proton-j content only, the rest remains in the other repo at: https://git-wip-us.apache.org/repos/asf/qpid-proton.git

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
deleted file mode 100644
index f701651..0000000
--- a/examples/c/proactor/CMakeLists.txt
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(Proton REQUIRED)
-
-include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
-
-add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
-
-find_package(Libuv)
-if (Libuv_FOUND)
-  foreach(name broker send receive)
-    add_executable(libuv_${name} ${name}.c libuv_proactor.c)
-    target_link_libraries(libuv_${name} ${Proton_LIBRARIES} ${Libuv_LIBRARIES})
-    set_target_properties(libuv_${name} PROPERTIES
-      COMPILE_DEFINITIONS  "PN_PROACTOR_INCLUDE=\"libuv_proactor.h\"")
-  endforeach()
-
-  # Add a test with the correct environment to find test executables and valgrind.
-  if(WIN32)
-    set(test_path "$<TARGET_FILE_DIR:libuv_broker>;$<TARGET_FILE_DIR:qpid-proton>")
-  else(WIN32)
-    set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
-  endif(WIN32)
-  set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
-  add_test(c-proactor-libuv ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py)
-endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/README.dox
----------------------------------------------------------------------
diff --git a/examples/c/proactor/README.dox b/examples/c/proactor/README.dox
deleted file mode 100644
index 4b09cb7..0000000
--- a/examples/c/proactor/README.dox
+++ /dev/null
@@ -1,17 +0,0 @@
-/**
- * @example send.c
- *
- * Send a fixed number of messages to the "example" node.
- *
- * @example receive.c
- *
- * Subscribes to the 'example' node and prints the message bodies
- * received.
- *
- * @example broker.c
- *
- * A simple multithreaded broker that works with the send and receive
- * examples.
- *
- * __Requires C++11__
- */

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
deleted file mode 100644
index ca52336..0000000
--- a/examples/c/proactor/broker.c
+++ /dev/null
@@ -1,488 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <proton/connection_driver.h>
-#include <proton/proactor.h>
-#include <proton/engine.h>
-#include <proton/sasl.h>
-#include <proton/transport.h>
-#include <proton/url.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-/* TODO aconway 2016-10-14: this example does not require libuv IO,
-   it uses uv.h only for portable mutex and thread functions.
-*/
-#include <uv.h>
-
-bool enable_debug = false;
-
-void debug(const char* fmt, ...) {
-  if (enable_debug) {
-    va_list(ap);
-    va_start(ap, fmt);
-    vfprintf(stderr, fmt, ap);
-    fputc('\n', stderr);
-    fflush(stderr);
-  }
-}
-
-void check(int err, const char* s) {
-  if (err != 0) {
-    perror(s);
-    exit(1);
-  }
-}
-
-void pcheck(int err, const char* s) {
-  if (err != 0) {
-    fprintf(stderr, "%s: %s", s, pn_code(err));
-    exit(1);
-  }
-}
-
-/* Simple re-sizable vector that acts as a queue */
-#define VEC(T) struct { T* data; size_t len, cap; }
-
-#define VEC_INIT(V)                             \
-  do {                                          \
-    V.len = 0;                                  \
-    V.cap = 16;                                 \
-    void **vp = (void**)&V.data;                \
-    *vp = malloc(V.cap * sizeof(*V.data));      \
-  } while(0)
-
-#define VEC_FINAL(V) free(V.data)
-
-#define VEC_PUSH(V, X)                                  \
-  do {                                                  \
-    if (V.len == V.cap) {                               \
-      V.cap *= 2;                                       \
-      void **vp = (void**)&V.data;                      \
-      *vp = realloc(V.data, V.cap * sizeof(*V.data));   \
-    }                                                   \
-    V.data[V.len++] = X;                                \
-  } while(0)                                            \
-
-#define VEC_POP(V)                                              \
-  do {                                                          \
-    if (V.len > 0)                                              \
-      memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data));     \
-  } while(0)
-
-/* Simple thread-safe queue implementation */
-typedef struct queue_t {
-  uv_mutex_t lock;
-  char* name;
-  VEC(pn_rwbytes_t) messages;   /* Messages on the queue_t */
-  VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
-  struct queue_t *next;            /* Next queue in chain */
-  size_t sent;                     /* Count of messages sent, used as delivery tag */
-} queue_t;
-
-static void queue_init(queue_t *q, const char* name, queue_t *next) {
-  debug("created queue %s", name);
-  uv_mutex_init(&q->lock);
-  q->name = strdup(name);
-  VEC_INIT(q->messages);
-  VEC_INIT(q->waiting);
-  q->next = next;
-  q->sent = 0;
-}
-
-static void queue_destroy(queue_t *q) {
-  uv_mutex_destroy(&q->lock);
-  free(q->name);
-  for (size_t i = 0; i < q->messages.len; ++i)
-    free(q->messages.data[i].start);
-  VEC_FINAL(q->messages);
-  for (size_t i = 0; i < q->waiting.len; ++i)
-    pn_decref(q->waiting.data[i]);
-  VEC_FINAL(q->waiting);
-}
-
-/* Send a message on s, or record s as eating if no messages.
-   Called in s dispatch loop, assumes s has credit.
-*/
-static void queue_send(queue_t *q, pn_link_t *s) {
-  pn_rwbytes_t m = { 0 };
-  size_t tag = 0;
-  uv_mutex_lock(&q->lock);
-  if (q->messages.len == 0) { /* Empty, record connection as waiting */
-    debug("queue is empty %s", q->name);
-    /* Record connection for wake-up if not already on the list. */
-    pn_connection_t *c = pn_session_connection(pn_link_session(s));
-    size_t i = 0;
-    for (; i < q->waiting.len && q->waiting.data[i] != c; ++i)
-      ;
-    if (i == q->waiting.len) {
-      VEC_PUSH(q->waiting, c);
-    }
-  } else {
-    debug("sending from queue %s", q->name);
-    m = q->messages.data[0];
-    VEC_POP(q->messages);
-    tag = ++q->sent;
-  }
-  uv_mutex_unlock(&q->lock);
-  if (m.start) {
-    pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag)));
-    pn_link_send(s, m.start, m.size);
-    pn_link_advance(s);
-    pn_delivery_settle(d);  /* Pre-settled: unreliable, there will bea no ack/ */
-    free(m.start);
-  }
-}
-
-/* Data associated with each broker connection */
-typedef struct broker_data_t {
-  bool check_queues;          /* Check senders on the connection for available data in queues. */
-} broker_data_t;
-
-/* Use the context pointer as a boolean flag to indicate we need to check queues */
-void pn_connection_set_check_queues(pn_connection_t *c, bool check) {
-  pn_connection_set_context(c, (void*)check);
-}
-
-bool pn_connection_get_check_queues(pn_connection_t *c) {
-  return (bool)pn_connection_get_context(c);
-}
-
-/* Put a message on the queue, called in receiver dispatch loop.
-   If the queue was previously empty, notify waiting senders.
-*/
-static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
-  debug("received to queue %s", q->name);
-  uv_mutex_lock(&q->lock);
-  VEC_PUSH(q->messages, m);
-  if (q->messages.len == 1) { /* Was empty, notify waiting connections */
-    for (size_t i = 0; i < q->waiting.len; ++i) {
-      pn_connection_t *c = q->waiting.data[i];
-      pn_connection_set_check_queues(c, true);
-      pn_connection_wake(c); /* Wake the connection */
-    }
-    q->waiting.len = 0;
-  }
-  uv_mutex_unlock(&q->lock);
-}
-
-/* Thread safe set of queues */
-typedef struct queues_t {
-  uv_mutex_t lock;
-  queue_t *queues;
-  size_t sent;
-} queues_t;
-
-void queues_init(queues_t *qs) {
-  uv_mutex_init(&qs->lock);
-  qs->queues = NULL;
-}
-
-void queues_destroy(queues_t *qs) {
-  for (queue_t *q = qs->queues; q; q = q->next) {
-    queue_destroy(q);
-    free(q);
-  }
-  uv_mutex_destroy(&qs->lock);
-}
-
-/** Get or create the named queue. */
-queue_t* queues_get(queues_t *qs, const char* name) {
-  uv_mutex_lock(&qs->lock);
-  queue_t *q;
-  for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
-    ;
-  if (!q) {
-    q = (queue_t*)malloc(sizeof(queue_t));
-    queue_init(q, name, qs->queues);
-    qs->queues = q;
-  }
-  uv_mutex_unlock(&qs->lock);
-  return q;
-}
-
-/* The broker implementation */
-typedef struct broker_t {
-  pn_proactor_t *proactor;
-  queues_t queues;
-  const char *container_id;     /* AMQP container-id */
-  size_t threads;
-  pn_millis_t heartbeat;
-  bool finished;
-} broker_t;
-
-void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) {
-  memset(b, 0, sizeof(*b));
-  b->proactor = pn_proactor();
-  queues_init(&b->queues);
-  b->container_id = container_id;
-  b->threads = threads;
-  b->heartbeat = 0;
-}
-
-void broker_stop(broker_t *b) {
-  /* In this broker an interrupt stops a thread, stopping all threads stops the broker */
-  for (size_t i = 0; i < b->threads; ++i)
-    pn_proactor_interrupt(b->proactor);
-}
-
-/* Try to send if link is sender and has credit */
-static void link_send(broker_t *b, pn_link_t *s) {
-  if (pn_link_is_sender(s) && pn_link_credit(s) > 0) {
-    const char *qname = pn_terminus_get_address(pn_link_source(s));
-    queue_t *q = queues_get(&b->queues, qname);
-    queue_send(q, s);
-  }
-}
-
-static void queue_unsub(queue_t *q, pn_connection_t *c) {
-  uv_mutex_lock(&q->lock);
-  for (size_t i = 0; i < q->waiting.len; ++i) {
-    if (q->waiting.data[i] == c){
-      q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */
-      VEC_POP(q->waiting);
-      break;
-    }
-  }
-  uv_mutex_unlock(&q->lock);
-}
-
-/* Unsubscribe from the queue of interest to this link. */
-static void link_unsub(broker_t *b, pn_link_t *s) {
-  if (pn_link_is_sender(s)) {
-    const char *qname = pn_terminus_get_address(pn_link_source(s));
-    if (qname) {
-      queue_t *q = queues_get(&b->queues, qname);
-      queue_unsub(q, pn_session_connection(pn_link_session(s)));
-    }
-  }
-}
-
-/* Called in connection's event loop when a connection is woken for messages.*/
-static void connection_unsub(broker_t *b, pn_connection_t *c) {
-  for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0))
-    link_unsub(b, l);
-}
-
-static void session_unsub(broker_t *b, pn_session_t *ssn) {
-  pn_connection_t *c = pn_session_connection(ssn);
-  for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) {
-    if (pn_link_session(l) == ssn)
-      link_unsub(b, l);
-  }
-}
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
-  if (pn_condition_is_set(cond)) {
-    const char *ename = e ? pn_event_type_name(pn_event_type(e)) : "UNKNOWN";
-    fprintf(stderr, "%s: %s: %s\n", ename,
-            pn_condition_get_name(cond), pn_condition_get_description(cond));
-  }
-}
-
-const int WINDOW=10;            /* Incoming credit window */
-
-static void handle(broker_t* b, pn_event_t* e) {
-  pn_connection_t *c = pn_event_connection(e);
-
-  switch (pn_event_type(e)) {
-
-   case PN_LISTENER_ACCEPT:
-    pn_listener_accept(pn_event_listener(e), pn_connection());
-    break;
-
-   case PN_CONNECTION_INIT: 
-     pn_connection_set_container(c, b->container_id);
-     break;
-
-   case PN_CONNECTION_BOUND: {
-     /* Turn off security */
-     pn_transport_t *t = pn_connection_transport(c);
-     pn_transport_require_auth(t, false);
-     pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
-     pn_transport_set_idle_timeout(t, 2 * b->heartbeat);
-   }
-   case PN_CONNECTION_REMOTE_OPEN: {
-     pn_connection_open(pn_event_connection(e)); /* Complete the open */
-     break;
-   }
-   case PN_CONNECTION_WAKE: {
-     if (pn_connection_get_check_queues(c)) {
-       pn_connection_set_check_queues(c, false);
-       int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
-       for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
-         link_send(b, l);
-     }
-     break;
-   }
-   case PN_SESSION_REMOTE_OPEN: {
-     pn_session_open(pn_event_session(e));
-     break;
-   }
-   case PN_LINK_REMOTE_OPEN: {
-     pn_link_t *l = pn_event_link(e);
-     if (pn_link_is_sender(l)) {
-       const char *source = pn_terminus_get_address(pn_link_remote_source(l));
-       pn_terminus_set_address(pn_link_source(l), source);
-     } else {
-       const char* target = pn_terminus_get_address(pn_link_remote_target(l));
-       pn_terminus_set_address(pn_link_target(l), target);
-       pn_link_flow(l, WINDOW);
-     }
-     pn_link_open(l);
-     break;
-   }
-   case PN_LINK_FLOW: {
-     link_send(b, pn_event_link(e));
-     break;
-   }
-   case PN_DELIVERY: {
-     pn_delivery_t *d = pn_event_delivery(e);
-     pn_link_t *r = pn_delivery_link(d);
-     if (pn_link_is_receiver(r) &&
-         pn_delivery_readable(d) && !pn_delivery_partial(d))
-     {
-       size_t size = pn_delivery_pending(d);
-       /* The broker does not decode the message, just forwards it. */
-       pn_rwbytes_t m = { size, (char*)malloc(size) };
-       pn_link_recv(r, m.start, m.size);
-       const char *qname = pn_terminus_get_address(pn_link_target(r));
-       queue_receive(b->proactor, queues_get(&b->queues, qname), m);
-       pn_delivery_update(d, PN_ACCEPTED);
-       pn_delivery_settle(d);
-       pn_link_flow(r, WINDOW - pn_link_credit(r));
-     }
-     break;
-   }
-
-   case PN_TRANSPORT_CLOSED:
-    connection_unsub(b, pn_event_connection(e));
-    check_condition(e, pn_transport_condition(pn_event_transport(e)));
-    break;
-
-   case PN_CONNECTION_REMOTE_CLOSE:
-    check_condition(e, pn_connection_remote_condition(pn_event_connection(e)));
-    connection_unsub(b, pn_event_connection(e));
-    pn_connection_close(pn_event_connection(e));
-    break;
-
-   case PN_SESSION_REMOTE_CLOSE:
-    check_condition(e, pn_session_remote_condition(pn_event_session(e)));
-    session_unsub(b, pn_event_session(e));
-    pn_session_close(pn_event_session(e));
-    pn_session_free(pn_event_session(e));
-    break;
-
-   case PN_LINK_REMOTE_CLOSE:
-    check_condition(e, pn_link_remote_condition(pn_event_link(e)));
-    link_unsub(b, pn_event_link(e));
-    pn_link_close(pn_event_link(e));
-    pn_link_free(pn_event_link(e));
-    break;
-
-   case PN_LISTENER_CLOSE:
-    check_condition(e, pn_listener_condition(pn_event_listener(e)));
-    break;
-
-   case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
-    broker_stop(b);
-    break;
-
-   case PN_PROACTOR_INTERRUPT:
-    b->finished = true;
-    break;
-
-   default:
-    break;
-  }
-}
-
-static void broker_thread(void *void_broker) {
-  broker_t *b = (broker_t*)void_broker;
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(b->proactor);
-    pn_event_t *e;
-    while ((e = pn_event_batch_next(events))) {
-      handle(b, e);
-    }
-    pn_proactor_done(b->proactor, events);
-  } while(!b->finished);
-}
-
-static void usage(const char *arg0) {
-  fprintf(stderr, "Usage: %s [-d] [-a url] [-t thread-count]\n", arg0);
-  exit(1);
-}
-
-int main(int argc, char **argv) {
-  /* Command line options */
-  char *urlstr = NULL;
-  char container_id[256];
-  /* Default container-id is program:pid */
-  snprintf(container_id, sizeof(container_id), "%s:%d", argv[0], getpid());
-  size_t nthreads = 4;
-  pn_millis_t heartbeat = 0;
-  int opt;
-  while ((opt = getopt(argc, argv, "a:t:dh:c:")) != -1) {
-    switch (opt) {
-     case 'a': urlstr = optarg; break;
-     case 't': nthreads = atoi(optarg); break;
-     case 'd': enable_debug = true; break;
-     case 'h': heartbeat = atoi(optarg); break;
-     case 'c': strncpy(container_id, optarg, sizeof(container_id)); break;
-     default: usage(argv[0]); break;
-    }
-  }
-  if (optind < argc)
-    usage(argv[0]);
-
-  broker_t b;
-  broker_init(&b, container_id, nthreads, heartbeat);
-
-  /* Parse the URL or use default values */
-  pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
-  /* Listen on IPv6 wildcard. On systems that do not set IPV6ONLY by default,
-     this will also listen for mapped IPv4 on the same port.
-  */
-  const char *host = url ? pn_url_get_host(url) : "::";
-  const char *port = url ? pn_url_get_port(url) : "amqp";
-  pn_proactor_listen(b.proactor, pn_listener(), host, port, 16);
-  printf("listening on '%s:%s' %zd threads\n", host, port, b.threads);
-
-  if (url) pn_url_free(url);
-  if (b.threads <= 0) {
-    fprintf(stderr, "invalid value -t %zu, threads must be > 0\n", b.threads);
-    exit(1);
-  }
-  /* Start n-1 threads and use main thread */
-  uv_thread_t* threads = (uv_thread_t*)calloc(sizeof(uv_thread_t), b.threads);
-  for (size_t i = 0; i < b.threads-1; ++i) {
-    check(uv_thread_create(&threads[i], broker_thread, &b), "pthread_create");
-  }
-  broker_thread(&b);            /* Use the main thread too. */
-  for (size_t i = 0; i < b.threads-1; ++i) {
-    check(uv_thread_join(&threads[i]), "pthread_join");
-  }
-  pn_proactor_free(b.proactor);
-  free(threads);
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/libuv_proactor.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c
deleted file mode 100644
index 42bbfab..0000000
--- a/examples/c/proactor/libuv_proactor.c
+++ /dev/null
@@ -1,873 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <uv.h>
-
-#include <proton/condition.h>
-#include <proton/connection_driver.h>
-#include <proton/engine.h>
-#include <proton/message.h>
-#include <proton/object.h>
-#include <proton/proactor.h>
-#include <proton/transport.h>
-#include <proton/url.h>
-
-#include <assert.h>
-#include <stddef.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-/*
-  libuv loop functions are thread unsafe. The only exception is uv_async_send()
-  which is a thread safe "wakeup" that can wake the uv_loop from another thread.
-
-  To provide concurrency the proactor uses a "leader-worker-follower" model,
-  threads take turns at the roles:
-
-  - a single "leader" calls libuv functions and runs the uv_loop in short bursts
-    to generate work. When there is work available it gives up leadership and
-    becomes a "worker"
-
-  - "workers" handle events concurrently for distinct connections/listeners
-    They do as much work as they can get, when none is left they become "followers"
-
-  - "followers" wait for the leader to generate work and become workers.
-    When the leader itself becomes a worker, one of the followers takes over.
-
-  This model is symmetric: any thread can take on any role based on run-time
-  requirements. It also allows the IO and non-IO work associated with an IO
-  wake-up to be processed in a single thread with no context switches.
-
-  Function naming:
-  - on_ - called in leader thread via uv_run().
-  - leader_ - called in leader thread, while processing the leader_q.
-  - owner_ - called in owning thread, leader or worker but not concurrently.
-
-  Note on_ and leader_ functions can call each other, the prefix indicates the
-  path they are most often called on.
-*/
-
-const char *COND_NAME = "proactor";
-const char *AMQP_PORT = "5672";
-const char *AMQP_PORT_NAME = "amqp";
-const char *AMQPS_PORT = "5671";
-const char *AMQPS_PORT_NAME = "amqps";
-
-PN_HANDLE(PN_PROACTOR)
-
-/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
-   Class definitions are for identification as pn_event_t context only.
-*/
-PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
-PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
-
-/* common to connection and listener */
-typedef struct psocket_t {
-  /* Immutable */
-  pn_proactor_t *proactor;
-
-  /* Protected by proactor.lock */
-  struct psocket_t* next;
-  void (*wakeup)(struct psocket_t*); /* interrupting action for leader */
-
-  /* Only used by leader */
-  uv_tcp_t tcp;
-  void (*action)(struct psocket_t*); /* deferred action for leader */
-  bool is_conn:1;
-  char host[NI_MAXHOST];
-  char port[NI_MAXSERV];
-} psocket_t;
-
-/* Special value for psocket.next pointer when socket is not on any any list. */
-psocket_t UNLISTED;
-
-static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host, const char *port) {
-  ps->proactor = p;
-  ps->next = &UNLISTED;
-  ps->is_conn = is_conn;
-  ps->tcp.data = ps;
-
-  /* For platforms that don't know about "amqp" and "amqps" service names. */
-  if (strcmp(port, AMQP_PORT_NAME) == 0)
-    port = AMQP_PORT;
-  else if (strcmp(port, AMQPS_PORT_NAME) == 0)
-    port = AMQPS_PORT;
-  /* Set to "\001" to indicate a NULL as opposed to an empty string "" */
-  strncpy(ps->host, host ? host : "\001", sizeof(ps->host));
-  strncpy(ps->port, port ? port : "\001", sizeof(ps->port));
-}
-
-/* Turn "\001" back to NULL */
-static inline const char* fixstr(const char* str) {
-  return str[0] == '\001' ? NULL : str;
-}
-
-typedef struct pconnection_t {
-  psocket_t psocket;
-
-  /* Only used by owner thread */
-  pn_connection_driver_t driver;
-
-  /* Only used by leader */
-  uv_connect_t connect;
-  uv_timer_t timer;
-  uv_write_t write;
-  uv_shutdown_t shutdown;
-  size_t writing;
-  bool reading:1;
-  bool server:1;                /* accept, not connect */
-} pconnection_t;
-
-struct pn_listener_t {
-  psocket_t psocket;
-
-  /* Only used by owner thread */
-  pconnection_t *accepting;     /* accept in progress */
-  pn_condition_t *condition;
-  pn_collector_t *collector;
-  pn_event_batch_t batch;
-  pn_record_t *attachments;
-  void *context;
-  size_t backlog;
-};
-
-
-typedef struct queue { psocket_t *front, *back; } queue;
-
-struct pn_proactor_t {
-  /* Leader thread  */
-  uv_cond_t cond;
-  uv_loop_t loop;
-  uv_async_t async;
-  uv_timer_t timer;
-
-  /* Owner thread: proactor collector and batch can belong to leader or a worker */
-  pn_collector_t *collector;
-  pn_event_batch_t batch;
-
-  /* Protected by lock */
-  uv_mutex_t lock;
-  queue start_q;
-  queue worker_q;
-  queue leader_q;
-  size_t interrupt;             /* pending interrupts */
-  pn_millis_t timeout;
-  size_t count;                 /* psocket count */
-  bool inactive:1;
-  bool timeout_request:1;
-  bool timeout_elapsed:1;
-  bool has_leader:1;
-  bool batch_working:1;          /* batch belongs to a worker.  */
-};
-
-static bool push_lh(queue *q, psocket_t *ps) {
-  if (ps->next != &UNLISTED)  /* Don't move if already listed. */
-    return false;
-  ps->next = NULL;
-  if (!q->front) {
-    q->front = q->back = ps;
-  } else {
-    q->back->next = ps;
-    q->back =  ps;
-  }
-  return true;
-}
-
-static psocket_t* pop_lh(queue *q) {
-  psocket_t *ps = q->front;
-  if (ps) {
-    q->front = ps->next;
-    ps->next = &UNLISTED;
-  }
-  return ps;
-}
-
-static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
-  return ps->is_conn ? (pconnection_t*)ps : NULL;
-}
-
-static inline pn_listener_t *as_listener(psocket_t* ps) {
-  return ps->is_conn ? NULL: (pn_listener_t*)ps;
-}
-
-/* Put ps on the leader queue for processing. Thread safe. */
-static void to_leader_lh(psocket_t *ps) {
-  push_lh(&ps->proactor->leader_q, ps);
-  uv_async_send(&ps->proactor->async); /* Wake leader */
-}
-
-static void to_leader(psocket_t *ps) {
-  uv_mutex_lock(&ps->proactor->lock);
-  to_leader_lh(ps);
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Detach from IO and put ps on the worker queue */
-static void leader_to_worker(psocket_t *ps) {
-  if (ps->is_conn) {
-    pconnection_t *pc = as_pconnection_t(ps);
-    /* Don't detach if there are no events yet. */
-    if (pn_connection_driver_has_event(&pc->driver)) {
-      if (pc->writing) {
-        pc->writing  = 0;
-        uv_cancel((uv_req_t*)&pc->write);
-      }
-      if (pc->reading) {
-        pc->reading = false;
-        uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
-      }
-      if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
-        uv_timer_stop(&pc->timer);
-      }
-    }
-  } else {
-    pn_listener_t *l = as_listener(ps);
-    uv_read_stop((uv_stream_t*)&l->psocket.tcp);
-  }
-  uv_mutex_lock(&ps->proactor->lock);
-  push_lh(&ps->proactor->worker_q, ps);
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Set a deferred action for leader, if not already set. */
-static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
-  uv_mutex_lock(&ps->proactor->lock);
-  if (!ps->action) {
-    ps->action = action;
-  }
-  to_leader_lh(ps);
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Owner thread send to worker thread. Set deferred action if not already set. */
-static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) {
-  uv_mutex_lock(&ps->proactor->lock);
-  if (!ps->action) {
-    ps->action = action;
-  }
-  push_lh(&ps->proactor->worker_q, ps);
-  uv_async_send(&ps->proactor->async); /* Wake leader */
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-
-/* Re-queue for further work */
-static void worker_requeue(psocket_t* ps) {
-  uv_mutex_lock(&ps->proactor->lock);
-  push_lh(&ps->proactor->worker_q, ps);
-  uv_async_send(&ps->proactor->async); /* Wake leader */
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
-  pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
-  if (!pc) return NULL;
-  if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
-    return NULL;
-  }
-  psocket_init(&pc->psocket, p,  true, host, port);
-  if (server) {
-    pn_transport_set_server(pc->driver.transport);
-  }
-  pn_record_t *r = pn_connection_attachments(pc->driver.connection);
-  pn_record_def(r, PN_PROACTOR, PN_VOID);
-  pn_record_set(r, PN_PROACTOR, pc);
-  return pc;
-}
-
-static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
-static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
-
-static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
-  return (batch->next_event == proactor_batch_next) ?
-    (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
-}
-
-static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
-  return (batch->next_event == listener_batch_next) ?
-    (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
-}
-
-static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
-  pn_connection_driver_t *d = pn_event_batch_connection_driver(batch);
-  return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
-}
-
-static void leader_count(pn_proactor_t *p, int change) {
-  uv_mutex_lock(&p->lock);
-  p->count += change;
-  p->inactive = (p->count == 0);
-  uv_mutex_unlock(&p->lock);
-}
-
-/* Free if there are no uv callbacks pending and no events */
-static void leader_pconnection_t_maybe_free(pconnection_t *pc) {
-    if (pn_connection_driver_has_event(&pc->driver)) {
-      leader_to_worker(&pc->psocket);         /* Return to worker */
-    } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) {
-      /* All UV requests are finished */
-      pn_connection_driver_destroy(&pc->driver);
-      leader_count(pc->psocket.proactor, -1);
-      free(pc);
-    }
-}
-
-/* Free if there are no uv callbacks pending and no events */
-static void leader_listener_maybe_free(pn_listener_t *l) {
-    if (pn_collector_peek(l->collector)) {
-      leader_to_worker(&l->psocket);         /* Return to worker */
-    } else if (!l->psocket.tcp.data) {
-      pn_condition_free(l->condition);
-      leader_count(l->psocket.proactor, -1);
-      free(l);
-    }
-}
-
-/* Free if there are no uv callbacks pending and no events */
-static void leader_maybe_free(psocket_t *ps) {
-  if (ps->is_conn) {
-    leader_pconnection_t_maybe_free(as_pconnection_t(ps));
-  } else {
-    leader_listener_maybe_free(as_listener(ps));
-  }
-}
-
-static void on_close(uv_handle_t *h) {
-  psocket_t *ps = (psocket_t*)h->data;
-  h->data = NULL;               /* Mark closed */
-  leader_maybe_free(ps);
-}
-
-static void on_shutdown(uv_shutdown_t *shutdown, int err) {
-  psocket_t *ps = (psocket_t*)shutdown->data;
-  shutdown->data = NULL;        /* Mark closed */
-  leader_maybe_free(ps);
-}
-
-static inline void leader_close(psocket_t *ps) {
-  if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) {
-    uv_close((uv_handle_t*)&ps->tcp, on_close);
-  }
-  pconnection_t *pc = as_pconnection_t(ps);
-  if (pc) {
-    pn_connection_driver_close(&pc->driver);
-    if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
-      uv_timer_stop(&pc->timer);
-      uv_close((uv_handle_t*)&pc->timer, on_close);
-    }
-  }
-  leader_maybe_free(ps);
-}
-
-static pconnection_t *get_pconnection_t(pn_connection_t* c) {
-  if (!c) return NULL;
-  pn_record_t *r = pn_connection_attachments(c);
-  return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
-}
-
-static void leader_error(psocket_t *ps, int err, const char* what) {
-  if (ps->is_conn) {
-    pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
-    pn_connection_driver_bind(driver); /* Bind so errors will be reported */
-    pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
-                                what, fixstr(ps->host), fixstr(ps->port),
-                                uv_strerror(err));
-    pn_connection_driver_close(driver);
-  } else {
-    pn_listener_t *l = as_listener(ps);
-    pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
-                        what, fixstr(ps->host), fixstr(ps->port),
-                        uv_strerror(err));
-    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
-  }
-  leader_to_worker(ps);               /* Worker to handle the error */
-}
-
-/* uv-initialization */
-static int leader_init(psocket_t *ps) {
-  leader_count(ps->proactor, +1);
-  int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
-  if (!err) {
-    pconnection_t *pc = as_pconnection_t(ps);
-    if (pc) {
-      pc->connect.data = ps;
-      int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
-      if (!err) {
-        pc->timer.data = pc;
-      }
-    }
-  }
-  if (err) {
-    leader_error(ps, err, "initialization");
-  }
-  return err;
-}
-
-/* Common logic for on_connect and on_accept */
-static void leader_connect_accept(pconnection_t *pc, int err, const char *what) {
-  if (!err) {
-    leader_to_worker(&pc->psocket);
-  } else {
-    leader_error(&pc->psocket, err, what);
-  }
-}
-
-static void on_connect(uv_connect_t *connect, int err) {
-  leader_connect_accept((pconnection_t*)connect->data, err, "on connect");
-}
-
-static void on_accept(uv_stream_t* server, int err) {
-  pn_listener_t *l = (pn_listener_t*) server->data;
-  if (err) {
-    leader_error(&l->psocket, err, "on accept");
-  }
-  pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
-  leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */
-}
-
-static void leader_accept(psocket_t *ps) {
-  pn_listener_t * l = as_listener(ps);
-  pconnection_t *pc = l->accepting;
-  l->accepting = NULL;
-  if (pc) {
-    int err = leader_init(&pc->psocket);
-    if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
-    leader_connect_accept(pc, err, "on accept");
-  }
-}
-
-static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
-  int err = leader_init(ps);
-  struct addrinfo hints = { 0 };
-  if (server) hints.ai_flags = AI_PASSIVE;
-  if (!err) {
-    err = uv_getaddrinfo(&ps->proactor->loop, info, NULL, fixstr(ps->host), fixstr(ps->port), &hints);
-  }
-  return err;
-}
-
-static void leader_connect(psocket_t *ps) {
-  pconnection_t *pc = as_pconnection_t(ps);
-  uv_getaddrinfo_t info;
-  int err = leader_resolve(ps, &info, false);
-  if (!err) {
-    err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect);
-    uv_freeaddrinfo(info.addrinfo);
-  }
-  if (err) {
-    leader_error(ps, err, "connect to");
-  }
-}
-
-static void leader_listen(psocket_t *ps) {
-  pn_listener_t *l = as_listener(ps);
-   uv_getaddrinfo_t info;
-  int err = leader_resolve(ps, &info, true);
-  if (!err) {
-    err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
-    uv_freeaddrinfo(info.addrinfo);
-  }
-  if (!err) err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
-  if (err) {
-    leader_error(ps, err, "listen on ");
-  }
-}
-
-static void on_tick(uv_timer_t *timer) {
-  pconnection_t *pc = (pconnection_t*)timer->data;
-  pn_transport_t *t = pc->driver.transport;
-  if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
-    uv_timer_stop(&pc->timer);
-    uint64_t now = uv_now(pc->timer.loop);
-    uint64_t next = pn_transport_tick(t, now);
-    if (next) {
-      uv_timer_start(&pc->timer, on_tick, next - now, 0);
-    }
-  }
-}
-
-static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
-  pconnection_t *pc = (pconnection_t*)stream->data;
-  if (nread >= 0) {
-    pn_connection_driver_read_done(&pc->driver, nread);
-    on_tick(&pc->timer);         /* check for tick changes. */
-    leader_to_worker(&pc->psocket);
-    /* Reading continues automatically until stopped. */
-  } else if (nread == UV_EOF) { /* hangup */
-    pn_connection_driver_read_close(&pc->driver);
-    leader_maybe_free(&pc->psocket);
-  } else {
-    leader_error(&pc->psocket, nread, "on read from");
-  }
-}
-
-static void on_write(uv_write_t* write, int err) {
-  pconnection_t *pc = (pconnection_t*)write->data;
-  write->data = NULL;
-  if (err == 0) {
-    pn_connection_driver_write_done(&pc->driver, pc->writing);
-    leader_to_worker(&pc->psocket);
-  } else if (err == UV_ECANCELED) {
-    leader_maybe_free(&pc->psocket);
-  } else {
-    leader_error(&pc->psocket, err, "on write to");
-  }
-  pc->writing = 0;              /* Need to send a new write request */
-}
-
-static void on_timeout(uv_timer_t *timer) {
-  pn_proactor_t *p = (pn_proactor_t*)timer->data;
-  uv_mutex_lock(&p->lock);
-  p->timeout_elapsed = true;
-  uv_mutex_unlock(&p->lock);
-}
-
-// Read buffer allocation function for uv, just returns the transports read buffer.
-static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
-  pconnection_t *pc = (pconnection_t*)stream->data;
-  pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
-  *buf = uv_buf_init(rbuf.start, rbuf.size);
-}
-
-static void leader_rewatch(psocket_t *ps) {
-  int err = 0;
-  if (ps->is_conn) {
-    pconnection_t *pc = as_pconnection_t(ps);
-    if (pc->timer.data) {         /* uv-initialized */
-      on_tick(&pc->timer);        /* Re-enable ticks if required */
-    }
-    pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
-    pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-
-    /* Ticks and checking buffers can generate events, process before proceeding */
-    if (pn_connection_driver_has_event(&pc->driver)) {
-      leader_to_worker(ps);
-    } else {                      /* Re-watch for IO */
-      if (wbuf.size > 0 && !pc->writing) {
-        pc->writing = wbuf.size;
-        uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
-        pc->write.data = ps;
-        uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
-      } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
-        pc->shutdown.data = ps;
-        uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
-      }
-      if (rbuf.size > 0 && !pc->reading) {
-        pc->reading = true;
-        err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
-      }
-    }
-  } else {
-    pn_listener_t *l = as_listener(ps);
-    err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
-  }
-  if (err) {
-    leader_error(ps, err, "rewatch");
-  }
-}
-
-/* Set the event in the proactor's batch  */
-static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
-  pn_collector_put(p->collector, pn_proactor__class(), p, t);
-  p->batch_working = true;
-  return &p->batch;
-}
-
-/* Return the next event batch or 0 if no events are ready */
-static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
-  if (!p->batch_working) {       /* Can generate proactor events */
-    if (p->inactive) {
-      p->inactive = false;
-      return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
-    }
-    if (p->interrupt > 0) {
-      --p->interrupt;
-      return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
-    }
-    if (p->timeout_elapsed) {
-      p->timeout_elapsed = false;
-      return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT);
-    }
-  }
-  for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
-    if (ps->is_conn) {
-      pconnection_t *pc = as_pconnection_t(ps);
-      return &pc->driver.batch;
-    } else {                    /* Listener */
-      pn_listener_t *l = as_listener(ps);
-      return &l->batch;
-    }
-    to_leader(ps);      /* No event, back to leader */
-  }
-  return 0;
-}
-
-/* Called in any thread to set a wakeup action. Replaces any previous wakeup action. */
-static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
-  uv_mutex_lock(&ps->proactor->lock);
-  ps->wakeup = action;
-  to_leader_lh(ps);
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-pn_listener_t *pn_event_listener(pn_event_t *e) {
-  return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
-}
-
-pn_proactor_t *pn_event_proactor(pn_event_t *e) {
-  if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
-  pn_listener_t *l = pn_event_listener(e);
-  if (l) return l->psocket.proactor;
-  pn_connection_t *c = pn_event_connection(e);
-  if (c) return pn_connection_proactor(pn_event_connection(e));
-  return NULL;
-}
-
-void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
-  pconnection_t *pc = batch_pconnection(batch);
-  if (pc) {
-    if (pn_connection_driver_has_event(&pc->driver)) {
-      /* Process all events before going back to IO. */
-      worker_requeue(&pc->psocket);
-    } else if (pn_connection_driver_finished(&pc->driver)) {
-      owner_to_leader(&pc->psocket, leader_close);
-    } else {
-      owner_to_leader(&pc->psocket, leader_rewatch);
-    }
-    return;
-  }
-  pn_listener_t *l = batch_listener(batch);
-  if (l) {
-    owner_to_leader(&l->psocket, leader_rewatch);
-    return;
-  }
-  pn_proactor_t *bp = batch_proactor(batch);
-  if (bp == p) {
-    uv_mutex_lock(&p->lock);
-    p->batch_working = false;
-    uv_async_send(&p->async); /* Wake leader */
-    uv_mutex_unlock(&p->lock);
-    return;
-  }
-}
-
-/* Run follower/leader loop till we can return an event and be a worker */
-pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
-  uv_mutex_lock(&p->lock);
-  /* Try to grab work immediately. */
-  pn_event_batch_t *batch = get_batch_lh(p);
-  if (batch == NULL) {
-    /* No work available, follow the leader */
-    while (p->has_leader) {
-      uv_cond_wait(&p->cond, &p->lock);
-    }
-    /* Lead till there is work to do. */
-    p->has_leader = true;
-    while (batch == NULL) {
-      if (p->timeout_request) {
-        p->timeout_request = false;
-        if (p->timeout) {
-          uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
-        } else {
-          uv_timer_stop(&p->timer);
-        }
-      }
-      for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
-        void (*action)(psocket_t*) = ps->action;
-        void (*wakeup)(psocket_t*) = ps->wakeup;
-        ps->action = NULL;
-        ps->wakeup = NULL;
-        if (action || wakeup) {
-          uv_mutex_unlock(&p->lock);
-          if (action) action(ps);
-          if (wakeup) wakeup(ps);
-          uv_mutex_lock(&p->lock);
-        }
-      }
-      batch = get_batch_lh(p);
-      if (batch == NULL) {
-        uv_mutex_unlock(&p->lock);
-        uv_run(&p->loop, UV_RUN_ONCE);
-        uv_mutex_lock(&p->lock);
-      }
-    }
-    /* Signal the next leader and return to work */
-    p->has_leader = false;
-    uv_cond_signal(&p->cond);
-  }
-  uv_mutex_unlock(&p->lock);
-  return batch;
-}
-
-void pn_proactor_interrupt(pn_proactor_t *p) {
-  uv_mutex_lock(&p->lock);
-  ++p->interrupt;
-  uv_async_send(&p->async);   /* Interrupt the UV loop */
-  uv_mutex_unlock(&p->lock);
-}
-
-void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
-  uv_mutex_lock(&p->lock);
-  p->timeout = t;
-  p->timeout_request = true;
-  uv_async_send(&p->async);   /* Interrupt the UV loop */
-  uv_mutex_unlock(&p->lock);
-}
-
-int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
-  pconnection_t *pc = new_pconnection_t(p, c, false, host, port);
-  if (!pc) {
-    return PN_OUT_OF_MEMORY;
-  }
-  /* Process PN_CONNECTION_INIT before binding */
-  owner_to_worker(&pc->psocket, leader_connect);
-  return 0;
-}
-
-int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog)
-{
-  psocket_init(&l->psocket, p, false, host, port);
-  l->backlog = backlog;
-  owner_to_leader(&l->psocket, leader_listen);
-  return 0;
-}
-
-pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
-  pconnection_t *pc = get_pconnection_t(c);
-  return pc ? pc->psocket.proactor : NULL;
-}
-
-void leader_wake_connection(psocket_t *ps) {
-  pconnection_t *pc = as_pconnection_t(ps);
-  pn_connection_t *c = pc->driver.connection;
-  pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
-  leader_to_worker(ps);
-}
-
-void pn_connection_wake(pn_connection_t* c) {
-  wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
-}
-
-pn_proactor_t *pn_proactor() {
-  pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
-  p->collector = pn_collector();
-  p->batch.next_event = &proactor_batch_next;
-  if (!p->collector) return NULL;
-  uv_loop_init(&p->loop);
-  uv_mutex_init(&p->lock);
-  uv_cond_init(&p->cond);
-  uv_async_init(&p->loop, &p->async, NULL);
-  uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */
-  p->timer.data = p;
-  return p;
-}
-
-static void on_stopping(uv_handle_t* h, void* v) {
-  uv_close(h, NULL);           /* Close this handle */
-  if (!uv_loop_alive(h->loop)) /* Everything closed */
-    uv_stop(h->loop);        /* Stop the loop, pn_proactor_destroy() can return */
-}
-
-void pn_proactor_free(pn_proactor_t *p) {
-  uv_walk(&p->loop, on_stopping, NULL); /* Close all handles */
-  uv_run(&p->loop, UV_RUN_DEFAULT);     /* Run till stop, all handles closed */
-  uv_loop_close(&p->loop);
-  uv_mutex_destroy(&p->lock);
-  uv_cond_destroy(&p->cond);
-  pn_collector_free(p->collector);
-  free(p);
-}
-
-static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
-  pn_listener_t *l = batch_listener(batch);
-  pn_event_t *handled = pn_collector_prev(l->collector);
-  if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) {
-    owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */
-  }
-  return pn_collector_next(l->collector);
-}
-
-static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
-  return pn_collector_next(batch_proactor(batch)->collector);
-}
-
-static void pn_listener_free(pn_listener_t *l) {
-  if (l) {
-    if (!l->collector) pn_collector_free(l->collector);
-    if (!l->condition) pn_condition_free(l->condition);
-    if (!l->attachments) pn_free(l->attachments);
-    free(l);
-  }
-}
-
-pn_listener_t *pn_listener() {
-  pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
-  if (l) {
-    l->batch.next_event = listener_batch_next;
-    l->collector = pn_collector();
-    l->condition = pn_condition();
-    l->attachments = pn_record();
-    if (!l->condition || !l->collector || !l->attachments) {
-      pn_listener_free(l);
-      return NULL;
-    }
-  }
-  return l;
-}
-
-void pn_listener_close(pn_listener_t* l) {
-  wakeup(&l->psocket, leader_close);
-}
-
-pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
-  return l ? l->psocket.proactor : NULL;
-}
-
-pn_condition_t* pn_listener_condition(pn_listener_t* l) {
-  return l->condition;
-}
-
-void *pn_listener_get_context(pn_listener_t *l) {
-  return l->context;
-}
-
-void pn_listener_set_context(pn_listener_t *l, void *context) {
-  l->context = context;
-}
-
-pn_record_t *pn_listener_attachments(pn_listener_t *l) {
-  return l->attachments;
-}
-
-int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
-  if (l->accepting) {
-    return PN_STATE_ERR;        /* Only one at a time */
-  }
-  l->accepting = new_pconnection_t(
-      l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
-  if (!l->accepting) {
-    return UV_ENOMEM;
-  }
-  owner_to_leader(&l->psocket, leader_accept);
-  return 0;
-}
-

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
deleted file mode 100644
index b8edcd6..0000000
--- a/examples/c/proactor/receive.c
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/connection.h>
-#include <proton/connection_driver.h>
-#include <proton/delivery.h>
-#include <proton/proactor.h>
-#include <proton/link.h>
-#include <proton/message.h>
-#include <proton/session.h>
-#include <proton/transport.h>
-#include <proton/url.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-typedef char str[1024];
-
-typedef struct app_data_t {
-  str address;
-  str container_id;
-  pn_rwbytes_t message_buffer;
-  int message_count;
-  int received;
-  pn_proactor_t *proactor;
-  bool finished;
-} app_data_t;
-
-static const int BATCH = 100; /* Batch size for unlimited receive */
-
-static int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
-  if (pn_condition_is_set(cond)) {
-    exit_code = 1;
-    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
-            pn_condition_get_name(cond), pn_condition_get_description(cond));
-  }
-}
-
-#define MAX_SIZE 1024
-
-static void decode_message(pn_delivery_t *dlv) {
-  static char buffer[MAX_SIZE];
-  ssize_t len;
-  // try to decode the message body
-  if (pn_delivery_pending(dlv) < MAX_SIZE) {
-    // read in the raw data
-    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
-    if (len > 0) {
-      // decode it into a proton message
-      pn_message_t *m = pn_message();
-      if (PN_OK == pn_message_decode(m, buffer, len)) {
-        pn_string_t *s = pn_string(NULL);
-        pn_inspect(pn_message_body(m), s);
-        printf("%s\n", pn_string_get(s));
-        pn_free(s);
-      }
-      pn_message_free(m);
-    }
-  }
-}
-
-static void handle(app_data_t* app, pn_event_t* event) {
-  switch (pn_event_type(event)) {
-
-   case PN_CONNECTION_INIT: {
-     pn_connection_t* c = pn_event_connection(event);
-     pn_connection_set_container(c, app->container_id);
-     pn_connection_open(c);
-     pn_session_t* s = pn_session(c);
-     pn_session_open(s);
-     pn_link_t* l = pn_receiver(s, "my_receiver");
-     pn_terminus_set_address(pn_link_source(l), app->address);
-     pn_link_open(l);
-     /* cannot receive without granting credit: */
-     pn_link_flow(l, app->message_count ? app->message_count : BATCH);
-   } break;
-
-   case PN_DELIVERY: {
-     /* A message has been received */
-     pn_link_t *link = NULL;
-     pn_delivery_t *dlv = pn_event_delivery(event);
-     if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
-       link = pn_delivery_link(dlv);
-       decode_message(dlv);
-       /* Accept the delivery */
-       pn_delivery_update(dlv, PN_ACCEPTED);
-       /* done with the delivery, move to the next and free it */
-       pn_link_advance(link);
-       pn_delivery_settle(dlv);  /* dlv is now freed */
-
-       if (app->message_count == 0) {
-         /* receive forever - see if more credit is needed */
-         if (pn_link_credit(link) < BATCH/2) {
-           /* Grant enough credit to bring it up to BATCH: */
-           pn_link_flow(link, BATCH - pn_link_credit(link));
-         }
-       } else if (++app->received >= app->message_count) {
-         /* done receiving, close the endpoints */
-         printf("%d messages received\n", app->received);
-         pn_session_t *ssn = pn_link_session(link);
-         pn_link_close(link);
-         pn_session_close(ssn);
-         pn_connection_close(pn_session_connection(ssn));
-       }
-     }
-   } break;
-
-   case PN_TRANSPORT_ERROR:
-    check_condition(event, pn_transport_condition(pn_event_transport(event)));
-    break;
-
-   case PN_CONNECTION_REMOTE_CLOSE:
-    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_SESSION_REMOTE_CLOSE:
-    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_LINK_REMOTE_CLOSE:
-   case PN_LINK_REMOTE_DETACH:
-    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_PROACTOR_INACTIVE:
-    app->finished = true;
-    break;
-
-   default: break;
-  }
-}
-
-static void usage(const char *arg0) {
-  fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
-  exit(1);
-}
-
-int main(int argc, char **argv) {
-  /* Default values for application and connection. */
-  app_data_t app = {{0}};
-  app.message_count = 100;
-  const char* urlstr = NULL;
-
-  int opt;
-  while((opt = getopt(argc, argv, "a:m:")) != -1) {
-    switch(opt) {
-     case 'a': urlstr = optarg; break;
-     case 'm': app.message_count = atoi(optarg); break;
-     default: usage(argv[0]); break;
-    }
-  }
-  if (optind < argc)
-    usage(argv[0]);
-
-  snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
-
-  /* Parse the URL or use default values */
-  pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
-  const char *host = url ? pn_url_get_host(url) : NULL;
-  const char *port = url ? pn_url_get_port(url) : "amqp";
-  strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
-
-  /* Create the proactor and connect */
-  app.proactor = pn_proactor();
-  pn_proactor_connect(app.proactor, pn_connection(), host, port);
-  if (url) pn_url_free(url);
-
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(app.proactor);
-    pn_event_t *e;
-    while ((e = pn_event_batch_next(events))) {
-      handle(&app, e);
-    }
-    pn_proactor_done(app.proactor, events);
-  } while(!app.finished);
-
-  pn_proactor_free(app.proactor);
-  free(app.message_buffer.start);
-  return exit_code;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
deleted file mode 100644
index d611b3d..0000000
--- a/examples/c/proactor/send.c
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/connection.h>
-#include <proton/connection_driver.h>
-#include <proton/delivery.h>
-#include <proton/proactor.h>
-#include <proton/link.h>
-#include <proton/message.h>
-#include <proton/session.h>
-#include <proton/transport.h>
-#include <proton/url.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-typedef char str[1024];
-
-typedef struct app_data_t {
-  str address;
-  str container_id;
-  pn_rwbytes_t message_buffer;
-  int message_count;
-  int sent;
-  int acknowledged;
-  pn_proactor_t *proactor;
-  pn_millis_t delay;
-  bool delaying;
-  pn_link_t *sender;
-  bool finished;
-} app_data_t;
-
-int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
-  if (pn_condition_is_set(cond)) {
-    exit_code = 1;
-    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
-            pn_condition_get_name(cond), pn_condition_get_description(cond));
-  }
-}
-
-/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
-static pn_bytes_t encode_message(app_data_t* app) {
-  /* Construct a message with the map { "sequence": app.sent } */
-  pn_message_t* message = pn_message();
-  pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
-  pn_data_t* body = pn_message_body(message);
-  pn_data_put_map(body);
-  pn_data_enter(body);
-  pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
-  pn_data_put_int(body, app->sent); /* The sequence number */
-  pn_data_exit(body);
-
-  /* encode the message, expanding the encode buffer as needed */
-  if (app->message_buffer.start == NULL) {
-    static const size_t initial_size = 128;
-    app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
-  }
-  /* app->message_buffer is the total buffer space available. */
-  /* mbuf wil point at just the portion used by the encoded message */
-  pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
-  int status = 0;
-  while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
-    app->message_buffer.size *= 2;
-    app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
-    mbuf.size = app->message_buffer.size;
-  }
-  if (status != 0) {
-    fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
-    exit(1);
-  }
-  pn_message_free(message);
-  return pn_bytes(mbuf.size, mbuf.start);
-}
-
-static void send(app_data_t* app) {
-  while (pn_link_credit(app->sender) > 0 && app->sent < app->message_count) {
-    ++app->sent;
-    // Use sent counter bytes as unique delivery tag.
-    pn_delivery(app->sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
-    pn_bytes_t msgbuf = encode_message(app);
-    pn_link_send(app->sender, msgbuf.start, msgbuf.size);
-    pn_link_advance(app->sender);
-    if (app->delay && app->sent < app->message_count) {
-      /* If delay is set, wait for TIMEOUT event to send more */
-      app->delaying = true;
-      pn_proactor_set_timeout(app->proactor, app->delay);
-      break;
-    }
-  }
-}
-
-static void handle(app_data_t* app, pn_event_t* event) {
-  switch (pn_event_type(event)) {
-
-   case PN_CONNECTION_INIT: {
-     pn_connection_t* c = pn_event_connection(event);
-     pn_connection_set_container(c, app->container_id);
-     pn_connection_open(c);
-     pn_session_t* s = pn_session(c);
-     pn_session_open(s);
-     pn_link_t* l = pn_sender(s, "my_sender");
-     pn_terminus_set_address(pn_link_target(l), app->address);
-     pn_link_open(l);
-   } break;
-
-   case PN_LINK_FLOW:
-    /* The peer has given us some credit, now we can send messages */
-    if (!app->delaying) {
-      app->sender = pn_event_link(event);
-      send(app);
-    }
-    break;
-
-   case PN_PROACTOR_TIMEOUT:
-    /* Wake the sender's connection */
-    pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
-    break;
-
-   case PN_CONNECTION_WAKE:
-    /* Timeout, we can send more. */
-    app->delaying = false;
-    send(app);
-    break;
-
-   case PN_DELIVERY: {
-     /* We received acknowledgedment from the peer that a message was delivered. */
-     pn_delivery_t* d = pn_event_delivery(event);
-     if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
-       if (++app->acknowledged == app->message_count) {
-         printf("%d messages sent and acknowledged\n", app->acknowledged);
-         pn_connection_close(pn_event_connection(event));
-       }
-     }
-   } break;
-
-   case PN_TRANSPORT_CLOSED:
-    check_condition(event, pn_transport_condition(pn_event_transport(event)));
-    break;
-
-   case PN_CONNECTION_REMOTE_CLOSE:
-    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_SESSION_REMOTE_CLOSE:
-    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_LINK_REMOTE_CLOSE:
-   case PN_LINK_REMOTE_DETACH:
-    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_PROACTOR_INACTIVE:
-    app->finished = true;
-    break;
-
-   default: break;
-  }
-}
-
-static void usage(const char *arg0) {
-  fprintf(stderr, "Usage: %s [-a url] [-m message-count] [-d delay-ms]\n", arg0);
-  exit(1);
-}
-
-int main(int argc, char **argv) {
-  /* Default values for application and connection. */
-  app_data_t app = {{0}};
-  app.message_count = 100;
-  const char* urlstr = NULL;
-
-  int opt;
-  while((opt = getopt(argc, argv, "a:m:d:")) != -1) {
-    switch(opt) {
-     case 'a': urlstr = optarg; break;
-     case 'm': app.message_count = atoi(optarg); break;
-     case 'd': app.delay = atoi(optarg); break;
-     default: usage(argv[0]); break;
-    }
-  }
-  if (optind < argc)
-    usage(argv[0]);
-
-  snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
-
-  /* Parse the URL or use default values */
-  pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
-  const char *host = url ? pn_url_get_host(url) : NULL;
-  const char *port = url ? pn_url_get_port(url) : "amqp";
-  strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
-
-  /* Create the proactor and connect */
-  app.proactor = pn_proactor();
-  pn_proactor_connect(app.proactor, pn_connection(), host, port);
-  if (url) pn_url_free(url);
-
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(app.proactor);
-    pn_event_t *e;
-    while ((e = pn_event_batch_next(events))) {
-      handle(&app, e);
-    }
-    pn_proactor_done(app.proactor, events);
-  } while(!app.finished);
-
-  pn_proactor_free(app.proactor);
-  free(app.message_buffer.start);
-  return exit_code;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
deleted file mode 100644
index a86425d..0000000
--- a/examples/c/proactor/test.py
+++ /dev/null
@@ -1,60 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License
-#
-
-# This is a test script to run the examples and verify that they behave as expected.
-
-from exampletest import *
-
-import unittest
-import sys
-
-def python_cmd(name):
-    dir = os.path.dirname(__file__)
-    return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
-
-def receive_expect(n):
-    return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n
-
-class CExampleTest(BrokerTestCase):
-    broker_exe = ["libuv_broker"]
-
-    def test_send_receive(self):
-        """Send first then receive"""
-        s = self.proc(["libuv_send", "-a", self.addr])
-        self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
-        r = self.proc(["libuv_receive", "-a", self.addr])
-        self.assertEqual(receive_expect(100), r.wait_out())
-
-    def test_receive_send(self):
-        """Start receiving  first, then send."""
-        r = self.proc(["libuv_receive", "-a", self.addr]);
-        s = self.proc(["libuv_send", "-a", self.addr]);
-        self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
-        self.assertEqual(receive_expect(100), r.wait_out())
-
-    def test_timed_send(self):
-        """Send with timed delay"""
-        s = self.proc(["libuv_send", "-a", self.addr, "-d100", "-m3"])
-        self.assertEqual("3 messages sent and acknowledged\n", s.wait_out())
-        r = self.proc(["libuv_receive", "-a", self.addr, "-m3"])
-        self.assertEqual(receive_expect(3), r.wait_out())
-
-
-if __name__ == "__main__":
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/reactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/reactor/CMakeLists.txt b/examples/c/reactor/CMakeLists.txt
deleted file mode 100644
index bd6163f..0000000
--- a/examples/c/reactor/CMakeLists.txt
+++ /dev/null
@@ -1,45 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(Proton REQUIRED)
-
-set (reactor-examples
-  sender.c
-  receiver.c
-  )
-
-set_source_files_properties (
-  ${reactor-examples}
-  PROPERTIES
-  COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_LANGUAGE_FLAGS} ${LINK_TIME_OPTIMIZATION}"
-  )
-
-if (BUILD_WITH_CXX)
-  set_source_files_properties (
-    ${reactor-examples}
-    PROPERTIES LANGUAGE CXX
-    )
-endif (BUILD_WITH_CXX)
-
-include_directories(${Proton_INCLUDE_DIRS})
-add_executable(sender sender.c)
-add_executable(receiver receiver.c)
-target_link_libraries(sender ${Proton_LIBRARIES})
-target_link_libraries(receiver ${Proton_LIBRARIES})
-

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/reactor/README
----------------------------------------------------------------------
diff --git a/examples/c/reactor/README b/examples/c/reactor/README
deleted file mode 100644
index 8d61893..0000000
--- a/examples/c/reactor/README
+++ /dev/null
@@ -1,30 +0,0 @@
-These example clients require a broker or similar intermediary that
-supports the AMQP 1.0 protocol, allows anonymous connections and
-accepts links to and from a node named 'examples'.
-
-------------------------------------------------------------------
-
-sender.c
-
-A simple message sending client.  This example sends all messages but
-the last as pre-settled (no ack required).  It then pends waiting for
-an ack for the last message sent before exiting.
-
-Use the '-h' command line option for a list of supported parameters.
-
-------------------------------------------------------------------
-
-receiver.c
-
-A simple message consuming client.  This example receives messages
-from a target (default 'examples').  Received messages are
-acknowledged if they are sent un-settled.  The client will try to
-decode the message payload assuming it has been generated by the
-sender example.
-
-Use the '-h' command line option for a list of supported parameters.
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/reactor/receiver.c
----------------------------------------------------------------------
diff --git a/examples/c/reactor/receiver.c b/examples/c/reactor/receiver.c
deleted file mode 100644
index 35c5a70..0000000
--- a/examples/c/reactor/receiver.c
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-
-#include "pncompat/misc_funcs.inc"
-
-#include "proton/reactor.h"
-#include "proton/message.h"
-#include "proton/connection.h"
-#include "proton/session.h"
-#include "proton/link.h"
-#include "proton/delivery.h"
-#include "proton/event.h"
-#include "proton/handlers.h"
-#include "proton/transport.h"
-#include "proton/url.h"
-
-static int quiet = 0;
-
-// Credit batch if unlimited receive (-c 0)
-static const int CAPACITY = 100;
-#define MAX_SIZE 512
-
-// Example application data.  This data will be instantiated in the event
-// handler, and is available during event processing.  In this example it
-// holds configuration and state information.
-//
-typedef struct {
-    int count;          // # of messages to receive before exiting
-    const char *source;     // name of the source node to receive from
-    pn_message_t *message;  // holds the received message
-} app_data_t;
-
-// helper to pull pointer to app_data_t instance out of the pn_handler_t
-//
-#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler))
-
-// Called when reactor exits to clean up app_data
-//
-static void delete_handler(pn_handler_t *handler)
-{
-    app_data_t *d = GET_APP_DATA(handler);
-    if (d->message) {
-        pn_decref(d->message);
-        d->message = NULL;
-    }
-}
-
-
-/* Process each event posted by the reactor.
- */
-static void event_handler(pn_handler_t *handler,
-                          pn_event_t *event,
-                          pn_event_type_t type)
-{
-    app_data_t *data = GET_APP_DATA(handler);
-
-    switch (type) {
-
-    case PN_CONNECTION_INIT: {
-        // Create and open all the endpoints needed to send a message
-        //
-        pn_connection_t *conn;
-        pn_session_t *ssn;
-        pn_link_t *receiver;
-
-        conn = pn_event_connection(event);
-        pn_connection_open(conn);
-        ssn = pn_session(conn);
-        pn_session_open(ssn);
-        receiver = pn_receiver(ssn, "MyReceiver");
-        pn_terminus_set_address(pn_link_source(receiver), data->source);
-        pn_link_open(receiver);
-        // cannot receive without granting credit:
-        pn_link_flow(receiver, data->count ? data->count : CAPACITY);
-    } break;
-
-    case PN_DELIVERY: {
-        // A message has been received
-        //
-        pn_link_t *link = NULL;
-        pn_delivery_t *dlv = pn_event_delivery(event);
-        if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
-            // A full message has arrived
-            if (!quiet) {
-                static char buffer[MAX_SIZE];
-                ssize_t len;
-                pn_bytes_t bytes;
-                bool found = false;
-
-                // try to decode the message body
-                if (pn_delivery_pending(dlv) < MAX_SIZE) {
-                    // read in the raw data
-                    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
-                    if (len > 0) {
-                        // decode it into a proton message
-                        pn_message_clear(data->message);
-                        if (PN_OK == pn_message_decode(data->message, buffer,
-                                                       len)) {
-                            // Assuming the message came from the sender
-                            // example, try to parse out a single string from
-                            // the payload
-                            //
-                            pn_data_scan(pn_message_body(data->message), "?S",
-                                         &found, &bytes);
-                        }
-                    }
-                }
-                if (found) {
-                    fprintf(stdout, "Message: [%.*s]\n", (int)bytes.size,
-                            bytes.start);
-                } else {
-                    fprintf(stdout, "Message received!\n");
-                }
-            }
-
-            link = pn_delivery_link(dlv);
-
-            if (!pn_delivery_settled(dlv)) {
-                // remote has not settled, so it is tracking the delivery.  Ack
-                // it.
-                pn_delivery_update(dlv, PN_ACCEPTED);
-            }
-
-            // done with the delivery, move to the next and free it
-            pn_link_advance(link);
-            pn_delivery_settle(dlv);  // dlv is now freed
-
-            if (data->count == 0) {
-                // send forever - see if more credit is needed
-                if (pn_link_credit(link) < CAPACITY/2) {
-                    // Grant enough credit to bring it up to CAPACITY:
-                    pn_link_flow(link, CAPACITY - pn_link_credit(link));
-                }
-            } else if (--data->count == 0) {
-                // done receiving, close the endpoints
-                pn_session_t *ssn = pn_link_session(link);
-				pn_link_close(link);
-                pn_session_close(ssn);
-                pn_connection_close(pn_session_connection(ssn));
-            }
-        }
-    } break;
-
-    case PN_TRANSPORT_ERROR: {
-        // The connection to the peer failed.
-        //
-        pn_transport_t *tport = pn_event_transport(event);
-        pn_condition_t *cond = pn_transport_condition(tport);
-        fprintf(stderr, "Network transport failed!\n");
-        if (pn_condition_is_set(cond)) {
-            const char *name = pn_condition_get_name(cond);
-            const char *desc = pn_condition_get_description(cond);
-            fprintf(stderr, "    Error: %s  Description: %s\n",
-                    (name) ? name : "<error name not provided>",
-                    (desc) ? desc : "<no description provided>");
-        }
-        // pn_reactor_process() will exit with a false return value, stopping
-        // the main loop.
-    } break;
-
-    default:
-        break;
-    }
-}
-
-static void usage(void)
-{
-  printf("Usage: receiver <options>\n");
-  printf("-a      \tThe host address [localhost:5672]\n");
-  printf("-c      \t# of messages to receive, 0=receive forever [1]\n");
-  printf("-s      \tSource address [examples]\n");
-  printf("-i      \tContainer name [ReceiveExample]\n");
-  printf("-q      \tQuiet - turn off stdout\n");
-  exit(1);
-}
-
-int main(int argc, char** argv)
-{
-    const char *address = "localhost";
-    const char *container = "ReceiveExample";
-    int c;
-    pn_reactor_t *reactor = NULL;
-    pn_url_t *url = NULL;
-    pn_connection_t *conn = NULL;
-
-    /* create a handler for the connection's events.
-     * event_handler will be called for each event.  The handler will allocate
-     * a app_data_t instance which can be accessed when the event_handler is
-     * called.
-     */
-    pn_handler_t *handler = pn_handler_new(event_handler,
-                                           sizeof(app_data_t),
-                                           delete_handler);
-
-    /* set up the application data with defaults */
-    app_data_t *app_data = GET_APP_DATA(handler);
-    memset(app_data, 0, sizeof(app_data_t));
-    app_data->count = 1;
-    app_data->source = "examples";
-    app_data->message = pn_message();
-
-    /* Attach the pn_handshaker() handler.  This handler deals with endpoint
-     * events from the peer so we don't have to.
-     */
-    {
-        pn_handler_t *handshaker = pn_handshaker();
-        pn_handler_add(handler, handshaker);
-        pn_decref(handshaker);
-    }
-
-    /* command line options */
-    opterr = 0;
-    while((c = getopt(argc, argv, "i:a:c:s:qh")) != -1) {
-        switch(c) {
-        case 'h': usage(); break;
-        case 'a': address = optarg; break;
-        case 'c':
-            app_data->count = atoi(optarg);
-            if (app_data->count < 0) usage();
-            break;
-        case 's': app_data->source = optarg; break;
-        case 'i': container = optarg; break;
-        case 'q': quiet = 1; break;
-        default:
-            usage();
-            break;
-        }
-    }
-
-    reactor = pn_reactor();
-
-    url = pn_url_parse(address);
-    if (url == NULL) {
-        fprintf(stderr, "Invalid host address %s\n", address);
-        exit(1);
-    }
-    conn = pn_reactor_connection_to_host(reactor,
-                                         pn_url_get_host(url),
-                                         pn_url_get_port(url),
-                                         handler);
-    pn_decref(url);
-    pn_decref(handler);
-
-    // the container name should be unique for each client
-    pn_connection_set_container(conn, container);
-
-    // wait up to 5 seconds for activity before returning from
-    // pn_reactor_process()
-    pn_reactor_set_timeout(reactor, 5000);
-
-    pn_reactor_start(reactor);
-
-    while (pn_reactor_process(reactor)) {
-        /* Returns 'true' until the connection is shut down.
-         * pn_reactor_process() will return true at least once every 5 seconds
-         * (due to the timeout).  If no timeout was configured,
-         * pn_reactor_process() returns as soon as it finishes processing all
-         * pending I/O and events. Once the connection has closed,
-         * pn_reactor_process() will return false.
-         */
-    }
-    pn_decref(reactor);
-
-    return 0;
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org