You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/17 18:18:55 UTC
[08/11] qpid-proton git commit: PROTON-1344: proactor batch events,
rename connection_driver
PROTON-1344: proactor batch events, rename connection_driver
renamed pn_connection_engine as pn_connection_driver.
pn_proactor_wait() returns pn_event_batch_t* rather than individual pn_event_t*
to reduce thread-context switching.
Added pn_collector_next() for simpler event looping.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/25706a47
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/25706a47
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/25706a47
Branch: refs/heads/master
Commit: 25706a47ea8f29c0a53f5fae44195a916173cfbd
Parents: 94bc296
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Nov 16 22:31:00 2016 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Nov 17 11:22:14 2016 -0500
----------------------------------------------------------------------
examples/c/proactor/broker.c | 21 +-
examples/c/proactor/libuv_proactor.c | 323 +++++++++++--------
examples/c/proactor/receive.c | 100 +++---
examples/c/proactor/send.c | 164 +++++-----
examples/cpp/README.dox | 2 +-
examples/cpp/mt/epoll_container.cpp | 76 ++---
proton-c/CMakeLists.txt | 4 +-
proton-c/bindings/cpp/CMakeLists.txt | 2 +-
proton-c/bindings/cpp/docs/io.md | 6 +-
proton-c/bindings/cpp/docs/main.md | 2 +-
.../cpp/include/proton/connection_options.hpp | 4 +-
.../cpp/include/proton/io/connection_driver.hpp | 211 ++++++++++++
.../cpp/include/proton/io/connection_engine.hpp | 215 ------------
.../cpp/include/proton/messaging_handler.hpp | 2 +-
.../bindings/cpp/include/proton/transport.hpp | 2 +-
proton-c/bindings/cpp/src/engine_test.cpp | 12 +-
proton-c/bindings/cpp/src/include/contexts.hpp | 2 +-
.../bindings/cpp/src/io/connection_driver.cpp | 161 +++++++++
.../bindings/cpp/src/io/connection_engine.cpp | 160 ---------
proton-c/bindings/cpp/src/receiver.cpp | 2 +-
proton-c/bindings/cpp/src/thread_safe_test.cpp | 10 +-
proton-c/docs/api/index.md | 42 ++-
proton-c/include/proton/connection.h | 2 +-
proton-c/include/proton/connection_driver.h | 243 ++++++++++++++
proton-c/include/proton/connection_engine.h | 313 ------------------
proton-c/include/proton/cproton.i | 3 +
proton-c/include/proton/event.h | 56 ++++
proton-c/include/proton/proactor.h | 62 ++--
proton-c/src/core/connection_driver.c | 173 +++++-----
proton-c/src/core/connection_engine.c | 163 ----------
proton-c/src/core/event.c | 28 +-
proton-c/src/tests/refcount.c | 3 +-
qpid-proton-cpp.syms | 48 +--
33 files changed, 1259 insertions(+), 1358 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index e11a8bd..66381fc 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -17,7 +17,7 @@
* under the License.
*/
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
#include <proton/proactor.h>
#include <proton/engine.h>
#include <proton/sasl.h>
@@ -220,9 +220,11 @@ typedef struct broker_t {
const char *container_id; /* AMQP container-id */
size_t threads;
pn_millis_t heartbeat;
+ bool finished;
} broker_t;
void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) {
+ memset(b, 0, sizeof(*b));
b->proactor = pn_proactor();
b->listener = NULL;
queues_init(&b->queues);
@@ -293,8 +295,7 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond) {
const int WINDOW=10; /* Incoming credit window */
-static bool handle(broker_t* b, pn_event_t* e) {
- bool more = true;
+static void handle(broker_t* b, pn_event_t* e) {
pn_connection_t *c = pn_event_connection(e);
switch (pn_event_type(e)) {
@@ -398,20 +399,24 @@ static bool handle(broker_t* b, pn_event_t* e) {
break;
case PN_PROACTOR_INTERRUPT:
- more = false;
+ b->finished = true;
break;
default:
break;
}
- pn_event_done(e);
- return more;
}
static void broker_thread(void *void_broker) {
broker_t *b = (broker_t*)void_broker;
- while (handle(b, pn_proactor_wait(b->proactor)))
- ;
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(b->proactor);
+ pn_event_t *e;
+ while ((e = pn_event_batch_next(events))) {
+ handle(b, e);
+ }
+ pn_proactor_done(b->proactor, events);
+ } while(!b->finished);
}
static void usage(const char *arg0) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/c/proactor/libuv_proactor.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c
index 8dd2706..35afd5c 100644
--- a/examples/c/proactor/libuv_proactor.c
+++ b/examples/c/proactor/libuv_proactor.c
@@ -22,7 +22,7 @@
#include <uv.h>
#include <proton/condition.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
#include <proton/engine.h>
#include <proton/extra.h>
#include <proton/message.h>
@@ -44,11 +44,15 @@
To provide concurrency the proactor uses a "leader-worker-follower" model,
threads take turns at the roles:
- - a single "leader" calls libuv functions and runs the uv_loop incrementally.
- When there is work it hands over leadership and becomes a "worker"
+ - a single "leader" calls libuv functions and runs the uv_loop in short bursts
+ to generate work. When there is work available it gives up leadership and
+ becomes a "worker"
+
- "workers" handle events concurrently for distinct connections/listeners
- When the work is done they become "followers"
- - "followers" wait for the leader to step aside, one takes over as new leader.
+ They do as much work as they can get, when none is left they become "followers"
+
+ - "followers" wait for the leader to generate work and become workers.
+ When the leader itself becomes a worker, one of the followers takes over.
This model is symmetric: any thread can take on any role based on run-time
requirements. It also allows the IO and non-IO work associated with an IO
@@ -77,7 +81,7 @@ PN_HANDLE(PN_PROACTOR)
PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
-/* common to connection engine and listeners */
+/* common to connection and listener */
typedef struct psocket_t {
/* Immutable */
pn_proactor_t *proactor;
@@ -118,11 +122,11 @@ static inline const char* fixstr(const char* str) {
return str[0] == '\001' ? NULL : str;
}
-typedef struct pconn {
+typedef struct pconnection_t {
psocket_t psocket;
/* Only used by owner thread */
- pn_connection_engine_t ceng;
+ pn_connection_driver_t driver;
/* Only used by leader */
uv_connect_t connect;
@@ -132,7 +136,7 @@ typedef struct pconn {
size_t writing;
bool reading:1;
bool server:1; /* accept, not connect */
-} pconn;
+} pconnection_t;
struct pn_listener_t {
psocket_t psocket;
@@ -140,6 +144,7 @@ struct pn_listener_t {
/* Only used by owner thread */
pn_condition_t *condition;
pn_collector_t *collector;
+ pn_event_batch_t batch;
size_t backlog;
};
@@ -153,6 +158,10 @@ struct pn_proactor_t {
uv_loop_t loop;
uv_async_t async;
+ /* Owner thread: proactor collector and batch can belong to leader or a worker */
+ pn_collector_t *collector;
+ pn_event_batch_t batch;
+
/* Protected by lock */
uv_mutex_t lock;
queue start_q;
@@ -162,11 +171,7 @@ struct pn_proactor_t {
size_t count; /* psocket count */
bool inactive:1;
bool has_leader:1;
-
- /* Immutable collectors to hold fixed events */
- pn_collector_t *interrupt_event;
- pn_collector_t *timeout_event;
- pn_collector_t *inactive_event;
+ bool batch_working:1; /* batch belongs to a worker. */
};
static bool push_lh(queue *q, psocket_t *ps) {
@@ -191,8 +196,8 @@ static psocket_t* pop_lh(queue *q) {
return ps;
}
-static inline pconn *as_pconn(psocket_t* ps) {
- return ps->is_conn ? (pconn*)ps : NULL;
+static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
+ return ps->is_conn ? (pconnection_t*)ps : NULL;
}
static inline pn_listener_t *as_listener(psocket_t* ps) {
@@ -213,9 +218,9 @@ static void to_leader(psocket_t *ps) {
/* Detach from IO and put ps on the worker queue */
static void leader_to_worker(psocket_t *ps) {
- pconn *pc = as_pconn(ps);
+ pconnection_t *pc = as_pconnection_t(ps);
/* Don't detach if there are no events yet. */
- if (pc && pn_connection_engine_has_event(&pc->ceng)) {
+ if (pc && pn_connection_driver_has_event(&pc->driver)) {
if (pc->writing) {
pc->writing = 0;
uv_cancel((uv_req_t*)&pc->write);
@@ -236,6 +241,28 @@ static void leader_to_worker(psocket_t *ps) {
uv_mutex_unlock(&ps->proactor->lock);
}
+/* Set a deferred action for leader, if not already set. */
+static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
+ uv_mutex_lock(&ps->proactor->lock);
+ if (!ps->action) {
+ ps->action = action;
+ }
+ to_leader_lh(ps);
+ uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Owner thread send to worker thread. Set deferred action if not already set. */
+static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) {
+ uv_mutex_lock(&ps->proactor->lock);
+ if (!ps->action) {
+ ps->action = action;
+ }
+ push_lh(&ps->proactor->worker_q, ps);
+ uv_async_send(&ps->proactor->async); /* Wake leader */
+ uv_mutex_unlock(&ps->proactor->lock);
+}
+
+
/* Re-queue for further work */
static void worker_requeue(psocket_t* ps) {
uv_mutex_lock(&ps->proactor->lock);
@@ -244,25 +271,43 @@ static void worker_requeue(psocket_t* ps) {
uv_mutex_unlock(&ps->proactor->lock);
}
-static pconn *new_pconn(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) {
- pconn *pc = (pconn*)calloc(1, sizeof(*pc));
+static pconnection_t *new_pconnection_t(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) {
+ pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
if (!pc) return NULL;
- if (pn_connection_engine_init(&pc->ceng, pn_connection_with_extra(extra.size), NULL) != 0) {
+ if (pn_connection_driver_init(&pc->driver, pn_connection_with_extra(extra.size), NULL) != 0) {
return NULL;
}
if (extra.start && extra.size) {
- memcpy(pn_connection_get_extra(pc->ceng.connection).start, extra.start, extra.size);
+ memcpy(pn_connection_get_extra(pc->driver.connection).start, extra.start, extra.size);
}
psocket_init(&pc->psocket, p, true, host, port);
if (server) {
- pn_transport_set_server(pc->ceng.transport);
+ pn_transport_set_server(pc->driver.transport);
}
- pn_record_t *r = pn_connection_attachments(pc->ceng.connection);
+ pn_record_t *r = pn_connection_attachments(pc->driver.connection);
pn_record_def(r, PN_PROACTOR, PN_VOID);
pn_record_set(r, PN_PROACTOR, pc);
return pc;
}
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
+
+static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
+ return (batch->next_event == proactor_batch_next) ?
+ (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
+}
+
+static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
+ return (batch->next_event == listener_batch_next) ?
+ (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
+}
+
+static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
+ pn_connection_driver_t *d = pn_event_batch_connection_driver(batch);
+ return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
+}
+
pn_listener_t *new_listener(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
pn_listener_t *l = (pn_listener_t*)calloc(1, PN_EXTRA_SIZEOF(pn_listener_t, extra.size));
if (!l) {
@@ -278,6 +323,7 @@ pn_listener_t *new_listener(pn_proactor_t *p, const char *host, const char *port
}
psocket_init(&l->psocket, p, false, host, port);
l->condition = pn_condition();
+ l->batch.next_event = listener_batch_next;
l->backlog = backlog;
return l;
}
@@ -290,11 +336,12 @@ static void leader_count(pn_proactor_t *p, int change) {
}
/* Free if there are no uv callbacks pending and no events */
-static void leader_pconn_maybe_free(pconn *pc) {
- if (pn_connection_engine_has_event(&pc->ceng)) {
+static void leader_pconnection_t_maybe_free(pconnection_t *pc) {
+ if (pn_connection_driver_has_event(&pc->driver)) {
leader_to_worker(&pc->psocket); /* Return to worker */
- } else if (!(pc->psocket.tcp.data || pc->shutdown.data || pc->timer.data)) {
- pn_connection_engine_destroy(&pc->ceng);
+ } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) {
+ /* All UV requests are finished */
+ pn_connection_driver_destroy(&pc->driver);
leader_count(pc->psocket.proactor, -1);
free(pc);
}
@@ -314,7 +361,7 @@ static void leader_listener_maybe_free(pn_listener_t *l) {
/* Free if there are no uv callbacks pending and no events */
static void leader_maybe_free(psocket_t *ps) {
if (ps->is_conn) {
- leader_pconn_maybe_free(as_pconn(ps));
+ leader_pconnection_t_maybe_free(as_pconnection_t(ps));
} else {
leader_listener_maybe_free(as_listener(ps));
}
@@ -336,9 +383,9 @@ static inline void leader_close(psocket_t *ps) {
if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) {
uv_close((uv_handle_t*)&ps->tcp, on_close);
}
- pconn *pc = as_pconn(ps);
+ pconnection_t *pc = as_pconnection_t(ps);
if (pc) {
- pn_connection_engine_close(&pc->ceng);
+ pn_connection_driver_close(&pc->driver);
if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
uv_timer_stop(&pc->timer);
uv_close((uv_handle_t*)&pc->timer, on_close);
@@ -347,20 +394,20 @@ static inline void leader_close(psocket_t *ps) {
leader_maybe_free(ps);
}
-static pconn *get_pconn(pn_connection_t* c) {
+static pconnection_t *get_pconnection_t(pn_connection_t* c) {
if (!c) return NULL;
pn_record_t *r = pn_connection_attachments(c);
- return (pconn*) pn_record_get(r, PN_PROACTOR);
+ return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
}
static void leader_error(psocket_t *ps, int err, const char* what) {
if (ps->is_conn) {
- pn_connection_engine_t *ceng = &as_pconn(ps)->ceng;
- pn_connection_engine_errorf(ceng, COND_NAME, "%s %s:%s: %s",
+ pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
+ pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+ pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
what, fixstr(ps->host), fixstr(ps->port),
uv_strerror(err));
- pn_connection_engine_bind(ceng);
- pn_connection_engine_close(ceng);
+ pn_connection_driver_close(driver);
} else {
pn_listener_t *l = as_listener(ps);
pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
@@ -376,9 +423,9 @@ static int leader_init(psocket_t *ps) {
leader_count(ps->proactor, +1);
int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
if (!err) {
- pconn *pc = as_pconn(ps);
+ pconnection_t *pc = as_pconnection_t(ps);
if (pc) {
- pc->connect.data = pc->write.data = pc->shutdown.data = ps;
+ pc->connect.data = ps;
int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
if (!err) {
pc->timer.data = pc;
@@ -392,7 +439,7 @@ static int leader_init(psocket_t *ps) {
}
/* Common logic for on_connect and on_accept */
-static void leader_connect_accept(pconn *pc, int err, const char *what) {
+static void leader_connect_accept(pconnection_t *pc, int err, const char *what) {
if (!err) {
leader_to_worker(&pc->psocket);
} else {
@@ -401,14 +448,14 @@ static void leader_connect_accept(pconn *pc, int err, const char *what) {
}
static void on_connect(uv_connect_t *connect, int err) {
- leader_connect_accept((pconn*)connect->data, err, "on connect");
+ leader_connect_accept((pconnection_t*)connect->data, err, "on connect");
}
static void on_accept(uv_stream_t* server, int err) {
pn_listener_t* l = (pn_listener_t*)server->data;
if (!err) {
pn_rwbytes_t v = pn_listener_get_extra(l);
- pconn *pc = new_pconn(l->psocket.proactor, true,
+ pconnection_t *pc = new_pconnection_t(l->psocket.proactor, true,
fixstr(l->psocket.host),
fixstr(l->psocket.port),
pn_bytes(v.size, v.start));
@@ -436,7 +483,7 @@ static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
}
static void leader_connect(psocket_t *ps) {
- pconn *pc = as_pconn(ps);
+ pconnection_t *pc = as_pconnection_t(ps);
uv_getaddrinfo_t info;
int err = leader_resolve(ps, &info, false);
if (!err) {
@@ -450,7 +497,7 @@ static void leader_connect(psocket_t *ps) {
static void leader_listen(psocket_t *ps) {
pn_listener_t *l = as_listener(ps);
- uv_getaddrinfo_t info;
+ uv_getaddrinfo_t info;
int err = leader_resolve(ps, &info, true);
if (!err) {
err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
@@ -463,8 +510,8 @@ static void leader_listen(psocket_t *ps) {
}
static void on_tick(uv_timer_t *timer) {
- pconn *pc = (pconn*)timer->data;
- pn_transport_t *t = pc->ceng.transport;
+ pconnection_t *pc = (pconnection_t*)timer->data;
+ pn_transport_t *t = pc->driver.transport;
if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
uv_timer_stop(&pc->timer);
uint64_t now = uv_now(pc->timer.loop);
@@ -476,24 +523,25 @@ static void on_tick(uv_timer_t *timer) {
}
static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
- pconn *pc = (pconn*)stream->data;
+ pconnection_t *pc = (pconnection_t*)stream->data;
if (nread >= 0) {
- pn_connection_engine_read_done(&pc->ceng, nread);
+ pn_connection_driver_read_done(&pc->driver, nread);
on_tick(&pc->timer); /* check for tick changes. */
leader_to_worker(&pc->psocket);
/* Reading continues automatically until stopped. */
} else if (nread == UV_EOF) { /* hangup */
- pn_connection_engine_read_close(&pc->ceng);
+ pn_connection_driver_read_close(&pc->driver);
leader_maybe_free(&pc->psocket);
} else {
leader_error(&pc->psocket, nread, "on read from");
}
}
-static void on_write(uv_write_t* request, int err) {
- pconn *pc = (pconn*)request->data;
+static void on_write(uv_write_t* write, int err) {
+ pconnection_t *pc = (pconnection_t*)write->data;
+ write->data = NULL;
if (err == 0) {
- pn_connection_engine_write_done(&pc->ceng, pc->writing);
+ pn_connection_driver_write_done(&pc->driver, pc->writing);
leader_to_worker(&pc->psocket);
} else if (err == UV_ECANCELED) {
leader_maybe_free(&pc->psocket);
@@ -505,29 +553,31 @@ static void on_write(uv_write_t* request, int err) {
// Read buffer allocation function for uv, just returns the transports read buffer.
static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
- pconn *pc = (pconn*)stream->data;
- pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&pc->ceng);
+ pconnection_t *pc = (pconnection_t*)stream->data;
+ pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
*buf = uv_buf_init(rbuf.start, rbuf.size);
}
static void leader_rewatch(psocket_t *ps) {
- pconn *pc = as_pconn(ps);
+ pconnection_t *pc = as_pconnection_t(ps);
if (pc->timer.data) { /* uv-initialized */
on_tick(&pc->timer); /* Re-enable ticks if required */
}
- pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&pc->ceng);
- pn_bytes_t wbuf = pn_connection_engine_write_buffer(&pc->ceng);
+ pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+ pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
/* Ticks and checking buffers can generate events, process before proceeding */
- if (pn_connection_engine_has_event(&pc->ceng)) {
+ if (pn_connection_driver_has_event(&pc->driver)) {
leader_to_worker(ps);
} else { /* Re-watch for IO */
if (wbuf.size > 0 && !pc->writing) {
pc->writing = wbuf.size;
uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+ pc->write.data = ps;
uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
- } else if (wbuf.size == 0 && pn_connection_engine_write_closed(&pc->ceng)) {
+ } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
+ pc->shutdown.data = ps;
uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
}
if (rbuf.size > 0 && !pc->reading) {
@@ -537,23 +587,31 @@ static void leader_rewatch(psocket_t *ps) {
}
}
-/* Return the next worker event or { 0 } if no events are ready */
-static pn_event_t* get_event_lh(pn_proactor_t *p) {
- if (p->inactive) {
- p->inactive = false;
- return pn_collector_peek(p->inactive_event);
- }
- if (p->interrupt > 0) {
- --p->interrupt;
- return pn_collector_peek(p->interrupt_event);
+static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
+ pn_collector_put(p->collector, pn_proactor__class(), p, t);
+ p->batch_working = true;
+ return &p->batch;
+}
+
+/* Return the next event batch or 0 if no events are ready */
+static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
+ if (!p->batch_working) { /* Can generate proactor events */
+ if (p->inactive) {
+ p->inactive = false;
+ return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
+ }
+ if (p->interrupt > 0) {
+ --p->interrupt;
+ return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
+ }
}
for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
if (ps->is_conn) {
- pconn *pc = as_pconn(ps);
- return pn_connection_engine_event(&pc->ceng);
+ pconnection_t *pc = as_pconnection_t(ps);
+ return &pc->driver.batch;
} else { /* Listener */
pn_listener_t *l = as_listener(ps);
- return pn_collector_peek(l->collector);
+ return &l->batch;
}
to_leader(ps); /* No event, back to leader */
}
@@ -568,15 +626,6 @@ static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
uv_mutex_unlock(&ps->proactor->lock);
}
-/* Defer an action to the leader thread. Only from non-leader threads. */
-static void owner_defer(psocket_t *ps, void (*action)(psocket_t*)) {
- uv_mutex_lock(&ps->proactor->lock);
- assert(!ps->action);
- ps->action = action;
- to_leader_lh(ps);
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
pn_listener_t *pn_event_listener(pn_event_t *e) {
return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
}
@@ -590,57 +639,47 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) {
return NULL;
}
-void pn_event_done(pn_event_t *e) {
- pn_event_type_t etype = pn_event_type(e);
- pconn *pc = get_pconn(pn_event_connection(e));
- if (pc && e == pn_collector_peek(pc->ceng.collector)) {
- pn_connection_engine_pop_event(&pc->ceng);
- if (etype == PN_CONNECTION_INIT) {
- /* Bind after user has handled CONNECTION_INIT */
- pn_connection_engine_bind(&pc->ceng);
- }
- if (pn_connection_engine_has_event(&pc->ceng)) {
- /* Process all events before going back to IO.
- Put it back on the worker queue and wake the leader.
- */
+void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
+ pconnection_t *pc = batch_pconnection(batch);
+ if (pc) {
+ if (pn_connection_driver_has_event(&pc->driver)) {
+ /* Process all events before going back to IO. */
worker_requeue(&pc->psocket);
- } else if (pn_connection_engine_finished(&pc->ceng)) {
- owner_defer(&pc->psocket, leader_close);
+ } else if (pn_connection_driver_finished(&pc->driver)) {
+ owner_to_leader(&pc->psocket, leader_close);
} else {
- owner_defer(&pc->psocket, leader_rewatch);
- }
- } else {
- pn_listener_t *l = pn_event_listener(e);
- if (l && e == pn_collector_peek(l->collector)) {
- pn_collector_pop(l->collector);
- if (etype == PN_LISTENER_CLOSE) {
- owner_defer(&l->psocket, leader_close);
- }
+ owner_to_leader(&pc->psocket, leader_rewatch);
}
+ return;
+ }
+ pn_proactor_t *bp = batch_proactor(batch);
+ if (bp == p) {
+ uv_mutex_lock(&p->lock);
+ p->batch_working = false;
+ uv_async_send(&p->async); /* Wake leader */
+ uv_mutex_unlock(&p->lock);
+ return;
}
+ /* Nothing extra to do for listener, it is always in the UV loop. */
}
/* Run follower/leader loop till we can return an event and be a worker */
-pn_event_t *pn_proactor_wait(struct pn_proactor_t* p) {
+pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
uv_mutex_lock(&p->lock);
/* Try to grab work immediately. */
- pn_event_t *e = get_event_lh(p);
- if (e == NULL) {
+ pn_event_batch_t *batch = get_batch_lh(p);
+ if (batch == NULL) {
/* No work available, follow the leader */
- while (p->has_leader)
+ while (p->has_leader) {
uv_cond_wait(&p->cond, &p->lock);
+ }
/* Lead till there is work to do. */
p->has_leader = true;
- for (e = get_event_lh(p); e == NULL; e = get_event_lh(p)) {
- /* Run uv_loop outside the lock */
- uv_mutex_unlock(&p->lock);
- uv_run(&p->loop, UV_RUN_ONCE);
- uv_mutex_lock(&p->lock);
- /* Process leader work queue outside the lock */
+ while (batch == NULL) {
for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
void (*action)(psocket_t*) = ps->action;
- ps->action = NULL;
void (*wakeup)(psocket_t*) = ps->wakeup;
+ ps->action = NULL;
ps->wakeup = NULL;
if (action || wakeup) {
uv_mutex_unlock(&p->lock);
@@ -649,13 +688,19 @@ pn_event_t *pn_proactor_wait(struct pn_proactor_t* p) {
uv_mutex_lock(&p->lock);
}
}
+ batch = get_batch_lh(p);
+ if (batch == NULL) {
+ uv_mutex_unlock(&p->lock);
+ uv_run(&p->loop, UV_RUN_ONCE);
+ uv_mutex_lock(&p->lock);
+ }
}
/* Signal the next leader and return to work */
p->has_leader = false;
uv_cond_signal(&p->cond);
}
uv_mutex_unlock(&p->lock);
- return e;
+ return batch;
}
void pn_proactor_interrupt(pn_proactor_t *p) {
@@ -666,11 +711,12 @@ void pn_proactor_interrupt(pn_proactor_t *p) {
}
int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn_bytes_t extra) {
- pconn *pc = new_pconn(p, false, host, port, extra);
+ pconnection_t *pc = new_pconnection_t(p, false, host, port, extra);
if (!pc) {
return PN_OUT_OF_MEMORY;
}
- owner_defer(&pc->psocket, leader_connect); /* Process PN_CONNECTION_INIT before binding */
+ /* Process PN_CONNECTION_INIT before binding */
+ owner_to_worker(&pc->psocket, leader_connect);
return 0;
}
@@ -678,12 +724,12 @@ pn_rwbytes_t pn_listener_get_extra(pn_listener_t *l) { return PN_EXTRA_GET(pn_li
pn_listener_t *pn_proactor_listen(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
pn_listener_t *l = new_listener(p, host, port, backlog, extra);
- if (l) owner_defer(&l->psocket, leader_listen);
+ if (l) owner_to_leader(&l->psocket, leader_listen);
return l;
}
pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
- pconn *pc = get_pconn(c);
+ pconnection_t *pc = get_pconnection_t(c);
return pc ? pc->psocket.proactor : NULL;
}
@@ -692,13 +738,14 @@ pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
}
void leader_wake_connection(psocket_t *ps) {
- pconn *pc = as_pconn(ps);
- pn_collector_put(pc->ceng.collector, PN_OBJECT, pc->ceng.connection, PN_CONNECTION_WAKE);
+ pconnection_t *pc = as_pconnection_t(ps);
+ pn_connection_t *c = pc->driver.connection;
+ pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
leader_to_worker(ps);
}
void pn_connection_wake(pn_connection_t* c) {
- wakeup(&get_pconn(c)->psocket, leader_wake_connection);
+ wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
}
void pn_listener_close(pn_listener_t* l) {
@@ -710,22 +757,15 @@ pn_condition_t* pn_listener_condition(pn_listener_t* l) {
return l->condition;
}
-/* Collector to hold for a single fixed event that is never popped. */
-static pn_collector_t *event_holder(pn_proactor_t *p, pn_event_type_t t) {
- pn_collector_t *c = pn_collector();
- pn_collector_put(c, pn_proactor__class(), p, t);
- return c;
-}
-
pn_proactor_t *pn_proactor() {
pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
+ p->collector = pn_collector();
+ p->batch.next_event = &proactor_batch_next;
+ if (!p->collector) return NULL;
uv_loop_init(&p->loop);
uv_mutex_init(&p->lock);
uv_cond_init(&p->cond);
uv_async_init(&p->loop, &p->async, NULL); /* Just wake the loop */
- p->interrupt_event = event_holder(p, PN_PROACTOR_INTERRUPT);
- p->inactive_event = event_holder(p, PN_PROACTOR_INACTIVE);
- p->timeout_event = event_holder(p, PN_PROACTOR_TIMEOUT);
return p;
}
@@ -741,8 +781,19 @@ void pn_proactor_free(pn_proactor_t *p) {
uv_loop_close(&p->loop);
uv_mutex_destroy(&p->lock);
uv_cond_destroy(&p->cond);
- pn_collector_free(p->interrupt_event);
- pn_collector_free(p->inactive_event);
- pn_collector_free(p->timeout_event);
+ pn_collector_free(p->collector);
free(p);
}
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
+ pn_listener_t *l = batch_listener(batch);
+ pn_event_t *handled = pn_collector_prev(l->collector);
+ if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) {
+ owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */
+ }
+ return pn_collector_next(l->collector);
+}
+
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
+ return pn_collector_next(batch_proactor(batch)->collector);
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index acdae0c..88e3456 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -20,7 +20,7 @@
*/
#include <proton/connection.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
#include <proton/delivery.h>
#include <proton/proactor.h>
#include <proton/link.h>
@@ -37,12 +37,13 @@
typedef char str[1024];
typedef struct app_data_t {
- str address;
- str container_id;
- pn_rwbytes_t message_buffer;
- int message_count;
- int received;
- pn_proactor_t *proactor;
+ str address;
+ str container_id;
+ pn_rwbytes_t message_buffer;
+ int message_count;
+ int received;
+ pn_proactor_t *proactor;
+ bool finished;
} app_data_t;
static const int BATCH = 100; /* Batch size for unlimited receive */
@@ -80,9 +81,7 @@ static void decode_message(pn_delivery_t *dlv) {
}
}
-/* Handle event, return true of we should handle more */
-static bool handle(app_data_t* app, pn_event_t* event) {
- bool more = true;
+static void handle(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
case PN_CONNECTION_INIT: {
@@ -149,53 +148,58 @@ static bool handle(app_data_t* app, pn_event_t* event) {
break;
case PN_PROACTOR_INACTIVE:
- more = false;
+ app->finished = true;
break;
default: break;
}
- pn_event_done(event);
- return more;
}
static void usage(const char *arg0) {
- fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
- exit(1);
+ fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+ exit(1);
}
int main(int argc, char **argv) {
- /* Default values for application and connection. */
- app_data_t app = {{0}};
- app.message_count = 100;
- const char* urlstr = NULL;
-
- int opt;
- while((opt = getopt(argc, argv, "a:m:")) != -1) {
- switch(opt) {
- case 'a': urlstr = optarg; break;
- case 'm': app.message_count = atoi(optarg); break;
- default: usage(argv[0]); break;
- }
+ /* Default values for application and connection. */
+ app_data_t app = {{0}};
+ app.message_count = 100;
+ const char* urlstr = NULL;
+
+ int opt;
+ while((opt = getopt(argc, argv, "a:m:")) != -1) {
+ switch(opt) {
+ case 'a': urlstr = optarg; break;
+ case 'm': app.message_count = atoi(optarg); break;
+ default: usage(argv[0]); break;
}
- if (optind < argc)
- usage(argv[0]);
-
- snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
-
- /* Parse the URL or use default values */
- pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
- const char *host = url ? pn_url_get_host(url) : NULL;
- const char *port = url ? pn_url_get_port(url) : "amqp";
- strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
-
- /* Create the proactor and connect */
- app.proactor = pn_proactor();
- pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
- if (url) pn_url_free(url);
-
- while (handle(&app, pn_proactor_wait(app.proactor)))
- ;
- pn_proactor_free(app.proactor);
- free(app.message_buffer.start);
- return exit_code;
+ }
+ if (optind < argc)
+ usage(argv[0]);
+
+ snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
+
+ /* Parse the URL or use default values */
+ pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+ const char *host = url ? pn_url_get_host(url) : NULL;
+ const char *port = url ? pn_url_get_port(url) : "amqp";
+ strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
+
+ /* Create the proactor and connect */
+ app.proactor = pn_proactor();
+ pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
+ if (url) pn_url_free(url);
+
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(app.proactor);
+ pn_event_t *e;
+ while ((e = pn_event_batch_next(events))) {
+ handle(&app, e);
+ }
+ pn_proactor_done(app.proactor, events);
+ } while(!app.finished);
+
+ pn_proactor_free(app.proactor);
+ free(app.message_buffer.start);
+ return exit_code;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index 5d58895..d64ea2d 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -20,7 +20,7 @@
*/
#include <proton/connection.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
#include <proton/delivery.h>
#include <proton/proactor.h>
#include <proton/link.h>
@@ -37,13 +37,14 @@
typedef char str[1024];
typedef struct app_data_t {
- str address;
- str container_id;
- pn_rwbytes_t message_buffer;
- int message_count;
- int sent;
- int acknowledged;
- pn_proactor_t *proactor;
+ str address;
+ str container_id;
+ pn_rwbytes_t message_buffer;
+ int message_count;
+ int sent;
+ int acknowledged;
+ pn_proactor_t *proactor;
+ bool finished;
} app_data_t;
int exit_code = 0;
@@ -58,41 +59,39 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond) {
/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
static pn_bytes_t encode_message(app_data_t* app) {
- /* Construct a message with the map { "sequence": app.sent } */
- pn_message_t* message = pn_message();
- pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
- pn_data_t* body = pn_message_body(message);
- pn_data_put_map(body);
- pn_data_enter(body);
- pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
- pn_data_put_int(body, app->sent); /* The sequence number */
- pn_data_exit(body);
-
- /* encode the message, expanding the encode buffer as needed */
- if (app->message_buffer.start == NULL) {
- static const size_t initial_size = 128;
- app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
- }
- /* app->message_buffer is the total buffer space available. */
- /* mbuf wil point at just the portion used by the encoded message */
- pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
- int status = 0;
- while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
- app->message_buffer.size *= 2;
- app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
- mbuf.size = app->message_buffer.size;
- }
- if (status != 0) {
- fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
- exit(1);
- }
- pn_message_free(message);
- return pn_bytes(mbuf.size, mbuf.start);
+ /* Construct a message with the map { "sequence": app.sent } */
+ pn_message_t* message = pn_message();
+ pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+ pn_data_t* body = pn_message_body(message);
+ pn_data_put_map(body);
+ pn_data_enter(body);
+ pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+ pn_data_put_int(body, app->sent); /* The sequence number */
+ pn_data_exit(body);
+
+ /* encode the message, expanding the encode buffer as needed */
+ if (app->message_buffer.start == NULL) {
+ static const size_t initial_size = 128;
+ app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+ }
+ /* app->message_buffer is the total buffer space available. */
+ /* mbuf wil point at just the portion used by the encoded message */
+ pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+ int status = 0;
+ while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+ app->message_buffer.size *= 2;
+ app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+ mbuf.size = app->message_buffer.size;
+ }
+ if (status != 0) {
+ fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+ exit(1);
+ }
+ pn_message_free(message);
+ return pn_bytes(mbuf.size, mbuf.start);
}
-/* Handle event, return true of we should handle more */
-static bool handle(app_data_t* app, pn_event_t* event) {
- bool more = true;
+static void handle(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
case PN_CONNECTION_INIT: {
@@ -130,7 +129,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
}
} break;
- case PN_TRANSPORT_ERROR:
+ case PN_TRANSPORT_CLOSED:
check_condition(event, pn_transport_condition(pn_event_transport(event)));
break;
@@ -151,53 +150,58 @@ static bool handle(app_data_t* app, pn_event_t* event) {
break;
case PN_PROACTOR_INACTIVE:
- more = false;
+ app->finished = true;
break;
default: break;
}
- pn_event_done(event);
- return more;
}
static void usage(const char *arg0) {
- fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
- exit(1);
+ fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+ exit(1);
}
int main(int argc, char **argv) {
- /* Default values for application and connection. */
- app_data_t app = {{0}};
- app.message_count = 100;
- const char* urlstr = NULL;
-
- int opt;
- while((opt = getopt(argc, argv, "a:m:")) != -1) {
- switch(opt) {
- case 'a': urlstr = optarg; break;
- case 'm': app.message_count = atoi(optarg); break;
- default: usage(argv[0]); break;
- }
+ /* Default values for application and connection. */
+ app_data_t app = {{0}};
+ app.message_count = 100;
+ const char* urlstr = NULL;
+
+ int opt;
+ while((opt = getopt(argc, argv, "a:m:")) != -1) {
+ switch(opt) {
+ case 'a': urlstr = optarg; break;
+ case 'm': app.message_count = atoi(optarg); break;
+ default: usage(argv[0]); break;
}
- if (optind < argc)
- usage(argv[0]);
-
- snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
-
- /* Parse the URL or use default values */
- pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
- const char *host = url ? pn_url_get_host(url) : NULL;
- const char *port = url ? pn_url_get_port(url) : "amqp";
- strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
-
- /* Create the proactor and connect */
- app.proactor = pn_proactor();
- pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
- if (url) pn_url_free(url);
-
- while (handle(&app, pn_proactor_wait(app.proactor)))
- ;
- pn_proactor_free(app.proactor);
- free(app.message_buffer.start);
- return exit_code;
+ }
+ if (optind < argc)
+ usage(argv[0]);
+
+ snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
+
+ /* Parse the URL or use default values */
+ pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+ const char *host = url ? pn_url_get_host(url) : NULL;
+ const char *port = url ? pn_url_get_port(url) : "amqp";
+ strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
+
+ /* Create the proactor and connect */
+ app.proactor = pn_proactor();
+ pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
+ if (url) pn_url_free(url);
+
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(app.proactor);
+ pn_event_t *e;
+ while ((e = pn_event_batch_next(events))) {
+ handle(&app, e);
+ }
+ pn_proactor_done(app.proactor, events);
+ } while(!app.finished);
+
+ pn_proactor_free(app.proactor);
+ free(app.message_buffer.start);
+ return exit_code;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/cpp/README.dox
----------------------------------------------------------------------
diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox
index 1d46ec8..447d3ad 100644
--- a/examples/cpp/README.dox
+++ b/examples/cpp/README.dox
@@ -123,7 +123,7 @@ subscribe.
/** @example mt/epoll_container.cpp
An example implementation of the proton::container API that shows how
-to use the proton::io::connection_engine SPI to adapt the proton API
+to use the proton::io::connection_driver SPI to adapt the proton API
to native IO, in this case using a multithreaded Linux epoll poller as
the implementation.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/cpp/mt/epoll_container.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/epoll_container.cpp b/examples/cpp/mt/epoll_container.cpp
index d9b9f08..7646673 100644
--- a/examples/cpp/mt/epoll_container.cpp
+++ b/examples/cpp/mt/epoll_container.cpp
@@ -25,7 +25,7 @@
#include <proton/url.hpp>
#include <proton/io/container_impl_base.hpp>
-#include <proton/io/connection_engine.hpp>
+#include <proton/io/connection_driver.hpp>
#include <proton/io/link_namer.hpp>
#include <atomic>
@@ -97,7 +97,7 @@ class unique_fd {
};
class pollable;
-class pollable_engine;
+class pollable_driver;
class pollable_listener;
class epoll_container : public proton::io::container_impl_base {
@@ -124,7 +124,7 @@ class epoll_container : public proton::io::container_impl_base {
std::string id() const OVERRIDE { return id_; }
// Functions used internally.
- proton::connection add_engine(proton::connection_options opts, int fd, bool server);
+ proton::connection add_driver(proton::connection_options opts, int fd, bool server);
void erase(pollable*);
// Link names must be unique per container.
@@ -160,7 +160,7 @@ class epoll_container : public proton::io::container_impl_base {
proton::connection_options options_;
std::map<std::string, std::unique_ptr<pollable_listener> > listeners_;
- std::map<pollable*, std::unique_ptr<pollable_engine> > engines_;
+ std::map<pollable*, std::unique_ptr<pollable_driver> > drivers_;
std::condition_variable stopped_;
bool stopping_;
@@ -274,21 +274,21 @@ class epoll_event_loop : public proton::event_loop {
bool closed_;
};
-// Handle epoll wakeups for a connection_engine.
-class pollable_engine : public pollable {
+// Handle epoll wakeups for a connection_driver.
+class pollable_driver : public pollable {
public:
- pollable_engine(epoll_container& c, int fd, int epoll_fd) :
+ pollable_driver(epoll_container& c, int fd, int epoll_fd) :
pollable(fd, epoll_fd),
loop_(new epoll_event_loop(*this)),
- engine_(c, loop_)
+ driver_(c, loop_)
{
- proton::connection conn = engine_.connection();
+ proton::connection conn = driver_.connection();
proton::io::set_link_namer(conn, c.link_namer);
}
- ~pollable_engine() {
+ ~pollable_driver() {
loop_->close(); // No calls to notify() after this.
- engine_.dispatch(); // Run any final events.
+ driver_.dispatch(); // Run any final events.
try { write(); } catch(...) {} // Write connection close if we can.
for (auto f : loop_->pop_all()) {// Run final queued work for side-effects.
try { f(); } catch(...) {}
@@ -303,17 +303,17 @@ class pollable_engine : public pollable {
can_read = can_read && read();
for (auto f : loop_->pop_all()) // Run queued work
f();
- engine_.dispatch();
+ driver_.dispatch();
} while (can_read || can_write);
- return (engine_.read_buffer().size ? EPOLLIN:0) |
- (engine_.write_buffer().size ? EPOLLOUT:0);
+ return (driver_.read_buffer().size ? EPOLLIN:0) |
+ (driver_.write_buffer().size ? EPOLLOUT:0);
} catch (const std::exception& e) {
- engine_.disconnected(proton::error_condition("exception", e.what()));
+ driver_.disconnected(proton::error_condition("exception", e.what()));
}
return 0; // Ending
}
- proton::io::connection_engine& engine() { return engine_; }
+ proton::io::connection_driver& driver() { return driver_; }
private:
static bool try_again(int e) {
@@ -322,11 +322,11 @@ class pollable_engine : public pollable {
}
bool write() {
- proton::io::const_buffer wbuf(engine_.write_buffer());
+ proton::io::const_buffer wbuf(driver_.write_buffer());
if (wbuf.size) {
ssize_t n = ::write(fd_, wbuf.data, wbuf.size);
if (n > 0) {
- engine_.write_done(n);
+ driver_.write_done(n);
return true;
} else if (n < 0 && !try_again(errno)) {
check(n, "write");
@@ -336,15 +336,15 @@ class pollable_engine : public pollable {
}
bool read() {
- proton::io::mutable_buffer rbuf(engine_.read_buffer());
+ proton::io::mutable_buffer rbuf(driver_.read_buffer());
if (rbuf.size) {
ssize_t n = ::read(fd_, rbuf.data, rbuf.size);
if (n > 0) {
- engine_.read_done(n);
+ driver_.read_done(n);
return true;
}
else if (n == 0)
- engine_.read_close();
+ driver_.read_close();
else if (!try_again(errno))
check(n, "read");
}
@@ -352,13 +352,13 @@ class pollable_engine : public pollable {
}
// Lifecycle note: loop_ belongs to the proton::connection, which can live
- // longer than the engine if the application holds a reference to it, we
- // disconnect ourselves with loop_->close() in ~connection_engine()
+ // longer than the driver if the application holds a reference to it, we
+ // disconnect ourselves with loop_->close() in ~connection_driver()
epoll_event_loop* loop_;
- proton::io::connection_engine engine_;
+ proton::io::connection_driver driver_;
};
-// A pollable listener fd that creates pollable_engine for incoming connections.
+// A pollable listener fd that creates pollable_driver for incoming connections.
class pollable_listener : public pollable {
public:
pollable_listener(
@@ -380,7 +380,7 @@ class pollable_listener : public pollable {
}
try {
int accepted = check(::accept(fd_, NULL, 0), "accept");
- container_.add_engine(listener_.on_accept(), accepted, true);
+ container_.add_driver(listener_.on_accept(), accepted, true);
return EPOLLIN;
} catch (const std::exception& e) {
listener_.on_error(e.what());
@@ -424,25 +424,25 @@ epoll_container::~epoll_container() {
} catch (...) {}
}
-proton::connection epoll_container::add_engine(proton::connection_options opts, int fd, bool server)
+proton::connection epoll_container::add_driver(proton::connection_options opts, int fd, bool server)
{
lock_guard g(lock_);
if (stopping_)
throw proton::error("container is stopping");
- std::unique_ptr<pollable_engine> eng(new pollable_engine(*this, fd, epoll_fd_));
+ std::unique_ptr<pollable_driver> eng(new pollable_driver(*this, fd, epoll_fd_));
if (server)
- eng->engine().accept(opts);
+ eng->driver().accept(opts);
else
- eng->engine().connect(opts);
- proton::connection c = eng->engine().connection();
+ eng->driver().connect(opts);
+ proton::connection c = eng->driver().connection();
eng->notify();
- engines_[eng.get()] = std::move(eng);
+ drivers_[eng.get()] = std::move(eng);
return c;
}
void epoll_container::erase(pollable* e) {
lock_guard g(lock_);
- if (!engines_.erase(e)) {
+ if (!drivers_.erase(e)) {
pollable_listener* l = dynamic_cast<pollable_listener*>(e);
if (l)
listeners_.erase(l->addr());
@@ -451,7 +451,7 @@ void epoll_container::erase(pollable* e) {
}
void epoll_container::idle_check(const lock_guard&) {
- if (stopping_ && engines_.empty() && listeners_.empty())
+ if (stopping_ && drivers_.empty() && listeners_.empty())
interrupt();
}
@@ -462,7 +462,7 @@ proton::returned<proton::connection> epoll_container::connect(
unique_addrinfo ainfo(addr);
unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));
check(::connect(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);
- return make_thread_safe(add_engine(opts, fd.release(), false));
+ return make_thread_safe(add_driver(opts, fd.release(), false));
}
proton::listener epoll_container::listen(const std::string& addr, proton::listen_handler& lh) {
@@ -520,10 +520,10 @@ void epoll_container::stop(const proton::error_condition& err) {
void epoll_container::wait() {
std::unique_lock<std::mutex> l(lock_);
stopped_.wait(l, [this]() { return this->threads_ == 0; } );
- for (auto& eng : engines_)
- eng.second->engine().disconnected(stop_err_);
+ for (auto& eng : drivers_)
+ eng.second->driver().disconnected(stop_err_);
listeners_.clear();
- engines_.clear();
+ drivers_.clear();
}
void epoll_container::interrupt() {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index ddab147..ffc6e10 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -369,7 +369,7 @@ set (qpid-proton-core
src/core/encoder.c
src/core/dispatcher.c
- src/core/connection_engine.c
+ src/core/connection_driver.c
src/core/engine.c
src/core/event.c
src/core/autodetect.c
@@ -440,7 +440,7 @@ set (qpid-proton-include
include/proton/codec.h
include/proton/condition.h
include/proton/connection.h
- include/proton/connection_engine.h
+ include/proton/connection_driver.h
include/proton/delivery.h
include/proton/disposition.h
include/proton/engine.h
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index ed969eb..6af4319 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -48,7 +48,7 @@ set(qpid-proton-cpp-source
src/error_condition.cpp
src/event_loop.cpp
src/handler.cpp
- src/io/connection_engine.cpp
+ src/io/connection_driver.cpp
src/io/link_namer.cpp
src/link.cpp
src/listener.cpp
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/docs/io.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/io.md b/proton-c/bindings/cpp/docs/io.md
index a892e61..230e538 100644
--- a/proton-c/bindings/cpp/docs/io.md
+++ b/proton-c/bindings/cpp/docs/io.md
@@ -7,16 +7,16 @@ The `proton::io` namespace contains a service provider interface (SPI)
that allows you to implement the Proton API over alternative IO or
threading libraries.
-The `proton::io::connection_engine` class converts an AMQP-encoded
+The `proton::io::connection_driver` class converts an AMQP-encoded
byte stream, read from any IO source, into `proton::messaging_handler`
calls. It generates an AMQP-encoded byte stream as output that can be
written to any IO destination.
-The connection engine is deliberately very simple and low level. It
+The connection driver is deliberately very simple and low level. It
performs no IO of its own, no thread-related locking, and is written
in simple C++98-compatible code.
-The connection engine can be used standalone as an AMQP translator, or
+The connection dirver can be used standalone as an AMQP translator, or
you can implement the following two interfaces to provide a complete
implementation of the Proton API that can run any Proton application:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/docs/main.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/main.md b/proton-c/bindings/cpp/docs/main.md
index 011df29..93ba2c0 100644
--- a/proton-c/bindings/cpp/docs/main.md
+++ b/proton-c/bindings/cpp/docs/main.md
@@ -123,6 +123,6 @@ The default container implementation is created using
`proton::default_container`.
You can implement your own container to integrate proton with any IO
-provider using the `proton::io::connection_engine`.
+provider using the `proton::io::connection_driver`.
@see @ref io_page
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index 9fbdbdc..d2deebf 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -40,7 +40,7 @@ class proton_handler;
class connection;
namespace io {
-class connection_engine;
+class connection_driver;
}
/// Options for creating a connection.
@@ -163,7 +163,7 @@ class connection_options {
/// @cond INTERNAL
friend class container_impl;
friend class connector;
- friend class io::connection_engine;
+ friend class io::connection_driver;
friend class connection;
/// @endcond
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
new file mode 100644
index 0000000..d5da718
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
@@ -0,0 +1,211 @@
+#ifndef PROTON_IO_CONNECTION_DRIVER_HPP
+#define PROTON_IO_CONNECTION_DRIVER_HPP
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "../internal/config.hpp"
+#include "../connection.hpp"
+#include "../connection_options.hpp"
+#include "../error.hpp"
+#include "../error_condition.hpp"
+#include "../internal/export.hpp"
+#include "../internal/pn_unique_ptr.hpp"
+#include "../transport.hpp"
+#include "../types.hpp"
+
+#include <proton/connection_driver.h>
+
+#include <cstddef>
+#include <utility>
+#include <string>
+
+namespace proton {
+
+class event_loop;
+class proton_handler;
+
+namespace io {
+
+/// **Experimental** - Pointer to a mutable memory region with a size.
+struct mutable_buffer {
+ char* data; ///< Beginning of the buffered data.
+ size_t size; ///< Number of bytes in the buffer.
+
+ /// Construct a buffer starting at data_ with size_ bytes.
+ mutable_buffer(char* data_=0, size_t size_=0) : data(data_), size(size_) {}
+};
+
+/// **Experimental** - Pointer to a const memory region with a size.
+struct const_buffer {
+ const char* data; ///< Beginning of the buffered data.
+ size_t size; ///< Number of bytes in the buffer.
+
+ /// Construct a buffer starting at data_ with size_ bytes.
+ const_buffer(const char* data_=0, size_t size_=0) : data(data_), size(size_) {}
+};
+
+/// **Experimental** - An AMQP driver for a single connection.
+///
+/// io::connection_driver manages a single proton::connection and dispatches
+/// events to a proton::messaging_handler. It does no IO of its own, but allows you to
+/// integrate AMQP protocol handling into any IO or concurrency framework.
+///
+/// The application is coded the same way as for the
+/// proton::container. The application implements a
+/// proton::messaging_handler to respond to transport, connection,
+/// session, link, and message events. With a little care, the same
+/// handler classes can be used for both container and
+/// connection_driver. the @ref broker.cpp example illustrates this.
+///
+/// You need to write the IO code to read AMQP data to the
+/// read_buffer(). The engine parses the AMQP frames. dispatch() calls
+/// the appropriate functions on the applications proton::messaging_handler. You
+/// write output data from the engine's write_buffer() to your IO.
+///
+/// The engine is not safe for concurrent use, but you can process
+/// different engines concurrently. A common pattern for
+/// high-performance servers is to serialize read/write activity
+/// per connection and dispatch in a fixed-size thread pool.
+///
+/// The engine is designed to work with a classic reactor (e.g.,
+/// select, poll, epoll) or an async-request driven proactor (e.g.,
+/// windows completion ports, boost.asio, libuv).
+///
+/// The engine never throws exceptions.
+class
+PN_CPP_CLASS_EXTERN connection_driver {
+ public:
+ /// An engine that is not associated with a proton::container or
+ /// proton::event_loop.
+ ///
+ /// Accessing the container or event_loop for this connection in
+ /// a proton::messaging_handler will throw a proton::error exception.
+ ///
+ PN_CPP_EXTERN connection_driver();
+
+ /// Create a connection driver associated with a proton::container and
+ /// optional event_loop. If the event_loop is not provided attempts to use
+ /// it will throw proton::error.
+ ///
+ /// Takes ownership of the event_loop. Note the proton::connection created
+ /// by this connection_driver can outlive the connection_driver itself if
+ /// the user pins it in memory using the proton::thread_safe<> template.
+ /// The event_loop is deleted when, and only when, the proton::connection is.
+ ///
+ PN_CPP_EXTERN connection_driver(proton::container&, event_loop* loop = 0);
+
+ PN_CPP_EXTERN ~connection_driver();
+
+ /// Configure a connection by applying exactly the options in opts (including proton::messaging_handler)
+ /// Does not apply any default options, to apply container defaults use connect() or accept()
+ /// instead. If server==true, configure a server connection.
+ void configure(const connection_options& opts=connection_options(), bool server=false);
+
+ /// Call configure() with client options and call connection::open()
+ /// Options applied: container::id(), container::client_connection_options(), opts.
+ PN_CPP_EXTERN void connect(const connection_options& opts);
+
+ /// Call configure() with server options.
+ /// Options applied: container::id(), container::server_connection_options(), opts.
+ ///
+ /// Note this does not call connection::open(). If there is a messaging_handler in the
+ /// composed options it will receive messaging_handler::on_connection_open() and can
+ /// respond with connection::open() or connection::close()
+ PN_CPP_EXTERN void accept(const connection_options& opts);
+
+ /// The engine's read buffer. Read data into this buffer then call read_done() when complete.
+ /// Returns mutable_buffer(0, 0) if the engine cannot currently read data.
+ /// Calling dispatch() may open up more buffer space.
+ PN_CPP_EXTERN mutable_buffer read_buffer();
+
+ /// Indicate that the first n bytes of read_buffer() have valid data.
+ /// This changes the buffer, call read_buffer() to get the updated buffer.
+ PN_CPP_EXTERN void read_done(size_t n);
+
+ /// Indicate that the read side of the transport is closed and no more data will be read.
+ /// Note that there may still be events to dispatch() or data to write.
+ PN_CPP_EXTERN void read_close();
+
+ /// The engine's write buffer. Write data from this buffer then call write_done()
+ /// Returns const_buffer(0, 0) if the engine has nothing to write.
+ /// Calling dispatch() may generate more data in the write buffer.
+ PN_CPP_EXTERN const_buffer write_buffer();
+
+ /// Indicate that the first n bytes of write_buffer() have been written successfully.
+ /// This changes the buffer, call write_buffer() to get the updated buffer.
+ PN_CPP_EXTERN void write_done(size_t n);
+
+ /// Indicate that the write side of the transport has closed and no more data can be written.
+ /// Note that there may still be events to dispatch() or data to read.
+ PN_CPP_EXTERN void write_close();
+
+ /// Inform the engine that the transport been disconnected unexpectedly,
+ /// without completing the AMQP connection close sequence.
+ ///
+ /// This calls read_close(), write_close(), sets the transport().error() and
+ /// queues an `on_transport_error` event. You must call dispatch() one more
+ /// time to dispatch the messaging_handler::on_transport_error() call and other final
+ /// events.
+ ///
+ /// Note this does not close the connection() so that a proton::messaging_handler can
+ /// distinguish between a connection close error sent by the remote peer and
+ /// a transport failure.
+ ///
+ PN_CPP_EXTERN void disconnected(const error_condition& = error_condition());
+
+ /// Dispatch all available events and call the corresponding \ref messaging_handler methods.
+ ///
+ /// Returns true if the engine is still active, false if it is finished and
+ /// can be destroyed. The engine is finished when all events are dispatched
+ /// and one of the following is true:
+ ///
+ /// - both read_close() and write_close() have been called, no more IO is possible.
+ /// - The AMQP connection() is closed AND the write_buffer() is empty.
+ ///
+ /// May modify the read_buffer() and/or the write_buffer().
+ ///
+ PN_CPP_EXTERN bool dispatch();
+
+ /// Get the AMQP connection associated with this connection_driver.
+ /// The event_loop is availabe via proton::thread_safe<connection>(connection())
+ PN_CPP_EXTERN proton::connection connection() const;
+
+ /// Get the transport associated with this connection_driver.
+ PN_CPP_EXTERN proton::transport transport() const;
+
+ /// Get the container associated with this connection_driver, if there is one.
+ PN_CPP_EXTERN proton::container* container() const;
+
+ private:
+ void init();
+ connection_driver(const connection_driver&);
+ connection_driver& operator=(const connection_driver&);
+
+ messaging_handler* handler_;
+ proton::container* container_;
+ pn_connection_driver_t driver_;
+};
+
+} // io
+} // proton
+
+#endif // PROTON_IO_CONNECTION_DRIVER_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
deleted file mode 100644
index d9825c2..0000000
--- a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
+++ /dev/null
@@ -1,215 +0,0 @@
-#ifndef PROTON_IO_CONNECTION_ENGINE_HPP
-#define PROTON_IO_CONNECTION_ENGINE_HPP
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "../internal/config.hpp"
-#include "../connection.hpp"
-#include "../connection_options.hpp"
-#include "../error.hpp"
-#include "../error_condition.hpp"
-#include "../internal/export.hpp"
-#include "../internal/pn_unique_ptr.hpp"
-#include "../transport.hpp"
-#include "../types.hpp"
-
-#include <proton/connection_engine.h>
-
-#include <cstddef>
-#include <utility>
-#include <string>
-
-namespace proton {
-
-class event_loop;
-class proton_handler;
-
-namespace io {
-
-/// **Experimental** - Pointer to a mutable memory region with a size.
-struct mutable_buffer {
- char* data; ///< Beginning of the buffered data.
- size_t size; ///< Number of bytes in the buffer.
-
- /// Construct a buffer starting at data_ with size_ bytes.
- mutable_buffer(char* data_=0, size_t size_=0) : data(data_), size(size_) {}
-};
-
-/// **Experimental** - Pointer to a const memory region with a size.
-struct const_buffer {
- const char* data; ///< Beginning of the buffered data.
- size_t size; ///< Number of bytes in the buffer.
-
- /// Construct a buffer starting at data_ with size_ bytes.
- const_buffer(const char* data_=0, size_t size_=0) : data(data_), size(size_) {}
-};
-
-/// **Experimental** - An AMQP protocol engine for a single
-/// connection.
-///
-/// A connection_engine is a protocol engine that integrates AMQP into
-/// any IO or concurrency framework.
-///
-/// io::connection_engine manages a single proton::connection and dispatches
-/// events to a proton::messaging_handler. It does no IO of its own, but allows you to
-/// integrate AMQP protocol handling into any IO or concurrency framework.
-///
-/// The application is coded the same way as for the
-/// proton::container. The application implements a
-/// proton::messaging_handler to respond to transport, connection,
-/// session, link, and message events. With a little care, the same
-/// handler classes can be used for both container and
-/// connection_engine. the @ref broker.cpp example illustrates this.
-///
-/// You need to write the IO code to read AMQP data to the
-/// read_buffer(). The engine parses the AMQP frames. dispatch() calls
-/// the appropriate functions on the applications proton::messaging_handler. You
-/// write output data from the engine's write_buffer() to your IO.
-///
-/// The engine is not safe for concurrent use, but you can process
-/// different engines concurrently. A common pattern for
-/// high-performance servers is to serialize read/write activity
-/// per connection and dispatch in a fixed-size thread pool.
-///
-/// The engine is designed to work with a classic reactor (e.g.,
-/// select, poll, epoll) or an async-request driven proactor (e.g.,
-/// windows completion ports, boost.asio, libuv).
-///
-/// The engine never throws exceptions.
-class
-PN_CPP_CLASS_EXTERN connection_engine {
- public:
- /// An engine that is not associated with a proton::container or
- /// proton::event_loop.
- ///
- /// Accessing the container or event_loop for this connection in
- /// a proton::messaging_handler will throw a proton::error exception.
- ///
- PN_CPP_EXTERN connection_engine();
-
- /// Create a connection engine associated with a proton::container and
- /// optional event_loop. If the event_loop is not provided attempts to use
- /// it will throw proton::error.
- ///
- /// Takes ownership of the event_loop. Note the proton::connection created
- /// by this connection_engine can outlive the connection_engine itself if
- /// the user pins it in memory using the proton::thread_safe<> template.
- /// The event_loop is deleted when, and only when, the proton::connection is.
- ///
- PN_CPP_EXTERN connection_engine(proton::container&, event_loop* loop = 0);
-
- PN_CPP_EXTERN ~connection_engine();
-
- /// Configure a connection by applying exactly the options in opts (including proton::messaging_handler)
- /// Does not apply any default options, to apply container defaults use connect() or accept()
- /// instead. If server==true, configure a server connection.
- void configure(const connection_options& opts=connection_options(), bool server=false);
-
- /// Call configure() with client options and call connection::open()
- /// Options applied: container::id(), container::client_connection_options(), opts.
- PN_CPP_EXTERN void connect(const connection_options& opts);
-
- /// Call configure() with server options.
- /// Options applied: container::id(), container::server_connection_options(), opts.
- ///
- /// Note this does not call connection::open(). If there is a messaging_handler in the
- /// composed options it will receive messaging_handler::on_connection_open() and can
- /// respond with connection::open() or connection::close()
- PN_CPP_EXTERN void accept(const connection_options& opts);
-
- /// The engine's read buffer. Read data into this buffer then call read_done() when complete.
- /// Returns mutable_buffer(0, 0) if the engine cannot currently read data.
- /// Calling dispatch() may open up more buffer space.
- PN_CPP_EXTERN mutable_buffer read_buffer();
-
- /// Indicate that the first n bytes of read_buffer() have valid data.
- /// This changes the buffer, call read_buffer() to get the updated buffer.
- PN_CPP_EXTERN void read_done(size_t n);
-
- /// Indicate that the read side of the transport is closed and no more data will be read.
- /// Note that there may still be events to dispatch() or data to write.
- PN_CPP_EXTERN void read_close();
-
- /// The engine's write buffer. Write data from this buffer then call write_done()
- /// Returns const_buffer(0, 0) if the engine has nothing to write.
- /// Calling dispatch() may generate more data in the write buffer.
- PN_CPP_EXTERN const_buffer write_buffer();
-
- /// Indicate that the first n bytes of write_buffer() have been written successfully.
- /// This changes the buffer, call write_buffer() to get the updated buffer.
- PN_CPP_EXTERN void write_done(size_t n);
-
- /// Indicate that the write side of the transport has closed and no more data can be written.
- /// Note that there may still be events to dispatch() or data to read.
- PN_CPP_EXTERN void write_close();
-
- /// Inform the engine that the transport been disconnected unexpectedly,
- /// without completing the AMQP connection close sequence.
- ///
- /// This calls read_close(), write_close(), sets the transport().error() and
- /// queues an `on_transport_error` event. You must call dispatch() one more
- /// time to dispatch the messaging_handler::on_transport_error() call and other final
- /// events.
- ///
- /// Note this does not close the connection() so that a proton::messaging_handler can
- /// distinguish between a connection close error sent by the remote peer and
- /// a transport failure.
- ///
- PN_CPP_EXTERN void disconnected(const error_condition& = error_condition());
-
- /// Dispatch all available events and call the corresponding \ref messaging_handler methods.
- ///
- /// Returns true if the engine is still active, false if it is finished and
- /// can be destroyed. The engine is finished when all events are dispatched
- /// and one of the following is true:
- ///
- /// - both read_close() and write_close() have been called, no more IO is possible.
- /// - The AMQP connection() is closed AND the write_buffer() is empty.
- ///
- /// May modify the read_buffer() and/or the write_buffer().
- ///
- PN_CPP_EXTERN bool dispatch();
-
- /// Get the AMQP connection associated with this connection_engine.
- /// The event_loop is availabe via proton::thread_safe<connection>(connection())
- PN_CPP_EXTERN proton::connection connection() const;
-
- /// Get the transport associated with this connection_engine.
- PN_CPP_EXTERN proton::transport transport() const;
-
- /// Get the container associated with this connection_engine, if there is one.
- PN_CPP_EXTERN proton::container* container() const;
-
- private:
- void init();
- connection_engine(const connection_engine&);
- connection_engine& operator=(const connection_engine&);
-
- messaging_handler* handler_;
- proton::container* container_;
- pn_connection_engine_t engine_;
-};
-
-} // io
-} // proton
-
-#endif // PROTON_IO_CONNECTION_ENGINE_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
index 2c5423f..acdcd30 100644
--- a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
@@ -40,7 +40,7 @@ class message;
class messaging_adapter;
namespace io {
-class connection_engine;
+class connection_driver;
}
/// A handler for Proton messaging events.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/transport.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/transport.hpp b/proton-c/bindings/cpp/include/proton/transport.hpp
index bcd8a2f..10641e0 100644
--- a/proton-c/bindings/cpp/include/proton/transport.hpp
+++ b/proton-c/bindings/cpp/include/proton/transport.hpp
@@ -35,7 +35,7 @@ class error_condition;
class sasl;
namespace io {
-class connection_engine;
+class connection_driver;
}
/// A network channel supporting an AMQP connection.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/engine_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/engine_test.cpp b/proton-c/bindings/cpp/src/engine_test.cpp
index 6c3341f..991836d 100644
--- a/proton-c/bindings/cpp/src/engine_test.cpp
+++ b/proton-c/bindings/cpp/src/engine_test.cpp
@@ -24,7 +24,7 @@
#include "proton/container.hpp"
#include "proton/uuid.hpp"
-#include "proton/io/connection_engine.hpp"
+#include "proton/io/connection_driver.hpp"
#include "proton/io/link_namer.hpp"
#include "proton/messaging_handler.hpp"
#include "proton/types_fwd.hpp"
@@ -37,7 +37,7 @@ namespace {
using namespace std;
using namespace proton;
-using proton::io::connection_engine;
+using proton::io::connection_driver;
using proton::io::const_buffer;
using proton::io::mutable_buffer;
@@ -45,14 +45,14 @@ using test::dummy_container;
typedef std::deque<char> byte_stream;
-/// In memory connection_engine that reads and writes from byte_streams
-struct in_memory_engine : public connection_engine {
+/// In memory connection_driver that reads and writes from byte_streams
+struct in_memory_engine : public connection_driver {
byte_stream& reads;
byte_stream& writes;
in_memory_engine(byte_stream& rd, byte_stream& wr, class container& cont) :
- connection_engine(cont), reads(rd), writes(wr) {}
+ connection_driver(cont), reads(rd), writes(wr) {}
void do_read() {
mutable_buffer rbuf = read_buffer();
@@ -247,7 +247,7 @@ void test_engine_disconnected() {
void test_no_container() {
// An engine with no container should throw, not crash.
- connection_engine e;
+ connection_driver e;
try {
e.connection().container();
FAIL("expected error");
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/include/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/contexts.hpp b/proton-c/bindings/cpp/src/include/contexts.hpp
index 74a763c..1d4194e 100644
--- a/proton-c/bindings/cpp/src/include/contexts.hpp
+++ b/proton-c/bindings/cpp/src/include/contexts.hpp
@@ -24,7 +24,7 @@
#include "proton/connection.hpp"
#include "proton/container.hpp"
-#include "proton/io/connection_engine.hpp"
+#include "proton/io/connection_driver.hpp"
#include "proton/event_loop.hpp"
#include "proton/listen_handler.hpp"
#include "proton/message.hpp"
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/io/connection_driver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_driver.cpp b/proton-c/bindings/cpp/src/io/connection_driver.cpp
new file mode 100644
index 0000000..06b01d8
--- /dev/null
+++ b/proton-c/bindings/cpp/src/io/connection_driver.cpp
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/io/connection_driver.hpp"
+
+#include "proton/event_loop.hpp"
+#include "proton/error.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/uuid.hpp"
+
+#include "contexts.hpp"
+#include "messaging_adapter.hpp"
+#include "msg.hpp"
+#include "proton_bits.hpp"
+#include "proton_event.hpp"
+
+#include <proton/connection.h>
+#include <proton/transport.h>
+#include <proton/event.h>
+
+#include <algorithm>
+
+
+namespace proton {
+namespace io {
+
+void connection_driver::init() {
+ if (pn_connection_driver_init(&driver_, pn_connection(), pn_transport()) != 0) {
+ this->~connection_driver(); // Dtor won't be called on throw from ctor.
+ throw proton::error(std::string("connection_driver allocation failed"));
+ }
+}
+
+connection_driver::connection_driver() : handler_(0), container_(0) { init(); }
+
+connection_driver::connection_driver(class container& cont, event_loop* loop) : handler_(0), container_(&cont) {
+ init();
+ connection_context& ctx = connection_context::get(connection());
+ ctx.container = container_;
+ ctx.event_loop.reset(loop);
+}
+
+connection_driver::~connection_driver() {
+ pn_connection_driver_destroy(&driver_);
+}
+
+// FIXME aconway 2016-11-16: rename _engine > _driver
+void connection_driver::configure(const connection_options& opts, bool server) {
+ proton::connection c(connection());
+ opts.apply_unbound(c);
+ if (server) pn_transport_set_server(driver_.transport);
+ pn_connection_driver_bind(&driver_);
+ opts.apply_bound(c);
+ handler_ = opts.handler();
+ connection_context::get(connection()).collector =
+ pn_connection_collector(driver_.connection);
+}
+
+void connection_driver::connect(const connection_options& opts) {
+ connection_options all;
+ if (container_) {
+ all.container_id(container_->id());
+ all.update(container_->client_connection_options());
+ }
+ all.update(opts);
+ configure(all, false);
+ connection().open();
+}
+
+void connection_driver::accept(const connection_options& opts) {
+ connection_options all;
+ if (container_) {
+ all.container_id(container_->id());
+ all.update(container_->server_connection_options());
+ }
+ all.update(opts);
+ configure(all, true);
+}
+
+bool connection_driver::dispatch() {
+ pn_event_t* c_event;
+ while ((c_event = pn_connection_driver_next_event(&driver_)) != NULL) {
+ proton_event cpp_event(c_event, container_);
+ try {
+ if (handler_ != 0) {
+ messaging_adapter adapter(*handler_);
+ cpp_event.dispatch(adapter);
+ }
+ } catch (const std::exception& e) {
+ pn_condition_t *cond = pn_transport_condition(driver_.transport);
+ if (!pn_condition_is_set(cond)) {
+ pn_condition_format(cond, "exception", "%s", e.what());
+ }
+ }
+ }
+ return !pn_connection_driver_finished(&driver_);
+}
+
+mutable_buffer connection_driver::read_buffer() {
+ pn_rwbytes_t buffer = pn_connection_driver_read_buffer(&driver_);
+ return mutable_buffer(buffer.start, buffer.size);
+}
+
+void connection_driver::read_done(size_t n) {
+ return pn_connection_driver_read_done(&driver_, n);
+}
+
+void connection_driver::read_close() {
+ pn_connection_driver_read_close(&driver_);
+}
+
+const_buffer connection_driver::write_buffer() {
+ pn_bytes_t buffer = pn_connection_driver_write_buffer(&driver_);
+ return const_buffer(buffer.start, buffer.size);
+}
+
+void connection_driver::write_done(size_t n) {
+ return pn_connection_driver_write_done(&driver_, n);
+}
+
+void connection_driver::write_close() {
+ pn_connection_driver_write_close(&driver_);
+}
+
+void connection_driver::disconnected(const proton::error_condition& err) {
+ pn_condition_t* condition = pn_transport_condition(driver_.transport);
+ if (!pn_condition_is_set(condition)) {
+ set_error_condition(err, condition);
+ }
+ pn_connection_driver_close(&driver_);
+}
+
+proton::connection connection_driver::connection() const {
+ return make_wrapper(driver_.connection);
+}
+
+proton::transport connection_driver::transport() const {
+ return make_wrapper(driver_.transport);
+}
+
+proton::container* connection_driver::container() const {
+ return container_;
+}
+
+}}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org