You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/25 21:01:31 UTC

[35/48] qpid-proton git commit: PROTON-1344: proactor listener/conneciton configuration

PROTON-1344: proactor listener/conneciton configuration

Dropped extra bytes mechanism, may be re-introduced later.

Added context and attachments to pn_listener_t, consistent with pn_connection_t
Configure connection/listener before calling proactor connect/listen.
Added PN_LISTENER_ACCEPT event so accepted connections can be configured.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/aadfcbbb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/aadfcbbb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/aadfcbbb

Branch: refs/heads/go1
Commit: aadfcbbb7a7b75eb442df4d4de8ef97eb2e7a754
Parents: f2c8a3a
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Nov 17 00:14:12 2016 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Nov 17 11:22:50 2016 -0500

----------------------------------------------------------------------
 examples/c/proactor/broker.c         |  33 ++--
 examples/c/proactor/libuv_proactor.c | 246 +++++++++++++++++-------------
 examples/c/proactor/receive.c        |   2 +-
 examples/c/proactor/send.c           |   2 +-
 proton-c/include/proton/connection.h |  12 --
 proton-c/include/proton/event.h      |  12 +-
 proton-c/include/proton/extra.h      |  69 ---------
 proton-c/include/proton/listener.h   |  43 +++++-
 proton-c/include/proton/proactor.h   |  32 ++--
 proton-c/src/core/engine.c           |  16 +-
 10 files changed, 226 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index 66381fc..ca52336 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -158,6 +158,15 @@ typedef struct broker_data_t {
   bool check_queues;          /* Check senders on the connection for available data in queues. */
 } broker_data_t;
 
+/* Use the context pointer as a boolean flag to indicate we need to check queues */
+void pn_connection_set_check_queues(pn_connection_t *c, bool check) {
+  pn_connection_set_context(c, (void*)check);
+}
+
+bool pn_connection_get_check_queues(pn_connection_t *c) {
+  return (bool)pn_connection_get_context(c);
+}
+
 /* Put a message on the queue, called in receiver dispatch loop.
    If the queue was previously empty, notify waiting senders.
 */
@@ -168,8 +177,7 @@ static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
   if (q->messages.len == 1) { /* Was empty, notify waiting connections */
     for (size_t i = 0; i < q->waiting.len; ++i) {
       pn_connection_t *c = q->waiting.data[i];
-      broker_data_t *bd = (broker_data_t*)pn_connection_get_extra(c).start;
-      bd->check_queues = true;
+      pn_connection_set_check_queues(c, true);
       pn_connection_wake(c); /* Wake the connection */
     }
     q->waiting.len = 0;
@@ -215,7 +223,6 @@ queue_t* queues_get(queues_t *qs, const char* name) {
 /* The broker implementation */
 typedef struct broker_t {
   pn_proactor_t *proactor;
-  pn_listener_t *listener;
   queues_t queues;
   const char *container_id;     /* AMQP container-id */
   size_t threads;
@@ -226,7 +233,6 @@ typedef struct broker_t {
 void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) {
   memset(b, 0, sizeof(*b));
   b->proactor = pn_proactor();
-  b->listener = NULL;
   queues_init(&b->queues);
   b->container_id = container_id;
   b->threads = threads;
@@ -300,10 +306,14 @@ static void handle(broker_t* b, pn_event_t* e) {
 
   switch (pn_event_type(e)) {
 
-   case PN_CONNECTION_INIT: {
+   case PN_LISTENER_ACCEPT:
+    pn_listener_accept(pn_event_listener(e), pn_connection());
+    break;
+
+   case PN_CONNECTION_INIT: 
      pn_connection_set_container(c, b->container_id);
      break;
-   }
+
    case PN_CONNECTION_BOUND: {
      /* Turn off security */
      pn_transport_t *t = pn_connection_transport(c);
@@ -316,9 +326,8 @@ static void handle(broker_t* b, pn_event_t* e) {
      break;
    }
    case PN_CONNECTION_WAKE: {
-     broker_data_t *bd = (broker_data_t*)pn_connection_get_extra(c).start;
-     if (bd->check_queues) {
-       bd->check_queues = false;
+     if (pn_connection_get_check_queues(c)) {
+       pn_connection_set_check_queues(c, false);
        int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
        for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
          link_send(b, l);
@@ -456,11 +465,7 @@ int main(int argc, char **argv) {
   */
   const char *host = url ? pn_url_get_host(url) : "::";
   const char *port = url ? pn_url_get_port(url) : "amqp";
-
-  /* Initial broker_data value copied to each accepted connection */
-  broker_data_t bd = { false };
-  b.listener = pn_proactor_listen(b.proactor, host, port, 16,
-                                  pn_bytes(sizeof(bd), (char*)&bd));
+  pn_proactor_listen(b.proactor, pn_listener(), host, port, 16);
   printf("listening on '%s:%s' %zd threads\n", host, port, b.threads);
 
   if (url) pn_url_free(url);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/examples/c/proactor/libuv_proactor.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c
index a26c311..9770166 100644
--- a/examples/c/proactor/libuv_proactor.c
+++ b/examples/c/proactor/libuv_proactor.c
@@ -24,7 +24,6 @@
 #include <proton/condition.h>
 #include <proton/connection_driver.h>
 #include <proton/engine.h>
-#include <proton/extra.h>
 #include <proton/message.h>
 #include <proton/object.h>
 #include <proton/proactor.h>
@@ -142,13 +141,15 @@ struct pn_listener_t {
   psocket_t psocket;
 
   /* Only used by owner thread */
+  pconnection_t *accepting;     /* accept in progress */
   pn_condition_t *condition;
   pn_collector_t *collector;
   pn_event_batch_t batch;
+  pn_record_t *attachments;
+  void *context;
   size_t backlog;
 };
 
-PN_EXTRA_DECLARE(pn_listener_t);
 
 typedef struct queue { psocket_t *front, *back; } queue;
 
@@ -222,24 +223,26 @@ static void to_leader(psocket_t *ps) {
 
 /* Detach from IO and put ps on the worker queue */
 static void leader_to_worker(psocket_t *ps) {
-  pconnection_t *pc = as_pconnection_t(ps);
-  /* Don't detach if there are no events yet. */
-  if (pc && pn_connection_driver_has_event(&pc->driver)) {
-    if (pc->writing) {
-      pc->writing  = 0;
-      uv_cancel((uv_req_t*)&pc->write);
-    }
-    if (pc->reading) {
-      pc->reading = false;
-      uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
-    }
-    if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
-      uv_timer_stop(&pc->timer);
+  if (ps->is_conn) {
+    pconnection_t *pc = as_pconnection_t(ps);
+    /* Don't detach if there are no events yet. */
+    if (pn_connection_driver_has_event(&pc->driver)) {
+      if (pc->writing) {
+        pc->writing  = 0;
+        uv_cancel((uv_req_t*)&pc->write);
+      }
+      if (pc->reading) {
+        pc->reading = false;
+        uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+      }
+      if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
+        uv_timer_stop(&pc->timer);
+      }
     }
+  } else {
+    pn_listener_t *l = as_listener(ps);
+    uv_read_stop((uv_stream_t*)&l->psocket.tcp);
   }
-
-  /* Nothing to do for a listener, on_accept doesn't touch worker state. */
-
   uv_mutex_lock(&ps->proactor->lock);
   push_lh(&ps->proactor->worker_q, ps);
   uv_mutex_unlock(&ps->proactor->lock);
@@ -275,15 +278,12 @@ static void worker_requeue(psocket_t* ps) {
   uv_mutex_unlock(&ps->proactor->lock);
 }
 
-static pconnection_t *new_pconnection_t(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) {
+static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
   pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
   if (!pc) return NULL;
-  if (pn_connection_driver_init(&pc->driver, pn_connection_with_extra(extra.size), NULL) != 0) {
+  if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
     return NULL;
   }
-  if (extra.start && extra.size) {
-    memcpy(pn_connection_get_extra(pc->driver.connection).start, extra.start, extra.size);
-  }
   psocket_init(&pc->psocket, p,  true, host, port);
   if (server) {
     pn_transport_set_server(pc->driver.transport);
@@ -312,26 +312,6 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
   return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
 }
 
-pn_listener_t *new_listener(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
-  pn_listener_t *l = (pn_listener_t*)calloc(1, PN_EXTRA_SIZEOF(pn_listener_t, extra.size));
-  if (!l) {
-    return NULL;
-  }
-  l->collector = pn_collector();
-  if (!l->collector) {
-    free(l);
-    return NULL;
-  }
-  if (extra.start && extra.size) {
-    memcpy(pn_listener_get_extra(l).start, extra.start, extra.size);
-  }
-  psocket_init(&l->psocket, p, false, host, port);
-  l->condition = pn_condition();
-  l->batch.next_event = listener_batch_next;
-  l->backlog = backlog;
-  return l;
-}
-
 static void leader_count(pn_proactor_t *p, int change) {
   uv_mutex_lock(&p->lock);
   p->count += change;
@@ -456,24 +436,23 @@ static void on_connect(uv_connect_t *connect, int err) {
 }
 
 static void on_accept(uv_stream_t* server, int err) {
-  pn_listener_t* l = (pn_listener_t*)server->data;
-  if (!err) {
-    pn_rwbytes_t v =  pn_listener_get_extra(l);
-    pconnection_t *pc = new_pconnection_t(l->psocket.proactor, true,
-                          fixstr(l->psocket.host),
-                          fixstr(l->psocket.port),
-                          pn_bytes(v.size, v.start));
-    if (pc) {
-      int err2 = leader_init(&pc->psocket);
-      if (!err2) err2 = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
-      leader_connect_accept(pc, err2, "on accept");
-    } else {
-      err = UV_ENOMEM;
-    }
-  }
+  pn_listener_t *l = (pn_listener_t*) server->data;
   if (err) {
     leader_error(&l->psocket, err, "on accept");
   }
+  pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+  leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */
+}
+
+static void leader_accept(psocket_t *ps) {
+  pn_listener_t * l = as_listener(ps);
+  pconnection_t *pc = l->accepting;
+  l->accepting = NULL;
+  if (pc) {
+    int err = leader_init(&pc->psocket);
+    if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
+    leader_connect_accept(pc, err, "on accept");
+  }
 }
 
 static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
@@ -570,31 +549,39 @@ static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
 }
 
 static void leader_rewatch(psocket_t *ps) {
-  pconnection_t *pc = as_pconnection_t(ps);
-
-  if (pc->timer.data) {         /* uv-initialized */
-    on_tick(&pc->timer);        /* Re-enable ticks if required */
-  }
-  pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
-  pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-
-  /* Ticks and checking buffers can generate events, process before proceeding */
-  if (pn_connection_driver_has_event(&pc->driver)) {
-    leader_to_worker(ps);
-  } else {                      /* Re-watch for IO */
-    if (wbuf.size > 0 && !pc->writing) {
-      pc->writing = wbuf.size;
-      uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
-      pc->write.data = ps;
-      uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
-    } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
-      pc->shutdown.data = ps;
-      uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
+  int err = 0;
+  if (ps->is_conn) {
+    pconnection_t *pc = as_pconnection_t(ps);
+    if (pc->timer.data) {         /* uv-initialized */
+      on_tick(&pc->timer);        /* Re-enable ticks if required */
     }
-    if (rbuf.size > 0 && !pc->reading) {
-      pc->reading = true;
-      uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+    pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+    pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+
+    /* Ticks and checking buffers can generate events, process before proceeding */
+    if (pn_connection_driver_has_event(&pc->driver)) {
+      leader_to_worker(ps);
+    } else {                      /* Re-watch for IO */
+      if (wbuf.size > 0 && !pc->writing) {
+        pc->writing = wbuf.size;
+        uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+        pc->write.data = ps;
+        uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
+      } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
+        pc->shutdown.data = ps;
+        uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
+      }
+      if (rbuf.size > 0 && !pc->reading) {
+        pc->reading = true;
+        err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+      }
     }
+  } else {
+    pn_listener_t *l = as_listener(ps);
+    err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
+  }
+  if (err) {
+    leader_error(ps, err, "rewatch");
   }
 }
 
@@ -668,6 +655,11 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
     }
     return;
   }
+  pn_listener_t *l = batch_listener(batch);
+  if (l) {
+    owner_to_leader(&l->psocket, leader_rewatch);
+    return;
+  }
   pn_proactor_t *bp = batch_proactor(batch);
   if (bp == p) {
     uv_mutex_lock(&p->lock);
@@ -676,7 +668,6 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
     uv_mutex_unlock(&p->lock);
     return;
   }
-  /* Nothing extra to do for listener, it is always in the UV loop. */
 }
 
 /* Run follower/leader loop till we can return an event and be a worker */
@@ -742,8 +733,8 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
   uv_mutex_unlock(&p->lock);
 }
 
-int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn_bytes_t extra) {
-  pconnection_t *pc = new_pconnection_t(p, false, host, port, extra);
+int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
+  pconnection_t *pc = new_pconnection_t(p, c, false, host, port);
   if (!pc) {
     return PN_OUT_OF_MEMORY;
   }
@@ -752,12 +743,12 @@ int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn
   return 0;
 }
 
-pn_rwbytes_t pn_listener_get_extra(pn_listener_t *l) { return PN_EXTRA_GET(pn_listener_t, l); }
-
-pn_listener_t *pn_proactor_listen(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) {
-  pn_listener_t *l = new_listener(p, host, port, backlog, extra);
-  if (l)  owner_to_leader(&l->psocket, leader_listen);
-  return l;
+int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog)
+{
+  psocket_init(&l->psocket, p, false, host, port);
+  l->backlog = backlog;
+  owner_to_leader(&l->psocket, leader_listen);
+  return 0;
 }
 
 pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
@@ -765,10 +756,6 @@ pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
   return pc ? pc->psocket.proactor : NULL;
 }
 
-pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
-  return l ? l->psocket.proactor : NULL;
-}
-
 void leader_wake_connection(psocket_t *ps) {
   pconnection_t *pc = as_pconnection_t(ps);
   pn_connection_t *c = pc->driver.connection;
@@ -780,15 +767,6 @@ void pn_connection_wake(pn_connection_t* c) {
   wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
 }
 
-void pn_listener_close(pn_listener_t* l) {
-  wakeup(&l->psocket, leader_close);
-}
-
-/* Only called when condition is closed by error. */
-pn_condition_t* pn_listener_condition(pn_listener_t* l) {
-  return l->condition;
-}
-
 pn_proactor_t *pn_proactor() {
   pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
   p->collector = pn_collector();
@@ -831,3 +809,65 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
 static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
   return pn_collector_next(batch_proactor(batch)->collector);
 }
+
+pn_listener_t *pn_listener() {
+  pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
+  if (l) {
+    l->batch.next_event = listener_batch_next;
+    l->collector = pn_collector();
+    l->condition = pn_condition();
+    l->attachments = pn_record();
+    if (!l->condition || !l->collector || !l->attachments) {
+      pn_listener_free(l);
+      return NULL;
+    }
+  }
+  return l;
+}
+
+void pn_listener_free(pn_listener_t *l) {
+  if (l) {
+    if (!l->collector) pn_collector_free(l->collector);
+    if (!l->condition) pn_condition_free(l->condition);
+    if (!l->attachments) pn_free(l->attachments);
+    free(l);
+  }
+}
+
+void pn_listener_close(pn_listener_t* l) {
+  wakeup(&l->psocket, leader_close);
+}
+
+pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
+  return l ? l->psocket.proactor : NULL;
+}
+
+pn_condition_t* pn_listener_condition(pn_listener_t* l) {
+  return l->condition;
+}
+
+void *pn_listener_get_context(pn_listener_t *l) {
+  return l->context;
+}
+
+void pn_listener_set_context(pn_listener_t *l, void *context) {
+  l->context = context;
+}
+
+pn_record_t *pn_listener_attachments(pn_listener_t *l) {
+  return l->attachments;
+}
+
+int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
+  if (l->accepting) {
+    return PN_STATE_ERR;        /* Only one at a time */
+  }
+  l->accepting = new_pconnection_t(
+      l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
+  if (!l->accepting) {
+    return UV_ENOMEM;
+  }
+  owner_to_leader(&l->psocket, leader_accept);
+  return 0;
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index 88e3456..b8edcd6 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -187,7 +187,7 @@ int main(int argc, char **argv) {
 
   /* Create the proactor and connect */
   app.proactor = pn_proactor();
-  pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
+  pn_proactor_connect(app.proactor, pn_connection(), host, port);
   if (url) pn_url_free(url);
 
   do {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index 42facb0..d611b3d 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -216,7 +216,7 @@ int main(int argc, char **argv) {
 
   /* Create the proactor and connect */
   app.proactor = pn_proactor();
-  pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null);
+  pn_proactor_connect(app.proactor, pn_connection(), host, port);
   if (url) pn_url_free(url);
 
   do {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/connection.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection.h b/proton-c/include/proton/connection.h
index 70fad73..5b966cd 100644
--- a/proton-c/include/proton/connection.h
+++ b/proton-c/include/proton/connection.h
@@ -156,7 +156,6 @@ PN_EXTERN pn_collector_t* pn_connection_collector(pn_connection_t *connection);
 
 
 /**
- * @deprecated
  * Get the application context that is associated with a connection
  * object.
  *
@@ -169,7 +168,6 @@ PN_EXTERN pn_collector_t* pn_connection_collector(pn_connection_t *connection);
 PN_EXTERN void *pn_connection_get_context(pn_connection_t *connection);
 
 /**
- * @deprecated
  * Set a new application context for a connection object.
  *
  * The application context for a connection object may be retrieved
@@ -485,16 +483,6 @@ PN_EXTERN pn_data_t *pn_connection_remote_properties(pn_connection_t *connection
  */
 PN_EXTERN pn_transport_t *pn_connection_transport(pn_connection_t *connection);
 
-/**
- * Create a connection with `size` bytes of extra aligned storage in the same heap block.
- */
-PN_EXTERN pn_connection_t* pn_connection_with_extra(size_t size);
-
-/**
- * Get the start and size of extra storage allocated by pn_connection_extra()
- */
-PN_EXTERN pn_rwbytes_t pn_connection_get_extra(pn_connection_t *connection);
-
 /** @}
  */
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index 31d4bdd..7793f1c 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -323,19 +323,25 @@ typedef enum {
   PN_CONNECTION_WAKE,
 
   /**
-   * pn_listener_close() was called or an error occurred, see pn_listener_condition()
+   * Indicates the listener is ready to call pn_listener_accept() 
+   * Events of this type point to the @ref pn_listener_t.
+   */
+  PN_LISTENER_ACCEPT,
+
+  /**
+   * Indicates the listener has closed. pn_listener_condition() provides error information.
    * Events of this type point to the @ref pn_listener_t.
    */
   PN_LISTENER_CLOSE,
 
   /**
-   * pn_proactor_interrupt() was called to interrupt a proactor thread
+   * Indicates pn_proactor_interrupt() was called to interrupt a proactor thread
    * Events of this type point to the @ref pn_proactor_t.
    */
   PN_PROACTOR_INTERRUPT,
 
   /**
-   * pn_proactor_set_timeout() time limit expired.
+   * Timeout set by pn_proactor_set_timeout() time limit expired.
    * Events of this type point to the @ref pn_proactor_t.
    */
   PN_PROACTOR_TIMEOUT,

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/extra.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/extra.h b/proton-c/include/proton/extra.h
deleted file mode 100644
index ea2e1ef..0000000
--- a/proton-c/include/proton/extra.h
+++ /dev/null
@@ -1,69 +0,0 @@
-#ifndef EXTRA_H
-#define EXTRA_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <proton/type_compat.h>
-#include <proton/types.h>
-#include <stddef.h>
-#include <stdlib.h>
-
-/**
- * @cond INTERNAL
- * Support for allocating extra aligned memory after a type.
- */
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-/**
- * extra_t contains a size and is maximally aligned so the memory immediately
- * after it can store any type of value.
- */
-typedef union pn_extra_t {
-  size_t size;
-#if __STDC_VERSION__ >= 201112
-  max_align_t max;
-#else
-/* Not standard but fairly safe */
-  uint64_t i;
-  long double d;
-  void *v;
-  void (*fp)(void);
-#endif
-} pn_extra_t;
-
-static inline pn_rwbytes_t pn_extra_rwbytes(pn_extra_t *x) {
-    return pn_rwbytes(x->size, (char*)(x+1));
-}
-
-/* Declare private helper struct for T */
-#define PN_EXTRA_DECLARE(T) typedef struct T##__extra { T base; pn_extra_t extra; } T##__extra
-#define PN_EXTRA_SIZEOF(T, N) (sizeof(T##__extra)+(N))
-#define PN_EXTRA_GET(T, P) pn_extra_rwbytes(&((T##__extra*)(P))->extra)
-
-#ifdef __cplusplus
-}
-#endif
-
-/** @endcond */
-
-#endif // EXTRA_H

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/listener.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h
index f55479b..5e60649 100644
--- a/proton-c/include/proton/listener.h
+++ b/proton-c/include/proton/listener.h
@@ -40,30 +40,57 @@ typedef struct pn_proactor_t pn_proactor_t;
 typedef struct pn_condition_t pn_condition_t;
 
 /**
- * Listener accepts connections, see pn_proactor_listen()
+ * A listener accepts connections.
  */
 typedef struct pn_listener_t pn_listener_t;
 
 /**
- * The proactor that created the listener.
+ * Create a listener.
  */
-pn_proactor_t *pn_listener_proactor(pn_listener_t *c);
+PN_EXTERN pn_listener_t *pn_listener(void);
+
+/**
+ * Free a listener
+ */
+PN_EXTERN void pn_listener_free(pn_listener_t*);
+
+/**
+ * Asynchronously accept a connection using the listener.
+ *
+ * @param[in] connection the listener takes ownership, do not free.
+ */
+PN_EXTERN int pn_listener_accept(pn_listener_t*, pn_connection_t *connection);
 
 /**
  * Get the error condition for a listener.
  */
-pn_condition_t *pn_listener_condition(pn_listener_t *l);
+PN_EXTERN pn_condition_t *pn_listener_condition(pn_listener_t *l);
 
 /**
- * Get the user-provided value associated with the listener in pn_proactor_listen()
- * The start address is aligned so you can cast it to any type.
+ * Get the application context that is associated with a listener.
  */
-pn_rwbytes_t pn_listener_get_extra(pn_listener_t*);
+PN_EXTERN void *pn_listener_get_context(pn_listener_t *listener);
+
+/**
+ * Set a new application context for a listener.
+ */
+PN_EXTERN void pn_listener_set_context(pn_listener_t *listener, void *context);
+
+/**
+ * Get the attachments that are associated with a listener object.
+ */
+PN_EXTERN pn_record_t *pn_listener_attachments(pn_listener_t *listener);
 
 /**
  * Close the listener (thread safe).
  */
-void pn_listener_close(pn_listener_t *l);
+PN_EXTERN void pn_listener_close(pn_listener_t *l);
+
+/**
+ * The proactor associated with a listener.
+ */
+PN_EXTERN pn_proactor_t *pn_listener_proactor(pn_listener_t *c);
+
 
 /**
  *@}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index e23a24f..9d39c9c 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -68,30 +68,30 @@ pn_proactor_t *pn_proactor(void);
 void pn_proactor_free(pn_proactor_t*);
 
 /**
- * Asynchronous connect: a connection and transport will be created, the
- * relevant events will be returned by pn_proactor_wait()
+ * Connect connection to host/port. Connection and transport events will be
+ * returned by pn_proactor_wait()
  *
- * Errors are indicated by PN_TRANSPORT_ERROR/PN_TRANSPORT_CLOSE events.
+ * @param[in] connection the proactor takes ownership do not free.
+ * @param[in] host the address to listen on
+ * @param[in] port the port to connect to
  *
- * @param extra bytes to copy to pn_connection_get_extra() on the new connection, @ref
- * pn_rwbytes_null for nothing.
- *
- * @return error if the connect cannot be initiated e.g. an allocation failure.
- * IO errors will be returned as transport events via pn_proactor_wait()
+ * @return error on immediate error, e.g. an allocation failure.
+ * Other errors are indicated by connection or transport events via pn_proactor_wait()
  */
-int pn_proactor_connect(pn_proactor_t*, const char *host, const char *port, pn_bytes_t extra);
+int pn_proactor_connect(pn_proactor_t*, pn_connection_t *connection, const char *host, const char *port);
 
 /**
- * Asynchronous listen: start listening, connections will be returned by pn_proactor_wait()
- * An error are indicated by PN_LISTENER_ERROR event.
+ * Start listening with listener.
+ * pn_proactor_wait() will return a PN_LISTENER_ACCEPT event when a connection can be accepted.
  *
- * @param extra bytes to copy to pn_connection_get_extra() on the new connection, @ref
- * pn_rwbytes_null for nothing.
+ * @param[in] listener proactor takes ownership of listener, do not free.
+ * @param[in] host the address to listen on
+ * @param[in] port the port to listen on
  *
- * @return error if the connect cannot be initiated e.g. an allocation failure.
- * IO errors will be returned as transport events via pn_proactor_wait()
+ * @return error on immediate error, e.g. an allocation failure.
+ * Other errors are indicated by pn_listener_condition() on the PN_LISTENER_CLOSE event.
  */
-pn_listener_t *pn_proactor_listen(pn_proactor_t *, const char *host, const char *port, int backlog, pn_bytes_t extra);
+int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *listener, const char *host, const char *port, int backlog);
 
 /**
  * Wait for events to handle. Call pn_proactor_done() after handling events.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/src/core/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/engine.c b/proton-c/src/core/engine.c
index 2836a43..99d311b 100644
--- a/proton-c/src/core/engine.c
+++ b/proton-c/src/core/engine.c
@@ -32,9 +32,6 @@
 #include "platform/platform_fmt.h"
 #include "transport.h"
 
-#include <proton/extra.h>
-
-
 static void pni_session_bound(pn_session_t *ssn);
 static void pni_link_bound(pn_link_t *link);
 
@@ -511,15 +508,10 @@ static void pn_connection_finalize(void *object)
 #define pn_connection_compare NULL
 #define pn_connection_inspect NULL
 
-PN_EXTRA_DECLARE(pn_connection_t);
-
-pn_rwbytes_t pn_connection_get_extra(pn_connection_t *c) { return PN_EXTRA_GET(pn_connection_t, c); }
-
-pn_connection_t *pn_connection_with_extra(size_t extra)
+pn_connection_t *pn_connection()
 {
   static const pn_class_t clazz = PN_CLASS(pn_connection);
-  size_t size = PN_EXTRA_SIZEOF(pn_connection_t, extra);
-  pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, size);
+  pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, sizeof(pn_connection_t));
   if (!conn) return NULL;
 
   conn->endpoint_head = NULL;
@@ -548,10 +540,6 @@ pn_connection_t *pn_connection_with_extra(size_t extra)
   return conn;
 }
 
-pn_connection_t *pn_connection(void) {
-  return pn_connection_with_extra(0);
-}
-
 static const pn_event_type_t endpoint_init_event_map[] = {
   PN_CONNECTION_INIT,  /* CONNECTION */
   PN_SESSION_INIT,     /* SESSION */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org