You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/25 21:01:31 UTC
[35/48] qpid-proton git commit: PROTON-1344: proactor
listener/conneciton configuration
PROTON-1344: proactor listener/conneciton configuration
Dropped extra bytes mechanism, may be re-introduced later.
Added context and attachments to pn_listener_t, consistent with pn_connection_t
Configure connection/listener before calling proactor connect/listen.
Added PN_LISTENER_ACCEPT event so accepted connections can be configured.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/aadfcbbb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/aadfcbbb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/aadfcbbb
Branch: refs/heads/go1
Commit: aadfcbbb7a7b75eb442df4d4de8ef97eb2e7a754
Parents: f2c8a3a
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Nov 17 00:14:12 2016 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Nov 17 11:22:50 2016 -0500
----------------------------------------------------------------------
examples/c/proactor/broker.c | 33 ++--
examples/c/proactor/libuv_proactor.c | 246 +++++++++++++++++-------------
examples/c/proactor/receive.c | 2 +-
examples/c/proactor/send.c | 2 +-
proton-c/include/proton/connection.h | 12 --
proton-c/include/proton/event.h | 12 +-
proton-c/include/proton/extra.h | 69 ---------
proton-c/include/proton/listener.h | 43 +++++-
proton-c/include/proton/proactor.h | 32 ++--
proton-c/src/core/engine.c | 16 +-
10 files changed, 226 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index 66381fc..ca52336 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -158,6 +158,15 @@ typedef struct broker_data_t {
bool check_queues; /* Check senders on the connection for available data in queues. */
} broker_data_t;
+/* Use the context pointer as a boolean flag to indicate we need to check queues */
+void pn_connection_set_check_queues(pn_connection_t *c, bool check) {
+ pn_connection_set_context(c, (void*)check);
+}
+
+bool pn_connection_get_check_queues(pn_connection_t *c) {
+ return (bool)pn_connection_get_context(c);
+}
+
/* Put a message on the queue, called in receiver dispatch loop.
If the queue was previously empty, notify waiting senders.
*/
@@ -168,8 +177,7 @@ static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
if (q->messages.len == 1) { /* Was empty, notify waiting connections */
for (size_t i = 0; i < q->waiting.len; ++i) {
pn_connection_t *c = q->waiting.data[i];
- broker_data_t *bd = (broker_data_t*)pn_connection_get_extra(c).start;
- bd->check_queues = true;
+ pn_connection_set_check_queues(c, true);
pn_connection_wake(c); /* Wake the connection */
}
q->waiting.len = 0;
@@ -215,7 +223,6 @@ queue_t* queues_get(queues_t *qs, const char* name) {
/* The broker implementation */
typedef struct broker_t {
pn_proactor_t *proactor;
- pn_listener_t *listener;
queues_t queues;
const char *container_id; /* AMQP container-id */
size_t threads;
@@ -226,7 +233,6 @@ typedef struct broker_t {
void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) {
memset(b, 0, sizeof(*b));
b->proactor = pn_proactor();
- b->listener = NULL;
queues_init(&b->queues);
b->container_id = container_id;
b->threads = threads;
@@ -300,10 +306,14 @@ static void handle(broker_t* b, pn_event_t* e) {
switch (pn_event_type(e)) {
- case PN_CONNECTION_INIT: {
+ case PN_LISTENER_ACCEPT:
+ pn_listener_accept(pn_event_listener(e), pn_connection());
+ break;
+
+ case PN_CONNECTION_INIT:
pn_connection_set_container(c, b->container_id);
break;
- }
+
case PN_CONNECTION_BOUND: {
/* Turn off security */
pn_transport_t *t = pn_connection_transport(c);
@@ -316,9 +326,8 @@ static void handle(broker_t* b, pn_event_t* e) {
break;
}
case PN_CONNECTION_WAKE: {
- broker_data_t *bd = (broker_data_t*)pn_connection_get_extra(c).start;
- if (bd->check_queues) {
- bd->check_queues = false;
+ if (pn_connection_get_check_queues(c)) {
+ pn_connection_set_check_queues(c, false);
int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
link_send(b, l);
@@ -456,11 +465,7 @@ int main(int argc, char **argv) {
*/
const char *host = url ? pn_url_get_host(url) : "::";
const char *port = url ? pn_url_get_port(url) : "amqp";
-
- /* Initial broker_data value copied to each accepted connection */
- broker_data_t bd = { false };
- b.listener = pn_proactor_listen(b.proactor, host, port, 16,
- pn_bytes(sizeof(bd), (char*)&bd));
+ pn_proactor_listen(b.proactor, pn_listener(), host, port, 16);
printf("listening on '%s:%s' %zd threads\n", host, port, b.threads);
if (url) pn_url_free(url);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/examples/c/proactor/libuv_proactor.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c
index a26c311..9770166 100644
--- a/examples/c/proactor/libuv_proactor.c
+++ b/examples/c/proactor/libuv_proactor.c
@@ -24,7 +24,6 @@
#include <proton/condition.h>
#include <proton/connection_driver.h>
#include <proton/engine.h>
-#include <proton/extra.h>
#include <proton/message.h>
#include <proton/object.h>
#include <proton/proactor.h>
@@ -142,13 +141,15 @@ struct pn_listener_t {
psocket_t psocket;
/* Only used by owner thread */
+ pconnection_t *accepting; /* accept in progress */
pn_condition_t *condition;
pn_collector_t *collector;
pn_event_batch_t batch;
+ pn_record_t *attachments;
+ void *context;
size_t backlog;
};
-PN_EXTRA_DECLARE(pn_listener_t);
typedef struct queue { psocket_t *front, *back; } queue;
@@ -222,24 +223,26 @@ static void to_leader(psocket_t *ps) {
/* Detach from IO and put ps on the worker queue */
static void leader_to_worker(psocket_t *ps) {
- pconnection_t *pc = as_pconnection_t(ps);
- /* Don't detach if there are no events yet. */
- if (pc && pn_connection_driver_has_event(&pc->driver)) {
- if (pc->writing) {
- pc->writing = 0;
- uv_cancel((uv_req_t*)&pc->write);
- }
- if (pc->reading) {
- pc->reading = false;
- uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
- }
- if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
- uv_timer_stop(&pc->timer);
+ if (ps->is_conn) {
+ pconnection_t *pc = as_pconnection_t(ps);
+ /* Don't detach if there are no events yet. */
+ if (pn_connection_driver_has_event(&pc->driver)) {
+ if (pc->writing) {
+ pc->writing = 0;
+ uv_cancel((uv_req_t*)&pc->write);
+ }
+ if (pc->reading) {
+ pc->reading = false;
+ uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+ }
+ if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
+ uv_timer_stop(&pc->timer);
+ }
}
+ } else {
+ pn_listener_t *l = as_listener(ps);
+ uv_read_stop((uv_stream_t*)&l->psocket.tcp);
}
-
- /* Nothing to do for a listener, on_accept doesn't touch worker state. */
-
uv_mutex_lock(&ps->proactor->lock);
push_lh(&ps->proactor->worker_q, ps);
uv_mutex_unlock(&ps->proactor->lock);
@@ -275,15 +278,12 @@ static void worker_requeue(psocket_t* ps) {
uv_mutex_unlock(&ps->proactor->lock);
}
-static pconnection_t *new_pconnection_t(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) {
+static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
if (!pc) return NULL;
- if (pn_connection_driver_init(&pc->driver, pn_connection_with_extra(extra.size), NULL) != 0) {
+ if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
return NULL;
}
- if (extra.start && extra.size) {
- memcpy(pn_connection_get_extra(pc->driver.connection).start, extra.start, extra.size);
- }
psocket_init(&pc->psocket, p, true, host, port);
if (server) {
pn_transport_set_server(pc->driver.transport);
@@ -312,26 +312,6 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
}
-pn_listener_t *new_listener(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
- pn_listener_t *l = (pn_listener_t*)calloc(1, PN_EXTRA_SIZEOF(pn_listener_t, extra.size));
- if (!l) {
- return NULL;
- }
- l->collector = pn_collector();
- if (!l->collector) {
- free(l);
- return NULL;
- }
- if (extra.start && extra.size) {
- memcpy(pn_listener_get_extra(l).start, extra.start, extra.size);
- }
- psocket_init(&l->psocket, p, false, host, port);
- l->condition = pn_condition();
- l->batch.next_event = listener_batch_next;
- l->backlog = backlog;
- return l;
-}
-
static void leader_count(pn_proactor_t *p, int change) {
uv_mutex_lock(&p->lock);
p->count += change;
@@ -456,24 +436,23 @@ static void on_connect(uv_connect_t *connect, int err) {
}
static void on_accept(uv_stream_t* server, int err) {
- pn_listener_t* l = (pn_listener_t*)server->data;
- if (!err) {
- pn_rwbytes_t v = pn_listener_get_extra(l);
- pconnection_t *pc = new_pconnection_t(l->psocket.proactor, true,
- fixstr(l->psocket.host),
- fixstr(l->psocket.port),
- pn_bytes(v.size, v.start));
- if (pc) {
- int err2 = leader_init(&pc->psocket);
- if (!err2) err2 = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
- leader_connect_accept(pc, err2, "on accept");
- } else {
- err = UV_ENOMEM;
- }
- }
+ pn_listener_t *l = (pn_listener_t*) server->data;
if (err) {
leader_error(&l->psocket, err, "on accept");
}
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+ leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */
+}
+
+static void leader_accept(psocket_t *ps) {
+ pn_listener_t * l = as_listener(ps);
+ pconnection_t *pc = l->accepting;
+ l->accepting = NULL;
+ if (pc) {
+ int err = leader_init(&pc->psocket);
+ if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
+ leader_connect_accept(pc, err, "on accept");
+ }
}
static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
@@ -570,31 +549,39 @@ static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
}
static void leader_rewatch(psocket_t *ps) {
- pconnection_t *pc = as_pconnection_t(ps);
-
- if (pc->timer.data) { /* uv-initialized */
- on_tick(&pc->timer); /* Re-enable ticks if required */
- }
- pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
- pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-
- /* Ticks and checking buffers can generate events, process before proceeding */
- if (pn_connection_driver_has_event(&pc->driver)) {
- leader_to_worker(ps);
- } else { /* Re-watch for IO */
- if (wbuf.size > 0 && !pc->writing) {
- pc->writing = wbuf.size;
- uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
- pc->write.data = ps;
- uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
- } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
- pc->shutdown.data = ps;
- uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
+ int err = 0;
+ if (ps->is_conn) {
+ pconnection_t *pc = as_pconnection_t(ps);
+ if (pc->timer.data) { /* uv-initialized */
+ on_tick(&pc->timer); /* Re-enable ticks if required */
}
- if (rbuf.size > 0 && !pc->reading) {
- pc->reading = true;
- uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+ pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+ pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+
+ /* Ticks and checking buffers can generate events, process before proceeding */
+ if (pn_connection_driver_has_event(&pc->driver)) {
+ leader_to_worker(ps);
+ } else { /* Re-watch for IO */
+ if (wbuf.size > 0 && !pc->writing) {
+ pc->writing = wbuf.size;
+ uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+ pc->write.data = ps;
+ uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
+ } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
+ pc->shutdown.data = ps;
+ uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
+ }
+ if (rbuf.size > 0 && !pc->reading) {
+ pc->reading = true;
+ err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+ }
}
+ } else {
+ pn_listener_t *l = as_listener(ps);
+ err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
+ }
+ if (err) {
+ leader_error(ps, err, "rewatch");
}
}
@@ -668,6 +655,11 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
}
return;
}
+ pn_listener_t *l = batch_listener(batch);
+ if (l) {
+ owner_to_leader(&l->psocket, leader_rewatch);
+ return;
+ }
pn_proactor_t *bp = batch_proactor(batch);
if (bp == p) {
uv_mutex_lock(&p->lock);
@@ -676,7 +668,6 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
uv_mutex_unlock(&p->lock);
return;
}
- /* Nothing extra to do for listener, it is always in the UV loop. */
}
/* Run follower/leader loop till we can return an event and be a worker */
@@ -742,8 +733,8 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
uv_mutex_unlock(&p->lock);
}
-int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn_bytes_t extra) {
- pconnection_t *pc = new_pconnection_t(p, false, host, port, extra);
+int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
+ pconnection_t *pc = new_pconnection_t(p, c, false, host, port);
if (!pc) {
return PN_OUT_OF_MEMORY;
}
@@ -752,12 +743,12 @@ int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn
return 0;
}
-pn_rwbytes_t pn_listener_get_extra(pn_listener_t *l) { return PN_EXTRA_GET(pn_listener_t, l); }
-
-pn_listener_t *pn_proactor_listen(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
- pn_listener_t *l = new_listener(p, host, port, backlog, extra);
- if (l) owner_to_leader(&l->psocket, leader_listen);
- return l;
+int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog)
+{
+ psocket_init(&l->psocket, p, false, host, port);
+ l->backlog = backlog;
+ owner_to_leader(&l->psocket, leader_listen);
+ return 0;
}
pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
@@ -765,10 +756,6 @@ pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
return pc ? pc->psocket.proactor : NULL;
}
-pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
- return l ? l->psocket.proactor : NULL;
-}
-
void leader_wake_connection(psocket_t *ps) {
pconnection_t *pc = as_pconnection_t(ps);
pn_connection_t *c = pc->driver.connection;
@@ -780,15 +767,6 @@ void pn_connection_wake(pn_connection_t* c) {
wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
}
-void pn_listener_close(pn_listener_t* l) {
- wakeup(&l->psocket, leader_close);
-}
-
-/* Only called when condition is closed by error. */
-pn_condition_t* pn_listener_condition(pn_listener_t* l) {
- return l->condition;
-}
-
pn_proactor_t *pn_proactor() {
pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
p->collector = pn_collector();
@@ -831,3 +809,65 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
return pn_collector_next(batch_proactor(batch)->collector);
}
+
+pn_listener_t *pn_listener() {
+ pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
+ if (l) {
+ l->batch.next_event = listener_batch_next;
+ l->collector = pn_collector();
+ l->condition = pn_condition();
+ l->attachments = pn_record();
+ if (!l->condition || !l->collector || !l->attachments) {
+ pn_listener_free(l);
+ return NULL;
+ }
+ }
+ return l;
+}
+
+void pn_listener_free(pn_listener_t *l) {
+ if (l) {
+ if (!l->collector) pn_collector_free(l->collector);
+ if (!l->condition) pn_condition_free(l->condition);
+ if (!l->attachments) pn_free(l->attachments);
+ free(l);
+ }
+}
+
+void pn_listener_close(pn_listener_t* l) {
+ wakeup(&l->psocket, leader_close);
+}
+
+pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
+ return l ? l->psocket.proactor : NULL;
+}
+
+pn_condition_t* pn_listener_condition(pn_listener_t* l) {
+ return l->condition;
+}
+
+void *pn_listener_get_context(pn_listener_t *l) {
+ return l->context;
+}
+
+void pn_listener_set_context(pn_listener_t *l, void *context) {
+ l->context = context;
+}
+
+pn_record_t *pn_listener_attachments(pn_listener_t *l) {
+ return l->attachments;
+}
+
+int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
+ if (l->accepting) {
+ return PN_STATE_ERR; /* Only one at a time */
+ }
+ l->accepting = new_pconnection_t(
+ l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
+ if (!l->accepting) {
+ return UV_ENOMEM;
+ }
+ owner_to_leader(&l->psocket, leader_accept);
+ return 0;
+}
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index 88e3456..b8edcd6 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -187,7 +187,7 @@ int main(int argc, char **argv) {
/* Create the proactor and connect */
app.proactor = pn_proactor();
- pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
+ pn_proactor_connect(app.proactor, pn_connection(), host, port);
if (url) pn_url_free(url);
do {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index 42facb0..d611b3d 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -216,7 +216,7 @@ int main(int argc, char **argv) {
/* Create the proactor and connect */
app.proactor = pn_proactor();
- pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
+ pn_proactor_connect(app.proactor, pn_connection(), host, port);
if (url) pn_url_free(url);
do {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/connection.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection.h b/proton-c/include/proton/connection.h
index 70fad73..5b966cd 100644
--- a/proton-c/include/proton/connection.h
+++ b/proton-c/include/proton/connection.h
@@ -156,7 +156,6 @@ PN_EXTERN pn_collector_t* pn_connection_collector(pn_connection_t *connection);
/**
- * @deprecated
* Get the application context that is associated with a connection
* object.
*
@@ -169,7 +168,6 @@ PN_EXTERN pn_collector_t* pn_connection_collector(pn_connection_t *connection);
PN_EXTERN void *pn_connection_get_context(pn_connection_t *connection);
/**
- * @deprecated
* Set a new application context for a connection object.
*
* The application context for a connection object may be retrieved
@@ -485,16 +483,6 @@ PN_EXTERN pn_data_t *pn_connection_remote_properties(pn_connection_t *connection
*/
PN_EXTERN pn_transport_t *pn_connection_transport(pn_connection_t *connection);
-/**
- * Create a connection with `size` bytes of extra aligned storage in the same heap block.
- */
-PN_EXTERN pn_connection_t* pn_connection_with_extra(size_t size);
-
-/**
- * Get the start and size of extra storage allocated by pn_connection_extra()
- */
-PN_EXTERN pn_rwbytes_t pn_connection_get_extra(pn_connection_t *connection);
-
/** @}
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index 31d4bdd..7793f1c 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -323,19 +323,25 @@ typedef enum {
PN_CONNECTION_WAKE,
/**
- * pn_listener_close() was called or an error occurred, see pn_listener_condition()
+ * Indicates the listener is ready to call pn_listener_accept()
+ * Events of this type point to the @ref pn_listener_t.
+ */
+ PN_LISTENER_ACCEPT,
+
+ /**
+ * Indicates the listener has closed. pn_listener_condition() provides error information.
* Events of this type point to the @ref pn_listener_t.
*/
PN_LISTENER_CLOSE,
/**
- * pn_proactor_interrupt() was called to interrupt a proactor thread
+ * Indicates pn_proactor_interrupt() was called to interrupt a proactor thread
* Events of this type point to the @ref pn_proactor_t.
*/
PN_PROACTOR_INTERRUPT,
/**
- * pn_proactor_set_timeout() time limit expired.
+ * Timeout set by pn_proactor_set_timeout() time limit expired.
* Events of this type point to the @ref pn_proactor_t.
*/
PN_PROACTOR_TIMEOUT,
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/extra.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/extra.h b/proton-c/include/proton/extra.h
deleted file mode 100644
index ea2e1ef..0000000
--- a/proton-c/include/proton/extra.h
+++ /dev/null
@@ -1,69 +0,0 @@
-#ifndef EXTRA_H
-#define EXTRA_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <proton/type_compat.h>
-#include <proton/types.h>
-#include <stddef.h>
-#include <stdlib.h>
-
-/**
- * @cond INTERNAL
- * Support for allocating extra aligned memory after a type.
- */
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-/**
- * extra_t contains a size and is maximally aligned so the memory immediately
- * after it can store any type of value.
- */
-typedef union pn_extra_t {
- size_t size;
-#if __STDC_VERSION__ >= 201112
- max_align_t max;
-#else
-/* Not standard but fairly safe */
- uint64_t i;
- long double d;
- void *v;
- void (*fp)(void);
-#endif
-} pn_extra_t;
-
-static inline pn_rwbytes_t pn_extra_rwbytes(pn_extra_t *x) {
- return pn_rwbytes(x->size, (char*)(x+1));
-}
-
-/* Declare private helper struct for T */
-#define PN_EXTRA_DECLARE(T) typedef struct T##__extra { T base; pn_extra_t extra; } T##__extra
-#define PN_EXTRA_SIZEOF(T, N) (sizeof(T##__extra)+(N))
-#define PN_EXTRA_GET(T, P) pn_extra_rwbytes(&((T##__extra*)(P))->extra)
-
-#ifdef __cplusplus
-}
-#endif
-
-/** @endcond */
-
-#endif // EXTRA_H
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/listener.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h
index f55479b..5e60649 100644
--- a/proton-c/include/proton/listener.h
+++ b/proton-c/include/proton/listener.h
@@ -40,30 +40,57 @@ typedef struct pn_proactor_t pn_proactor_t;
typedef struct pn_condition_t pn_condition_t;
/**
- * Listener accepts connections, see pn_proactor_listen()
+ * A listener accepts connections.
*/
typedef struct pn_listener_t pn_listener_t;
/**
- * The proactor that created the listener.
+ * Create a listener.
*/
-pn_proactor_t *pn_listener_proactor(pn_listener_t *c);
+PN_EXTERN pn_listener_t *pn_listener(void);
+
+/**
+ * Free a listener
+ */
+PN_EXTERN void pn_listener_free(pn_listener_t*);
+
+/**
+ * Asynchronously accept a connection using the listener.
+ *
+ * @param[in] connection the listener takes ownership, do not free.
+ */
+PN_EXTERN int pn_listener_accept(pn_listener_t*, pn_connection_t *connection);
/**
* Get the error condition for a listener.
*/
-pn_condition_t *pn_listener_condition(pn_listener_t *l);
+PN_EXTERN pn_condition_t *pn_listener_condition(pn_listener_t *l);
/**
- * Get the user-provided value associated with the listener in pn_proactor_listen()
- * The start address is aligned so you can cast it to any type.
+ * Get the application context that is associated with a listener.
*/
-pn_rwbytes_t pn_listener_get_extra(pn_listener_t*);
+PN_EXTERN void *pn_listener_get_context(pn_listener_t *listener);
+
+/**
+ * Set a new application context for a listener.
+ */
+PN_EXTERN void pn_listener_set_context(pn_listener_t *listener, void *context);
+
+/**
+ * Get the attachments that are associated with a listener object.
+ */
+PN_EXTERN pn_record_t *pn_listener_attachments(pn_listener_t *listener);
/**
* Close the listener (thread safe).
*/
-void pn_listener_close(pn_listener_t *l);
+PN_EXTERN void pn_listener_close(pn_listener_t *l);
+
+/**
+ * The proactor associated with a listener.
+ */
+PN_EXTERN pn_proactor_t *pn_listener_proactor(pn_listener_t *c);
+
/**
*@}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index e23a24f..9d39c9c 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -68,30 +68,30 @@ pn_proactor_t *pn_proactor(void);
void pn_proactor_free(pn_proactor_t*);
/**
- * Asynchronous connect: a connection and transport will be created, the
- * relevant events will be returned by pn_proactor_wait()
+ * Connect connection to host/port. Connection and transport events will be
+ * returned by pn_proactor_wait()
*
- * Errors are indicated by PN_TRANSPORT_ERROR/PN_TRANSPORT_CLOSE events.
+ * @param[in] connection the proactor takes ownership do not free.
+ * @param[in] host the address to listen on
+ * @param[in] port the port to connect to
*
- * @param extra bytes to copy to pn_connection_get_extra() on the new connection, @ref
- * pn_rwbytes_null for nothing.
- *
- * @return error if the connect cannot be initiated e.g. an allocation failure.
- * IO errors will be returned as transport events via pn_proactor_wait()
+ * @return error on immediate error, e.g. an allocation failure.
+ * Other errors are indicated by connection or transport events via pn_proactor_wait()
*/
-int pn_proactor_connect(pn_proactor_t*, const char *host, const char *port, pn_bytes_t extra);
+int pn_proactor_connect(pn_proactor_t*, pn_connection_t *connection, const char *host, const char *port);
/**
- * Asynchronous listen: start listening, connections will be returned by pn_proactor_wait()
- * An error are indicated by PN_LISTENER_ERROR event.
+ * Start listening with listener.
+ * pn_proactor_wait() will return a PN_LISTENER_ACCEPT event when a connection can be accepted.
*
- * @param extra bytes to copy to pn_connection_get_extra() on the new connection, @ref
- * pn_rwbytes_null for nothing.
+ * @param[in] listener proactor takes ownership of listener, do not free.
+ * @param[in] host the address to listen on
+ * @param[in] port the port to listen on
*
- * @return error if the connect cannot be initiated e.g. an allocation failure.
- * IO errors will be returned as transport events via pn_proactor_wait()
+ * @return error on immediate error, e.g. an allocation failure.
+ * Other errors are indicated by pn_listener_condition() on the PN_LISTENER_CLOSE event.
*/
-pn_listener_t *pn_proactor_listen(pn_proactor_t *, const char *host, const char *port, int backlog, pn_bytes_t extra);
+int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *listener, const char *host, const char *port, int backlog);
/**
* Wait for events to handle. Call pn_proactor_done() after handling events.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/src/core/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/engine.c b/proton-c/src/core/engine.c
index 2836a43..99d311b 100644
--- a/proton-c/src/core/engine.c
+++ b/proton-c/src/core/engine.c
@@ -32,9 +32,6 @@
#include "platform/platform_fmt.h"
#include "transport.h"
-#include <proton/extra.h>
-
-
static void pni_session_bound(pn_session_t *ssn);
static void pni_link_bound(pn_link_t *link);
@@ -511,15 +508,10 @@ static void pn_connection_finalize(void *object)
#define pn_connection_compare NULL
#define pn_connection_inspect NULL
-PN_EXTRA_DECLARE(pn_connection_t);
-
-pn_rwbytes_t pn_connection_get_extra(pn_connection_t *c) { return PN_EXTRA_GET(pn_connection_t, c); }
-
-pn_connection_t *pn_connection_with_extra(size_t extra)
+pn_connection_t *pn_connection()
{
static const pn_class_t clazz = PN_CLASS(pn_connection);
- size_t size = PN_EXTRA_SIZEOF(pn_connection_t, extra);
- pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, size);
+ pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, sizeof(pn_connection_t));
if (!conn) return NULL;
conn->endpoint_head = NULL;
@@ -548,10 +540,6 @@ pn_connection_t *pn_connection_with_extra(size_t extra)
return conn;
}
-pn_connection_t *pn_connection(void) {
- return pn_connection_with_extra(0);
-}
-
static const pn_event_type_t endpoint_init_event_map[] = {
PN_CONNECTION_INIT, /* CONNECTION */
PN_SESSION_INIT, /* SESSION */
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org