You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/17 18:18:53 UTC

[06/11] qpid-proton git commit: PROTON-1344: C proactor for multi-threaded proton applications

PROTON-1344: C proactor for multi-threaded proton applications

proactor.h is an asynchronous, multi-threaded replacement for reactor.h

It uses the same Proton engine APIs and events, but allows multiple application
threads wait for events to handle, rather than calling back on handler functions
from a single thread.

The proactor ensures that events for the same AMQP connection are handled in
sequence (although possibly by different threads at different times) so event
handling code does not need to lock the use of thread-unsafe proton APIs. It
provides a "wake" feature to signal connections for processing triggered by the
application rather than proton IO.

Examples show C sender, receiver and broker, and a libuv driver implementation.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ca454180
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ca454180
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ca454180

Branch: refs/heads/master
Commit: ca454180b2846b2538a60ad03a047aa6a738d3ca
Parents: cdc1baa
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Nov 2 15:38:49 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Nov 16 19:52:38 2016 -0500

----------------------------------------------------------------------
 config.sh.in                                    |   2 +-
 examples/CMakeLists.txt                         |  13 +
 examples/c/CMakeLists.txt                       |   5 +
 examples/c/proactor/CMakeLists.txt              |  43 ++
 examples/c/proactor/README.dox                  |  20 +
 examples/c/proactor/broker.c                    | 476 ++++++++++++
 examples/c/proactor/libuv_proactor.c            | 747 +++++++++++++++++++
 examples/c/proactor/receive.c                   | 202 +++++
 examples/c/proactor/send.c                      | 204 +++++
 examples/c/proactor/test.py                     |  52 ++
 examples/engine/c/precv.c                       | 502 -------------
 examples/engine/c/psend.c                       | 373 ---------
 examples/exampletest.py                         | 183 +++++
 .../cpp/include/proton/io/connection_engine.hpp |   7 +-
 .../cpp/include/proton/sender_options.hpp       |   4 +-
 .../bindings/cpp/src/io/connection_engine.cpp   |  78 +-
 proton-c/bindings/cpp/src/sender_options.cpp    |   4 +-
 proton-c/docs/api/index.md                      |  38 +-
 proton-c/include/proton/cid.h                   |   5 +-
 proton-c/include/proton/condition.h             |   4 +
 proton-c/include/proton/connection.h            |  18 +
 proton-c/include/proton/connection_engine.h     | 417 +++++++----
 proton-c/include/proton/event.h                 |  48 +-
 proton-c/include/proton/extra.h                 |  69 ++
 proton-c/include/proton/listener.h              |  76 ++
 proton-c/include/proton/object.h                |  27 +
 proton-c/include/proton/proactor.h              | 174 +++++
 proton-c/include/proton/types.h                 |   2 +
 proton-c/src/core/connection_driver.c           | 163 ++++
 proton-c/src/core/connection_engine.c           | 177 +++--
 proton-c/src/core/engine.c                      |  45 +-
 proton-c/src/core/event.c                       |  13 +-
 proton-c/src/core/object/object.c               |  10 +-
 tools/cmake/Modules/FindLibuv.cmake             |  37 +
 34 files changed, 3092 insertions(+), 1146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/config.sh.in
----------------------------------------------------------------------
diff --git a/config.sh.in b/config.sh.in
index 4902b61..edb77e6 100755
--- a/config.sh.in
+++ b/config.sh.in
@@ -40,7 +40,7 @@ RUBY_BINDINGS=$PROTON_BINDINGS/ruby
 PERL_BINDINGS=$PROTON_BINDINGS/perl
 
 # Python & Jython
-COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python
+COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python:$PROTON_HOME/examples
 export PYTHONPATH=$COMMON_PYPATH:$PYTHON_BINDINGS
 export JYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-j/src/main/resources:$PROTON_JARS
 export CLASSPATH=$PROTON_JARS

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index 99e8315..4d744d2 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -20,6 +20,19 @@
 set (Proton_DIR ${CMAKE_CURRENT_SOURCE_DIR})
 set (ProtonCpp_DIR ${CMAKE_CURRENT_SOURCE_DIR})
 
+# Set result to a native search path
+macro(set_search_path result)  # args after result are directories or search paths.
+  set(${result} ${ARGN})
+  if (UNIX)
+    string(REPLACE ";" ":" ${result} "${${result}}") # native search path separators.
+  endif()
+  file(TO_NATIVE_PATH "${${result}}" ${result}) # native slash separators
+endmacro()
+
+# Some non-python examples use exampletest.py to drive their self-tests.
+set_search_path(EXAMPLE_PYTHONPATH "${CMAKE_CURRENT_SOURCE_DIR}" "$ENV{PYTHON_PATH}")
+set(EXAMPLE_ENV "PYTHONPATH=${EXAMPLE_PYTHONPATH}")
+
 add_subdirectory(c)
 add_subdirectory(go)
 if (BUILD_CPP)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/examples/c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt
index 1612a86..0d0c7e9 100644
--- a/examples/c/CMakeLists.txt
+++ b/examples/c/CMakeLists.txt
@@ -17,6 +17,11 @@
 # under the License.
 #
 
+find_package(Proton REQUIRED)
+include(CheckCCompilerFlag)
+
 include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
+
+add_subdirectory(proactor)
 add_subdirectory(messenger)
 add_subdirectory(reactor)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
new file mode 100644
index 0000000..f701651
--- /dev/null
+++ b/examples/c/proactor/CMakeLists.txt
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+find_package(Proton REQUIRED)
+
+include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
+
+add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
+
+find_package(Libuv)
+if (Libuv_FOUND)
+  foreach(name broker send receive)
+    add_executable(libuv_${name} ${name}.c libuv_proactor.c)
+    target_link_libraries(libuv_${name} ${Proton_LIBRARIES} ${Libuv_LIBRARIES})
+    set_target_properties(libuv_${name} PROPERTIES
+      COMPILE_DEFINITIONS  "PN_PROACTOR_INCLUDE=\"libuv_proactor.h\"")
+  endforeach()
+
+  # Add a test with the correct environment to find test executables and valgrind.
+  if(WIN32)
+    set(test_path "$<TARGET_FILE_DIR:libuv_broker>;$<TARGET_FILE_DIR:qpid-proton>")
+  else(WIN32)
+    set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
+  endif(WIN32)
+  set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
+  add_test(c-proactor-libuv ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py)
+endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/examples/c/proactor/README.dox
----------------------------------------------------------------------
diff --git a/examples/c/proactor/README.dox b/examples/c/proactor/README.dox
new file mode 100644
index 0000000..89f7aaa
--- /dev/null
+++ b/examples/c/proactor/README.dox
@@ -0,0 +1,20 @@
+
+/** @example send.c
+
+Send a fixed number of messages to the "example" node.
+
+*/
+
+/** @example simple_recv.cpp
+
+Subscribes to the 'example' node and prints the message bodies received.
+
+*/
+
+/** @example broker.c
+
+A simple multi-threaded broker that works with the send and receive examples.
+
+__Requires C++11__
+
+*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
new file mode 100644
index 0000000..79f34bc
--- /dev/null
+++ b/examples/c/proactor/broker.c
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/connection_engine.h>
+#include <proton/proactor.h>
+#include <proton/engine.h>
+#include <proton/sasl.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+/* TODO aconway 2016-10-14: this example does not require libuv IO,
+   it uses uv.h only for portable mutex and thread functions.
+*/
+#include <uv.h>
+
+bool enable_debug = false;
+
+void debug(const char* fmt, ...) {
+  if (enable_debug) {
+    va_list(ap);
+    va_start(ap, fmt);
+    vfprintf(stderr, fmt, ap);
+    fputc('\n', stderr);
+    fflush(stderr);
+  }
+}
+
+void check(int err, const char* s) {
+  if (err != 0) {
+    perror(s);
+    exit(1);
+  }
+}
+
+void pcheck(int err, const char* s) {
+  if (err != 0) {
+    fprintf(stderr, "%s: %s", s, pn_code(err));
+    exit(1);
+  }
+}
+
+/* Simple re-sizable vector that acts as a queue */
+#define VEC(T) struct { T* data; size_t len, cap; }
+
+#define VEC_INIT(V)                             \
+  do {                                          \
+    V.len = 0;                                  \
+    V.cap = 16;                                 \
+    void **vp = (void**)&V.data;                \
+    *vp = malloc(V.cap * sizeof(*V.data));      \
+  } while(0)
+
+#define VEC_FINAL(V) free(V.data)
+
+#define VEC_PUSH(V, X)                                  \
+  do {                                                  \
+    if (V.len == V.cap) {                               \
+      V.cap *= 2;                                       \
+      void **vp = (void**)&V.data;                      \
+      *vp = realloc(V.data, V.cap * sizeof(*V.data));   \
+    }                                                   \
+    V.data[V.len++] = X;                                \
+  } while(0)                                            \
+
+#define VEC_POP(V)                                              \
+  do {                                                          \
+    if (V.len > 0)                                              \
+      memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data));     \
+  } while(0)
+
+/* Simple thread-safe queue implementation */
+typedef struct queue_t {
+  uv_mutex_t lock;
+  char* name;
+  VEC(pn_rwbytes_t) messages;   /* Messages on the queue_t */
+  VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
+  struct queue_t *next;            /* Next queue in chain */
+  size_t sent;                     /* Count of messages sent, used as delivery tag */
+} queue_t;
+
+static void queue_init(queue_t *q, const char* name, queue_t *next) {
+  debug("created queue %s", name);
+  uv_mutex_init(&q->lock);
+  q->name = strdup(name);
+  VEC_INIT(q->messages);
+  VEC_INIT(q->waiting);
+  q->next = next;
+  q->sent = 0;
+}
+
+static void queue_destroy(queue_t *q) {
+  uv_mutex_destroy(&q->lock);
+  free(q->name);
+  for (size_t i = 0; i < q->messages.len; ++i)
+    free(q->messages.data[i].start);
+  VEC_FINAL(q->messages);
+  for (size_t i = 0; i < q->waiting.len; ++i)
+    pn_decref(q->waiting.data[i]);
+  VEC_FINAL(q->waiting);
+}
+
+/* Send a message on s, or record s as eating if no messages.
+   Called in s dispatch loop, assumes s has credit.
+*/
+static void queue_send(queue_t *q, pn_link_t *s) {
+  pn_rwbytes_t m = { 0 };
+  size_t tag = 0;
+  uv_mutex_lock(&q->lock);
+  if (q->messages.len == 0) { /* Empty, record connection as waiting */
+    debug("queue is empty %s", q->name);
+    /* Record connection for wake-up if not already on the list. */
+    pn_connection_t *c = pn_session_connection(pn_link_session(s));
+    size_t i = 0;
+    for (; i < q->waiting.len && q->waiting.data[i] != c; ++i)
+      ;
+    if (i == q->waiting.len) {
+      VEC_PUSH(q->waiting, c);
+    }
+  } else {
+    debug("sending from queue %s", q->name);
+    m = q->messages.data[0];
+    VEC_POP(q->messages);
+    tag = ++q->sent;
+  }
+  uv_mutex_unlock(&q->lock);
+  if (m.start) {
+    pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag)));
+    pn_link_send(s, m.start, m.size);
+    pn_link_advance(s);
+    pn_delivery_settle(d);  /* Pre-settled: unreliable, there will bea no ack/ */
+    free(m.start);
+  }
+}
+
+/* Data associated with each broker connection */
+typedef struct broker_data_t {
+  bool check_queues;          /* Check senders on the connection for available data in queues. */
+} broker_data_t;
+
+/* Put a message on the queue, called in receiver dispatch loop.
+   If the queue was previously empty, notify waiting senders.
+*/
+static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
+  debug("received to queue %s", q->name);
+  uv_mutex_lock(&q->lock);
+  VEC_PUSH(q->messages, m);
+  if (q->messages.len == 1) { /* Was empty, notify waiting connections */
+    for (size_t i = 0; i < q->waiting.len; ++i) {
+      pn_connection_t *c = q->waiting.data[i];
+      broker_data_t *bd = (broker_data_t*)pn_connection_get_extra(c).start;
+      bd->check_queues = true;
+      pn_connection_wake(c); /* Wake the connection */
+    }
+    q->waiting.len = 0;
+  }
+  uv_mutex_unlock(&q->lock);
+}
+
+/* Thread safe set of queues */
+typedef struct queues_t {
+  uv_mutex_t lock;
+  queue_t *queues;
+  size_t sent;
+} queues_t;
+
+void queues_init(queues_t *qs) {
+  uv_mutex_init(&qs->lock);
+  qs->queues = NULL;
+}
+
+void queues_destroy(queues_t *qs) {
+  for (queue_t *q = qs->queues; q; q = q->next) {
+    queue_destroy(q);
+    free(q);
+  }
+  uv_mutex_destroy(&qs->lock);
+}
+
+/** Get or create the named queue. */
+queue_t* queues_get(queues_t *qs, const char* name) {
+  uv_mutex_lock(&qs->lock);
+  queue_t *q;
+  for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
+    ;
+  if (!q) {
+    q = (queue_t*)malloc(sizeof(queue_t));
+    queue_init(q, name, qs->queues);
+    qs->queues = q;
+  }
+  uv_mutex_unlock(&qs->lock);
+  return q;
+}
+
+/* The broker implementation */
+typedef struct broker_t {
+  pn_proactor_t *proactor;
+  pn_listener_t *listener;
+  queues_t queues;
+  const char *container_id;     /* AMQP container-id */
+  size_t threads;
+  pn_millis_t heartbeat;
+} broker_t;
+
+void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) {
+  b->proactor = pn_proactor();
+  b->listener = NULL;
+  queues_init(&b->queues);
+  b->container_id = container_id;
+  b->threads = threads;
+  b->heartbeat = 0;
+}
+
+void broker_stop(broker_t *b) {
+  /* In this broker an interrupt stops a thread, stopping all threads stops the broker */
+  for (size_t i = 0; i < b->threads; ++i)
+    pn_proactor_interrupt(b->proactor);
+}
+
+/* Try to send if link is sender and has credit */
+static void link_send(broker_t *b, pn_link_t *s) {
+  if (pn_link_is_sender(s) && pn_link_credit(s) > 0) {
+    const char *qname = pn_terminus_get_address(pn_link_source(s));
+    queue_t *q = queues_get(&b->queues, qname);
+    queue_send(q, s);
+  }
+}
+
+static void queue_unsub(queue_t *q, pn_connection_t *c) {
+  uv_mutex_lock(&q->lock);
+  for (size_t i = 0; i < q->waiting.len; ++i) {
+    if (q->waiting.data[i] == c){
+      q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */
+      VEC_POP(q->waiting);
+      break;
+    }
+  }
+  uv_mutex_unlock(&q->lock);
+}
+
+/* Unsubscribe from the queue of interest to this link. */
+static void link_unsub(broker_t *b, pn_link_t *s) {
+  if (pn_link_is_sender(s)) {
+    const char *qname = pn_terminus_get_address(pn_link_source(s));
+    if (qname) {
+      queue_t *q = queues_get(&b->queues, qname);
+      queue_unsub(q, pn_session_connection(pn_link_session(s)));
+    }
+  }
+}
+
+/* Called in connection's event loop when a connection is woken for messages.*/
+static void connection_unsub(broker_t *b, pn_connection_t *c) {
+  for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0))
+    link_unsub(b, l);
+}
+
+static void session_unsub(broker_t *b, pn_session_t *ssn) {
+  pn_connection_t *c = pn_session_connection(ssn);
+  for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) {
+    if (pn_link_session(l) == ssn)
+      link_unsub(b, l);
+  }
+}
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+  if (pn_condition_is_set(cond)) {
+    const char *ename = e ? pn_event_type_name(pn_event_type(e)) : "UNKNOWN";
+    fprintf(stderr, "%s: %s: %s\n", ename,
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+  }
+}
+
+const int WINDOW=10;            /* Incoming credit window */
+
+static bool handle(broker_t* b, pn_event_t* e) {
+  bool more = true;
+  pn_connection_t *c = pn_event_connection(e);
+
+  switch (pn_event_type(e)) {
+
+   case PN_CONNECTION_INIT: {
+     pn_connection_set_container(c, b->container_id);
+     break;
+   }
+   case PN_CONNECTION_BOUND: {
+     /* Turn off security */
+     pn_transport_t *t = pn_connection_transport(c);
+     pn_transport_require_auth(t, false);
+     pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
+     pn_transport_set_idle_timeout(t, 2 * b->heartbeat);
+   }
+   case PN_CONNECTION_REMOTE_OPEN: {
+     pn_connection_open(pn_event_connection(e)); /* Complete the open */
+     break;
+   }
+   case PN_CONNECTION_WAKE: {
+     broker_data_t *bd = (broker_data_t*)pn_connection_get_extra(c).start;
+     if (bd->check_queues) {
+       bd->check_queues = false;
+       int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
+       for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
+         link_send(b, l);
+     }
+     break;
+   }
+   case PN_SESSION_REMOTE_OPEN: {
+     pn_session_open(pn_event_session(e));
+     break;
+   }
+   case PN_LINK_REMOTE_OPEN: {
+     pn_link_t *l = pn_event_link(e);
+     if (pn_link_is_sender(l)) {
+       const char *source = pn_terminus_get_address(pn_link_remote_source(l));
+       pn_terminus_set_address(pn_link_source(l), source);
+     } else {
+       const char* target = pn_terminus_get_address(pn_link_remote_target(l));
+       pn_terminus_set_address(pn_link_target(l), target);
+       pn_link_flow(l, WINDOW);
+     }
+     pn_link_open(l);
+     break;
+   }
+   case PN_LINK_FLOW: {
+     link_send(b, pn_event_link(e));
+     break;
+   }
+   case PN_DELIVERY: {
+     pn_delivery_t *d = pn_event_delivery(e);
+     pn_link_t *r = pn_delivery_link(d);
+     if (pn_link_is_receiver(r) &&
+         pn_delivery_readable(d) && !pn_delivery_partial(d))
+     {
+       size_t size = pn_delivery_pending(d);
+       /* The broker does not decode the message, just forwards it. */
+       pn_rwbytes_t m = { size, (char*)malloc(size) };
+       pn_link_recv(r, m.start, m.size);
+       const char *qname = pn_terminus_get_address(pn_link_target(r));
+       queue_receive(b->proactor, queues_get(&b->queues, qname), m);
+       pn_delivery_update(d, PN_ACCEPTED);
+       pn_delivery_settle(d);
+       pn_link_flow(r, WINDOW - pn_link_credit(r));
+     }
+     break;
+   }
+
+   case PN_TRANSPORT_CLOSED:
+    connection_unsub(b, pn_event_connection(e));
+    check_condition(e, pn_transport_condition(pn_event_transport(e)));
+    break;
+
+   case PN_CONNECTION_REMOTE_CLOSE:
+    check_condition(e, pn_connection_remote_condition(pn_event_connection(e)));
+    connection_unsub(b, pn_event_connection(e));
+    pn_connection_close(pn_event_connection(e));
+    break;
+
+   case PN_SESSION_REMOTE_CLOSE:
+    check_condition(e, pn_session_remote_condition(pn_event_session(e)));
+    session_unsub(b, pn_event_session(e));
+    pn_session_close(pn_event_session(e));
+    pn_session_free(pn_event_session(e));
+    break;
+
+   case PN_LINK_REMOTE_CLOSE:
+    check_condition(e, pn_link_remote_condition(pn_event_link(e)));
+    link_unsub(b, pn_event_link(e));
+    pn_link_close(pn_event_link(e));
+    pn_link_free(pn_event_link(e));
+    break;
+
+   case PN_LISTENER_CLOSE:
+    check_condition(e, pn_listener_condition(pn_event_listener(e)));
+    break;
+
+   case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
+    broker_stop(b);
+    break;
+
+   case PN_PROACTOR_INTERRUPT:
+    more = false;
+    break;
+
+   default:
+    break;
+  }
+  pn_event_done(e);
+  return more;
+}
+
+static void broker_thread(void *void_broker) {
+  broker_t *b = (broker_t*)void_broker;
+  while (handle(b, pn_proactor_wait(b->proactor)))
+    ;
+}
+
+static void usage(const char *arg0) {
+  fprintf(stderr, "Usage: %s [-d] [-a url] [-t thread-count]\n", arg0);
+  exit(1);
+}
+
+int main(int argc, char **argv) {
+  /* Command line options */
+  char *urlstr = NULL;
+  char container_id[256];
+  /* Default container-id is program:pid */
+  snprintf(container_id, sizeof(container_id), "%s:%d", argv[0], getpid());
+  size_t nthreads = 4;
+  pn_millis_t heartbeat = 0;
+  int opt;
+  while ((opt = getopt(argc, argv, "a:t:dh:c:")) != -1) {
+    switch (opt) {
+     case 'a': urlstr = optarg; break;
+     case 't': nthreads = atoi(optarg); break;
+     case 'd': enable_debug = true; break;
+     case 'h': heartbeat = atoi(optarg); break;
+     case 'c': strncpy(container_id, optarg, sizeof(container_id)); break;
+     default: usage(argv[0]); break;
+    }
+  }
+  if (optind < argc)
+    usage(argv[0]);
+
+  broker_t b;
+  broker_init(&b, container_id, nthreads, heartbeat);
+
+  /* Parse the URL or use default values */
+  pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+  const char *host = url ? pn_url_get_host(url) : "localhost";
+  const char *port = url ? pn_url_get_port(url) : NULL;
+  if (!port) port = "amqp";
+
+  /* Initial broker_data value copied to each accepted connection */
+  broker_data_t bd = { false };
+  b.listener = pn_proactor_listen(b.proactor, host, port, 16,
+                                  pn_bytes(sizeof(bd), (char*)&bd));
+  printf("listening on '%s:%s' %zd threads\n", host, port, b.threads);
+
+  if (url) pn_url_free(url);
+  if (b.threads <= 0) {
+    fprintf(stderr, "invalid value -t %zu, threads must be > 0\n", b.threads);
+    exit(1);
+  }
+  /* Start n-1 threads and use main thread */
+  uv_thread_t* threads = (uv_thread_t*)calloc(sizeof(uv_thread_t), b.threads);
+  for (size_t i = 0; i < b.threads-1; ++i) {
+    check(uv_thread_create(&threads[i], broker_thread, &b), "pthread_create");
+  }
+  broker_thread(&b);            /* Use the main thread too. */
+  for (size_t i = 0; i < b.threads-1; ++i) {
+    check(uv_thread_join(&threads[i]), "pthread_join");
+  }
+  pn_proactor_free(b.proactor);
+  free(threads);
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/examples/c/proactor/libuv_proactor.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c
new file mode 100644
index 0000000..ce5b948
--- /dev/null
+++ b/examples/c/proactor/libuv_proactor.c
@@ -0,0 +1,747 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <uv.h>
+
+#include <proton/condition.h>
+#include <proton/connection_engine.h>
+#include <proton/engine.h>
+#include <proton/extra.h>
+#include <proton/message.h>
+#include <proton/object.h>
+#include <proton/proactor.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+
+#include <assert.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+/*
+  libuv loop functions are thread unsafe. The only exception is uv_async_send()
+  which is a thread safe "wakeup" that can wake the uv_loop from another thread.
+
+  To provide concurrency the proactor uses a "leader-worker-follower" model,
+  threads take turns at the roles:
+
+  - a single "leader" calls libuv functions and runs the uv_loop incrementally.
+  When there is work it hands over leadership and becomes a "worker"
+  - "workers" handle events concurrently for distinct connections/listeners
+  When the work is done they become "followers"
+  - "followers" wait for the leader to step aside, one takes over as new leader.
+
+  This model is symmetric: any thread can take on any role based on run-time
+  requirements. It also allows the IO and non-IO work associated with an IO
+  wake-up to be processed in a single thread with no context switches.
+
+  Function naming:
+  - on_ - called in leader thread via uv_run().
+  - leader_ - called in leader thread, while processing the leader_q.
+  - owner_ - called in owning thread, leader or worker but not concurrently.
+
+  Note on_ and leader_ functions can call each other, the prefix indicates the
+  path they are most often called on.
+*/
+
+const char *COND_NAME = "proactor";
+const char *AMQP_PORT = "5672";
+const char *AMQP_PORT_NAME = "amqp";
+const char *AMQPS_PORT = "5671";
+const char *AMQPS_PORT_NAME = "amqps";
+
+PN_HANDLE(PN_PROACTOR)
+
+/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
+   Class definitions are for identification as pn_event_t context only.
+*/
+PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
+PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
+
+/* common to connection engine and listeners */
+typedef struct psocket_t {
+  /* Immutable */
+  pn_proactor_t *proactor;
+
+  /* Protected by proactor.lock */
+  struct psocket_t* next;
+  void (*wakeup)(struct psocket_t*); /* interrupting action for leader */
+
+  /* Only used by leader */
+  uv_tcp_t tcp;
+  void (*action)(struct psocket_t*); /* deferred action for leader */
+  bool is_conn:1;
+  char host[NI_MAXHOST];
+  char port[NI_MAXSERV];
+} psocket_t;
+
+/* Special value for psocket.next pointer when socket is not on any any list. */
+psocket_t UNLISTED;
+
+static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host, const char *port) {
+  ps->proactor = p;
+  ps->next = &UNLISTED;
+  ps->is_conn = is_conn;
+  ps->tcp.data = ps;
+
+  /* For platforms that don't know about "amqp" and "amqps" service names. */
+  if (strcmp(port, AMQP_PORT_NAME) == 0)
+    port = AMQP_PORT;
+  else if (strcmp(port, AMQPS_PORT_NAME) == 0)
+    port = AMQPS_PORT;
+  /* Set to "\001" to indicate a NULL as opposed to an empty string "" */
+  strncpy(ps->host, host ? host : "\001", sizeof(ps->host));
+  strncpy(ps->port, port ? port : "\001", sizeof(ps->port));
+}
+
+/* Turn "\001" back to NULL */
+static inline const char* fixstr(const char* str) {
+  return str[0] == '\001' ? NULL : str;
+}
+
+typedef struct pconn {
+  psocket_t psocket;
+
+  /* Only used by owner thread */
+  pn_connection_engine_t ceng;
+
+  /* Only used by leader */
+  uv_connect_t connect;
+  uv_timer_t timer;
+  uv_write_t write;
+  uv_shutdown_t shutdown;
+  size_t writing;
+  bool reading:1;
+  bool server:1;                /* accept, not connect */
+} pconn;
+
+struct pn_listener_t {
+  psocket_t psocket;
+
+  /* Only used by owner thread */
+  pn_condition_t *condition;
+  pn_collector_t *collector;
+  size_t backlog;
+};
+
+PN_EXTRA_DECLARE(pn_listener_t);
+
+typedef struct queue { psocket_t *front, *back; } queue;
+
+struct pn_proactor_t {
+  /* Leader thread  */
+  uv_cond_t cond;
+  uv_loop_t loop;
+  uv_async_t async;
+
+  /* Protected by lock */
+  uv_mutex_t lock;
+  queue start_q;
+  queue worker_q;
+  queue leader_q;
+  size_t interrupt;             /* pending interrupts */
+  size_t count;                 /* psocket count */
+  bool inactive:1;
+  bool has_leader:1;
+
+  /* Immutable collectors to hold fixed events */
+  pn_collector_t *interrupt_event;
+  pn_collector_t *timeout_event;
+  pn_collector_t *inactive_event;
+};
+
+static bool push_lh(queue *q, psocket_t *ps) {
+  if (ps->next != &UNLISTED)  /* Don't move if already listed. */
+    return false;
+  ps->next = NULL;
+  if (!q->front) {
+    q->front = q->back = ps;
+  } else {
+    q->back->next = ps;
+    q->back =  ps;
+  }
+  return true;
+}
+
+static psocket_t* pop_lh(queue *q) {
+  psocket_t *ps = q->front;
+  if (ps) {
+    q->front = ps->next;
+    ps->next = &UNLISTED;
+  }
+  return ps;
+}
+
+static inline pconn *as_pconn(psocket_t* ps) {
+  return ps->is_conn ? (pconn*)ps : NULL;
+}
+
+static inline pn_listener_t *as_listener(psocket_t* ps) {
+  return ps->is_conn ? NULL: (pn_listener_t*)ps;
+}
+
+/* Put ps on the leader queue for processing. Thread safe. */
+static void to_leader_lh(psocket_t *ps) {
+  push_lh(&ps->proactor->leader_q, ps);
+  uv_async_send(&ps->proactor->async); /* Wake leader */
+}
+
+static void to_leader(psocket_t *ps) {
+  uv_mutex_lock(&ps->proactor->lock);
+  to_leader_lh(ps);
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Detach from IO and put ps on the worker queue */
+static void leader_to_worker(psocket_t *ps) {
+  pconn *pc = as_pconn(ps);
+  /* Don't detach if there are no events yet. */
+  if (pc && pn_connection_engine_has_event(&pc->ceng)) {
+    if (pc->writing) {
+      pc->writing  = 0;
+      uv_cancel((uv_req_t*)&pc->write);
+    }
+    if (pc->reading) {
+      pc->reading = false;
+      uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+    }
+    if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
+      uv_timer_stop(&pc->timer);
+    }
+  }
+
+  /* Nothing to do for a listener, on_accept doesn't touch worker state. */
+
+  uv_mutex_lock(&ps->proactor->lock);
+  push_lh(&ps->proactor->worker_q, ps);
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Re-queue for further work */
+static void worker_requeue(psocket_t* ps) {
+  uv_mutex_lock(&ps->proactor->lock);
+  push_lh(&ps->proactor->worker_q, ps);
+  uv_async_send(&ps->proactor->async); /* Wake leader */
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+static pconn *new_pconn(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) {
+  pconn *pc = (pconn*)calloc(1, sizeof(*pc));
+  if (!pc) return NULL;
+  if (pn_connection_engine_init(&pc->ceng, pn_connection_with_extra(extra.size), NULL) != 0) {
+    return NULL;
+  }
+  if (extra.start && extra.size) {
+    memcpy(pn_connection_get_extra(pc->ceng.connection).start, extra.start, extra.size);
+  }
+  psocket_init(&pc->psocket, p,  true, host, port);
+  if (server) {
+    pn_transport_set_server(pc->ceng.transport);
+  }
+  pn_record_t *r = pn_connection_attachments(pc->ceng.connection);
+  pn_record_def(r, PN_PROACTOR, PN_VOID);
+  pn_record_set(r, PN_PROACTOR, pc);
+  return pc;
+}
+
+pn_listener_t *new_listener(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
+  pn_listener_t *l = (pn_listener_t*)calloc(1, PN_EXTRA_SIZEOF(pn_listener_t, extra.size));
+  if (!l) {
+    return NULL;
+  }
+  l->collector = pn_collector();
+  if (!l->collector) {
+    free(l);
+    return NULL;
+  }
+  if (extra.start && extra.size) {
+    memcpy(pn_listener_get_extra(l).start, extra.start, extra.size);
+  }
+  psocket_init(&l->psocket, p, false, host, port);
+  l->condition = pn_condition();
+  l->backlog = backlog;
+  return l;
+}
+
+static void leader_count(pn_proactor_t *p, int change) {
+  uv_mutex_lock(&p->lock);
+  p->count += change;
+  p->inactive = (p->count == 0);
+  uv_mutex_unlock(&p->lock);
+}
+
+/* Free if there are no uv callbacks pending and no events */
+static void leader_pconn_maybe_free(pconn *pc) {
+    if (pn_connection_engine_has_event(&pc->ceng)) {
+      leader_to_worker(&pc->psocket);         /* Return to worker */
+    } else if (!(pc->psocket.tcp.data || pc->shutdown.data || pc->timer.data)) {
+      pn_connection_engine_destroy(&pc->ceng);
+      leader_count(pc->psocket.proactor, -1);
+      free(pc);
+    }
+}
+
+/* Free if there are no uv callbacks pending and no events */
+static void leader_listener_maybe_free(pn_listener_t *l) {
+    if (pn_collector_peek(l->collector)) {
+      leader_to_worker(&l->psocket);         /* Return to worker */
+    } else if (!l->psocket.tcp.data) {
+      pn_condition_free(l->condition);
+      leader_count(l->psocket.proactor, -1);
+      free(l);
+    }
+}
+
+/* Free if there are no uv callbacks pending and no events */
+static void leader_maybe_free(psocket_t *ps) {
+  if (ps->is_conn) {
+    leader_pconn_maybe_free(as_pconn(ps));
+  } else {
+    leader_listener_maybe_free(as_listener(ps));
+  }
+}
+
+static void on_close(uv_handle_t *h) {
+  psocket_t *ps = (psocket_t*)h->data;
+  h->data = NULL;               /* Mark closed */
+  leader_maybe_free(ps);
+}
+
+static void on_shutdown(uv_shutdown_t *shutdown, int err) {
+  psocket_t *ps = (psocket_t*)shutdown->data;
+  shutdown->data = NULL;        /* Mark closed */
+  leader_maybe_free(ps);
+}
+
+static inline void leader_close(psocket_t *ps) {
+  if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) {
+    uv_close((uv_handle_t*)&ps->tcp, on_close);
+  }
+  pconn *pc = as_pconn(ps);
+  if (pc) {
+    pn_connection_engine_close(&pc->ceng);
+    if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
+      uv_timer_stop(&pc->timer);
+      uv_close((uv_handle_t*)&pc->timer, on_close);
+    }
+  }
+  leader_maybe_free(ps);
+}
+
+static pconn *get_pconn(pn_connection_t* c) {
+  if (!c) return NULL;
+  pn_record_t *r = pn_connection_attachments(c);
+  return (pconn*) pn_record_get(r, PN_PROACTOR);
+}
+
+static void leader_error(psocket_t *ps, int err, const char* what) {
+  if (ps->is_conn) {
+    pn_connection_engine_t *ceng = &as_pconn(ps)->ceng;
+    pn_connection_engine_errorf(ceng, COND_NAME, "%s %s:%s: %s",
+                                what, fixstr(ps->host), fixstr(ps->port),
+                                uv_strerror(err));
+    pn_connection_engine_bind(ceng);
+    pn_connection_engine_close(ceng);
+  } else {
+    pn_listener_t *l = as_listener(ps);
+    pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
+                        what, fixstr(ps->host), fixstr(ps->port),
+                        uv_strerror(err));
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
+  }
+  leader_to_worker(ps);               /* Worker to handle the error */
+}
+
+/* uv-initialization */
+static int leader_init(psocket_t *ps) {
+  leader_count(ps->proactor, +1);
+  int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
+  if (!err) {
+    pconn *pc = as_pconn(ps);
+    if (pc) {
+      pc->connect.data = pc->write.data = pc->shutdown.data = ps;
+      int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
+      if (!err) {
+        pc->timer.data = pc;
+      }
+    }
+  }
+  if (err) {
+    leader_error(ps, err, "initialization");
+  }
+  return err;
+}
+
+/* Common logic for on_connect and on_accept */
+static void leader_connect_accept(pconn *pc, int err, const char *what) {
+  if (!err) {
+    leader_to_worker(&pc->psocket);
+  } else {
+    leader_error(&pc->psocket, err, what);
+  }
+}
+
+static void on_connect(uv_connect_t *connect, int err) {
+  leader_connect_accept((pconn*)connect->data, err, "on connect");
+}
+
+static void on_accept(uv_stream_t* server, int err) {
+  pn_listener_t* l = (pn_listener_t*)server->data;
+  if (!err) {
+    pn_rwbytes_t v =  pn_listener_get_extra(l);
+    pconn *pc = new_pconn(l->psocket.proactor, true,
+                          fixstr(l->psocket.host),
+                          fixstr(l->psocket.port),
+                          pn_bytes(v.size, v.start));
+    if (pc) {
+      int err2 = leader_init(&pc->psocket);
+      if (!err2) err2 = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
+      leader_connect_accept(pc, err2, "on accept");
+    } else {
+      err = UV_ENOMEM;
+    }
+  }
+  if (err) {
+    leader_error(&l->psocket, err, "on accept");
+  }
+}
+
+static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info) {
+  int err = leader_init(ps);
+  if (!err) {
+    err = uv_getaddrinfo(&ps->proactor->loop, info, NULL,
+                         fixstr(ps->host), fixstr(ps->port), NULL);
+  }
+  return err;
+}
+
+static void leader_connect(psocket_t *ps) {
+  pconn *pc = as_pconn(ps);
+  uv_getaddrinfo_t info;
+  int err = leader_resolve(ps, &info);
+  if (!err) {
+    err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect);
+    uv_freeaddrinfo(info.addrinfo);
+  }
+  if (err) {
+    leader_error(ps, err, "connect to");
+  }
+}
+
+static void leader_listen(psocket_t *ps) {
+  pn_listener_t *l = as_listener(ps);
+  uv_getaddrinfo_t info;
+  int err = leader_resolve(ps, &info);
+  if (!err) {
+    err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
+    uv_freeaddrinfo(info.addrinfo);
+  }
+  if (!err) err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
+  if (err) {
+    leader_error(ps, err, "listen on ");
+  }
+}
+
+static void on_tick(uv_timer_t *timer) {
+  pconn *pc = (pconn*)timer->data;
+  pn_transport_t *t = pc->ceng.transport;
+  if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
+    uv_timer_stop(&pc->timer);
+    uint64_t now = uv_now(pc->timer.loop);
+    uint64_t next = pn_transport_tick(t, now);
+    if (next) {
+      uv_timer_start(&pc->timer, on_tick, next - now, 0);
+    }
+  }
+}
+
+static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
+  pconn *pc = (pconn*)stream->data;
+  if (nread >= 0) {
+    pn_connection_engine_read_done(&pc->ceng, nread);
+    on_tick(&pc->timer);         /* check for tick changes. */
+    leader_to_worker(&pc->psocket);
+    /* Reading continues automatically until stopped. */
+  } else if (nread == UV_EOF) { /* hangup */
+    pn_connection_engine_read_close(&pc->ceng);
+    leader_maybe_free(&pc->psocket);
+  } else {
+    leader_error(&pc->psocket, nread, "on read from");
+  }
+}
+
+static void on_write(uv_write_t* request, int err) {
+  pconn *pc = (pconn*)request->data;
+  if (err == 0) {
+    pn_connection_engine_write_done(&pc->ceng, pc->writing);
+    leader_to_worker(&pc->psocket);
+  } else if (err == UV_ECANCELED) {
+    leader_maybe_free(&pc->psocket);
+  } else {
+    leader_error(&pc->psocket, err, "on write to");
+  }
+  pc->writing = 0;              /* Need to send a new write request */
+}
+
+// Read buffer allocation function for uv, just returns the transports read buffer.
+static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
+  pconn *pc = (pconn*)stream->data;
+  pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&pc->ceng);
+  *buf = uv_buf_init(rbuf.start, rbuf.size);
+}
+
+static void leader_rewatch(psocket_t *ps) {
+  pconn *pc = as_pconn(ps);
+
+  if (pc->timer.data) {         /* uv-initialized */
+    on_tick(&pc->timer);        /* Re-enable ticks if required */
+  }
+  pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&pc->ceng);
+  pn_bytes_t wbuf = pn_connection_engine_write_buffer(&pc->ceng);
+
+  /* Ticks and checking buffers can generate events, process before proceeding */
+  if (pn_connection_engine_has_event(&pc->ceng)) {
+    leader_to_worker(ps);
+  } else {                      /* Re-watch for IO */
+    if (wbuf.size > 0 && !pc->writing) {
+      pc->writing = wbuf.size;
+      uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+      uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
+    } else if (wbuf.size == 0 && pn_connection_engine_write_closed(&pc->ceng)) {
+      uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
+    }
+    if (rbuf.size > 0 && !pc->reading) {
+      pc->reading = true;
+      uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+    }
+  }
+}
+
+/* Return the next worker event or { 0 } if no events are ready */
+static pn_event_t* get_event_lh(pn_proactor_t *p) {
+  if (p->inactive) {
+    p->inactive = false;
+    return pn_collector_peek(p->inactive_event);
+  }
+  if (p->interrupt > 0) {
+    --p->interrupt;
+    return pn_collector_peek(p->interrupt_event);
+  }
+  for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
+    if (ps->is_conn) {
+      pconn *pc = as_pconn(ps);
+      return pn_connection_engine_event(&pc->ceng);
+    } else {                    /* Listener */
+      pn_listener_t *l = as_listener(ps);
+      return pn_collector_peek(l->collector);
+    }
+    to_leader(ps);      /* No event, back to leader */
+  }
+  return 0;
+}
+
+/* Called in any thread to set a wakeup action. Replaces any previous wakeup action. */
+static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
+  uv_mutex_lock(&ps->proactor->lock);
+  ps->wakeup = action;
+  to_leader_lh(ps);
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Defer an action to the leader thread. Only from non-leader threads. */
+static void owner_defer(psocket_t *ps, void (*action)(psocket_t*)) {
+  uv_mutex_lock(&ps->proactor->lock);
+  assert(!ps->action);
+  ps->action = action;
+  to_leader_lh(ps);
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+pn_listener_t *pn_event_listener(pn_event_t *e) {
+  return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
+}
+
+pn_proactor_t *pn_event_proactor(pn_event_t *e) {
+  if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
+  pn_listener_t *l = pn_event_listener(e);
+  if (l) return l->psocket.proactor;
+  pn_connection_t *c = pn_event_connection(e);
+  if (c) return pn_connection_proactor(pn_event_connection(e));
+  return NULL;
+}
+
+void pn_event_done(pn_event_t *e) {
+  pn_event_type_t etype = pn_event_type(e);
+  pconn *pc = get_pconn(pn_event_connection(e));
+  if (pc && e == pn_collector_peek(pc->ceng.collector)) {
+    pn_connection_engine_pop_event(&pc->ceng);
+    if (etype == PN_CONNECTION_INIT) {
+      /* Bind after user has handled CONNECTION_INIT */
+      pn_connection_engine_bind(&pc->ceng);
+    }
+    if (pn_connection_engine_has_event(&pc->ceng)) {
+      /* Process all events before going back to IO.
+         Put it back on the worker queue and wake the leader.
+      */
+      worker_requeue(&pc->psocket);
+    } else if (pn_connection_engine_finished(&pc->ceng)) {
+      owner_defer(&pc->psocket, leader_close);
+    } else {
+      owner_defer(&pc->psocket, leader_rewatch);
+    }
+  } else {
+    pn_listener_t *l = pn_event_listener(e);
+    if (l && e == pn_collector_peek(l->collector)) {
+      pn_collector_pop(l->collector);
+      if (etype == PN_LISTENER_CLOSE) {
+        owner_defer(&l->psocket, leader_close);
+      }
+    }
+  }
+}
+
+/* Run follower/leader loop till we can return an event and be a worker */
+pn_event_t *pn_proactor_wait(struct pn_proactor_t* p) {
+  uv_mutex_lock(&p->lock);
+  /* Try to grab work immediately. */
+  pn_event_t *e = get_event_lh(p);
+  if (e == NULL) {
+    /* No work available, follow the leader */
+    while (p->has_leader)
+      uv_cond_wait(&p->cond, &p->lock);
+    /* Lead till there is work to do. */
+    p->has_leader = true;
+    for (e = get_event_lh(p); e == NULL; e = get_event_lh(p)) {
+      /* Run uv_loop outside the lock */
+      uv_mutex_unlock(&p->lock);
+      uv_run(&p->loop, UV_RUN_ONCE);
+      uv_mutex_lock(&p->lock);
+      /* Process leader work queue outside the lock */
+      for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
+        void (*action)(psocket_t*) = ps->action;
+        ps->action = NULL;
+        void (*wakeup)(psocket_t*) = ps->wakeup;
+        ps->wakeup = NULL;
+        if (action || wakeup) {
+          uv_mutex_unlock(&p->lock);
+          if (action) action(ps);
+          if (wakeup) wakeup(ps);
+          uv_mutex_lock(&p->lock);
+        }
+      }
+    }
+    /* Signal the next leader and return to work */
+    p->has_leader = false;
+    uv_cond_signal(&p->cond);
+  }
+  uv_mutex_unlock(&p->lock);
+  return e;
+}
+
+void pn_proactor_interrupt(pn_proactor_t *p) {
+  uv_mutex_lock(&p->lock);
+  ++p->interrupt;
+  uv_async_send(&p->async);   /* Interrupt the UV loop */
+  uv_mutex_unlock(&p->lock);
+}
+
+int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn_bytes_t extra) {
+  pconn *pc = new_pconn(p, false, host, port, extra);
+  if (!pc) {
+    return PN_OUT_OF_MEMORY;
+  }
+  owner_defer(&pc->psocket, leader_connect); /* Process PN_CONNECTION_INIT before binding */
+  return 0;
+}
+
+pn_rwbytes_t pn_listener_get_extra(pn_listener_t *l) { return PN_EXTRA_GET(pn_listener_t, l); }
+
+pn_listener_t *pn_proactor_listen(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
+  pn_listener_t *l = new_listener(p, host, port, backlog, extra);
+  if (l)  owner_defer(&l->psocket, leader_listen);
+  return l;
+}
+
+pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
+  pconn *pc = get_pconn(c);
+  return pc ? pc->psocket.proactor : NULL;
+}
+
+pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
+  return l ? l->psocket.proactor : NULL;
+}
+
+void leader_wake_connection(psocket_t *ps) {
+  pconn *pc = as_pconn(ps);
+  pn_collector_put(pc->ceng.collector, PN_OBJECT, pc->ceng.connection, PN_CONNECTION_WAKE);
+  leader_to_worker(ps);
+}
+
+void pn_connection_wake(pn_connection_t* c) {
+  wakeup(&get_pconn(c)->psocket, leader_wake_connection);
+}
+
+void pn_listener_close(pn_listener_t* l) {
+  wakeup(&l->psocket, leader_close);
+}
+
+/* Only called when condition is closed by error. */
+pn_condition_t* pn_listener_condition(pn_listener_t* l) {
+  return l->condition;
+}
+
+/* Collector to hold for a single fixed event that is never popped. */
+static pn_collector_t *event_holder(pn_proactor_t *p, pn_event_type_t t) {
+  pn_collector_t *c = pn_collector();
+  pn_collector_put(c, pn_proactor__class(), p, t);
+  return c;
+}
+
+pn_proactor_t *pn_proactor() {
+  pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
+  uv_loop_init(&p->loop);
+  uv_mutex_init(&p->lock);
+  uv_cond_init(&p->cond);
+  uv_async_init(&p->loop, &p->async, NULL); /* Just wake the loop */
+  p->interrupt_event = event_holder(p, PN_PROACTOR_INTERRUPT);
+  p->inactive_event = event_holder(p, PN_PROACTOR_INACTIVE);
+  p->timeout_event = event_holder(p, PN_PROACTOR_TIMEOUT);
+  return p;
+}
+
+static void on_stopping(uv_handle_t* h, void* v) {
+  uv_close(h, NULL);           /* Close this handle */
+  if (!uv_loop_alive(h->loop)) /* Everything closed */
+    uv_stop(h->loop);        /* Stop the loop, pn_proactor_destroy() can return */
+}
+
+void pn_proactor_free(pn_proactor_t *p) {
+  uv_walk(&p->loop, on_stopping, NULL); /* Close all handles */
+  uv_run(&p->loop, UV_RUN_DEFAULT);     /* Run till stop, all handles closed */
+  uv_loop_close(&p->loop);
+  uv_mutex_destroy(&p->lock);
+  uv_cond_destroy(&p->cond);
+  pn_collector_free(p->interrupt_event);
+  pn_collector_free(p->inactive_event);
+  pn_collector_free(p->timeout_event);
+  free(p);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
new file mode 100644
index 0000000..303e348
--- /dev/null
+++ b/examples/c/proactor/receive.c
@@ -0,0 +1,202 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/connection_engine.h>
+#include <proton/delivery.h>
+#include <proton/proactor.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+typedef char str[1024];
+
+typedef struct app_data_t {
+    str address;
+    str container_id;
+    pn_rwbytes_t message_buffer;
+    int message_count;
+    int received;
+    pn_proactor_t *proactor;
+} app_data_t;
+
+static const int BATCH = 100; /* Batch size for unlimited receive */
+
+static int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+  if (pn_condition_is_set(cond)) {
+    exit_code = 1;
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+  }
+}
+
+#define MAX_SIZE 1024
+
+static void decode_message(pn_delivery_t *dlv) {
+  static char buffer[MAX_SIZE];
+  ssize_t len;
+  // try to decode the message body
+  if (pn_delivery_pending(dlv) < MAX_SIZE) {
+    // read in the raw data
+    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
+    if (len > 0) {
+      // decode it into a proton message
+      pn_message_t *m = pn_message();
+      if (PN_OK == pn_message_decode(m, buffer, len)) {
+        pn_string_t *s = pn_string(NULL);
+        pn_inspect(pn_message_body(m), s);
+        printf("%s\n", pn_string_get(s));
+        pn_free(s);
+      }
+      pn_message_free(m);
+    }
+  }
+}
+
+/* Handle event, return true of we should handle more */
+static bool handle(app_data_t* app, pn_event_t* event) {
+  bool more = true;
+  switch (pn_event_type(event)) {
+
+   case PN_CONNECTION_INIT: {
+     pn_connection_t* c = pn_event_connection(event);
+     pn_connection_set_container(c, app->container_id);
+     pn_connection_open(c);
+     pn_session_t* s = pn_session(c);
+     pn_session_open(s);
+     pn_link_t* l = pn_receiver(s, "my_receiver");
+     pn_terminus_set_address(pn_link_source(l), app->address);
+     pn_link_open(l);
+     /* cannot receive without granting credit: */
+     pn_link_flow(l, app->message_count ? app->message_count : BATCH);
+   } break;
+
+   case PN_DELIVERY: {
+     /* A message has been received */
+     pn_link_t *link = NULL;
+     pn_delivery_t *dlv = pn_event_delivery(event);
+     if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
+       link = pn_delivery_link(dlv);
+       decode_message(dlv);
+       /* Accept the delivery */
+       pn_delivery_update(dlv, PN_ACCEPTED);
+       /* done with the delivery, move to the next and free it */
+       pn_link_advance(link);
+       pn_delivery_settle(dlv);  /* dlv is now freed */
+
+       if (app->message_count == 0) {
+         /* receive forever - see if more credit is needed */
+         if (pn_link_credit(link) < BATCH/2) {
+           /* Grant enough credit to bring it up to BATCH: */
+           pn_link_flow(link, BATCH - pn_link_credit(link));
+         }
+       } else if (++app->received >= app->message_count) {
+         /* done receiving, close the endpoints */
+         printf("%d messages received\n", app->received);
+         pn_session_t *ssn = pn_link_session(link);
+         pn_link_close(link);
+         pn_session_close(ssn);
+         pn_connection_close(pn_session_connection(ssn));
+       }
+     }
+   } break;
+
+   case PN_TRANSPORT_ERROR:
+    check_condition(event, pn_transport_condition(pn_event_transport(event)));
+    break;
+
+   case PN_CONNECTION_REMOTE_CLOSE:
+    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_SESSION_REMOTE_CLOSE:
+    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_LINK_REMOTE_CLOSE:
+   case PN_LINK_REMOTE_DETACH:
+    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_PROACTOR_INACTIVE:
+    more = false;
+    break;
+
+   default: break;
+  }
+  pn_event_done(event);
+  return more;
+}
+
+static void usage(const char *arg0) {
+    fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+    exit(1);
+}
+
+int main(int argc, char **argv) {
+    /* Default values for application and connection. */
+    app_data_t app = {{0}};
+    app.message_count = 100;
+    const char* urlstr = NULL;
+
+    int opt;
+    while((opt = getopt(argc, argv, "a:m:")) != -1) {
+        switch(opt) {
+          case 'a': urlstr = optarg; break;
+          case 'm': app.message_count = atoi(optarg); break;
+          default: usage(argv[0]); break;
+        }
+    }
+    if (optind < argc)
+        usage(argv[0]);
+
+    snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
+
+    /* Parse the URL or use default values */
+    pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+    const char *host = url ? pn_url_get_host(url) : NULL;
+    const char *port = url ? pn_url_get_port(url) : NULL;
+    if (!port) port = "amqp";
+    strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
+
+    /* Create the proactor and connect */
+    app.proactor = pn_proactor();
+    pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
+    if (url) pn_url_free(url);
+
+    while (handle(&app, pn_proactor_wait(app.proactor)))
+           ;
+    pn_proactor_free(app.proactor);
+    free(app.message_buffer.start);
+    return exit_code;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
new file mode 100644
index 0000000..68ba0c8
--- /dev/null
+++ b/examples/c/proactor/send.c
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/connection_engine.h>
+#include <proton/delivery.h>
+#include <proton/proactor.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+typedef char str[1024];
+
+typedef struct app_data_t {
+    str address;
+    str container_id;
+    pn_rwbytes_t message_buffer;
+    int message_count;
+    int sent;
+    int acknowledged;
+    pn_proactor_t *proactor;
+} app_data_t;
+
+int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+  if (pn_condition_is_set(cond)) {
+    exit_code = 1;
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+  }
+}
+
+/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
+static pn_bytes_t encode_message(app_data_t* app) {
+    /* Construct a message with the map { "sequence": app.sent } */
+    pn_message_t* message = pn_message();
+    pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+    pn_data_t* body = pn_message_body(message);
+    pn_data_put_map(body);
+    pn_data_enter(body);
+    pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+    pn_data_put_int(body, app->sent); /* The sequence number */
+    pn_data_exit(body);
+
+    /* encode the message, expanding the encode buffer as needed */
+    if (app->message_buffer.start == NULL) {
+        static const size_t initial_size = 128;
+        app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+    }
+    /* app->message_buffer is the total buffer space available. */
+    /* mbuf wil point at just the portion used by the encoded message */
+    pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+    int status = 0;
+    while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+        app->message_buffer.size *= 2;
+        app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+        mbuf.size = app->message_buffer.size;
+    }
+    if (status != 0) {
+        fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+        exit(1);
+    }
+    pn_message_free(message);
+    return pn_bytes(mbuf.size, mbuf.start);
+}
+
+/* Handle event, return true of we should handle more */
+static bool handle(app_data_t* app, pn_event_t* event) {
+  bool more = true;
+  switch (pn_event_type(event)) {
+
+   case PN_CONNECTION_INIT: {
+     pn_connection_t* c = pn_event_connection(event);
+     pn_connection_set_container(c, app->container_id);
+     pn_connection_open(c);
+     pn_session_t* s = pn_session(c);
+     pn_session_open(s);
+     pn_link_t* l = pn_sender(s, "my_sender");
+     pn_terminus_set_address(pn_link_target(l), app->address);
+     pn_link_open(l);
+   } break;
+
+   case PN_LINK_FLOW: {
+     /* The peer has given us some credit, now we can send messages */
+     pn_link_t *sender = pn_event_link(event);
+     while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
+       ++app->sent;
+       // Use sent counter bytes as unique delivery tag.
+       pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+       pn_bytes_t msgbuf = encode_message(app);
+       pn_link_send(sender, msgbuf.start, msgbuf.size);
+       pn_link_advance(sender);
+     }
+   } break;
+
+   case PN_DELIVERY: {
+     /* We received acknowledgedment from the peer that a message was delivered. */
+     pn_delivery_t* d = pn_event_delivery(event);
+     if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
+       if (++app->acknowledged == app->message_count) {
+         printf("%d messages sent and acknowledged\n", app->acknowledged);
+         pn_connection_close(pn_event_connection(event));
+       }
+     }
+   } break;
+
+   case PN_TRANSPORT_ERROR:
+    check_condition(event, pn_transport_condition(pn_event_transport(event)));
+    break;
+
+   case PN_CONNECTION_REMOTE_CLOSE:
+    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_SESSION_REMOTE_CLOSE:
+    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_LINK_REMOTE_CLOSE:
+   case PN_LINK_REMOTE_DETACH:
+    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_PROACTOR_INACTIVE:
+    more = false;
+    break;
+
+   default: break;
+  }
+  pn_event_done(event);
+  return more;
+}
+
+static void usage(const char *arg0) {
+    fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+    exit(1);
+}
+
+int main(int argc, char **argv) {
+    /* Default values for application and connection. */
+    app_data_t app = {{0}};
+    app.message_count = 100;
+    const char* urlstr = NULL;
+
+    int opt;
+    while((opt = getopt(argc, argv, "a:m:")) != -1) {
+        switch(opt) {
+          case 'a': urlstr = optarg; break;
+          case 'm': app.message_count = atoi(optarg); break;
+          default: usage(argv[0]); break;
+        }
+    }
+    if (optind < argc)
+        usage(argv[0]);
+
+    snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
+
+    /* Parse the URL or use default values */
+    pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+    const char *host = url ? pn_url_get_host(url) : NULL;
+    const char *port = url ? pn_url_get_port(url) : NULL;
+    if (!port) port = "amqp";
+    strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
+
+    /* Create the proactor and connect */
+    app.proactor = pn_proactor();
+    pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
+    if (url) pn_url_free(url);
+
+    while (handle(&app, pn_proactor_wait(app.proactor)))
+           ;
+    pn_proactor_free(app.proactor);
+    free(app.message_buffer.start);
+    return exit_code;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
new file mode 100644
index 0000000..5dc3a99
--- /dev/null
+++ b/examples/c/proactor/test.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+# This is a test script to run the examples and verify that they behave as expected.
+
+from exampletest import *
+
+import unittest
+import sys
+
+def python_cmd(name):
+    dir = os.path.dirname(__file__)
+    return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
+
+def receive_expect(n):
+    return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n
+
+class CExampleTest(BrokerTestCase):
+    broker_exe = ["libuv_broker"]
+
+    def test_send_receive(self):
+        """Send first then receive"""
+        s = self.proc(["libuv_send", "-a", self.addr])
+        self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
+        r = self.proc(["libuv_receive", "-a", self.addr])
+        self.assertEqual(receive_expect(100), r.wait_out())
+
+    def test_receive_send(self):
+        """Start receiving  first, then send."""
+        r = self.proc(["libuv_receive", "-a", self.addr]);
+        s = self.proc(["libuv_send", "-a", self.addr]);
+        self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
+        self.assertEqual(receive_expect(100), r.wait_out())
+
+if __name__ == "__main__":
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/examples/engine/c/precv.c
----------------------------------------------------------------------
diff --git a/examples/engine/c/precv.c b/examples/engine/c/precv.c
deleted file mode 100644
index 3c79a6e..0000000
--- a/examples/engine/c/precv.c
+++ /dev/null
@@ -1,502 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-/*################################################################
-  This program is half of a pair.  Precv and Psend are meant to 
-  be simple-as-possible examples of how to use the proton-c
-  engine interface to send and receive messages over a single 
-  connection and a single session.
-
-  In addition to being examples, these programs or their 
-  descendants will be used in performance regression testing
-  for both throughput and latency, and long-term soak testing.
-*################################################################*/
-
-#include <stdio.h>
-#include <string.h>
-#include <ctype.h>
-#include <sys/time.h>
-#include <unistd.h>
-#include <stdlib.h>
-#include <time.h>
-#define __STDC_FORMAT_MACROS
-#include <inttypes.h>
-
-#include <proton/connection.h>
-#include <proton/delivery.h>
-#include <proton/driver.h>
-#include <proton/event.h>
-#include <proton/terminus.h>
-#include <proton/link.h>
-#include <proton/message.h>
-#include <proton/session.h>
-
-
-
-
-
-#define MY_BUF_SIZE  1000
-
-
-
-
-/*---------------------------------------------------------
-  These high-resolution times are used both for
-  interim timing reports -- i.e. every 'report_frequency'
-  messages -- and for the final timestamp, after all 
-  expected messages have been received.
----------------------------------------------------------*/
-static
-double
-get_time ( )
-{
-  struct timeval tv;
-  struct tm      * timeinfo;
-
-  gettimeofday ( & tv, 0 );
-  timeinfo = localtime ( & tv.tv_sec );
-
-  double time_now = 3600 * timeinfo->tm_hour +
-                      60 * timeinfo->tm_min  +
-                           timeinfo->tm_sec;
-
-  time_now += ((double)(tv.tv_usec) / 1000000.0);
-  return time_now;
-}
-
-
-
-
-
-/*----------------------------------------------------
-  These absolute timestamps are useful in soak tests,
-  where I want to align the program's output with 
-  output from top to look at CPU and memory use..
-----------------------------------------------------*/
-void
-print_timestamp_like_a_normal_person ( FILE * fp )
-{
-  char const * month_abbrevs[] = { "jan", 
-                                   "feb", 
-                                   "mar", 
-                                   "apr", 
-                                   "may", 
-                                   "jun", 
-                                   "jul", 
-                                   "aug", 
-                                   "sep", 
-                                   "oct", 
-                                   "nov", 
-                                   "dec" 
-                                 };
-  time_t rawtime;
-  struct tm * timeinfo;
-
-  time ( & rawtime );
-  timeinfo = localtime ( &rawtime );
-
-  char time_string[100];
-  sprintf ( time_string,
-            "%d-%s-%02d %02d:%02d:%02d",
-            1900 + timeinfo->tm_year,
-            month_abbrevs[timeinfo->tm_mon],
-            timeinfo->tm_mday,
-            timeinfo->tm_hour,
-            timeinfo->tm_min,
-            timeinfo->tm_sec
-          );
-
-  fprintf ( fp, "timestamp %s\n", time_string );
-}
-
-
-
-
-
-int 
-main ( int argc, char ** argv  ) 
-{
-  char info[1000];
-
-  uint64_t received = 0;
-
-  char host [1000];
-  char port [1000];
-  char output_file_name[1000];
-
-  int initial_flow   = 400;
-  int flow_increment = 200;
-
-  int       report_frequency  = 200000;
-  int64_t   messages          = 2000000,
-            delivery_count    = 0;
-
-  strcpy ( host, "0.0.0.0" );
-  strcpy ( port, "5672" );
-
-  pn_driver_t     * driver;
-  pn_listener_t   * listener;
-  pn_connector_t  * connector;
-  pn_connection_t * connection;
-  pn_collector_t  * collector;
-  pn_transport_t  * transport;
-  pn_sasl_t       * sasl;
-  pn_session_t    * session;
-  pn_event_t      * event;
-  pn_link_t       * link;
-  pn_delivery_t   * delivery;
-
-
-  double last_time,
-         this_time,
-         time_diff;
-
-  char * message_data          = (char *) malloc ( MY_BUF_SIZE );
-  int    message_data_capacity = MY_BUF_SIZE;
-
-  FILE * output_fp;
-
-
-  /*-----------------------------------------------------------
-    Read the command lines args and override initialization.
-  -----------------------------------------------------------*/
-  for ( int i = 1; i < argc; ++ i )
-  {
-    if ( ! strcmp ( "--host", argv[i] ) )
-    {
-      strcpy ( host, argv[i+1] );
-      ++ i;
-    }
-    else
-    if ( ! strcmp ( "--port", argv[i] ) )
-    {
-      strcpy ( port, argv[i+1] );
-      ++ i;
-    }
-    else
-    if ( ! strcmp ( "--messages", argv[i] ) )
-    {
-      sscanf ( argv [ i+1 ], "%" SCNd64 , & messages );
-      ++ i;
-    }
-    else
-    if ( ! strcmp ( "--output", argv[i] ) )
-    {
-      if ( ! strcmp ( "stderr", argv[i+1] ) )
-      {
-        output_fp = stderr;
-        strcpy ( output_file_name, "stderr");
-      }
-      else
-      if ( ! strcmp ( "stdout", argv[i+1] ) )
-      {
-        output_fp = stdout;
-        strcpy ( output_file_name, "stdout");
-      }
-      else
-      {
-        output_fp = fopen ( argv[i+1], "w" );
-        strcpy ( output_file_name, argv[i+1] );
-        if ( ! output_fp )
-        {
-          fprintf ( stderr, "Can't open |%s| for writing.\n", argv[i+1] );
-          exit ( 1 );
-        }
-      }
-      ++ i;
-    }
-    else
-    if ( ! strcmp ( "--report_frequency", argv[i] ) )
-    {
-      report_frequency = atoi ( argv[i+1] );
-      ++ i;
-    }
-    else
-    if ( ! strcmp ( "--initial_flow", argv[i] ) )
-    {
-      sscanf ( argv [ i+1 ], "%d", & initial_flow );
-      ++ i;
-    }
-    else
-    if ( ! strcmp ( "--flow_increment", argv[i] ) )
-    {
-      sscanf ( argv [ i+1 ], "%d", & flow_increment );
-      ++ i;
-    }
-    else
-    {
-      fprintf ( output_fp, "unknown arg |%s|\n", argv[i] );
-      exit ( 1 );
-    }
-  }
-
-  /*-----------------------------------------------
-    Show what we ended up with for all the args.
-  -----------------------------------------------*/
-  fprintf ( output_fp, "host                %s\n",          host );
-  fprintf ( output_fp, "port                %s\n",          port );
-  fprintf ( output_fp, "messages            %" PRId64 "\n", messages );
-  fprintf ( output_fp, "report_frequency    %d\n",          report_frequency );
-  fprintf ( output_fp, "initial_flow        %d\n",          initial_flow );
-  fprintf ( output_fp, "flow_increment      %d\n",          flow_increment );
-  fprintf ( output_fp, "output              %s\n",          output_file_name );
-
-
-  /*--------------------------------------------
-    Create a standard driver and listen for the 
-    initial connector.
-  --------------------------------------------*/
-  driver = pn_driver ( );
-
-  if ( ! pn_listener(driver, host, port, 0) ) 
-  {
-    fprintf ( output_fp, "precv listener creation failed.\n" );
-    exit ( 1 );
-  }
-
-  fprintf ( output_fp, "\nprecv ready...\n\n" );
-
-  while ( 1 )
-  {
-    pn_driver_wait ( driver, -1 );
-    if ( listener = pn_driver_listener(driver) )
-    {
-      if ( connector = pn_listener_accept(listener) )
-        break;
-    }
-  }
-
-  /*--------------------------------------------------------
-    Now make all the other structure around the connector,
-    and tell it that skipping sasl is OK.
-  --------------------------------------------------------*/
-  connection = pn_connection ( );
-  collector  = pn_collector  ( );
-  pn_connection_collect ( connection, collector );
-  pn_connector_set_connection ( connector, connection );
-
-  transport = pn_connector_transport ( connector );
-  sasl = pn_sasl ( transport );
-  pn_sasl_mechanisms ( sasl, "ANONYMOUS" );
-  pn_sasl_server ( sasl );
-  pn_sasl_allow_skip ( sasl, true );
-  pn_sasl_done ( sasl, PN_SASL_OK );
-
-  /*----------------------------------------------------------
-    If report_frequency is not set to zero, we will 
-    produce a timing report every report_frequency messages.
-    The timing reported will be the delta from the last_time
-    to the current time.  
-    This is useful in soak testing, where you basically 
-    never stop, but still need to see how the system is doing
-    every so often.
-  ----------------------------------------------------------*/
-  last_time = get_time();
-
-
-  /*------------------------------------------------------------
-    A triply-nested loop.
-    In the outermost one, we just wait for activity to come in 
-    from the driver.
-  ------------------------------------------------------------*/
-  while ( 1 )
-  {
-    pn_driver_wait ( driver, -1 );
-
-    /*---------------------------------------------------------------
-      In the next loop, we keep going as long as we processed
-      some events.  This is because our own processing of events
-      may have caused more to be generated that we need to handle.
-      If we go back to the outermost loop and its pn_driver_wait()
-      without handling these events, we will end up with the sender
-      and receiver programs just staring at each other with blank
-      expressions on their faces.
-    ---------------------------------------------------------------*/
-    int event_count = 1;
-    while ( event_count > 0 )
-    {
-      event_count = 0;
-      pn_connector_process ( connector );
-
-      /*-------------------------------------------------------
-        After we process the connector, it may have generated
-        a batch of events for us to handle.  As we go through 
-        this batch of events, our handling may generate other 
-        events which we must handle before going back to 
-        pn_driver_wait().
-      --------------------------------------------------------*/
-      event = pn_collector_peek(collector);
-      while ( event )
-      {
-        ++ event_count;
-        pn_event_type_t event_type = pn_event_type ( event );
-        //fprintf ( output_fp, "precv event: %s\n", pn_event_type_name(event_type));
-
-
-        switch ( event_type )
-        {
-          case PN_CONNECTION_REMOTE_OPEN:
-          break;
-
-
-          case PN_SESSION_REMOTE_OPEN:
-            session = pn_event_session(event);
-            if ( pn_session_state(session) & PN_LOCAL_UNINIT ) 
-            {
-              // big number because it's in bytes.
-              pn_session_set_incoming_capacity ( session, 1000000 );
-              pn_session_open ( session );
-            }
-          break;
-
-
-          case PN_LINK_REMOTE_OPEN:
-            /*----------------------------------------------------
-              When we first open the link, we give it an initial 
-              amount of credit in units of messages.
-              We will later increment its credit whenever credit
-              falls below some threshold.
-            ----------------------------------------------------*/
-            link = pn_event_link(event);
-            if (pn_link_state(link) & PN_LOCAL_UNINIT )
-            {
-              pn_link_open ( link );
-              pn_link_flow ( link, initial_flow );
-            }
-          break;
-
-
-          case PN_CONNECTION_BOUND:
-            if ( pn_connection_state(connection) & PN_LOCAL_UNINIT )
-            {
-              pn_connection_open ( connection );
-            }
-          break;
-
-
-          // And now the event that you've all been waiting for.....
-
-          case PN_DELIVERY:
-            link = pn_event_link ( event );
-            delivery = pn_event_delivery ( event );
-
-            /*------------------------------------------------
-              Since I want this program to be a receiver,
-              I am not interested in deliver-related events 
-              unless they are incoming, 'readable' events.
-            ------------------------------------------------*/
-            if ( pn_delivery_readable ( delivery ) )
-            {
-              // If the delivery is partial I am just going to ignore
-              // it until it becomes complete.
-              if ( ! pn_delivery_partial ( delivery ) )
-              {
-                ++ delivery_count; 
-                /*
-                if ( ! (delivery_count % report_frequency) )
-                {
-                  pn_link_t * delivery_link = pn_delivery_link ( delivery );
-                  int received_bytes = pn_delivery_pending ( delivery );
-                  pn_link_recv ( delivery_link, incoming, 1000 );
-                  fprintf ( output_fp, "received bytes: %d\n", received_bytes );
-                }
-                */
-
-                // don't bother updating.  they're pre-settled.
-                // pn_delivery_update ( delivery, PN_ACCEPTED );
-                pn_delivery_settle ( delivery );
-
-                int credit = pn_link_credit ( link );
-
-                if ( delivery_count >= messages )
-                {
-                  fprintf ( output_fp, "precv_stop %.3lf\n", get_time() );
-                  goto all_done;
-                }
-
-                // Make a report frequency of zero shut down interim reporting.
-                if ( report_frequency
-                     &&
-                     (! (delivery_count % report_frequency))
-                   )
-                {
-                  this_time = get_time();
-                  time_diff = this_time - last_time;
-
-                  print_timestamp_like_a_normal_person ( output_fp );
-                  fprintf ( output_fp, 
-                            "deliveries: %" PRIu64 "  time: %.3lf\n", 
-                            delivery_count,
-                            time_diff
-                          );
-                  fflush ( output_fp );
-                  last_time = this_time;
-                }
-
-                /*----------------------------------------------------------
-                  I replenish the credit whevere it falls below this limit
-                  because I this psend / precv pair of programs to run as
-                  fast as possible.  A real application might want to do
-                  something fancier here.
-                ----------------------------------------------------------*/
-                if ( credit <= flow_increment )
-                {
-                  pn_link_flow ( link, flow_increment );
-                }
-              }
-            }
-          break;
-
-
-          case PN_TRANSPORT:
-            // not sure why I would care...
-          break;
-
-
-          default:
-            /*
-            fprintf ( output_fp, 
-                      "precv unhandled event: %s\n", 
-                      pn_event_type_name(event_type)
-                    );
-            */
-          break;
-        }
-
-        pn_collector_pop ( collector );
-        event = pn_collector_peek(collector);
-      }
-    }
-  }
-
-
-all_done:
-  pn_session_close ( session );
-  pn_connection_close ( connection );
-  fclose ( output_fp );
-  return 0;
-}
-
-
-
-
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org