You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/17 18:18:55 UTC

[08/11] qpid-proton git commit: PROTON-1344: proactor batch events, rename connection_driver

PROTON-1344: proactor batch events, rename connection_driver

renamed pn_connection_engine as pn_connection_driver.

pn_proactor_wait() returns pn_event_batch_t* rather than individual pn_event_t*
to reduce thread-context switching.

Added pn_collector_next() for simpler event looping.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/25706a47
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/25706a47
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/25706a47

Branch: refs/heads/master
Commit: 25706a47ea8f29c0a53f5fae44195a916173cfbd
Parents: 94bc296
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Nov 16 22:31:00 2016 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Nov 17 11:22:14 2016 -0500

----------------------------------------------------------------------
 examples/c/proactor/broker.c                    |  21 +-
 examples/c/proactor/libuv_proactor.c            | 323 +++++++++++--------
 examples/c/proactor/receive.c                   | 100 +++---
 examples/c/proactor/send.c                      | 164 +++++-----
 examples/cpp/README.dox                         |   2 +-
 examples/cpp/mt/epoll_container.cpp             |  76 ++---
 proton-c/CMakeLists.txt                         |   4 +-
 proton-c/bindings/cpp/CMakeLists.txt            |   2 +-
 proton-c/bindings/cpp/docs/io.md                |   6 +-
 proton-c/bindings/cpp/docs/main.md              |   2 +-
 .../cpp/include/proton/connection_options.hpp   |   4 +-
 .../cpp/include/proton/io/connection_driver.hpp | 211 ++++++++++++
 .../cpp/include/proton/io/connection_engine.hpp | 215 ------------
 .../cpp/include/proton/messaging_handler.hpp    |   2 +-
 .../bindings/cpp/include/proton/transport.hpp   |   2 +-
 proton-c/bindings/cpp/src/engine_test.cpp       |  12 +-
 proton-c/bindings/cpp/src/include/contexts.hpp  |   2 +-
 .../bindings/cpp/src/io/connection_driver.cpp   | 161 +++++++++
 .../bindings/cpp/src/io/connection_engine.cpp   | 160 ---------
 proton-c/bindings/cpp/src/receiver.cpp          |   2 +-
 proton-c/bindings/cpp/src/thread_safe_test.cpp  |  10 +-
 proton-c/docs/api/index.md                      |  42 ++-
 proton-c/include/proton/connection.h            |   2 +-
 proton-c/include/proton/connection_driver.h     | 243 ++++++++++++++
 proton-c/include/proton/connection_engine.h     | 313 ------------------
 proton-c/include/proton/cproton.i               |   3 +
 proton-c/include/proton/event.h                 |  56 ++++
 proton-c/include/proton/proactor.h              |  62 ++--
 proton-c/src/core/connection_driver.c           | 173 +++++-----
 proton-c/src/core/connection_engine.c           | 163 ----------
 proton-c/src/core/event.c                       |  28 +-
 proton-c/src/tests/refcount.c                   |   3 +-
 qpid-proton-cpp.syms                            |  48 +--
 33 files changed, 1259 insertions(+), 1358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index e11a8bd..66381fc 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
 #include <proton/proactor.h>
 #include <proton/engine.h>
 #include <proton/sasl.h>
@@ -220,9 +220,11 @@ typedef struct broker_t {
   const char *container_id;     /* AMQP container-id */
   size_t threads;
   pn_millis_t heartbeat;
+  bool finished;
 } broker_t;
 
 void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) {
+  memset(b, 0, sizeof(*b));
   b->proactor = pn_proactor();
   b->listener = NULL;
   queues_init(&b->queues);
@@ -293,8 +295,7 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond) {
 
 const int WINDOW=10;            /* Incoming credit window */
 
-static bool handle(broker_t* b, pn_event_t* e) {
-  bool more = true;
+static void handle(broker_t* b, pn_event_t* e) {
   pn_connection_t *c = pn_event_connection(e);
 
   switch (pn_event_type(e)) {
@@ -398,20 +399,24 @@ static bool handle(broker_t* b, pn_event_t* e) {
     break;
 
    case PN_PROACTOR_INTERRUPT:
-    more = false;
+    b->finished = true;
     break;
 
    default:
     break;
   }
-  pn_event_done(e);
-  return more;
 }
 
 static void broker_thread(void *void_broker) {
   broker_t *b = (broker_t*)void_broker;
-  while (handle(b, pn_proactor_wait(b->proactor)))
-    ;
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(b->proactor);
+    pn_event_t *e;
+    while ((e = pn_event_batch_next(events))) {
+      handle(b, e);
+    }
+    pn_proactor_done(b->proactor, events);
+  } while(!b->finished);
 }
 
 static void usage(const char *arg0) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/c/proactor/libuv_proactor.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c
index 8dd2706..35afd5c 100644
--- a/examples/c/proactor/libuv_proactor.c
+++ b/examples/c/proactor/libuv_proactor.c
@@ -22,7 +22,7 @@
 #include <uv.h>
 
 #include <proton/condition.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
 #include <proton/engine.h>
 #include <proton/extra.h>
 #include <proton/message.h>
@@ -44,11 +44,15 @@
   To provide concurrency the proactor uses a "leader-worker-follower" model,
   threads take turns at the roles:
 
-  - a single "leader" calls libuv functions and runs the uv_loop incrementally.
-  When there is work it hands over leadership and becomes a "worker"
+  - a single "leader" calls libuv functions and runs the uv_loop in short bursts
+    to generate work. When there is work available it gives up leadership and
+    becomes a "worker"
+
   - "workers" handle events concurrently for distinct connections/listeners
-  When the work is done they become "followers"
-  - "followers" wait for the leader to step aside, one takes over as new leader.
+    They do as much work as they can get, when none is left they become "followers"
+
+  - "followers" wait for the leader to generate work and become workers.
+    When the leader itself becomes a worker, one of the followers takes over.
 
   This model is symmetric: any thread can take on any role based on run-time
   requirements. It also allows the IO and non-IO work associated with an IO
@@ -77,7 +81,7 @@ PN_HANDLE(PN_PROACTOR)
 PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
 PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
 
-/* common to connection engine and listeners */
+/* common to connection and listener */
 typedef struct psocket_t {
   /* Immutable */
   pn_proactor_t *proactor;
@@ -118,11 +122,11 @@ static inline const char* fixstr(const char* str) {
   return str[0] == '\001' ? NULL : str;
 }
 
-typedef struct pconn {
+typedef struct pconnection_t {
   psocket_t psocket;
 
   /* Only used by owner thread */
-  pn_connection_engine_t ceng;
+  pn_connection_driver_t driver;
 
   /* Only used by leader */
   uv_connect_t connect;
@@ -132,7 +136,7 @@ typedef struct pconn {
   size_t writing;
   bool reading:1;
   bool server:1;                /* accept, not connect */
-} pconn;
+} pconnection_t;
 
 struct pn_listener_t {
   psocket_t psocket;
@@ -140,6 +144,7 @@ struct pn_listener_t {
   /* Only used by owner thread */
   pn_condition_t *condition;
   pn_collector_t *collector;
+  pn_event_batch_t batch;
   size_t backlog;
 };
 
@@ -153,6 +158,10 @@ struct pn_proactor_t {
   uv_loop_t loop;
   uv_async_t async;
 
+  /* Owner thread: proactor collector and batch can belong to leader or a worker */
+  pn_collector_t *collector;
+  pn_event_batch_t batch;
+
   /* Protected by lock */
   uv_mutex_t lock;
   queue start_q;
@@ -162,11 +171,7 @@ struct pn_proactor_t {
   size_t count;                 /* psocket count */
   bool inactive:1;
   bool has_leader:1;
-
-  /* Immutable collectors to hold fixed events */
-  pn_collector_t *interrupt_event;
-  pn_collector_t *timeout_event;
-  pn_collector_t *inactive_event;
+  bool batch_working:1;          /* batch belongs to a worker.  */
 };
 
 static bool push_lh(queue *q, psocket_t *ps) {
@@ -191,8 +196,8 @@ static psocket_t* pop_lh(queue *q) {
   return ps;
 }
 
-static inline pconn *as_pconn(psocket_t* ps) {
-  return ps->is_conn ? (pconn*)ps : NULL;
+static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
+  return ps->is_conn ? (pconnection_t*)ps : NULL;
 }
 
 static inline pn_listener_t *as_listener(psocket_t* ps) {
@@ -213,9 +218,9 @@ static void to_leader(psocket_t *ps) {
 
 /* Detach from IO and put ps on the worker queue */
 static void leader_to_worker(psocket_t *ps) {
-  pconn *pc = as_pconn(ps);
+  pconnection_t *pc = as_pconnection_t(ps);
   /* Don't detach if there are no events yet. */
-  if (pc && pn_connection_engine_has_event(&pc->ceng)) {
+  if (pc && pn_connection_driver_has_event(&pc->driver)) {
     if (pc->writing) {
       pc->writing  = 0;
       uv_cancel((uv_req_t*)&pc->write);
@@ -236,6 +241,28 @@ static void leader_to_worker(psocket_t *ps) {
   uv_mutex_unlock(&ps->proactor->lock);
 }
 
+/* Set a deferred action for leader, if not already set. */
+static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
+  uv_mutex_lock(&ps->proactor->lock);
+  if (!ps->action) {
+    ps->action = action;
+  }
+  to_leader_lh(ps);
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Owner thread send to worker thread. Set deferred action if not already set. */
+static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) {
+  uv_mutex_lock(&ps->proactor->lock);
+  if (!ps->action) {
+    ps->action = action;
+  }
+  push_lh(&ps->proactor->worker_q, ps);
+  uv_async_send(&ps->proactor->async); /* Wake leader */
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+
 /* Re-queue for further work */
 static void worker_requeue(psocket_t* ps) {
   uv_mutex_lock(&ps->proactor->lock);
@@ -244,25 +271,43 @@ static void worker_requeue(psocket_t* ps) {
   uv_mutex_unlock(&ps->proactor->lock);
 }
 
-static pconn *new_pconn(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) {
-  pconn *pc = (pconn*)calloc(1, sizeof(*pc));
+static pconnection_t *new_pconnection_t(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) {
+  pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
   if (!pc) return NULL;
-  if (pn_connection_engine_init(&pc->ceng, pn_connection_with_extra(extra.size), NULL) != 0) {
+  if (pn_connection_driver_init(&pc->driver, pn_connection_with_extra(extra.size), NULL) != 0) {
     return NULL;
   }
   if (extra.start && extra.size) {
-    memcpy(pn_connection_get_extra(pc->ceng.connection).start, extra.start, extra.size);
+    memcpy(pn_connection_get_extra(pc->driver.connection).start, extra.start, extra.size);
   }
   psocket_init(&pc->psocket, p,  true, host, port);
   if (server) {
-    pn_transport_set_server(pc->ceng.transport);
+    pn_transport_set_server(pc->driver.transport);
   }
-  pn_record_t *r = pn_connection_attachments(pc->ceng.connection);
+  pn_record_t *r = pn_connection_attachments(pc->driver.connection);
   pn_record_def(r, PN_PROACTOR, PN_VOID);
   pn_record_set(r, PN_PROACTOR, pc);
   return pc;
 }
 
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
+
+static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
+  return (batch->next_event == proactor_batch_next) ?
+    (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
+}
+
+static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
+  return (batch->next_event == listener_batch_next) ?
+    (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
+}
+
+static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
+  pn_connection_driver_t *d = pn_event_batch_connection_driver(batch);
+  return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
+}
+
 pn_listener_t *new_listener(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
   pn_listener_t *l = (pn_listener_t*)calloc(1, PN_EXTRA_SIZEOF(pn_listener_t, extra.size));
   if (!l) {
@@ -278,6 +323,7 @@ pn_listener_t *new_listener(pn_proactor_t *p, const char *host, const char *port
   }
   psocket_init(&l->psocket, p, false, host, port);
   l->condition = pn_condition();
+  l->batch.next_event = listener_batch_next;
   l->backlog = backlog;
   return l;
 }
@@ -290,11 +336,12 @@ static void leader_count(pn_proactor_t *p, int change) {
 }
 
 /* Free if there are no uv callbacks pending and no events */
-static void leader_pconn_maybe_free(pconn *pc) {
-    if (pn_connection_engine_has_event(&pc->ceng)) {
+static void leader_pconnection_t_maybe_free(pconnection_t *pc) {
+    if (pn_connection_driver_has_event(&pc->driver)) {
       leader_to_worker(&pc->psocket);         /* Return to worker */
-    } else if (!(pc->psocket.tcp.data || pc->shutdown.data || pc->timer.data)) {
-      pn_connection_engine_destroy(&pc->ceng);
+    } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) {
+      /* All UV requests are finished */
+      pn_connection_driver_destroy(&pc->driver);
       leader_count(pc->psocket.proactor, -1);
       free(pc);
     }
@@ -314,7 +361,7 @@ static void leader_listener_maybe_free(pn_listener_t *l) {
 /* Free if there are no uv callbacks pending and no events */
 static void leader_maybe_free(psocket_t *ps) {
   if (ps->is_conn) {
-    leader_pconn_maybe_free(as_pconn(ps));
+    leader_pconnection_t_maybe_free(as_pconnection_t(ps));
   } else {
     leader_listener_maybe_free(as_listener(ps));
   }
@@ -336,9 +383,9 @@ static inline void leader_close(psocket_t *ps) {
   if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) {
     uv_close((uv_handle_t*)&ps->tcp, on_close);
   }
-  pconn *pc = as_pconn(ps);
+  pconnection_t *pc = as_pconnection_t(ps);
   if (pc) {
-    pn_connection_engine_close(&pc->ceng);
+    pn_connection_driver_close(&pc->driver);
     if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
       uv_timer_stop(&pc->timer);
       uv_close((uv_handle_t*)&pc->timer, on_close);
@@ -347,20 +394,20 @@ static inline void leader_close(psocket_t *ps) {
   leader_maybe_free(ps);
 }
 
-static pconn *get_pconn(pn_connection_t* c) {
+static pconnection_t *get_pconnection_t(pn_connection_t* c) {
   if (!c) return NULL;
   pn_record_t *r = pn_connection_attachments(c);
-  return (pconn*) pn_record_get(r, PN_PROACTOR);
+  return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
 }
 
 static void leader_error(psocket_t *ps, int err, const char* what) {
   if (ps->is_conn) {
-    pn_connection_engine_t *ceng = &as_pconn(ps)->ceng;
-    pn_connection_engine_errorf(ceng, COND_NAME, "%s %s:%s: %s",
+    pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
+    pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+    pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
                                 what, fixstr(ps->host), fixstr(ps->port),
                                 uv_strerror(err));
-    pn_connection_engine_bind(ceng);
-    pn_connection_engine_close(ceng);
+    pn_connection_driver_close(driver);
   } else {
     pn_listener_t *l = as_listener(ps);
     pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
@@ -376,9 +423,9 @@ static int leader_init(psocket_t *ps) {
   leader_count(ps->proactor, +1);
   int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
   if (!err) {
-    pconn *pc = as_pconn(ps);
+    pconnection_t *pc = as_pconnection_t(ps);
     if (pc) {
-      pc->connect.data = pc->write.data = pc->shutdown.data = ps;
+      pc->connect.data = ps;
       int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
       if (!err) {
         pc->timer.data = pc;
@@ -392,7 +439,7 @@ static int leader_init(psocket_t *ps) {
 }
 
 /* Common logic for on_connect and on_accept */
-static void leader_connect_accept(pconn *pc, int err, const char *what) {
+static void leader_connect_accept(pconnection_t *pc, int err, const char *what) {
   if (!err) {
     leader_to_worker(&pc->psocket);
   } else {
@@ -401,14 +448,14 @@ static void leader_connect_accept(pconn *pc, int err, const char *what) {
 }
 
 static void on_connect(uv_connect_t *connect, int err) {
-  leader_connect_accept((pconn*)connect->data, err, "on connect");
+  leader_connect_accept((pconnection_t*)connect->data, err, "on connect");
 }
 
 static void on_accept(uv_stream_t* server, int err) {
   pn_listener_t* l = (pn_listener_t*)server->data;
   if (!err) {
     pn_rwbytes_t v =  pn_listener_get_extra(l);
-    pconn *pc = new_pconn(l->psocket.proactor, true,
+    pconnection_t *pc = new_pconnection_t(l->psocket.proactor, true,
                           fixstr(l->psocket.host),
                           fixstr(l->psocket.port),
                           pn_bytes(v.size, v.start));
@@ -436,7 +483,7 @@ static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
 }
 
 static void leader_connect(psocket_t *ps) {
-  pconn *pc = as_pconn(ps);
+  pconnection_t *pc = as_pconnection_t(ps);
   uv_getaddrinfo_t info;
   int err = leader_resolve(ps, &info, false);
   if (!err) {
@@ -450,7 +497,7 @@ static void leader_connect(psocket_t *ps) {
 
 static void leader_listen(psocket_t *ps) {
   pn_listener_t *l = as_listener(ps);
-  uv_getaddrinfo_t info;
+   uv_getaddrinfo_t info;
   int err = leader_resolve(ps, &info, true);
   if (!err) {
     err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
@@ -463,8 +510,8 @@ static void leader_listen(psocket_t *ps) {
 }
 
 static void on_tick(uv_timer_t *timer) {
-  pconn *pc = (pconn*)timer->data;
-  pn_transport_t *t = pc->ceng.transport;
+  pconnection_t *pc = (pconnection_t*)timer->data;
+  pn_transport_t *t = pc->driver.transport;
   if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
     uv_timer_stop(&pc->timer);
     uint64_t now = uv_now(pc->timer.loop);
@@ -476,24 +523,25 @@ static void on_tick(uv_timer_t *timer) {
 }
 
 static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
-  pconn *pc = (pconn*)stream->data;
+  pconnection_t *pc = (pconnection_t*)stream->data;
   if (nread >= 0) {
-    pn_connection_engine_read_done(&pc->ceng, nread);
+    pn_connection_driver_read_done(&pc->driver, nread);
     on_tick(&pc->timer);         /* check for tick changes. */
     leader_to_worker(&pc->psocket);
     /* Reading continues automatically until stopped. */
   } else if (nread == UV_EOF) { /* hangup */
-    pn_connection_engine_read_close(&pc->ceng);
+    pn_connection_driver_read_close(&pc->driver);
     leader_maybe_free(&pc->psocket);
   } else {
     leader_error(&pc->psocket, nread, "on read from");
   }
 }
 
-static void on_write(uv_write_t* request, int err) {
-  pconn *pc = (pconn*)request->data;
+static void on_write(uv_write_t* write, int err) {
+  pconnection_t *pc = (pconnection_t*)write->data;
+  write->data = NULL;
   if (err == 0) {
-    pn_connection_engine_write_done(&pc->ceng, pc->writing);
+    pn_connection_driver_write_done(&pc->driver, pc->writing);
     leader_to_worker(&pc->psocket);
   } else if (err == UV_ECANCELED) {
     leader_maybe_free(&pc->psocket);
@@ -505,29 +553,31 @@ static void on_write(uv_write_t* request, int err) {
 
 // Read buffer allocation function for uv, just returns the transports read buffer.
 static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
-  pconn *pc = (pconn*)stream->data;
-  pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&pc->ceng);
+  pconnection_t *pc = (pconnection_t*)stream->data;
+  pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
   *buf = uv_buf_init(rbuf.start, rbuf.size);
 }
 
 static void leader_rewatch(psocket_t *ps) {
-  pconn *pc = as_pconn(ps);
+  pconnection_t *pc = as_pconnection_t(ps);
 
   if (pc->timer.data) {         /* uv-initialized */
     on_tick(&pc->timer);        /* Re-enable ticks if required */
   }
-  pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&pc->ceng);
-  pn_bytes_t wbuf = pn_connection_engine_write_buffer(&pc->ceng);
+  pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+  pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
 
   /* Ticks and checking buffers can generate events, process before proceeding */
-  if (pn_connection_engine_has_event(&pc->ceng)) {
+  if (pn_connection_driver_has_event(&pc->driver)) {
     leader_to_worker(ps);
   } else {                      /* Re-watch for IO */
     if (wbuf.size > 0 && !pc->writing) {
       pc->writing = wbuf.size;
       uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+      pc->write.data = ps;
       uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
-    } else if (wbuf.size == 0 && pn_connection_engine_write_closed(&pc->ceng)) {
+    } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
+      pc->shutdown.data = ps;
       uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
     }
     if (rbuf.size > 0 && !pc->reading) {
@@ -537,23 +587,31 @@ static void leader_rewatch(psocket_t *ps) {
   }
 }
 
-/* Return the next worker event or { 0 } if no events are ready */
-static pn_event_t* get_event_lh(pn_proactor_t *p) {
-  if (p->inactive) {
-    p->inactive = false;
-    return pn_collector_peek(p->inactive_event);
-  }
-  if (p->interrupt > 0) {
-    --p->interrupt;
-    return pn_collector_peek(p->interrupt_event);
+static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
+  pn_collector_put(p->collector, pn_proactor__class(), p, t);
+  p->batch_working = true;
+  return &p->batch;
+}
+
+/* Return the next event batch or 0 if no events are ready */
+static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
+  if (!p->batch_working) {       /* Can generate proactor events */
+    if (p->inactive) {
+      p->inactive = false;
+      return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
+    }
+    if (p->interrupt > 0) {
+      --p->interrupt;
+      return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
+    }
   }
   for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
     if (ps->is_conn) {
-      pconn *pc = as_pconn(ps);
-      return pn_connection_engine_event(&pc->ceng);
+      pconnection_t *pc = as_pconnection_t(ps);
+      return &pc->driver.batch;
     } else {                    /* Listener */
       pn_listener_t *l = as_listener(ps);
-      return pn_collector_peek(l->collector);
+      return &l->batch;
     }
     to_leader(ps);      /* No event, back to leader */
   }
@@ -568,15 +626,6 @@ static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
   uv_mutex_unlock(&ps->proactor->lock);
 }
 
-/* Defer an action to the leader thread. Only from non-leader threads. */
-static void owner_defer(psocket_t *ps, void (*action)(psocket_t*)) {
-  uv_mutex_lock(&ps->proactor->lock);
-  assert(!ps->action);
-  ps->action = action;
-  to_leader_lh(ps);
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
 pn_listener_t *pn_event_listener(pn_event_t *e) {
   return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
 }
@@ -590,57 +639,47 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) {
   return NULL;
 }
 
-void pn_event_done(pn_event_t *e) {
-  pn_event_type_t etype = pn_event_type(e);
-  pconn *pc = get_pconn(pn_event_connection(e));
-  if (pc && e == pn_collector_peek(pc->ceng.collector)) {
-    pn_connection_engine_pop_event(&pc->ceng);
-    if (etype == PN_CONNECTION_INIT) {
-      /* Bind after user has handled CONNECTION_INIT */
-      pn_connection_engine_bind(&pc->ceng);
-    }
-    if (pn_connection_engine_has_event(&pc->ceng)) {
-      /* Process all events before going back to IO.
-         Put it back on the worker queue and wake the leader.
-      */
+void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
+  pconnection_t *pc = batch_pconnection(batch);
+  if (pc) {
+    if (pn_connection_driver_has_event(&pc->driver)) {
+      /* Process all events before going back to IO. */
       worker_requeue(&pc->psocket);
-    } else if (pn_connection_engine_finished(&pc->ceng)) {
-      owner_defer(&pc->psocket, leader_close);
+    } else if (pn_connection_driver_finished(&pc->driver)) {
+      owner_to_leader(&pc->psocket, leader_close);
     } else {
-      owner_defer(&pc->psocket, leader_rewatch);
-    }
-  } else {
-    pn_listener_t *l = pn_event_listener(e);
-    if (l && e == pn_collector_peek(l->collector)) {
-      pn_collector_pop(l->collector);
-      if (etype == PN_LISTENER_CLOSE) {
-        owner_defer(&l->psocket, leader_close);
-      }
+      owner_to_leader(&pc->psocket, leader_rewatch);
     }
+    return;
+  }
+  pn_proactor_t *bp = batch_proactor(batch);
+  if (bp == p) {
+    uv_mutex_lock(&p->lock);
+    p->batch_working = false;
+    uv_async_send(&p->async); /* Wake leader */
+    uv_mutex_unlock(&p->lock);
+    return;
   }
+  /* Nothing extra to do for listener, it is always in the UV loop. */
 }
 
 /* Run follower/leader loop till we can return an event and be a worker */
-pn_event_t *pn_proactor_wait(struct pn_proactor_t* p) {
+pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
   uv_mutex_lock(&p->lock);
   /* Try to grab work immediately. */
-  pn_event_t *e = get_event_lh(p);
-  if (e == NULL) {
+  pn_event_batch_t *batch = get_batch_lh(p);
+  if (batch == NULL) {
     /* No work available, follow the leader */
-    while (p->has_leader)
+    while (p->has_leader) {
       uv_cond_wait(&p->cond, &p->lock);
+    }
     /* Lead till there is work to do. */
     p->has_leader = true;
-    for (e = get_event_lh(p); e == NULL; e = get_event_lh(p)) {
-      /* Run uv_loop outside the lock */
-      uv_mutex_unlock(&p->lock);
-      uv_run(&p->loop, UV_RUN_ONCE);
-      uv_mutex_lock(&p->lock);
-      /* Process leader work queue outside the lock */
+    while (batch == NULL) {
       for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
         void (*action)(psocket_t*) = ps->action;
-        ps->action = NULL;
         void (*wakeup)(psocket_t*) = ps->wakeup;
+        ps->action = NULL;
         ps->wakeup = NULL;
         if (action || wakeup) {
           uv_mutex_unlock(&p->lock);
@@ -649,13 +688,19 @@ pn_event_t *pn_proactor_wait(struct pn_proactor_t* p) {
           uv_mutex_lock(&p->lock);
         }
       }
+      batch = get_batch_lh(p);
+      if (batch == NULL) {
+        uv_mutex_unlock(&p->lock);
+        uv_run(&p->loop, UV_RUN_ONCE);
+        uv_mutex_lock(&p->lock);
+      }
     }
     /* Signal the next leader and return to work */
     p->has_leader = false;
     uv_cond_signal(&p->cond);
   }
   uv_mutex_unlock(&p->lock);
-  return e;
+  return batch;
 }
 
 void pn_proactor_interrupt(pn_proactor_t *p) {
@@ -666,11 +711,12 @@ void pn_proactor_interrupt(pn_proactor_t *p) {
 }
 
 int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn_bytes_t extra) {
-  pconn *pc = new_pconn(p, false, host, port, extra);
+  pconnection_t *pc = new_pconnection_t(p, false, host, port, extra);
   if (!pc) {
     return PN_OUT_OF_MEMORY;
   }
-  owner_defer(&pc->psocket, leader_connect); /* Process PN_CONNECTION_INIT before binding */
+  /* Process PN_CONNECTION_INIT before binding */
+  owner_to_worker(&pc->psocket, leader_connect);
   return 0;
 }
 
@@ -678,12 +724,12 @@ pn_rwbytes_t pn_listener_get_extra(pn_listener_t *l) { return PN_EXTRA_GET(pn_li
 
 pn_listener_t *pn_proactor_listen(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
   pn_listener_t *l = new_listener(p, host, port, backlog, extra);
-  if (l)  owner_defer(&l->psocket, leader_listen);
+  if (l)  owner_to_leader(&l->psocket, leader_listen);
   return l;
 }
 
 pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
-  pconn *pc = get_pconn(c);
+  pconnection_t *pc = get_pconnection_t(c);
   return pc ? pc->psocket.proactor : NULL;
 }
 
@@ -692,13 +738,14 @@ pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
 }
 
 void leader_wake_connection(psocket_t *ps) {
-  pconn *pc = as_pconn(ps);
-  pn_collector_put(pc->ceng.collector, PN_OBJECT, pc->ceng.connection, PN_CONNECTION_WAKE);
+  pconnection_t *pc = as_pconnection_t(ps);
+  pn_connection_t *c = pc->driver.connection;
+  pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
   leader_to_worker(ps);
 }
 
 void pn_connection_wake(pn_connection_t* c) {
-  wakeup(&get_pconn(c)->psocket, leader_wake_connection);
+  wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
 }
 
 void pn_listener_close(pn_listener_t* l) {
@@ -710,22 +757,15 @@ pn_condition_t* pn_listener_condition(pn_listener_t* l) {
   return l->condition;
 }
 
-/* Collector to hold for a single fixed event that is never popped. */
-static pn_collector_t *event_holder(pn_proactor_t *p, pn_event_type_t t) {
-  pn_collector_t *c = pn_collector();
-  pn_collector_put(c, pn_proactor__class(), p, t);
-  return c;
-}
-
 pn_proactor_t *pn_proactor() {
   pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
+  p->collector = pn_collector();
+  p->batch.next_event = &proactor_batch_next;
+  if (!p->collector) return NULL;
   uv_loop_init(&p->loop);
   uv_mutex_init(&p->lock);
   uv_cond_init(&p->cond);
   uv_async_init(&p->loop, &p->async, NULL); /* Just wake the loop */
-  p->interrupt_event = event_holder(p, PN_PROACTOR_INTERRUPT);
-  p->inactive_event = event_holder(p, PN_PROACTOR_INACTIVE);
-  p->timeout_event = event_holder(p, PN_PROACTOR_TIMEOUT);
   return p;
 }
 
@@ -741,8 +781,19 @@ void pn_proactor_free(pn_proactor_t *p) {
   uv_loop_close(&p->loop);
   uv_mutex_destroy(&p->lock);
   uv_cond_destroy(&p->cond);
-  pn_collector_free(p->interrupt_event);
-  pn_collector_free(p->inactive_event);
-  pn_collector_free(p->timeout_event);
+  pn_collector_free(p->collector);
   free(p);
 }
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
+  pn_listener_t *l = batch_listener(batch);
+  pn_event_t *handled = pn_collector_prev(l->collector);
+  if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) {
+    owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */
+  }
+  return pn_collector_next(l->collector);
+}
+
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
+  return pn_collector_next(batch_proactor(batch)->collector);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index acdae0c..88e3456 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -20,7 +20,7 @@
  */
 
 #include <proton/connection.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
 #include <proton/delivery.h>
 #include <proton/proactor.h>
 #include <proton/link.h>
@@ -37,12 +37,13 @@
 typedef char str[1024];
 
 typedef struct app_data_t {
-    str address;
-    str container_id;
-    pn_rwbytes_t message_buffer;
-    int message_count;
-    int received;
-    pn_proactor_t *proactor;
+  str address;
+  str container_id;
+  pn_rwbytes_t message_buffer;
+  int message_count;
+  int received;
+  pn_proactor_t *proactor;
+  bool finished;
 } app_data_t;
 
 static const int BATCH = 100; /* Batch size for unlimited receive */
@@ -80,9 +81,7 @@ static void decode_message(pn_delivery_t *dlv) {
   }
 }
 
-/* Handle event, return true of we should handle more */
-static bool handle(app_data_t* app, pn_event_t* event) {
-  bool more = true;
+static void handle(app_data_t* app, pn_event_t* event) {
   switch (pn_event_type(event)) {
 
    case PN_CONNECTION_INIT: {
@@ -149,53 +148,58 @@ static bool handle(app_data_t* app, pn_event_t* event) {
     break;
 
    case PN_PROACTOR_INACTIVE:
-    more = false;
+    app->finished = true;
     break;
 
    default: break;
   }
-  pn_event_done(event);
-  return more;
 }
 
 static void usage(const char *arg0) {
-    fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
-    exit(1);
+  fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+  exit(1);
 }
 
 int main(int argc, char **argv) {
-    /* Default values for application and connection. */
-    app_data_t app = {{0}};
-    app.message_count = 100;
-    const char* urlstr = NULL;
-
-    int opt;
-    while((opt = getopt(argc, argv, "a:m:")) != -1) {
-        switch(opt) {
-          case 'a': urlstr = optarg; break;
-          case 'm': app.message_count = atoi(optarg); break;
-          default: usage(argv[0]); break;
-        }
+  /* Default values for application and connection. */
+  app_data_t app = {{0}};
+  app.message_count = 100;
+  const char* urlstr = NULL;
+
+  int opt;
+  while((opt = getopt(argc, argv, "a:m:")) != -1) {
+    switch(opt) {
+     case 'a': urlstr = optarg; break;
+     case 'm': app.message_count = atoi(optarg); break;
+     default: usage(argv[0]); break;
     }
-    if (optind < argc)
-        usage(argv[0]);
-
-    snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
-
-    /* Parse the URL or use default values */
-    pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
-    const char *host = url ? pn_url_get_host(url) : NULL;
-    const char *port = url ? pn_url_get_port(url) : "amqp";
-    strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
-
-    /* Create the proactor and connect */
-    app.proactor = pn_proactor();
-    pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
-    if (url) pn_url_free(url);
-
-    while (handle(&app, pn_proactor_wait(app.proactor)))
-           ;
-    pn_proactor_free(app.proactor);
-    free(app.message_buffer.start);
-    return exit_code;
+  }
+  if (optind < argc)
+    usage(argv[0]);
+
+  snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
+
+  /* Parse the URL or use default values */
+  pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+  const char *host = url ? pn_url_get_host(url) : NULL;
+  const char *port = url ? pn_url_get_port(url) : "amqp";
+  strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
+
+  /* Create the proactor and connect */
+  app.proactor = pn_proactor();
+  pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
+  if (url) pn_url_free(url);
+
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app.proactor);
+    pn_event_t *e;
+    while ((e = pn_event_batch_next(events))) {
+      handle(&app, e);
+    }
+    pn_proactor_done(app.proactor, events);
+  } while(!app.finished);
+
+  pn_proactor_free(app.proactor);
+  free(app.message_buffer.start);
+  return exit_code;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index 5d58895..d64ea2d 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -20,7 +20,7 @@
  */
 
 #include <proton/connection.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
 #include <proton/delivery.h>
 #include <proton/proactor.h>
 #include <proton/link.h>
@@ -37,13 +37,14 @@
 typedef char str[1024];
 
 typedef struct app_data_t {
-    str address;
-    str container_id;
-    pn_rwbytes_t message_buffer;
-    int message_count;
-    int sent;
-    int acknowledged;
-    pn_proactor_t *proactor;
+  str address;
+  str container_id;
+  pn_rwbytes_t message_buffer;
+  int message_count;
+  int sent;
+  int acknowledged;
+  pn_proactor_t *proactor;
+  bool finished;
 } app_data_t;
 
 int exit_code = 0;
@@ -58,41 +59,39 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond) {
 
 /* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
 static pn_bytes_t encode_message(app_data_t* app) {
-    /* Construct a message with the map { "sequence": app.sent } */
-    pn_message_t* message = pn_message();
-    pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
-    pn_data_t* body = pn_message_body(message);
-    pn_data_put_map(body);
-    pn_data_enter(body);
-    pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
-    pn_data_put_int(body, app->sent); /* The sequence number */
-    pn_data_exit(body);
-
-    /* encode the message, expanding the encode buffer as needed */
-    if (app->message_buffer.start == NULL) {
-        static const size_t initial_size = 128;
-        app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
-    }
-    /* app->message_buffer is the total buffer space available. */
-    /* mbuf wil point at just the portion used by the encoded message */
-    pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
-    int status = 0;
-    while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
-        app->message_buffer.size *= 2;
-        app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
-        mbuf.size = app->message_buffer.size;
-    }
-    if (status != 0) {
-        fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
-        exit(1);
-    }
-    pn_message_free(message);
-    return pn_bytes(mbuf.size, mbuf.start);
+  /* Construct a message with the map { "sequence": app.sent } */
+  pn_message_t* message = pn_message();
+  pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+  pn_data_t* body = pn_message_body(message);
+  pn_data_put_map(body);
+  pn_data_enter(body);
+  pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+  pn_data_put_int(body, app->sent); /* The sequence number */
+  pn_data_exit(body);
+
+  /* encode the message, expanding the encode buffer as needed */
+  if (app->message_buffer.start == NULL) {
+    static const size_t initial_size = 128;
+    app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+  }
+  /* app->message_buffer is the total buffer space available. */
+  /* mbuf wil point at just the portion used by the encoded message */
+  pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+  int status = 0;
+  while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+    app->message_buffer.size *= 2;
+    app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+    mbuf.size = app->message_buffer.size;
+  }
+  if (status != 0) {
+    fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+    exit(1);
+  }
+  pn_message_free(message);
+  return pn_bytes(mbuf.size, mbuf.start);
 }
 
-/* Handle event, return true of we should handle more */
-static bool handle(app_data_t* app, pn_event_t* event) {
-  bool more = true;
+static void handle(app_data_t* app, pn_event_t* event) {
   switch (pn_event_type(event)) {
 
    case PN_CONNECTION_INIT: {
@@ -130,7 +129,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
      }
    } break;
 
-   case PN_TRANSPORT_ERROR:
+   case PN_TRANSPORT_CLOSED:
     check_condition(event, pn_transport_condition(pn_event_transport(event)));
     break;
 
@@ -151,53 +150,58 @@ static bool handle(app_data_t* app, pn_event_t* event) {
     break;
 
    case PN_PROACTOR_INACTIVE:
-    more = false;
+    app->finished = true;
     break;
 
    default: break;
   }
-  pn_event_done(event);
-  return more;
 }
 
 static void usage(const char *arg0) {
-    fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
-    exit(1);
+  fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+  exit(1);
 }
 
 int main(int argc, char **argv) {
-    /* Default values for application and connection. */
-    app_data_t app = {{0}};
-    app.message_count = 100;
-    const char* urlstr = NULL;
-
-    int opt;
-    while((opt = getopt(argc, argv, "a:m:")) != -1) {
-        switch(opt) {
-          case 'a': urlstr = optarg; break;
-          case 'm': app.message_count = atoi(optarg); break;
-          default: usage(argv[0]); break;
-        }
+  /* Default values for application and connection. */
+  app_data_t app = {{0}};
+  app.message_count = 100;
+  const char* urlstr = NULL;
+
+  int opt;
+  while((opt = getopt(argc, argv, "a:m:")) != -1) {
+    switch(opt) {
+     case 'a': urlstr = optarg; break;
+     case 'm': app.message_count = atoi(optarg); break;
+     default: usage(argv[0]); break;
     }
-    if (optind < argc)
-        usage(argv[0]);
-
-    snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
-
-    /* Parse the URL or use default values */
-    pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
-    const char *host = url ? pn_url_get_host(url) : NULL;
-    const char *port = url ? pn_url_get_port(url) : "amqp";
-    strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
-
-    /* Create the proactor and connect */
-    app.proactor = pn_proactor();
-    pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
-    if (url) pn_url_free(url);
-
-    while (handle(&app, pn_proactor_wait(app.proactor)))
-           ;
-    pn_proactor_free(app.proactor);
-    free(app.message_buffer.start);
-    return exit_code;
+  }
+  if (optind < argc)
+    usage(argv[0]);
+
+  snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
+
+  /* Parse the URL or use default values */
+  pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+  const char *host = url ? pn_url_get_host(url) : NULL;
+  const char *port = url ? pn_url_get_port(url) : "amqp";
+  strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
+
+  /* Create the proactor and connect */
+  app.proactor = pn_proactor();
+  pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
+  if (url) pn_url_free(url);
+
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app.proactor);
+    pn_event_t *e;
+    while ((e = pn_event_batch_next(events))) {
+      handle(&app, e);
+    }
+    pn_proactor_done(app.proactor, events);
+  } while(!app.finished);
+
+  pn_proactor_free(app.proactor);
+  free(app.message_buffer.start);
+  return exit_code;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/cpp/README.dox
----------------------------------------------------------------------
diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox
index 1d46ec8..447d3ad 100644
--- a/examples/cpp/README.dox
+++ b/examples/cpp/README.dox
@@ -123,7 +123,7 @@ subscribe.
 /** @example mt/epoll_container.cpp
 
 An example implementation of the proton::container API that shows how
-to use the proton::io::connection_engine SPI to adapt the proton API
+to use the proton::io::connection_driver SPI to adapt the proton API
 to native IO, in this case using a multithreaded Linux epoll poller as
 the implementation.
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/cpp/mt/epoll_container.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/epoll_container.cpp b/examples/cpp/mt/epoll_container.cpp
index d9b9f08..7646673 100644
--- a/examples/cpp/mt/epoll_container.cpp
+++ b/examples/cpp/mt/epoll_container.cpp
@@ -25,7 +25,7 @@
 #include <proton/url.hpp>
 
 #include <proton/io/container_impl_base.hpp>
-#include <proton/io/connection_engine.hpp>
+#include <proton/io/connection_driver.hpp>
 #include <proton/io/link_namer.hpp>
 
 #include <atomic>
@@ -97,7 +97,7 @@ class unique_fd {
 };
 
 class pollable;
-class pollable_engine;
+class pollable_driver;
 class pollable_listener;
 
 class epoll_container : public proton::io::container_impl_base {
@@ -124,7 +124,7 @@ class epoll_container : public proton::io::container_impl_base {
     std::string id() const OVERRIDE { return id_; }
 
     // Functions used internally.
-    proton::connection add_engine(proton::connection_options opts, int fd, bool server);
+    proton::connection add_driver(proton::connection_options opts, int fd, bool server);
     void erase(pollable*);
 
     // Link names must be unique per container.
@@ -160,7 +160,7 @@ class epoll_container : public proton::io::container_impl_base {
 
     proton::connection_options options_;
     std::map<std::string, std::unique_ptr<pollable_listener> > listeners_;
-    std::map<pollable*, std::unique_ptr<pollable_engine> > engines_;
+    std::map<pollable*, std::unique_ptr<pollable_driver> > drivers_;
 
     std::condition_variable stopped_;
     bool stopping_;
@@ -274,21 +274,21 @@ class epoll_event_loop : public proton::event_loop {
     bool closed_;
 };
 
-// Handle epoll wakeups for a connection_engine.
-class pollable_engine : public pollable {
+// Handle epoll wakeups for a connection_driver.
+class pollable_driver : public pollable {
   public:
-    pollable_engine(epoll_container& c, int fd, int epoll_fd) :
+    pollable_driver(epoll_container& c, int fd, int epoll_fd) :
         pollable(fd, epoll_fd),
         loop_(new epoll_event_loop(*this)),
-        engine_(c, loop_)
+        driver_(c, loop_)
     {
-        proton::connection conn = engine_.connection();
+        proton::connection conn = driver_.connection();
         proton::io::set_link_namer(conn, c.link_namer);
     }
 
-    ~pollable_engine() {
+    ~pollable_driver() {
         loop_->close();                // No calls to notify() after this.
-        engine_.dispatch();            // Run any final events.
+        driver_.dispatch();            // Run any final events.
         try { write(); } catch(...) {} // Write connection close if we can.
         for (auto f : loop_->pop_all()) {// Run final queued work for side-effects.
             try { f(); } catch(...) {}
@@ -303,17 +303,17 @@ class pollable_engine : public pollable {
                 can_read = can_read && read();
                 for (auto f : loop_->pop_all()) // Run queued work
                     f();
-                engine_.dispatch();
+                driver_.dispatch();
             } while (can_read || can_write);
-            return (engine_.read_buffer().size ? EPOLLIN:0) |
-                (engine_.write_buffer().size ? EPOLLOUT:0);
+            return (driver_.read_buffer().size ? EPOLLIN:0) |
+                (driver_.write_buffer().size ? EPOLLOUT:0);
         } catch (const std::exception& e) {
-            engine_.disconnected(proton::error_condition("exception", e.what()));
+            driver_.disconnected(proton::error_condition("exception", e.what()));
         }
         return 0;               // Ending
     }
 
-    proton::io::connection_engine& engine() { return engine_; }
+    proton::io::connection_driver& driver() { return driver_; }
 
   private:
     static bool try_again(int e) {
@@ -322,11 +322,11 @@ class pollable_engine : public pollable {
     }
 
     bool write() {
-        proton::io::const_buffer wbuf(engine_.write_buffer());
+        proton::io::const_buffer wbuf(driver_.write_buffer());
         if (wbuf.size) {
             ssize_t n = ::write(fd_, wbuf.data, wbuf.size);
             if (n > 0) {
-                engine_.write_done(n);
+                driver_.write_done(n);
                 return true;
             } else if (n < 0 && !try_again(errno)) {
                 check(n, "write");
@@ -336,15 +336,15 @@ class pollable_engine : public pollable {
     }
 
     bool read() {
-        proton::io::mutable_buffer rbuf(engine_.read_buffer());
+        proton::io::mutable_buffer rbuf(driver_.read_buffer());
         if (rbuf.size) {
             ssize_t n = ::read(fd_, rbuf.data, rbuf.size);
             if (n > 0) {
-                engine_.read_done(n);
+                driver_.read_done(n);
                 return true;
             }
             else if (n == 0)
-                engine_.read_close();
+                driver_.read_close();
             else if (!try_again(errno))
                 check(n, "read");
         }
@@ -352,13 +352,13 @@ class pollable_engine : public pollable {
     }
 
     // Lifecycle note: loop_ belongs to the proton::connection, which can live
-    // longer than the engine if the application holds a reference to it, we
-    // disconnect ourselves with loop_->close() in ~connection_engine()
+    // longer than the driver if the application holds a reference to it, we
+    // disconnect ourselves with loop_->close() in ~connection_driver()
     epoll_event_loop* loop_;
-    proton::io::connection_engine engine_;
+    proton::io::connection_driver driver_;
 };
 
-// A pollable listener fd that creates pollable_engine for incoming connections.
+// A pollable listener fd that creates pollable_driver for incoming connections.
 class pollable_listener : public pollable {
   public:
     pollable_listener(
@@ -380,7 +380,7 @@ class pollable_listener : public pollable {
         }
         try {
             int accepted = check(::accept(fd_, NULL, 0), "accept");
-            container_.add_engine(listener_.on_accept(), accepted, true);
+            container_.add_driver(listener_.on_accept(), accepted, true);
             return EPOLLIN;
         } catch (const std::exception& e) {
             listener_.on_error(e.what());
@@ -424,25 +424,25 @@ epoll_container::~epoll_container() {
     } catch (...) {}
 }
 
-proton::connection epoll_container::add_engine(proton::connection_options opts, int fd, bool server)
+proton::connection epoll_container::add_driver(proton::connection_options opts, int fd, bool server)
 {
     lock_guard g(lock_);
     if (stopping_)
         throw proton::error("container is stopping");
-    std::unique_ptr<pollable_engine> eng(new pollable_engine(*this, fd, epoll_fd_));
+    std::unique_ptr<pollable_driver> eng(new pollable_driver(*this, fd, epoll_fd_));
     if (server)
-        eng->engine().accept(opts);
+        eng->driver().accept(opts);
     else
-        eng->engine().connect(opts);
-    proton::connection c = eng->engine().connection();
+        eng->driver().connect(opts);
+    proton::connection c = eng->driver().connection();
     eng->notify();
-    engines_[eng.get()] = std::move(eng);
+    drivers_[eng.get()] = std::move(eng);
     return c;
 }
 
 void epoll_container::erase(pollable* e) {
     lock_guard g(lock_);
-    if (!engines_.erase(e)) {
+    if (!drivers_.erase(e)) {
         pollable_listener* l = dynamic_cast<pollable_listener*>(e);
         if (l)
             listeners_.erase(l->addr());
@@ -451,7 +451,7 @@ void epoll_container::erase(pollable* e) {
 }
 
 void epoll_container::idle_check(const lock_guard&) {
-    if (stopping_  && engines_.empty() && listeners_.empty())
+    if (stopping_  && drivers_.empty() && listeners_.empty())
         interrupt();
 }
 
@@ -462,7 +462,7 @@ proton::returned<proton::connection> epoll_container::connect(
     unique_addrinfo ainfo(addr);
     unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));
     check(::connect(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);
-    return make_thread_safe(add_engine(opts, fd.release(), false));
+    return make_thread_safe(add_driver(opts, fd.release(), false));
 }
 
 proton::listener epoll_container::listen(const std::string& addr, proton::listen_handler& lh) {
@@ -520,10 +520,10 @@ void epoll_container::stop(const proton::error_condition& err) {
 void epoll_container::wait() {
     std::unique_lock<std::mutex> l(lock_);
     stopped_.wait(l, [this]() { return this->threads_ == 0; } );
-    for (auto& eng : engines_)
-        eng.second->engine().disconnected(stop_err_);
+    for (auto& eng : drivers_)
+        eng.second->driver().disconnected(stop_err_);
     listeners_.clear();
-    engines_.clear();
+    drivers_.clear();
 }
 
 void epoll_container::interrupt() {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index ddab147..ffc6e10 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -369,7 +369,7 @@ set (qpid-proton-core
   src/core/encoder.c
 
   src/core/dispatcher.c
-  src/core/connection_engine.c
+  src/core/connection_driver.c
   src/core/engine.c
   src/core/event.c
   src/core/autodetect.c
@@ -440,7 +440,7 @@ set (qpid-proton-include
   include/proton/codec.h
   include/proton/condition.h
   include/proton/connection.h
-  include/proton/connection_engine.h
+  include/proton/connection_driver.h
   include/proton/delivery.h
   include/proton/disposition.h
   include/proton/engine.h

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index ed969eb..6af4319 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -48,7 +48,7 @@ set(qpid-proton-cpp-source
   src/error_condition.cpp
   src/event_loop.cpp
   src/handler.cpp
-  src/io/connection_engine.cpp
+  src/io/connection_driver.cpp
   src/io/link_namer.cpp
   src/link.cpp
   src/listener.cpp

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/docs/io.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/io.md b/proton-c/bindings/cpp/docs/io.md
index a892e61..230e538 100644
--- a/proton-c/bindings/cpp/docs/io.md
+++ b/proton-c/bindings/cpp/docs/io.md
@@ -7,16 +7,16 @@ The `proton::io` namespace contains a service provider interface (SPI)
 that allows you to implement the Proton API over alternative IO or
 threading libraries.
 
-The `proton::io::connection_engine` class converts an AMQP-encoded
+The `proton::io::connection_driver` class converts an AMQP-encoded
 byte stream, read from any IO source, into `proton::messaging_handler`
 calls. It generates an AMQP-encoded byte stream as output that can be
 written to any IO destination.
 
-The connection engine is deliberately very simple and low level. It
+The connection driver is deliberately very simple and low level. It
 performs no IO of its own, no thread-related locking, and is written
 in simple C++98-compatible code.
 
-The connection engine can be used standalone as an AMQP translator, or
+The connection dirver can be used standalone as an AMQP translator, or
 you can implement the following two interfaces to provide a complete
 implementation of the Proton API that can run any Proton application:
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/docs/main.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/main.md b/proton-c/bindings/cpp/docs/main.md
index 011df29..93ba2c0 100644
--- a/proton-c/bindings/cpp/docs/main.md
+++ b/proton-c/bindings/cpp/docs/main.md
@@ -123,6 +123,6 @@ The default container implementation is created using
 `proton::default_container`.
 
 You can implement your own container to integrate proton with any IO
-provider using the `proton::io::connection_engine`.
+provider using the `proton::io::connection_driver`.
 
 @see @ref io_page

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index 9fbdbdc..d2deebf 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -40,7 +40,7 @@ class proton_handler;
 class connection;
 
 namespace io {
-class connection_engine;
+class connection_driver;
 }
 
 /// Options for creating a connection.
@@ -163,7 +163,7 @@ class connection_options {
     /// @cond INTERNAL
   friend class container_impl;
   friend class connector;
-  friend class io::connection_engine;
+  friend class io::connection_driver;
   friend class connection;
     /// @endcond
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
new file mode 100644
index 0000000..d5da718
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
@@ -0,0 +1,211 @@
+#ifndef PROTON_IO_CONNECTION_DRIVER_HPP
+#define PROTON_IO_CONNECTION_DRIVER_HPP
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "../internal/config.hpp"
+#include "../connection.hpp"
+#include "../connection_options.hpp"
+#include "../error.hpp"
+#include "../error_condition.hpp"
+#include "../internal/export.hpp"
+#include "../internal/pn_unique_ptr.hpp"
+#include "../transport.hpp"
+#include "../types.hpp"
+
+#include <proton/connection_driver.h>
+
+#include <cstddef>
+#include <utility>
+#include <string>
+
+namespace proton {
+
+class event_loop;
+class proton_handler;
+
+namespace io {
+
+/// **Experimental** - Pointer to a mutable memory region with a size.
+struct mutable_buffer {
+    char* data;                 ///< Beginning of the buffered data.
+    size_t size;                ///< Number of bytes in the buffer.
+
+    /// Construct a buffer starting at data_ with size_ bytes.
+    mutable_buffer(char* data_=0, size_t size_=0) : data(data_), size(size_) {}
+};
+
+/// **Experimental** - Pointer to a const memory region with a size.
+struct const_buffer {
+    const char* data;           ///< Beginning of the buffered data.
+    size_t size;                ///< Number of bytes in the buffer.
+
+    /// Construct a buffer starting at data_ with size_ bytes.
+    const_buffer(const char* data_=0, size_t size_=0) : data(data_), size(size_) {}
+};
+
+/// **Experimental** - An AMQP driver for a single connection.
+///
+/// io::connection_driver manages a single proton::connection and dispatches
+/// events to a proton::messaging_handler. It does no IO of its own, but allows you to
+/// integrate AMQP protocol handling into any IO or concurrency framework.
+///
+/// The application is coded the same way as for the
+/// proton::container. The application implements a
+/// proton::messaging_handler to respond to transport, connection,
+/// session, link, and message events. With a little care, the same
+/// handler classes can be used for both container and
+/// connection_driver. the @ref broker.cpp example illustrates this.
+///
+/// You need to write the IO code to read AMQP data to the
+/// read_buffer(). The engine parses the AMQP frames. dispatch() calls
+/// the appropriate functions on the applications proton::messaging_handler. You
+/// write output data from the engine's write_buffer() to your IO.
+///
+/// The engine is not safe for concurrent use, but you can process
+/// different engines concurrently. A common pattern for
+/// high-performance servers is to serialize read/write activity
+/// per connection and dispatch in a fixed-size thread pool.
+///
+/// The engine is designed to work with a classic reactor (e.g.,
+/// select, poll, epoll) or an async-request driven proactor (e.g.,
+/// windows completion ports, boost.asio, libuv).
+///
+/// The engine never throws exceptions.
+class
+PN_CPP_CLASS_EXTERN connection_driver {
+  public:
+    /// An engine that is not associated with a proton::container or
+    /// proton::event_loop.
+    ///
+    /// Accessing the container or event_loop for this connection in
+    /// a proton::messaging_handler will throw a proton::error exception.
+    ///
+    PN_CPP_EXTERN connection_driver();
+
+    /// Create a connection driver associated with a proton::container and
+    /// optional event_loop. If the event_loop is not provided attempts to use
+    /// it will throw proton::error.
+    ///
+    /// Takes ownership of the event_loop. Note the proton::connection created
+    /// by this connection_driver can outlive the connection_driver itself if
+    /// the user pins it in memory using the proton::thread_safe<> template.
+    /// The event_loop is deleted when, and only when, the proton::connection is.
+    ///
+    PN_CPP_EXTERN connection_driver(proton::container&, event_loop* loop = 0);
+
+    PN_CPP_EXTERN ~connection_driver();
+
+    /// Configure a connection by applying exactly the options in opts (including proton::messaging_handler)
+    /// Does not apply any default options, to apply container defaults use connect() or accept()
+    /// instead. If server==true, configure a server connection.
+    void configure(const connection_options& opts=connection_options(), bool server=false);
+
+    /// Call configure() with client options and call connection::open()
+    /// Options applied: container::id(), container::client_connection_options(), opts.
+    PN_CPP_EXTERN void connect(const connection_options& opts);
+
+    /// Call configure() with server options.
+    /// Options applied: container::id(), container::server_connection_options(), opts.
+    ///
+    /// Note this does not call connection::open(). If there is a messaging_handler in the
+    /// composed options it will receive messaging_handler::on_connection_open() and can
+    /// respond with connection::open() or connection::close()
+    PN_CPP_EXTERN void accept(const connection_options& opts);
+
+    /// The engine's read buffer. Read data into this buffer then call read_done() when complete.
+    /// Returns mutable_buffer(0, 0) if the engine cannot currently read data.
+    /// Calling dispatch() may open up more buffer space.
+    PN_CPP_EXTERN mutable_buffer read_buffer();
+
+    /// Indicate that the first n bytes of read_buffer() have valid data.
+    /// This changes the buffer, call read_buffer() to get the updated buffer.
+    PN_CPP_EXTERN void read_done(size_t n);
+
+    /// Indicate that the read side of the transport is closed and no more data will be read.
+    /// Note that there may still be events to dispatch() or data to write.
+    PN_CPP_EXTERN void read_close();
+
+    /// The engine's write buffer. Write data from this buffer then call write_done()
+    /// Returns const_buffer(0, 0) if the engine has nothing to write.
+    /// Calling dispatch() may generate more data in the write buffer.
+    PN_CPP_EXTERN const_buffer write_buffer();
+
+    /// Indicate that the first n bytes of write_buffer() have been written successfully.
+    /// This changes the buffer, call write_buffer() to get the updated buffer.
+    PN_CPP_EXTERN void write_done(size_t n);
+
+    /// Indicate that the write side of the transport has closed and no more data can be written.
+    /// Note that there may still be events to dispatch() or data to read.
+    PN_CPP_EXTERN void write_close();
+
+    /// Inform the engine that the transport been disconnected unexpectedly,
+    /// without completing the AMQP connection close sequence.
+    ///
+    /// This calls read_close(), write_close(), sets the transport().error() and
+    /// queues an `on_transport_error` event. You must call dispatch() one more
+    /// time to dispatch the messaging_handler::on_transport_error() call and other final
+    /// events.
+    ///
+    /// Note this does not close the connection() so that a proton::messaging_handler can
+    /// distinguish between a connection close error sent by the remote peer and
+    /// a transport failure.
+    ///
+    PN_CPP_EXTERN void disconnected(const error_condition& = error_condition());
+
+    /// Dispatch all available events and call the corresponding \ref messaging_handler methods.
+    ///
+    /// Returns true if the engine is still active, false if it is finished and
+    /// can be destroyed. The engine is finished when all events are dispatched
+    /// and one of the following is true:
+    ///
+    /// - both read_close() and write_close() have been called, no more IO is possible.
+    /// - The AMQP connection() is closed AND the write_buffer() is empty.
+    ///
+    /// May modify the read_buffer() and/or the write_buffer().
+    ///
+    PN_CPP_EXTERN bool dispatch();
+
+    /// Get the AMQP connection associated with this connection_driver.
+    /// The event_loop is availabe via proton::thread_safe<connection>(connection())
+    PN_CPP_EXTERN proton::connection connection() const;
+
+    /// Get the transport associated with this connection_driver.
+    PN_CPP_EXTERN proton::transport transport() const;
+
+    /// Get the container associated with this connection_driver, if there is one.
+    PN_CPP_EXTERN proton::container* container() const;
+
+ private:
+    void init();
+    connection_driver(const connection_driver&);
+    connection_driver& operator=(const connection_driver&);
+
+    messaging_handler* handler_;
+    proton::container* container_;
+    pn_connection_driver_t driver_;
+};
+
+} // io
+} // proton
+
+#endif // PROTON_IO_CONNECTION_DRIVER_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
deleted file mode 100644
index d9825c2..0000000
--- a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
+++ /dev/null
@@ -1,215 +0,0 @@
-#ifndef PROTON_IO_CONNECTION_ENGINE_HPP
-#define PROTON_IO_CONNECTION_ENGINE_HPP
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "../internal/config.hpp"
-#include "../connection.hpp"
-#include "../connection_options.hpp"
-#include "../error.hpp"
-#include "../error_condition.hpp"
-#include "../internal/export.hpp"
-#include "../internal/pn_unique_ptr.hpp"
-#include "../transport.hpp"
-#include "../types.hpp"
-
-#include <proton/connection_engine.h>
-
-#include <cstddef>
-#include <utility>
-#include <string>
-
-namespace proton {
-
-class event_loop;
-class proton_handler;
-
-namespace io {
-
-/// **Experimental** - Pointer to a mutable memory region with a size.
-struct mutable_buffer {
-    char* data;                 ///< Beginning of the buffered data.
-    size_t size;                ///< Number of bytes in the buffer.
-
-    /// Construct a buffer starting at data_ with size_ bytes.
-    mutable_buffer(char* data_=0, size_t size_=0) : data(data_), size(size_) {}
-};
-
-/// **Experimental** - Pointer to a const memory region with a size.
-struct const_buffer {
-    const char* data;           ///< Beginning of the buffered data.
-    size_t size;                ///< Number of bytes in the buffer.
-
-    /// Construct a buffer starting at data_ with size_ bytes.
-    const_buffer(const char* data_=0, size_t size_=0) : data(data_), size(size_) {}
-};
-
-/// **Experimental** - An AMQP protocol engine for a single
-/// connection.
-///
-/// A connection_engine is a protocol engine that integrates AMQP into
-/// any IO or concurrency framework.
-///
-/// io::connection_engine manages a single proton::connection and dispatches
-/// events to a proton::messaging_handler. It does no IO of its own, but allows you to
-/// integrate AMQP protocol handling into any IO or concurrency framework.
-///
-/// The application is coded the same way as for the
-/// proton::container. The application implements a
-/// proton::messaging_handler to respond to transport, connection,
-/// session, link, and message events. With a little care, the same
-/// handler classes can be used for both container and
-/// connection_engine. the @ref broker.cpp example illustrates this.
-///
-/// You need to write the IO code to read AMQP data to the
-/// read_buffer(). The engine parses the AMQP frames. dispatch() calls
-/// the appropriate functions on the applications proton::messaging_handler. You
-/// write output data from the engine's write_buffer() to your IO.
-///
-/// The engine is not safe for concurrent use, but you can process
-/// different engines concurrently. A common pattern for
-/// high-performance servers is to serialize read/write activity
-/// per connection and dispatch in a fixed-size thread pool.
-///
-/// The engine is designed to work with a classic reactor (e.g.,
-/// select, poll, epoll) or an async-request driven proactor (e.g.,
-/// windows completion ports, boost.asio, libuv).
-///
-/// The engine never throws exceptions.
-class
-PN_CPP_CLASS_EXTERN connection_engine {
-  public:
-    /// An engine that is not associated with a proton::container or
-    /// proton::event_loop.
-    ///
-    /// Accessing the container or event_loop for this connection in
-    /// a proton::messaging_handler will throw a proton::error exception.
-    ///
-    PN_CPP_EXTERN connection_engine();
-
-    /// Create a connection engine associated with a proton::container and
-    /// optional event_loop. If the event_loop is not provided attempts to use
-    /// it will throw proton::error.
-    ///
-    /// Takes ownership of the event_loop. Note the proton::connection created
-    /// by this connection_engine can outlive the connection_engine itself if
-    /// the user pins it in memory using the proton::thread_safe<> template.
-    /// The event_loop is deleted when, and only when, the proton::connection is.
-    ///
-    PN_CPP_EXTERN connection_engine(proton::container&, event_loop* loop = 0);
-
-    PN_CPP_EXTERN ~connection_engine();
-
-    /// Configure a connection by applying exactly the options in opts (including proton::messaging_handler)
-    /// Does not apply any default options, to apply container defaults use connect() or accept()
-    /// instead. If server==true, configure a server connection.
-    void configure(const connection_options& opts=connection_options(), bool server=false);
-
-    /// Call configure() with client options and call connection::open()
-    /// Options applied: container::id(), container::client_connection_options(), opts.
-    PN_CPP_EXTERN void connect(const connection_options& opts);
-
-    /// Call configure() with server options.
-    /// Options applied: container::id(), container::server_connection_options(), opts.
-    ///
-    /// Note this does not call connection::open(). If there is a messaging_handler in the
-    /// composed options it will receive messaging_handler::on_connection_open() and can
-    /// respond with connection::open() or connection::close()
-    PN_CPP_EXTERN void accept(const connection_options& opts);
-
-    /// The engine's read buffer. Read data into this buffer then call read_done() when complete.
-    /// Returns mutable_buffer(0, 0) if the engine cannot currently read data.
-    /// Calling dispatch() may open up more buffer space.
-    PN_CPP_EXTERN mutable_buffer read_buffer();
-
-    /// Indicate that the first n bytes of read_buffer() have valid data.
-    /// This changes the buffer, call read_buffer() to get the updated buffer.
-    PN_CPP_EXTERN void read_done(size_t n);
-
-    /// Indicate that the read side of the transport is closed and no more data will be read.
-    /// Note that there may still be events to dispatch() or data to write.
-    PN_CPP_EXTERN void read_close();
-
-    /// The engine's write buffer. Write data from this buffer then call write_done()
-    /// Returns const_buffer(0, 0) if the engine has nothing to write.
-    /// Calling dispatch() may generate more data in the write buffer.
-    PN_CPP_EXTERN const_buffer write_buffer();
-
-    /// Indicate that the first n bytes of write_buffer() have been written successfully.
-    /// This changes the buffer, call write_buffer() to get the updated buffer.
-    PN_CPP_EXTERN void write_done(size_t n);
-
-    /// Indicate that the write side of the transport has closed and no more data can be written.
-    /// Note that there may still be events to dispatch() or data to read.
-    PN_CPP_EXTERN void write_close();
-
-    /// Inform the engine that the transport been disconnected unexpectedly,
-    /// without completing the AMQP connection close sequence.
-    ///
-    /// This calls read_close(), write_close(), sets the transport().error() and
-    /// queues an `on_transport_error` event. You must call dispatch() one more
-    /// time to dispatch the messaging_handler::on_transport_error() call and other final
-    /// events.
-    ///
-    /// Note this does not close the connection() so that a proton::messaging_handler can
-    /// distinguish between a connection close error sent by the remote peer and
-    /// a transport failure.
-    ///
-    PN_CPP_EXTERN void disconnected(const error_condition& = error_condition());
-
-    /// Dispatch all available events and call the corresponding \ref messaging_handler methods.
-    ///
-    /// Returns true if the engine is still active, false if it is finished and
-    /// can be destroyed. The engine is finished when all events are dispatched
-    /// and one of the following is true:
-    ///
-    /// - both read_close() and write_close() have been called, no more IO is possible.
-    /// - The AMQP connection() is closed AND the write_buffer() is empty.
-    ///
-    /// May modify the read_buffer() and/or the write_buffer().
-    ///
-    PN_CPP_EXTERN bool dispatch();
-
-    /// Get the AMQP connection associated with this connection_engine.
-    /// The event_loop is availabe via proton::thread_safe<connection>(connection())
-    PN_CPP_EXTERN proton::connection connection() const;
-
-    /// Get the transport associated with this connection_engine.
-    PN_CPP_EXTERN proton::transport transport() const;
-
-    /// Get the container associated with this connection_engine, if there is one.
-    PN_CPP_EXTERN proton::container* container() const;
-
- private:
-    void init();
-    connection_engine(const connection_engine&);
-    connection_engine& operator=(const connection_engine&);
-
-    messaging_handler* handler_;
-    proton::container* container_;
-    pn_connection_engine_t engine_;
-};
-
-} // io
-} // proton
-
-#endif // PROTON_IO_CONNECTION_ENGINE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
index 2c5423f..acdcd30 100644
--- a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
@@ -40,7 +40,7 @@ class message;
 class messaging_adapter;
 
 namespace io {
-class connection_engine;
+class connection_driver;
 }
 
 /// A handler for Proton messaging events.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/transport.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/transport.hpp b/proton-c/bindings/cpp/include/proton/transport.hpp
index bcd8a2f..10641e0 100644
--- a/proton-c/bindings/cpp/include/proton/transport.hpp
+++ b/proton-c/bindings/cpp/include/proton/transport.hpp
@@ -35,7 +35,7 @@ class error_condition;
 class sasl;
 
 namespace io {
-class connection_engine;
+class connection_driver;
 }
 
 /// A network channel supporting an AMQP connection.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/engine_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/engine_test.cpp b/proton-c/bindings/cpp/src/engine_test.cpp
index 6c3341f..991836d 100644
--- a/proton-c/bindings/cpp/src/engine_test.cpp
+++ b/proton-c/bindings/cpp/src/engine_test.cpp
@@ -24,7 +24,7 @@
 
 #include "proton/container.hpp"
 #include "proton/uuid.hpp"
-#include "proton/io/connection_engine.hpp"
+#include "proton/io/connection_driver.hpp"
 #include "proton/io/link_namer.hpp"
 #include "proton/messaging_handler.hpp"
 #include "proton/types_fwd.hpp"
@@ -37,7 +37,7 @@ namespace {
 using namespace std;
 using namespace proton;
 
-using proton::io::connection_engine;
+using proton::io::connection_driver;
 using proton::io::const_buffer;
 using proton::io::mutable_buffer;
 
@@ -45,14 +45,14 @@ using test::dummy_container;
 
 typedef std::deque<char> byte_stream;
 
-/// In memory connection_engine that reads and writes from byte_streams
-struct in_memory_engine : public connection_engine {
+/// In memory connection_driver that reads and writes from byte_streams
+struct in_memory_engine : public connection_driver {
 
     byte_stream& reads;
     byte_stream& writes;
 
     in_memory_engine(byte_stream& rd, byte_stream& wr, class container& cont) :
-        connection_engine(cont), reads(rd), writes(wr) {}
+        connection_driver(cont), reads(rd), writes(wr) {}
 
     void do_read() {
         mutable_buffer rbuf = read_buffer();
@@ -247,7 +247,7 @@ void test_engine_disconnected() {
 
 void test_no_container() {
     // An engine with no container should throw, not crash.
-    connection_engine e;
+    connection_driver e;
     try {
         e.connection().container();
         FAIL("expected error");

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/include/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/contexts.hpp b/proton-c/bindings/cpp/src/include/contexts.hpp
index 74a763c..1d4194e 100644
--- a/proton-c/bindings/cpp/src/include/contexts.hpp
+++ b/proton-c/bindings/cpp/src/include/contexts.hpp
@@ -24,7 +24,7 @@
 
 #include "proton/connection.hpp"
 #include "proton/container.hpp"
-#include "proton/io/connection_engine.hpp"
+#include "proton/io/connection_driver.hpp"
 #include "proton/event_loop.hpp"
 #include "proton/listen_handler.hpp"
 #include "proton/message.hpp"

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/io/connection_driver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_driver.cpp b/proton-c/bindings/cpp/src/io/connection_driver.cpp
new file mode 100644
index 0000000..06b01d8
--- /dev/null
+++ b/proton-c/bindings/cpp/src/io/connection_driver.cpp
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/io/connection_driver.hpp"
+
+#include "proton/event_loop.hpp"
+#include "proton/error.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/uuid.hpp"
+
+#include "contexts.hpp"
+#include "messaging_adapter.hpp"
+#include "msg.hpp"
+#include "proton_bits.hpp"
+#include "proton_event.hpp"
+
+#include <proton/connection.h>
+#include <proton/transport.h>
+#include <proton/event.h>
+
+#include <algorithm>
+
+
+namespace proton {
+namespace io {
+
+void connection_driver::init() {
+    if (pn_connection_driver_init(&driver_, pn_connection(), pn_transport()) != 0) {
+        this->~connection_driver(); // Dtor won't be called on throw from ctor.
+        throw proton::error(std::string("connection_driver allocation failed"));
+    }
+}
+
+connection_driver::connection_driver() : handler_(0), container_(0) { init(); }
+
+connection_driver::connection_driver(class container& cont, event_loop* loop) : handler_(0), container_(&cont) {
+    init();
+    connection_context& ctx = connection_context::get(connection());
+    ctx.container = container_;
+    ctx.event_loop.reset(loop);
+}
+
+connection_driver::~connection_driver() {
+    pn_connection_driver_destroy(&driver_);
+}
+
+// FIXME aconway 2016-11-16: rename _engine > _driver
+void connection_driver::configure(const connection_options& opts, bool server) {
+    proton::connection c(connection());
+    opts.apply_unbound(c);
+    if (server) pn_transport_set_server(driver_.transport);
+    pn_connection_driver_bind(&driver_);
+    opts.apply_bound(c);
+    handler_ =  opts.handler();
+    connection_context::get(connection()).collector =
+      pn_connection_collector(driver_.connection);
+}
+
+void connection_driver::connect(const connection_options& opts) {
+    connection_options all;
+    if (container_) {
+        all.container_id(container_->id());
+        all.update(container_->client_connection_options());
+    }
+    all.update(opts);
+    configure(all, false);
+    connection().open();
+}
+
+void connection_driver::accept(const connection_options& opts) {
+    connection_options all;
+    if (container_) {
+        all.container_id(container_->id());
+        all.update(container_->server_connection_options());
+    }
+    all.update(opts);
+    configure(all, true);
+}
+
+bool connection_driver::dispatch() {
+    pn_event_t* c_event;
+    while ((c_event = pn_connection_driver_next_event(&driver_)) != NULL) {
+        proton_event cpp_event(c_event, container_);
+        try {
+            if (handler_ != 0) {
+                messaging_adapter adapter(*handler_);
+                cpp_event.dispatch(adapter);
+            }
+        } catch (const std::exception& e) {
+            pn_condition_t *cond = pn_transport_condition(driver_.transport);
+            if (!pn_condition_is_set(cond)) {
+                pn_condition_format(cond, "exception", "%s", e.what());
+            }
+        }
+    }
+    return !pn_connection_driver_finished(&driver_);
+}
+
+mutable_buffer connection_driver::read_buffer() {
+    pn_rwbytes_t buffer = pn_connection_driver_read_buffer(&driver_);
+    return mutable_buffer(buffer.start, buffer.size);
+}
+
+void connection_driver::read_done(size_t n) {
+    return pn_connection_driver_read_done(&driver_, n);
+}
+
+void connection_driver::read_close() {
+    pn_connection_driver_read_close(&driver_);
+}
+
+const_buffer connection_driver::write_buffer() {
+    pn_bytes_t buffer = pn_connection_driver_write_buffer(&driver_);
+    return const_buffer(buffer.start, buffer.size);
+}
+
+void connection_driver::write_done(size_t n) {
+    return pn_connection_driver_write_done(&driver_, n);
+}
+
+void connection_driver::write_close() {
+    pn_connection_driver_write_close(&driver_);
+}
+
+void connection_driver::disconnected(const proton::error_condition& err) {
+    pn_condition_t* condition = pn_transport_condition(driver_.transport);
+    if (!pn_condition_is_set(condition))  {
+        set_error_condition(err, condition);
+    }
+    pn_connection_driver_close(&driver_);
+}
+
+proton::connection connection_driver::connection() const {
+    return make_wrapper(driver_.connection);
+}
+
+proton::transport connection_driver::transport() const {
+    return make_wrapper(driver_.transport);
+}
+
+proton::container* connection_driver::container() const {
+    return container_;
+}
+
+}}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org