You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/05/05 21:52:07 UTC

[1/2] qpid-proton git commit: PROTON-1463: Fix incorrect address handling.

Repository: qpid-proton
Updated Branches:
  refs/heads/master ed9532e2a -> e1d1f2f40


PROTON-1463: Fix incorrect address handling.

Fix to previous commit
a61f50e * PROTON-1463: Golang AMQP URL incompatable with Qpid Cpp Broker.had

Use of string.TrimPrefix was incorrect.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/fa5b36dc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/fa5b36dc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/fa5b36dc

Branch: refs/heads/master
Commit: fa5b36dc739d9cad595b9c50099305ffdd50a884
Parents: ed9532e
Author: Alan Conway <ac...@redhat.com>
Authored: Fri May 5 14:19:40 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri May 5 14:19:40 2017 -0400

----------------------------------------------------------------------
 examples/go/electron/receive.go | 2 +-
 examples/go/electron/send.go    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fa5b36dc/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index c2ba1d1..7229b07 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -76,7 +76,7 @@ func main() {
 			c, err := container.Dial("tcp", url.Host)
 			fatalIf(err)
 			connections <- c // Save connection so we can Close() when main() ends
-			addr := strings.TrimPrefix("/", url.Path)
+			addr := strings.TrimPrefix(url.Path, "/")
 			r, err := c.Receiver(electron.Source(addr))
 			fatalIf(err)
 			// Loop receiving messages and sending them to the main() goroutine

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fa5b36dc/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index 033abab..2ba99c8 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -75,7 +75,7 @@ func main() {
 			c, err := container.Dial("tcp", url.Host)
 			fatalIf(err)
 			connections <- c // Save connection so we can Close() when main() ends
-			addr := strings.TrimPrefix("/", url.Path)
+			addr := strings.TrimPrefix(url.Path, "/")
 			s, err := c.Sender(electron.Target(addr))
 			fatalIf(err)
 			// Loop sending messages.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-proton git commit: PROTON-1460: epoll multi-address listen and connect.

Posted by ac...@apache.org.
PROTON-1460: epoll multi-address listen and connect.

epoll proactor:

pn_listener_t listens to every working address in the getaddrinfo list.
It holds one psocket_t per listening socket but has a single pcontext_t.
Returns one accept event at a time.

pn_proactor_connect tries each address in the getaddrinfo list until one works.
No further attempts are made after the first successful connection.

Modified ipv4/v6 flags and address parsing to follow the behavior documented for
pn_proactor_listen and pn_proactor_connect.

Modified proactor to deal directly with contexts not sockets, moved "closing"
flag from socket to context. Previously there was a 1-1 relationship, but now a
listener context has >1 socket.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e1d1f2f4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e1d1f2f4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e1d1f2f4

Branch: refs/heads/master
Commit: e1d1f2f4041dcf2b56be0aab0529f21e0dbf8c19
Parents: fa5b36d
Author: Alan Conway <ac...@redhat.com>
Authored: Fri May 5 17:40:30 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri May 5 17:50:38 2017 -0400

----------------------------------------------------------------------
 proton-c/include/proton/proactor.h |   4 +-
 proton-c/src/proactor/epoll.c      | 470 ++++++++++++++++++--------------
 2 files changed, 273 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e1d1f2f4/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 4a3b3c7..861afbe 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -107,7 +107,7 @@ PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor);
  * pn_proactor_release_connection()
  *
  * @param[in] addr the "host:port" network address, constructed by pn_proactor_addr()
- * An empty host will connect to the local host via the default protocol.
+ * An empty host will connect to the local host via the default protocol (IPV6 or IPV4).
  * An empty port will connect to the standard AMQP port (5672).
  *
  * @param[in] connection @ref connection to be connected to @p addr.
@@ -128,7 +128,7 @@ PNP_EXTERN void pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *co
  * is handled, or when pn_proactor_free() is called.
  *
  * @param[in] addr the "host:port" network address, constructed by pn_proactor_addr()
- * An empty host will listen for all protocols on all local interfaces.
+ * An empty host will listen for all protocols (IPV6 and IPV4) on all local interfaces.
  * An empty port will listen on the standard AMQP port (5672).
 
  * @param[in] backlog of un-handled connection requests to allow before refusing

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e1d1f2f4/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 7930eb6..22a6d92 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -265,17 +265,21 @@ typedef struct pcontext_t {
   pcontext_type_t type;
   bool working;
   int wake_ops;             // unprocessed eventfd wake callback (convert to bool?)
-  struct pcontext_t *next;         // wake list, guarded by proactor eventfd_mutex
+  struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
+  bool closing;
+  // Next 4 are protected by the proactor mutex
+  struct pcontext_t* next;  /* Protected by proactor.mutex */
+  struct pcontext_t* prev;  /* Protected by proactor.mutex */
+  int disconnect_ops;           /* ops remaining before disconnect complete */
+  bool disconnecting;           /* pn_proactor_disconnect */
 } pcontext_t;
 
 static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p, void *o) {
+  memset(ctx, 0, sizeof(*ctx));
   pmutex_init(&ctx->mutex);
   ctx->proactor = p;
   ctx->owner = o;
   ctx->type = t;
-  ctx->working = false;
-  ctx->wake_ops = 0;
-  ctx->next = NULL;
 }
 
 static void pcontext_finalize(pcontext_t* ctx) {
@@ -285,16 +289,10 @@ static void pcontext_finalize(pcontext_t* ctx) {
 /* common to connection and listener */
 typedef struct psocket_t {
   pn_proactor_t *proactor;
-  // Next 4 are protected by the proactor mutex
-  struct psocket_t* next;   /* Protected by proactor.mutex */
-  struct psocket_t* prev;   /* Protected by proactor.mutex */
-  bool disconnecting;       /* pn_proactor_disconnect */
-  int disconnect_ops;       /* ops remaining before disconnect complete */
   // Remaining protected by the pconnection/listener mutex
   int sockfd;
   epoll_extended_t epoll_io;
-  bool is_conn;
-  bool closing;
+  pn_listener_t *listener;      /* NULL for a connection socket */
   char addr_buf[PN_MAX_ADDR];
   const char *host, *port;
 } psocket_t;
@@ -304,7 +302,7 @@ struct pn_proactor_t {
   int epollfd;
   ptimer_t timer;
   pn_collector_t *collector;
-  psocket_t *psockets;          /* track in-use psockets for PN_PROACTOR_INACTIVE and final cleanup */
+  pcontext_t *contexts;         /* in-use contexts for PN_PROACTOR_INACTIVE and cleanup */
   epoll_extended_t epoll_wake;
   pn_event_t *cached_event;
   pn_event_batch_t batch;
@@ -349,7 +347,7 @@ static bool wake(pcontext_t *ctx) {
       if (!p->wake_list_first) {
         p->wake_list_first = p->wake_list_last = ctx;
       } else {
-        p->wake_list_last->next = ctx;
+        p->wake_list_last->wake_next = ctx;
         p->wake_list_last = ctx;
       }
       if (!p->wakes_in_progress) {
@@ -376,9 +374,9 @@ static pcontext_t *wake_pop_front(pn_proactor_t *p) {
   assert(p->wakes_in_progress);
   if (p->wake_list_first) {
     ctx = p->wake_list_first;
-    p->wake_list_first = ctx->next;
+    p->wake_list_first = ctx->wake_next;
     if (!p->wake_list_first) p->wake_list_last = NULL;
-    ctx->next = NULL;
+    ctx->wake_next = NULL;
 
     if (!p->wake_list_first) {
       /* Reset the eventfd until a future write.
@@ -402,19 +400,14 @@ static inline void wake_done(pcontext_t *ctx) {
 }
 
 
-static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *addr)
+static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t *listener, const char *addr)
 {
   ps->epoll_io.psocket = ps;
   ps->epoll_io.fd = -1;
-  ps->epoll_io.type = is_conn ? PCONNECTION_IO : LISTENER_IO;
+  ps->epoll_io.type = listener ? LISTENER_IO : PCONNECTION_IO;
   ps->epoll_io.wanted = 0;
   ps->proactor = p;
-  ps->next = NULL;
-  ps->prev = NULL;
-  ps->disconnecting = false;
-  ps->disconnect_ops = 0;
-  ps->is_conn = is_conn;
-  ps->closing = false;
+  ps->listener = listener;
   ps->sockfd = -1;
   pni_parse_addr(addr, ps->addr_buf, sizeof(ps->addr_buf), &ps->host, &ps->port);
 }
@@ -434,6 +427,7 @@ typedef struct pconnection_t {
   ptimer_t timer;  // TODO: review one timerfd per connectoin
   // Following values only changed by (sole) working context:
   uint32_t current_arm;  // active epoll io events
+  bool connected;
   bool read_blocked;
   bool write_blocked;
   bool read_closed;
@@ -444,10 +438,13 @@ typedef struct pconnection_t {
   pn_event_batch_t batch;
   pn_connection_driver_t driver;
   struct pn_netaddr_t local, remote; /* Actual addresses */
+  struct addrinfo *addrinfo;         /* Resolved address list */
+  struct addrinfo *ai;               /* Current connect address */
 } pconnection_t;
 
 struct pn_listener_t {
-  psocket_t psocket;
+  psocket_t *psockets;          /* Array of listening sockets */
+  size_t psockets_size;
   pcontext_t context;
   pn_condition_t *condition;
   pn_collector_t *collector;
@@ -456,8 +453,7 @@ struct pn_listener_t {
   pn_record_t *attachments;
   void *listener_context;
   size_t backlog;
-  int available_accepts;
-  int pending_accepts;
+  psocket_t *acceptable, *accepted;
   bool close_dispatched;
   bool armed;
 };
@@ -465,15 +461,25 @@ struct pn_listener_t {
 
 static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool topup);
 static void listener_begin_close(pn_listener_t* l);
-static void proactor_add(psocket_t *ps);
-static bool proactor_remove(psocket_t *ps);
+static void proactor_add(pcontext_t *ctx);
+static bool proactor_remove(pcontext_t *ctx);
+
+static inline pconnection_t *psocket_pconnection(psocket_t* ps) {
+  return ps->listener ? NULL : (pconnection_t*)ps;
+}
 
-static inline pconnection_t *as_pconnection(psocket_t* ps) {
-  return ps->is_conn ? (pconnection_t*)ps : NULL;
+static inline pn_listener_t *psocket_listener(psocket_t* ps) {
+  return ps->listener;
 }
 
-static inline pn_listener_t *as_listener(psocket_t* ps) {
-  return ps->is_conn ? NULL: (pn_listener_t*)ps;
+static inline pconnection_t *pcontext_pconnection(pcontext_t *c) {
+  return c->type == PCONNECTION ?
+    (pconnection_t*)((char*)c - offsetof(pconnection_t, context)) : NULL;
+}
+
+static inline pn_listener_t *pcontext_listener(pcontext_t *c) {
+  return c->type == LISTENER ?
+    (pn_listener_t*)((char*)c - offsetof(pn_listener_t, context)) : NULL;
 }
 
 static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
@@ -515,15 +521,15 @@ static pn_event_t *log_event(void* p, pn_event_t *e) {
 }
 
 static void psocket_error(psocket_t *ps, int err, const char* what) {
-  if (ps->is_conn) {
-    pn_connection_driver_t *driver = &as_pconnection(ps)->driver;
+  if (!ps->listener) {
+    pn_connection_driver_t *driver = &psocket_pconnection(ps)->driver;
     pn_connection_driver_bind(driver); /* Bind so errors will be reported */
     pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
                                 what, ps->host, ps->port,
                                 strerror(err));
     pn_connection_driver_close(driver);
   } else {
-    pn_listener_t *l = as_listener(ps);
+    pn_listener_t *l = psocket_listener(ps);
     pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
                         what, ps->host, ps->port,
                         strerror(err));
@@ -572,13 +578,14 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
     abort();
   }
   pcontext_init(&pc->context, PCONNECTION, p, pc);
-  psocket_init(&pc->psocket, p,  true, addr);
+  psocket_init(&pc->psocket, p, NULL, addr);
   pc->new_events = 0;
   pc->wake_count = 0;
   pc->tick_pending = false;
   pc->timer_armed = false;
 
   pc->current_arm = 0;
+  pc->connected = false;
   pc->read_blocked = true;
   pc->write_blocked = true;
   pc->read_closed = true;
@@ -618,7 +625,7 @@ static void pconnection_cleanup(pconnection_t *pc) {
   stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd);
   ptimer_finalize(&pc->timer);
   lock(&pc->context.mutex);
-  bool can_free = proactor_remove(&pc->psocket);
+  bool can_free = proactor_remove(&pc->context);
   unlock(&pc->context.mutex);
   if (can_free)
     pconnection_final_free(pc);
@@ -627,8 +634,8 @@ static void pconnection_cleanup(pconnection_t *pc) {
 
 // Call with lock held or from forced_shutdown
 void pconnection_begin_close(pconnection_t *pc) {
-  if (!pc->psocket.closing) {
-    pc->psocket.closing = true;
+  if (!pc->context.closing) {
+    pc->context.closing = true;
     pc->read_closed = pc->write_closed = true;
     stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
     pc->current_arm = 0;
@@ -753,6 +760,9 @@ static bool pconnection_write(pconnection_t *pc, pn_bytes_t wbuf) {
   return true;
 }
 
+static void pconnection_connected_lh(pconnection_t *pc);
+static void pconnection_maybe_connect_lh(pconnection_t *pc);
+
 /*
  * May be called concurrently from multiple threads:
  *   pn_event_batch_t loop (topup is true)
@@ -808,7 +818,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
 
   // Confirmed as working thread.  Review state and unlock ASAP.
 
-  if (pc->psocket.closing && pconnection_is_final(pc)) {
+  if (pc->context.closing && pconnection_is_final(pc)) {
     unlock(&pc->context.mutex);
     pconnection_cleanup(pc);
     return NULL;
@@ -833,7 +843,9 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
 
   if (pc->new_events) {
     if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pc->read_closed && !pc->write_closed)
-      pc->disconnected = true;
+      pconnection_maybe_connect_lh(pc);
+    else
+      pconnection_connected_lh(pc); /* Non error event means we are connected */
     if (pc->new_events & EPOLLOUT)
       pc->write_blocked = false;
     if (pc->new_events & EPOLLIN)
@@ -934,7 +946,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
   }
 
   lock(&pc->context.mutex);
-  if (pc->psocket.closing && pconnection_is_final(pc)) {
+  if (pc->context.closing && pconnection_is_final(pc)) {
     unlock(&pc->context.mutex);
     pconnection_cleanup(pc);
     return NULL;
@@ -961,7 +973,6 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
   return NULL;
 }
 
-
 static void configure_socket(int sock) {
   int flags = fcntl(sock, F_GETFL);
   flags |= O_NONBLOCK;
@@ -971,7 +982,19 @@ static void configure_socket(int sock) {
   setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void*) &tcp_nodelay, sizeof(tcp_nodelay));
 }
 
-void pconnection_start(pconnection_t *pc) {
+/* Called with context.lock held */
+void pconnection_connected_lh(pconnection_t *pc) {
+  if (!pc->connected) {
+    pc->connected = true;
+    if (pc->addrinfo) {
+      freeaddrinfo(pc->addrinfo);
+    }
+    pc->addrinfo = NULL;
+    pc->ai = NULL;
+  }
+}
+
+static void pconnection_start(pconnection_t *pc) {
   int efd = pc->psocket.proactor->epollfd;
   start_polling(&pc->timer.epoll_io, efd);  // TODO: check for error
 
@@ -990,40 +1013,63 @@ void pconnection_start(pconnection_t *pc) {
   start_polling(ee, efd);  // TODO: check for error
 }
 
+/* Called on initial connect, and if connection fails to try another address */
+static void pconnection_maybe_connect_lh(pconnection_t *pc) {
+  errno = 0;
+  if (!pc->connected) {         /* Not yet connected */
+    while (pc->ai) {            /* Have an address */
+      struct addrinfo *ai = pc->ai;
+      pc->ai = pc->ai->ai_next; /* Move to next address in case this fails */
+      int fd = socket(ai->ai_family, SOCK_STREAM, 0);
+      if (fd >= 0) {
+        configure_socket(fd);
+        if (!connect(fd, ai->ai_addr, ai->ai_addrlen) || errno == EINPROGRESS) {
+          pc->psocket.sockfd = fd;
+          pconnection_start(pc);
+          return;               /* Async connection started */
+        }
+      }
+      /* connect failed immediately, go round the loop to try the next addr */
+    }
+    /* If there was a previous attempted connection, let the poller discover the
+       errno from its socket, otherwise set the current error. */
+    if (pc->psocket.sockfd < 1) {
+      psocket_error(&pc->psocket, errno ? errno : ENOTCONN, "on connect");
+    }
+  }
+  pc->disconnected = true;
+}
+
+static int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
+{
+  struct addrinfo hints = { 0 };
+  hints.ai_family = AF_UNSPEC;
+  hints.ai_socktype = SOCK_STREAM;
+  hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags;
+  return getaddrinfo(host, port, &hints, res);
+}
+
 void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
   pconnection_t *pc = new_pconnection_t(p, c, false, addr);
   assert(pc); // TODO: memory safety
   // TODO: check case of proactor shutting down
   lock(&pc->context.mutex);
-  proactor_add(&pc->psocket);
+  proactor_add(&pc->context);
   pn_connection_open(pc->driver.connection); /* Auto-open */
 
-  struct addrinfo *ai = NULL;
-  int fd = -1;
-  if (!getaddrinfo(pc->psocket.host, pc->psocket.port, 0, &ai)) {
-    fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
-    if (fd >= 0) {
-      configure_socket(fd);
-      if (!connect(fd, ai->ai_addr, ai->ai_addrlen) || errno == EINPROGRESS) {
-        pc->psocket.sockfd = fd;
-        pconnection_start(pc);
-        unlock(&pc->context.mutex);
-        freeaddrinfo(ai);
-        return;
-      }
-    }
+  bool notify = false;
+  if (!pgetaddrinfo(pc->psocket.host, pc->psocket.port, 0, &pc->addrinfo)) {
+    pc->ai = pc->addrinfo;
+    pconnection_maybe_connect_lh(pc); /* Start connection attempts */
+    notify = pc->disconnected;
+  } else {
+    psocket_error(&pc->psocket, errno, "connect to ");
+    notify = wake(&pc->context);
   }
-
-  psocket_error(&pc->psocket, errno, "connect to ");
-  bool notify = wake(&pc->context);
-  if (ai) freeaddrinfo(ai);
-  if (fd != -1) close (fd);
   unlock(&pc->context.mutex);
   if (notify) wake_notify(&pc->context);
-  return;
 }
 
-
 static void pconnection_tick(pconnection_t *pc) {
   pn_transport_t *t = pc->driver.transport;
   if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
@@ -1041,7 +1087,7 @@ void pn_connection_wake(pn_connection_t* c) {
   pconnection_t *pc = get_pconnection(c);
   if (pc) {
     lock(&pc->context.mutex);
-    if (!pc->psocket.closing) {
+    if (!pc->context.closing) {
       pc->wake_count++;
       notify = wake(&pc->context);
     }
@@ -1093,53 +1139,76 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
   // TODO: check listener not already listening for this or another proactor
   lock(&l->context.mutex);
   l->context.proactor = p;;
-  psocket_init(&l->psocket, p, false, addr);
   l->backlog = backlog;
-  proactor_add(&l->psocket);
+
+  char addr_buf[PN_MAX_ADDR];
+  const char *host, *port;
+  pni_parse_addr(addr, addr_buf, PN_MAX_ADDR, &host, &port);
+
+  struct addrinfo *addrinfo = NULL;
+  if (!pgetaddrinfo(host, port, AI_PASSIVE | AI_ALL, &addrinfo)) {
+    /* Count addresses, allocate enough space for sockets */
+    size_t len = 0;
+    for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
+      ++len;
+    }
+    assert(len > 0);            /* guaranteed by getaddrinfo */
+    l->psockets = (psocket_t*)calloc(len, sizeof(psocket_t));
+    assert(l->psockets);      /* FIXME aconway 2017-05-05: memory safety */
+    l->psockets_size = 0;
+    /* Find working listen addresses */
+    for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
+      int fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
+      static int on = 1;
+      if ((fd >= 0) &&
+          !setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) &&
+          /* We listen to v4/v6 on separate sockets, don't let v6 listen for v4 */
+          (ai->ai_family != AF_INET6 ||
+           !setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on))) &&
+          !bind(fd, ai->ai_addr, ai->ai_addrlen) &&
+          !listen(fd, backlog))
+      {
+        psocket_t *ps = &l->psockets[l->psockets_size++];
+        psocket_init(ps, p, l, addr);
+        ps->sockfd = fd;
+        ps->epoll_io.fd = fd;
+        ps->epoll_io.wanted = EPOLLIN;
+        start_polling(&ps->epoll_io, ps->proactor->epollfd);  // TODO: check for error
+      }
+    }
+  }
+  if (addrinfo) freeaddrinfo(addrinfo);
   /* Always put an OPEN event for symmetry, even if we immediately close with err */
   pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
   bool notify = wake(&l->context);
 
-  struct addrinfo *ai = NULL;
-  int fd = -1;
-  if (!getaddrinfo(l->psocket.host, l->psocket.port, 0, &ai)) {
-    fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
-    if (fd >= 0) {
-      int yes = 1;
-      if (!setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
-        if (!bind(fd, ai->ai_addr, ai->ai_addrlen))
-          if (!listen(fd, backlog)) {
-            l->psocket.sockfd = fd;
-            l->psocket.epoll_io.fd = fd;
-            l->psocket.epoll_io.wanted = EPOLLIN;
-            start_polling(&l->psocket.epoll_io, l->psocket.proactor->epollfd);  // TODO: check for error
-            unlock(&l->context.mutex);
-            if (notify) wake_notify(&l->context);
-            freeaddrinfo(ai);
-            return;
-          }
-    }
+  if (l->psockets_size == 0) { /* All failed, create dummy socket with an error */
+    int err = errno;
+    l->psockets = (psocket_t*)calloc(sizeof(psocket_t), 1);
+    psocket_init(l->psockets, p, l, addr);
+    psocket_error(l->psockets, err, "listen on");
   }
-
-  psocket_error(&l->psocket, errno, "listen on");
+  proactor_add(&l->context);
   unlock(&l->context.mutex);
   if (notify) wake_notify(&l->context);
-  if (ai) freeaddrinfo(ai);
   return;
 }
 
 // call with lock held
 static inline bool listener_can_free(pn_listener_t *l) {
-  return l->psocket.closing && l->close_dispatched &&
-    !l->context.wake_ops;
+  return l->context.closing && l->close_dispatched && !l->context.wake_ops;
 }
 
 static inline void listener_final_free(pn_listener_t *l) {
   pcontext_finalize(&l->context);
+  free(l->psockets);
   free(l);
 }
 
 void pn_listener_free(pn_listener_t *l) {
+  /* Note at this point either the listener has never been used (freed by user)
+     or it has been closed, so all its sockets are closed.
+  */
   // TODO: do we need a QPID DeletionManager equivalent to be safe from inbound connection (accept) epoll events?
   if (l) {
     bool can_free = true;
@@ -1147,8 +1216,9 @@ void pn_listener_free(pn_listener_t *l) {
     if (l->condition) pn_condition_free(l->condition);
     if (l->attachments) pn_free(l->attachments);
     lock(&l->context.mutex);
-    if (l->context.proactor)
-      can_free = proactor_remove(&l->psocket);
+    if (l->context.proactor) {
+      can_free = proactor_remove(&l->context);
+    }
     unlock(&l->context.mutex);
     if (can_free)
       listener_final_free(l);
@@ -1156,21 +1226,25 @@ void pn_listener_free(pn_listener_t *l) {
 }
 
 static void listener_begin_close(pn_listener_t* l) {
-  if (!l->psocket.closing) {
-    l->psocket.closing = true;
-    if (l->psocket.sockfd >= 0) {
-      stop_polling(&l->psocket.epoll_io, l->psocket.proactor->epollfd);
-      close(l->psocket.sockfd);
+  if (!l->context.closing) {
+    l->context.closing = true;
+    /* Close all listening sockets */
+    for (size_t i = 0; i < l->psockets_size; ++i) {
+      psocket_t *ps = &l->psockets[i];
+      if (ps->sockfd >= 0) {
+        stop_polling(&ps->epoll_io, ps->proactor->epollfd);
+        close(ps->sockfd);
+      }
     }
     pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
-    l->pending_accepts = l->available_accepts = 0;
+    l->acceptable = l->accepted = NULL;
   }
 }
 
 void pn_listener_close(pn_listener_t* l) {
   bool notify = false;
   lock(&l->context.mutex);
-  if (!l->psocket.closing) {
+  if (!l->context.closing) {
     listener_begin_close(l);
     notify = wake(&l->context);
   }
@@ -1188,17 +1262,19 @@ static void listener_forced_shutdown(pn_listener_t *l) {
   pn_listener_free(l);
 }
 
-
-static pn_event_batch_t *listener_process(pn_listener_t *l, uint32_t events) {
+/* Process a listening socket */
+static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
   // TODO: some parallelization of the accept mechanism.
+  pn_listener_t *l = psocket_listener(ps);
   lock(&l->context.mutex);
   if (events) {
     l->armed = false;
-    if (events & EPOLLRDHUP)
-      psocket_error(&l->psocket, errno, "listener epoll");  // includes listener_begin_close
-    else if (!l->psocket.closing && events & EPOLLIN) {
-      l->available_accepts++;
-      l->pending_accepts++;
+    if (events & EPOLLRDHUP) {
+      /* Calls listener_begin_close which closes all the listener's sockets */
+      psocket_error(ps, errno, "listener epoll");
+    } else if (!l->context.closing && events & EPOLLIN) {
+      l->acceptable = ps;
+      pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
     }
   } else {
     wake_done(&l->context); // callback accounting
@@ -1206,7 +1282,7 @@ static pn_event_batch_t *listener_process(pn_listener_t *l, uint32_t events) {
   pn_event_batch_t *lb = NULL;
   if (!l->context.working) {
     l->context.working = true;
-    if (listener_has_event(l) || l->available_accepts)
+    if (listener_has_event(l))
       lb = &l->batch;
     else
       l->context.working = false;
@@ -1219,10 +1295,6 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
   pn_listener_t *l = batch_listener(batch);
   lock(&l->context.mutex);
   pn_event_t *e = NULL;
-  if (!listener_has_event(l) && l->available_accepts && !l->psocket.closing) {
-    l->available_accepts--;
-    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
-  }
   if (listener_has_event(l))
     e = l->cached_event;
   l->cached_event = NULL;
@@ -1244,12 +1316,14 @@ static void listener_done(pn_listener_t *l) {
       return;
     }
   } else {
-    if (listener_has_event(l) || l->available_accepts)
+    if (listener_has_event(l))
       notify = wake(&l->context);
     else {
-      if (!l->psocket.closing && !l->armed) {
-        rearm(l->psocket.proactor, &l->psocket.epoll_io);
+      /* Don't rearm until the current socket is accepted */
+      if (!l->context.closing && !l->armed && !l->acceptable && l->accepted) {
+        rearm(l->accepted->proactor, &l->accepted->epoll_io);
         l->armed = true;
+        l->accepted = NULL;
       }
     }
   }
@@ -1258,7 +1332,7 @@ static void listener_done(pn_listener_t *l) {
 }
 
 pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
-  return l ? l->psocket.proactor : NULL;
+  return l ? l->psockets[0].proactor : NULL;
 }
 
 pn_condition_t* pn_listener_condition(pn_listener_t* l) {
@@ -1279,31 +1353,32 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
 
 void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
   // TODO: fuller sanity check on input args
-  pconnection_t *pc = new_pconnection_t(l->psocket.proactor, c, true, "");
+  pconnection_t *pc = new_pconnection_t(l->psockets[0].proactor, c, true, "");
   assert(pc);  // TODO: memory safety
   int err = 0;
 
   lock(&l->context.mutex);
-  proactor_add(&pc->psocket);
-  if (l->psocket.closing)
+  proactor_add(&pc->context);
+  if (l->context.closing)
     err = EBADF;
-  else {
-    if (l->pending_accepts == 0)
+  else if (l->acceptable == 0) {
       err = EAGAIN;
   }
 
   if (err) {
-    psocket_error(&l->psocket, errno, "listener state on accept");
+    /* Error on one socket closes the entire listener */
+    psocket_error(&l->psockets[0], errno, "listener state on accept");
     unlock(&l->context.mutex);
     return;
   }
-  l->pending_accepts--;
+  psocket_t *ps = l->accepted = l->acceptable;
+  l->acceptable = 0;
 
-  int newfd = accept(l->psocket.sockfd, NULL, 0);
+  int newfd = accept(ps->sockfd, NULL, 0);
   if (newfd < 0) {
     err = errno;
     psocket_error(&pc->psocket, err, "failed initialization on accept");
-    psocket_error(&l->psocket, err, "accept");
+    psocket_error(ps, err, "accept");
   } else {
     lock(&pc->context.mutex);
     configure_socket(newfd);
@@ -1358,16 +1433,18 @@ void pn_proactor_free(pn_proactor_t *p) {
   close(p->epollfd);
   close(p->eventfd);
   ptimer_finalize(&p->timer);
-  while (p->psockets) {
-    psocket_t *ps = p->psockets;
-    p->psockets = ps->next;
-    pconnection_t *pc = as_pconnection(ps);
-    if (pc) {
-      pconnection_forced_shutdown(pc);
-    } else {
-      pn_listener_t *l = as_listener(ps);
-      if (l)
-        listener_forced_shutdown(l);
+  while (p->contexts) {
+    pcontext_t *ctx = p->contexts;
+    p->contexts = ctx->next;
+    switch (ctx->type) {
+     case PCONNECTION:
+      pconnection_forced_shutdown(pcontext_pconnection(ctx));
+      break;
+     case LISTENER:
+      listener_forced_shutdown(pcontext_listener(ctx));
+      break;
+     default:
+      break;
     }
   }
 
@@ -1380,7 +1457,7 @@ void pn_proactor_free(pn_proactor_t *p) {
 pn_proactor_t *pn_event_proactor(pn_event_t *e) {
   if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
   pn_listener_t *l = pn_event_listener(e);
-  if (l) return l->psocket.proactor;
+  if (l) return l->psockets[0].proactor;
   pn_connection_t *c = pn_event_connection(e);
   if (c) return pn_connection_proactor(pn_event_connection(e));
   return NULL;
@@ -1462,29 +1539,28 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout) {
   return NULL;
 }
 
-static void proactor_add(psocket_t *ps) {
-  pn_proactor_t *p = ps->proactor;
+static void proactor_add(pcontext_t *ctx) {
+  pn_proactor_t *p = ctx->proactor;
   lock(&p->context.mutex);
-  if (p->psockets) {
-    p->psockets->prev = ps;
-    ps->next = p->psockets;
-    p->psockets = ps;
+  if (p->contexts) {
+    p->contexts->prev = ctx;
+    ctx->next = p->contexts;
   }
-  else p->psockets = ps;
+  p->contexts = ctx;
   unlock(&p->context.mutex);
 }
 
 // call with psocket's mutex held
 // return true if safe for caller to free psocket
-static bool proactor_remove(psocket_t *ps) {
-  pn_proactor_t *p = ps->proactor;
+static bool proactor_remove(pcontext_t *ctx) {
+  pn_proactor_t *p = ctx->proactor;
   lock(&p->context.mutex);
   bool notify = false;
   bool can_free = true;
-  if (ps->disconnecting) {
-    // No longer on psockets list
-    if (--ps->disconnect_ops == 0) {
-      if (--p->disconnects_pending == 0 && !p->psockets) {
+  if (ctx->disconnecting) {
+    // No longer on contexts list
+    if (--ctx->disconnect_ops == 0) {
+      if (--p->disconnects_pending == 0 && !p->contexts) {
         p->inactive = true;
         notify = wake(&p->context);
       }
@@ -1494,18 +1570,18 @@ static bool proactor_remove(psocket_t *ps) {
   }
   else {
     // normal case
-    if (ps->prev)
-      ps->prev->next = ps->next;
+    if (ctx->prev)
+      ctx->prev->next = ctx->next;
     else {
-      p->psockets = ps->next;
-      ps->next = NULL;
-      if (p->psockets)
-        p->psockets->prev = NULL;
+      p->contexts = ctx->next;
+      ctx->next = NULL;
+      if (p->contexts)
+        p->contexts->prev = NULL;
     }
-    if (ps->next)
-      ps->next->prev = ps->prev;
+    if (ctx->next)
+      ctx->next->prev = ctx->prev;
 
-    if (!p->psockets && !p->disconnects_pending && !p->shutting_down) {
+    if (!p->contexts && !p->disconnects_pending && !p->shutting_down) {
       p->inactive = true;
       notify = wake(&p->context);
     }
@@ -1524,7 +1600,7 @@ static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p) {
     case PCONNECTION:
       return pconnection_process((pconnection_t *) ctx->owner, 0, false, false);
     case LISTENER:
-      return listener_process((pn_listener_t *) ctx->owner, 0);
+     return listener_process(&((pn_listener_t *) ctx->owner)->psockets[0], 0);
     default:
       assert(ctx->type == WAKEABLE); // TODO: implement or remove
     }
@@ -1562,7 +1638,7 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_blo
     } else if (ee->type == PROACTOR_TIMER) {
       batch = proactor_process(p, true);
     } else {
-      pconnection_t *pc = as_pconnection(ee->psocket);
+      pconnection_t *pc = psocket_pconnection(ee->psocket);
       if (pc) {
         if (ee->type == PCONNECTION_IO) {
           batch = pconnection_process(pc, ev.events, false, false);
@@ -1572,9 +1648,8 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_blo
         }
       }
       else {
-        pn_listener_t *l = as_listener(ee->psocket);
         // TODO: can any of the listener processing be parallelized like IOCP?
-        batch = listener_process(l, ev.events);
+        batch = listener_process(ee->psocket, ev.events);
       }
     }
 
@@ -1657,76 +1732,73 @@ pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
 
 void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
   lock(&p->context.mutex);
-  // Move the whole psockets list into a disconnecting state
-  psocket_t *disconnecting_psockets = p->psockets;
-  p->psockets = NULL;
-  // First pass: mark each psocket as disconnecting and update global pending count.
-  psocket_t *ps = disconnecting_psockets;
-  while (ps) {
-    ps->disconnecting = true;
-    ps->disconnect_ops = 2;   // Second pass below and proactor_remove(), in any order.
+  // Move the whole contexts list into a disconnecting state
+  pcontext_t *disconnecting_pcontexts = p->contexts;
+  p->contexts = NULL;
+  // First pass: mark each pcontext as disconnecting and update global pending count.
+  pcontext_t *ctx = disconnecting_pcontexts;
+  while (ctx) {
+    ctx->disconnecting = true;
+    ctx->disconnect_ops = 2;   // Second pass below and proactor_remove(), in any order.
     p->disconnects_pending++;
-    ps = ps->next;
+    ctx = ctx->next;
   }
   unlock(&p->context.mutex);
-  if (!disconnecting_psockets)
+  if (!disconnecting_pcontexts)
     return;
 
-  // Second pass: different locking, close the psockets, free them if !disconnect_ops
+  // Second pass: different locking, close the pcontexts, free them if !disconnect_ops
   bool notify = false;
-  for (ps = disconnecting_psockets; ps; ps = ps->next) {
+  for (ctx = disconnecting_pcontexts; ctx; ctx = ctx ? ctx->next : NULL) {
     bool do_free = false;
-    pcontext_t *psocket_context = NULL;
-    pmutex *ps_mutex = NULL;
-    pconnection_t *pc = as_pconnection(ps);
+    pmutex *ctx_mutex = NULL;
+    pconnection_t *pc = pcontext_pconnection(ctx);
     if (pc) {
-      ps_mutex = &pc->context.mutex;
-      lock(ps_mutex);
-      if (!ps->closing) {
+      ctx_mutex = &pc->context.mutex;
+      lock(ctx_mutex);
+      if (!ctx->closing) {
         if (cond) {
           pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
         }
         pn_connection_driver_close(&pc->driver);
         pc->read_closed = true;
-        psocket_context = &pc->context;
       }
     } else {
-      pn_listener_t *l = as_listener(ps);
+      pn_listener_t *l = pcontext_listener(ctx);
       assert(l);
-      ps_mutex = &l->context.mutex;
-      lock(ps_mutex);
-      if (!ps->closing) {
+      ctx_mutex = &l->context.mutex;
+      lock(ctx_mutex);
+      if (!ctx->closing) {
         if (cond) {
           pn_condition_copy(pn_listener_condition(l), cond);
         }
         listener_begin_close(l);
-        psocket_context = &l->context;
       }
     }
 
     lock(&p->context.mutex);
-    if (--ps->disconnect_ops == 0) {
+    if (--ctx->disconnect_ops == 0) {
       do_free = true;
-      psocket_context = NULL;
-      if (--p->disconnects_pending == 0 && !p->psockets) {
+      ctx = NULL;
+      if (--p->disconnects_pending == 0 && !p->contexts) {
         p->inactive = true;
         notify = wake(&p->context);
       }
     } else {
-      // If initiating the close, wake the psocket to do the free.
-      if (psocket_context)
-        if (!wake(psocket_context))
-          psocket_context = NULL;  // Wake already pending.
+      // If initiating the close, wake the pcontext to do the free.
+      if (ctx)
+        if (!wake(ctx))
+          ctx = NULL;  // Wake already pending.
     }
     unlock(&p->context.mutex);
-    unlock(ps_mutex);
+    unlock(ctx_mutex);
 
     if (do_free) {
       if (pc) pconnection_final_free(pc);
-      else listener_final_free(as_listener(ps));
+      else listener_final_free(pcontext_listener(ctx));
     } else {
-      if (psocket_context)
-        wake_notify(psocket_context);
+      if (ctx)
+        wake_notify(ctx);
     }
   }
   if (notify)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org