You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2020/08/12 15:34:14 UTC

[qpid-proton] 04/05: PROTON-2247: Epoll implementation of raw connection API

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 7dc9240bd52a74225bd1c7a24a84ba37a08b30e4
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Fri Jun 19 14:00:16 2020 -0400

    PROTON-2247: Epoll implementation of raw connection API
---
 c/CMakeLists.txt                         |   2 +-
 c/src/proactor/epoll-internal.h          |  84 ++++++-
 c/src/proactor/epoll.c                   | 141 +++++-------
 c/src/proactor/epoll_raw_connection.c    | 376 +++++++++++++++++++++++++++++++
 c/src/proactor/raw_connection-internal.h |   1 +
 c/src/proactor/raw_connection.c          |   7 +-
 6 files changed, 523 insertions(+), 88 deletions(-)

diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt
index 6b2c167..9e224de 100644
--- a/c/CMakeLists.txt
+++ b/c/CMakeLists.txt
@@ -340,7 +340,7 @@ if (PROACTOR STREQUAL "epoll" OR (NOT PROACTOR AND NOT BUILD_PROACTOR))
   check_symbol_exists(epoll_wait "sys/epoll.h" HAVE_EPOLL)
   if (HAVE_EPOLL)
     set (PROACTOR_OK epoll)
-    set (qpid-proton-proactor src/proactor/epoll.c ${qpid-proton-proactor-common})
+    set (qpid-proton-proactor src/proactor/epoll.c src/proactor/epoll_raw_connection.c ${qpid-proton-proactor-common})
     set (PROACTOR_LIBS Threads::Threads ${TIME_LIB})
   endif()
 endif()
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index fd02817..78cad14 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -22,8 +22,18 @@
  *
  */
 
+/* Enable POSIX features beyond c99 for modern pthread and standard strerror_r() */
+#ifndef _POSIX_C_SOURCE
+#define _POSIX_C_SOURCE 200809L
+#endif
+/* Avoid GNU extensions, in particular the incompatible alternative strerror_r() */
+#undef _GNU_SOURCE
+
 #include <stdbool.h>
 #include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+
 #include <pthread.h>
 
 #include <netdb.h>
@@ -50,7 +60,8 @@ typedef enum {
   PCONNECTION_IO,
   PCONNECTION_TIMER,
   LISTENER_IO,
-  PROACTOR_TIMER
+  PROACTOR_TIMER,
+  RAW_CONNECTION_IO
 } epoll_type_t;
 
 // Data to use with epoll.
@@ -73,7 +84,8 @@ typedef struct ptimer_t {
 typedef enum {
   PROACTOR,
   PCONNECTION,
-  LISTENER
+  LISTENER,
+  RAW_CONNECTION
 } pcontext_type_t;
 
 typedef struct pcontext_t {
@@ -82,7 +94,7 @@ typedef struct pcontext_t {
   pcontext_type_t type;
   bool working;
   bool on_wake_list;
-  bool wake_pending;             // unprocessed eventfd wake callback (convert to bool?)
+  bool wake_pending;             // unprocessed eventfd wake callback
   struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
   bool closing;
   // Next 4 are protected by the proactor mutex
@@ -209,7 +221,7 @@ typedef struct pconnection_t {
   ptimer_t timer;  // TODO: review one timerfd per connection
   const char *host, *port;
   uint32_t new_events;
-  int wake_count;
+  int wake_count; // TODO: protected by context.mutex so should be moved in there (also really bool)
   bool server;                /* accept, not connect */
   bool tick_pending;
   bool timer_armed;
@@ -279,6 +291,70 @@ struct pn_listener_t {
   uint32_t sched_io_events;
 };
 
+typedef char strerrorbuf[1024];      /* used for pstrerror message buffer */
+void pstrerror(int err, strerrorbuf msg);
+
+// In general all locks to be held singly and shortly (possibly as spin locks).
+// See above about lock ordering.
+
+static inline void pmutex_init(pthread_mutex_t *pm){
+  pthread_mutexattr_t attr;
+
+  pthread_mutexattr_init(&attr);
+  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP);
+  if (pthread_mutex_init(pm, &attr)) {
+    perror("pthread failure");
+    abort();
+  }
+}
+
+static inline void pmutex_finalize(pthread_mutex_t *m) { pthread_mutex_destroy(m); }
+static inline void lock(pmutex *m) { pthread_mutex_lock(m); }
+static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }
+
+static inline bool pconnection_has_event(pconnection_t *pc) {
+  return pn_connection_driver_has_event(&pc->driver);
+}
+
+static inline bool listener_has_event(pn_listener_t *l) {
+  return pn_collector_peek(l->collector) || (l->pending_count);
+}
+
+static inline bool proactor_has_event(pn_proactor_t *p) {
+  return pn_collector_peek(p->collector);
+}
+
+bool wake_if_inactive(pn_proactor_t *p);
+int pclosefd(pn_proactor_t *p, int fd);
+
+void proactor_add(pcontext_t *ctx);
+bool proactor_remove(pcontext_t *ctx);
+
+bool unassign_thread(tslot_t *ts, tslot_state new_state);
+
+void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p);
+bool wake(pcontext_t *ctx);
+void wake_notify(pcontext_t *ctx);
+void wake_done(pcontext_t *ctx);
+
+void psocket_init(psocket_t* ps, pn_proactor_t* p, epoll_type_t type);
+bool start_polling(epoll_extended_t *ee, int epollfd);
+void stop_polling(epoll_extended_t *ee, int epollfd);
+void rearm_polling(epoll_extended_t *ee, int epollfd);
+
+int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res);
+void configure_socket(int sock);
+
+accepted_t *listener_accepted_next(pn_listener_t *listener);
+
+pcontext_t *pni_psocket_raw_context(psocket_t *ps);
+pn_event_batch_t *pni_raw_connection_process(pcontext_t *c, bool sched_wake);
+
+typedef struct praw_connection_t praw_connection_t;
+pcontext_t *pni_raw_connection_context(praw_connection_t *rc);
+praw_connection_t *pni_batch_raw_connection(pn_event_batch_t* batch);
+void pni_raw_connection_done(praw_connection_t *rc);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 3b5f32e..1e693fe 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -106,10 +106,8 @@
 // could be several eventfds with random assignment of wakeables.
 
 
-typedef char strerrorbuf[1024];      /* used for pstrerror message buffer */
-
 /* Like strerror_r but provide a default message if strerror_r fails */
-static void pstrerror(int err, strerrorbuf msg) {
+void pstrerror(int err, strerrorbuf msg) {
   int e = strerror_r(err, msg, sizeof(strerrorbuf));
   if (e) snprintf(msg, sizeof(strerrorbuf), "unknown error %d", err);
 }
@@ -128,24 +126,6 @@ static void pstrerror(int err, strerrorbuf msg) {
 // First define a proactor mutex (pmutex) and timer mechanism (ptimer) to taste.
 // ========================================================================
 
-// In general all locks to be held singly and shortly (possibly as spin locks).
-// See above about lock ordering.
-
-static void pmutex_init(pthread_mutex_t *pm){
-  pthread_mutexattr_t attr;
-
-  pthread_mutexattr_init(&attr);
-  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP);
-  if (pthread_mutex_init(pm, &attr)) {
-    perror("pthread failure");
-    abort();
-  }
-}
-
-static void pmutex_finalize(pthread_mutex_t *m) { pthread_mutex_destroy(m); }
-static inline void lock(pmutex *m) { pthread_mutex_lock(m); }
-static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }
-
 /* epoll_ctl()/epoll_wait() do not form a memory barrier, so cached memory
    writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
    visible to epoll_wait() thread. This function creates a memory barrier,
@@ -286,7 +266,7 @@ const char *AMQP_PORT_NAME = "amqp";
 PN_STRUCT_CLASSDEF(pn_proactor)
 PN_STRUCT_CLASSDEF(pn_listener)
 
-static bool start_polling(epoll_extended_t *ee, int epollfd) {
+bool start_polling(epoll_extended_t *ee, int epollfd) {
   if (ee->polling)
     return false;
   ee->polling = true;
@@ -297,7 +277,7 @@ static bool start_polling(epoll_extended_t *ee, int epollfd) {
   return (epoll_ctl(epollfd, EPOLL_CTL_ADD, ee->fd, &ev) == 0);
 }
 
-static void stop_polling(epoll_extended_t *ee, int epollfd) {
+void stop_polling(epoll_extended_t *ee, int epollfd) {
   // TODO: check for error, return bool or just log?
   // TODO: is EPOLL_CTL_DEL ever needed beyond auto de-register when ee->fd is closed?
   if (ee->fd == -1 || !ee->polling || epollfd == -1)
@@ -312,6 +292,19 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) {
   ee->polling = false;
 }
 
+void rearm_polling(epoll_extended_t *ee, int epollfd) {
+  struct epoll_event ev = {0};
+  ev.data.ptr = ee;
+  ev.events = ee->wanted | EPOLLONESHOT;
+  memory_barrier(ee);
+  if (epoll_ctl(epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
+    EPOLL_FATAL("arming polled file descriptor", errno);
+}
+
+static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
+  rearm_polling(ee, p->epollfd);
+}
+
 /*
  * The proactor maintains a number of serialization contexts: each
  * connection, each listener, the proactor itself.  The serialization
@@ -339,7 +332,7 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) {
 // Fake thread for temporarily disabling the scheduling of a context.
 static struct tslot_t *REWAKE_PLACEHOLDER = (struct tslot_t*) -1;
 
-static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p) {
+void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p) {
   memset(ctx, 0, sizeof(*ctx));
   pmutex_init(&ctx->mutex);
   ctx->proactor = p;
@@ -350,8 +343,6 @@ static void pcontext_finalize(pcontext_t* ctx) {
   pmutex_finalize(&ctx->mutex);
 }
 
-static void rearm(pn_proactor_t *p, epoll_extended_t *ee);
-
 /*
  * Wake strategy with eventfd.
  *  - wakees can be in the list only once
@@ -411,7 +402,9 @@ static void pop_wake(pcontext_t *ctx) {
 }
 
 // part1: call with ctx->owner lock held, return true if notify required by caller
-static bool wake(pcontext_t *ctx) {
+// Note that this will return false if either there is a pending wake OR if we are already
+// in the connection context that is to be woken (as we don't have to wake it up)
+bool wake(pcontext_t *ctx) {
   bool notify = false;
 
   if (!ctx->wake_pending) {
@@ -440,7 +433,7 @@ static bool wake(pcontext_t *ctx) {
 }
 
 // part2: make OS call without lock held
-static inline void wake_notify(pcontext_t *ctx) {
+void wake_notify(pcontext_t *ctx) {
   pn_proactor_t *p = ctx->proactor;
   if (p->eventfd == -1)
     return;
@@ -448,7 +441,7 @@ static inline void wake_notify(pcontext_t *ctx) {
 }
 
 // call with owner lock held, once for each pop from the wake list
-static inline void wake_done(pcontext_t *ctx) {
+void wake_done(pcontext_t *ctx) {
 //  assert(ctx->wake_pending > 0);
   ctx->wake_pending = false;
 }
@@ -572,7 +565,7 @@ static bool rewake(pcontext_t *ctx) {
 }
 
 // Call with sched lock
-static bool unassign_thread(tslot_t *ts, tslot_state new_state) {
+bool unassign_thread(tslot_t *ts, tslot_state new_state) {
   pcontext_t *ctx = ts->context;
   bool notify = false;
   bool deleting = (ts->state == DELETING);
@@ -660,7 +653,7 @@ static void make_runnable(pcontext_t *ctx) {
 
 
 
-static void psocket_init(psocket_t* ps, pn_proactor_t* p, epoll_type_t type)
+void psocket_init(psocket_t* ps, pn_proactor_t* p, epoll_type_t type)
 {
   ps->epoll_io.fd = -1;
   ps->epoll_io.type = type;
@@ -698,8 +691,6 @@ static void set_pconnection(pn_connection_t* c, pconnection_t *pc) {
 static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool wake, bool topup);
 static void write_flush(pconnection_t *pc);
 static void listener_begin_close(pn_listener_t* l);
-static void proactor_add(pcontext_t *ctx);
-static bool proactor_remove(pcontext_t *ctx);
 static void poller_done(struct pn_proactor_t* p, tslot_t *ts);
 
 static inline pconnection_t *psocket_pconnection(psocket_t* ps) {
@@ -741,18 +732,6 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
     containerof(batch, pconnection_t, batch) : NULL;
 }
 
-static inline bool pconnection_has_event(pconnection_t *pc) {
-  return pn_connection_driver_has_event(&pc->driver);
-}
-
-static inline bool listener_has_event(pn_listener_t *l) {
-  return pn_collector_peek(l->collector) || (l->pending_count);
-}
-
-static inline bool proactor_has_event(pn_proactor_t *p) {
-  return pn_collector_peek(p->collector);
-}
-
 static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) {
   pconnection_t *pc = psocket_pconnection(ps);
   if (pc) {
@@ -780,15 +759,6 @@ static void psocket_gai_error(psocket_t *ps, int gai_err, const char* what) {
   psocket_error_str(ps, gai_strerror(gai_err), what);
 }
 
-static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
-  struct epoll_event ev = {0};
-  ev.data.ptr = ee;
-  ev.events = ee->wanted | EPOLLONESHOT;
-  memory_barrier(ee);
-  if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
-    EPOLL_FATAL("arming polled file descriptor", errno);
-}
-
 static void listener_accepted_append(pn_listener_t *listener, accepted_t item) {
   if (listener->pending_first+listener->pending_count >= listener->backlog) return;
 
@@ -796,7 +766,7 @@ static void listener_accepted_append(pn_listener_t *listener, accepted_t item) {
   listener->pending_count++;
 }
 
-static accepted_t *listener_accepted_next(pn_listener_t *listener) {
+accepted_t *listener_accepted_next(pn_listener_t *listener) {
   if (!listener->pending_count) return NULL;
 
   listener->pending_count--;
@@ -861,7 +831,7 @@ static void proactor_rearm_overflow(pn_proactor_t *p) {
 }
 
 // Close an FD and rearm overflow listeners.  Call with no listener locks held.
-static int pclosefd(pn_proactor_t *p, int fd) {
+int pclosefd(pn_proactor_t *p, int fd) {
   int err = close(fd);
   if (!err) proactor_rearm_overflow(p);
   return err;
@@ -1258,7 +1228,6 @@ static void pconnection_connected_lh(pconnection_t *pc);
 static void pconnection_maybe_connect_lh(pconnection_t *pc);
 
 static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool sched_wake, bool topup) {
-  bool inbound_wake = sched_wake;
   bool rearm_timer = false;
   bool timer_fired = false;
   bool waking = false;
@@ -1281,10 +1250,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
     pc->tick_pending = true;
     timer_fired = false;
   }
-  if (inbound_wake) {
-    wake_done(&pc->context);
-    inbound_wake = false;
-  }
+  if (sched_wake) wake_done(&pc->context);
 
   if (rearm_timer)
     pc->timer_armed = false;
@@ -1462,7 +1428,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
   return NULL;
 }
 
-static void configure_socket(int sock) {
+void configure_socket(int sock) {
   int flags = fcntl(sock, F_GETFL);
   flags |= O_NONBLOCK;
   (void)fcntl(sock, F_SETFL, flags); // TODO: check for error
@@ -1536,7 +1502,7 @@ static void pconnection_maybe_connect_lh(pconnection_t *pc) {
   pc->disconnected = true;
 }
 
-static int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
+int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
 {
   struct addrinfo hints = { 0 };
   hints.ai_family = AF_UNSPEC;
@@ -1550,7 +1516,7 @@ static inline bool is_inactive(pn_proactor_t *p) {
 }
 
 /* If inactive set need_inactive and return true if the proactor needs a wakeup */
-static bool wake_if_inactive(pn_proactor_t *p) {
+bool wake_if_inactive(pn_proactor_t *p) {
   if (is_inactive(p)) {
     p->need_inactive = true;
     return wake(&p->context);
@@ -2280,7 +2246,7 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout, bool i
   return NULL;
 }
 
-static void proactor_add(pcontext_t *ctx) {
+void proactor_add(pcontext_t *ctx) {
   pn_proactor_t *p = ctx->proactor;
   lock(&p->context.mutex);
   if (p->contexts) {
@@ -2294,7 +2260,7 @@ static void proactor_add(pcontext_t *ctx) {
 
 // call with psocket's mutex held
 // return true if safe for caller to free psocket
-static bool proactor_remove(pcontext_t *ctx) {
+bool proactor_remove(pcontext_t *ctx) {
   pn_proactor_t *p = ctx->proactor;
   // Disassociate this context from scheduler
   if (!p->shutting_down) {
@@ -2420,6 +2386,11 @@ static pn_event_batch_t *process(pcontext_t *ctx) {
     batch = listener_process(l, n_events, ctx_wake);
     break;
   }
+  case RAW_CONNECTION: {
+    unlock(&p->sched_mutex);
+    batch = pni_raw_connection_process(ctx, ctx_wake);
+    break;
+  }
   default:
     assert(NULL);
   }
@@ -2469,20 +2440,20 @@ static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
     ctx->sched_pending = true;
     break;
 
-  case PCONNECTION_IO: {
-    psocket_t *ps = containerof(ee, psocket_t, epoll_io);
-    pconnection_t *pc = psocket_pconnection(ps);
+  case PCONNECTION_TIMER: {
+    pconnection_t *pc = containerof(containerof(ee, ptimer_t, epoll_io), pconnection_t, timer);
     assert(pc);
     ctx = &pc->context;
-    ps->sched_io_events = evp->events;
+    pc->sched_timeout = true;;
     ctx->sched_pending = true;
     break;
   }
-  case PCONNECTION_TIMER: {
-    pconnection_t *pc = containerof(containerof(ee, ptimer_t, epoll_io), pconnection_t, timer);
+  case PCONNECTION_IO: {
+    psocket_t *ps = containerof(ee, psocket_t, epoll_io);
+    pconnection_t *pc = psocket_pconnection(ps);
     assert(pc);
     ctx = &pc->context;
-    pc->sched_timeout = true;;
+    ps->sched_io_events = evp->events;
     ctx->sched_pending = true;
     break;
   }
@@ -2495,6 +2466,13 @@ static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
     ctx->sched_pending = true;
     break;
   }
+  case RAW_CONNECTION_IO: {
+    psocket_t *ps = containerof(ee, psocket_t, epoll_io);
+    ctx = pni_psocket_raw_context(ps);
+    ps->sched_io_events = evp->events;
+    ctx->sched_pending = true;
+    break;
+  }
   }
   if (ctx && !ctx->runnable && !ctx->runner)
     return ctx;
@@ -2860,6 +2838,14 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
     check_earmark_override(p, ts);
     return;
   }
+  praw_connection_t *rc = pni_batch_raw_connection(batch);
+  if (rc) {
+    tslot_t *ts = pni_raw_connection_context(rc)->runner;
+    pni_raw_connection_done(rc);
+    // rc possibly freed/invalid
+    check_earmark_override(p, ts);
+    return;
+  }
   pn_proactor_t *bp = batch_proactor(batch);
   if (bp == p) {
     bool notify = false;
@@ -2962,6 +2948,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
     bool do_free = false;
     bool ctx_notify = false;
     pmutex *ctx_mutex = NULL;
+    // TODO: Need to extend this for raw connections too
     pconnection_t *pc = pcontext_pconnection(ctx);
     if (pc) {
       ctx_mutex = &pc->context.mutex;
@@ -3047,11 +3034,3 @@ int64_t pn_proactor_now_64(void) {
   clock_gettime(CLOCK_MONOTONIC, &t);
   return t.tv_sec * 1000 + t.tv_nsec / 1000000;
 }
-
-// Empty stubs for raw connection code
-pn_raw_connection_t *pn_raw_connection(void) { return NULL; }
-void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {}
-void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) {}
-void pn_raw_connection_wake(pn_raw_connection_t *conn) {}
-const struct pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *connection) { return NULL; }
-const struct pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *connection) { return NULL; }
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
new file mode 100644
index 0000000..2b9c4d4
--- /dev/null
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -0,0 +1,376 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* This is currently epoll implementation specific - and will need changing for the other proactors */
+
+#include "epoll-internal.h"
+#include "proactor-internal.h"
+#include "raw_connection-internal.h"
+
+#include <proton/proactor.h>
+#include <proton/listener.h>
+#include <proton/netaddr.h>
+#include <proton/raw_connection.h>
+
+#include <alloca.h>
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <sys/epoll.h>
+
+/* epoll specific raw connection struct */
+struct praw_connection_t {
+  pcontext_t context;
+  struct pn_raw_connection_t raw_connection;
+  psocket_t psocket;
+  struct pn_netaddr_t local, remote; /* Actual addresses */
+  pmutex rearm_mutex;                /* protects pconnection_rearm from out of order arming*/
+  pn_event_batch_t batch;
+  struct addrinfo *addrinfo;         /* Resolved address list */
+  struct addrinfo *ai;               /* Current connect address */
+  bool connected;
+  bool disconnected;
+  bool waking; // TODO: This is actually protected by context.mutex so should be moved into context (pconnection too)
+};
+
+static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
+  pn_condition_t *cond = rc->raw_connection.condition;
+  if (!pn_condition_is_set(cond)) { /* Preserve older error information */
+    strerrorbuf what;
+    pstrerror(err, what);
+    char addr[PN_MAX_ADDR];
+    pn_netaddr_str(&rc->remote, addr, sizeof(addr));
+    pn_condition_format(cond, PNI_IO_CONDITION, "%s - %s %s", what, msg, addr);
+  }
+}
+
+static void psocket_gai_error(praw_connection_t *rc, int gai_err, const char* what, const char *addr) {
+  pn_condition_format(rc->raw_connection.condition, PNI_IO_CONDITION, "%s - %s %s",
+                      gai_strerror(gai_err), what, addr);
+}
+
+static void praw_connection_connected_lh(praw_connection_t *prc) {
+  // Need to check socket for connection error
+  prc->connected = true;
+  if (prc->addrinfo) {
+    freeaddrinfo(prc->addrinfo);
+        prc->addrinfo = NULL;
+  }
+  prc->ai = NULL;
+  socklen_t len = sizeof(prc->remote.ss);
+  (void)getpeername(prc->psocket.epoll_io.fd, (struct sockaddr*)&prc->remote.ss, &len);
+
+  pni_raw_connected(&prc->raw_connection);
+}
+
+/* multi-address connections may call pconnection_start multiple times with diffferent FDs  */
+static void praw_connection_start(praw_connection_t *prc, int fd) {
+  int efd = prc->psocket.proactor->epollfd;
+
+  /* Get the local socket name now, get the peer name in pconnection_connected */
+  socklen_t len = sizeof(prc->local.ss);
+  (void)getsockname(fd, (struct sockaddr*)&prc->local.ss, &len);
+
+  epoll_extended_t *ee = &prc->psocket.epoll_io;
+  if (ee->polling) {     /* This is not the first attempt, stop polling and close the old FD */
+    int fd = ee->fd;     /* Save fd, it will be set to -1 by stop_polling */
+    stop_polling(ee, efd);
+    pclosefd(prc->psocket.proactor, fd);
+  }
+  ee->fd = fd;
+  ee->wanted = EPOLLIN | EPOLLOUT;
+  start_polling(ee, efd);  // TODO: check for error
+}
+
+/* Called on initial connect, and if connection fails to try another address */
+static void praw_connection_maybe_connect_lh(praw_connection_t *prc) {
+  while (prc->ai) {            /* Have an address */
+    struct addrinfo *ai = prc->ai;
+    prc->ai = prc->ai->ai_next; /* Move to next address in case this fails */
+    int fd = socket(ai->ai_family, SOCK_STREAM, 0);
+    if (fd >= 0) {
+      configure_socket(fd);
+      if (!connect(fd, ai->ai_addr, ai->ai_addrlen) || errno == EINPROGRESS) {
+
+        /* Until we finish connecting save away the address we're trying to connect to */
+        memcpy((struct sockaddr *) &prc->remote.ss, ai->ai_addr, ai->ai_addrlen);
+
+        praw_connection_start(prc, fd);
+        return;               /* Async connection started */
+      } else {
+        close(fd);
+      }
+    }
+    /* connect failed immediately, go round the loop to try the next addr */
+  }
+  int err;
+  socklen_t errlen = sizeof(err);
+  getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);
+  psocket_error(prc, err, "on connect");
+
+  freeaddrinfo(prc->addrinfo);
+  prc->addrinfo = NULL;
+  prc->disconnected = true;
+}
+
+//
+// Raw socket API
+//
+static pn_event_t * pni_raw_batch_next(pn_event_batch_t *batch);
+
+static void praw_connection_init(praw_connection_t *prc, pn_proactor_t *p, pn_raw_connection_t *rc) {
+  pcontext_init(&prc->context, RAW_CONNECTION, p);
+  psocket_init(&prc->psocket, p, RAW_CONNECTION_IO);
+
+  prc->connected = false;
+  prc->disconnected = false;
+  prc->waking = false;
+  prc->batch.next_event = pni_raw_batch_next;
+
+  pmutex_init(&prc->rearm_mutex);
+}
+
+static void praw_connection_cleanup(praw_connection_t *prc) {
+  int fd = prc->psocket.epoll_io.fd;
+  stop_polling(&prc->psocket.epoll_io, prc->psocket.proactor->epollfd);
+  if (fd != -1)
+    pclosefd(prc->psocket.proactor, fd);
+
+  lock(&prc->context.mutex);
+  bool can_free = proactor_remove(&prc->context);
+  unlock(&prc->context.mutex);
+  if (can_free) {
+    free(prc);
+  }
+  // else proactor_disconnect logic owns prc and its final free
+}
+
+pn_raw_connection_t *pn_raw_connection(void) {
+  praw_connection_t *conn = (praw_connection_t*) calloc(1, sizeof(praw_connection_t));
+  if (!conn) return NULL;
+
+  pni_raw_initialize(&conn->raw_connection);
+
+  return &conn->raw_connection;
+}
+
+void pn_raw_connection_free(pn_raw_connection_t *conn) {
+}
+
+void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {
+  assert(rc);
+  praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+  praw_connection_init(prc, p, rc);
+  // TODO: check case of proactor shutting down
+
+  lock(&prc->context.mutex);
+  proactor_add(&prc->context);
+
+  bool notify = false;
+  bool notify_proactor = false;
+
+  const char *host;
+  const char *port;
+  size_t addrlen = strlen(addr);
+  char *addr_buf = (char*) alloca(addrlen+1);
+  pni_parse_addr(addr, addr_buf, addrlen+1, &host, &port);
+
+  int gai_error = pgetaddrinfo(host, port, 0, &prc->addrinfo);
+  if (!gai_error) {
+    prc->ai = prc->addrinfo;
+    praw_connection_maybe_connect_lh(prc); /* Start connection attempts */
+    if (prc->disconnected) notify = wake(&prc->context);
+  } else {
+    psocket_gai_error(prc, gai_error, "connect to ", addr);
+    prc->disconnected = true;
+    notify = wake(&prc->context);
+    lock(&p->context.mutex);
+    notify_proactor = wake_if_inactive(p);
+    unlock(&p->context.mutex);
+  }
+
+  /* We need to issue INACTIVE on immediate failure */
+  unlock(&prc->context.mutex);
+  if (notify) wake_notify(&prc->context);
+  if (notify_proactor) wake_notify(&p->context);
+}
+
+void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) {
+  assert(rc);
+  praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+  praw_connection_init(prc, pn_listener_proactor(l), rc);
+  // TODO: fuller sanity check on input args
+
+  int err = 0;
+  int fd = -1;
+  bool notify = false;
+  lock(&l->context.mutex);
+  if (l->context.closing)
+    err = EBADF;
+  else {
+    accepted_t *a = listener_accepted_next(l);
+    if (a) {
+      fd = a->accepted_fd;
+      a->accepted_fd = -1;
+    }
+    else err = EWOULDBLOCK;
+  }
+
+  proactor_add(&prc->context);
+
+  lock(&prc->context.mutex);
+  if (fd >= 0) {
+    configure_socket(fd);
+    praw_connection_start(prc, fd);
+    praw_connection_connected_lh(prc);
+  } else {
+    psocket_error(prc, err, "pn_listener_accept");
+  }
+
+  if (!l->context.working && listener_has_event(l)) {
+    notify = wake(&l->context);
+  }
+  unlock(&prc->context.mutex);
+  unlock(&l->context.mutex);
+  if (notify) wake_notify(&l->context);
+}
+
+const pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *rc) {
+  praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+  if (!prc) return NULL;
+  return &prc->local;
+}
+
+const pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *rc) {
+  praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+  if (!prc) return NULL;
+  return &prc->remote;
+}
+
+void pn_raw_connection_wake(pn_raw_connection_t *rc) {
+  bool notify = false;
+  praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+  if (prc) {
+    lock(&prc->context.mutex);
+    if (!prc->context.closing) {
+      prc->waking = true;
+      notify = wake(&prc->context);
+    }
+    unlock(&prc->context.mutex);
+  }
+  if (notify) wake_notify(&prc->context);
+}
+
+static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) {
+  pn_raw_connection_t *raw = &containerof(batch, praw_connection_t, batch)->raw_connection;
+  return pni_raw_event_next(raw);
+}
+
+pcontext_t *pni_psocket_raw_context(psocket_t* ps) {
+  return &containerof(ps, praw_connection_t, psocket)->context;
+}
+
+praw_connection_t *pni_batch_raw_connection(pn_event_batch_t *batch) {
+  return (batch->next_event == pni_raw_batch_next) ?
+    containerof(batch, praw_connection_t, batch) : NULL;
+}
+
+pcontext_t *pni_raw_connection_context(praw_connection_t *rc) {
+  return &rc->context;
+}
+
+static long snd(int fd, const void* b, size_t s) {
+  return send(fd, b, s, MSG_NOSIGNAL | MSG_DONTWAIT);
+}
+
+static long rcv(int fd, void* b, size_t s) {
+  return recv(fd, b, s, MSG_DONTWAIT);
+}
+
+static void  set_error(pn_raw_connection_t *conn, const char *msg, int err) {
+  psocket_error(containerof(conn, praw_connection_t, raw_connection), err, msg);
+}
+
+pn_event_batch_t *pni_raw_connection_process(pcontext_t *c, bool sched_wake) {
+  praw_connection_t *rc = containerof(c, praw_connection_t, context);
+  int events = rc->psocket.sched_io_events;
+  int fd = rc->psocket.epoll_io.fd;
+  if (!rc->connected) {
+    if (events & (EPOLLHUP | EPOLLERR)) {
+      praw_connection_maybe_connect_lh(rc);
+    }
+    if (rc->disconnected) {
+      pni_raw_disconnect(&rc->raw_connection);
+      return &rc->batch;
+    }
+    if (events & (EPOLLHUP | EPOLLERR)) {
+      return NULL;
+    }
+    praw_connection_connected_lh(rc);
+  }
+
+  bool wake = false;
+  lock(&c->mutex);
+  c->working = true;
+  if (sched_wake) wake_done(c);
+  wake = sched_wake || rc->waking;
+  rc->waking = false;
+  unlock(&c->mutex);
+
+  if (wake) pni_raw_wake(&rc->raw_connection);
+  if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
+  if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error);
+  return &rc->batch;
+}
+
+void pni_raw_connection_done(praw_connection_t *rc) {
+  bool self_notify = false;
+  lock(&rc->context.mutex);
+  pn_proactor_t *p = rc->context.proactor;
+  tslot_t *ts = rc->context.runner;
+  rc->context.working = false;
+  self_notify = rc->waking && wake(&rc->context);
+  unlock(&rc->context.mutex);
+  if (self_notify) wake_notify(&rc->context);
+
+  pn_raw_connection_t *raw = &rc->raw_connection;
+  int wanted =
+    (pni_raw_can_read(raw)  ? EPOLLIN : 0) |
+    (pni_raw_can_write(raw) ? EPOLLOUT : 0);
+  if (wanted) {
+    rc->psocket.epoll_io.wanted = wanted;
+    rearm_polling(&rc->psocket.epoll_io, p->epollfd);  // TODO: check for error
+  } else {
+    bool finished_disconnect = raw->rclosed && raw->wclosed && !raw->disconnectpending;
+    if (finished_disconnect) {
+      // If we're closed and we've sent the disconnect then close
+      pni_raw_finalize(raw);
+      praw_connection_cleanup(rc);
+    }
+  }
+
+  lock(&p->sched_mutex);
+  bool notify = unassign_thread(ts, UNUSED);
+  unlock(&p->sched_mutex);
+  if (notify) wake_notify(&p->context);
+}
diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h
index 79633e4..02c3af2 100644
--- a/c/src/proactor/raw_connection-internal.h
+++ b/c/src/proactor/raw_connection-internal.h
@@ -83,6 +83,7 @@ struct pn_raw_connection_t {
   bool rdrainpending;
   bool wdrainpending;
   bool disconnectpending;
+  bool wakepending;
 };
 
 /*
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
index 3e4a74c..a225df9 100644
--- a/c/src/proactor/raw_connection.c
+++ b/c/src/proactor/raw_connection.c
@@ -56,7 +56,6 @@ void pni_raw_initialize(pn_raw_connection_t *conn) {
     conn->wbuffers[i-1].type = buff_wempty;
   }
 
-  //conn->batch.next_event = pni_raw_batch_next;
   conn->condition = pn_condition();
   conn->collector = pn_collector();
   conn->attachments = pn_record();
@@ -323,7 +322,7 @@ void pni_raw_connected(pn_raw_connection_t *conn) {
 }
 
 void pni_raw_wake(pn_raw_connection_t *conn) {
-  pni_raw_put_event(conn, PN_RAW_CONNECTION_WAKE);
+  conn->wakepending = true;
 }
 
 void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) {
@@ -482,6 +481,10 @@ pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
       switch (type) {
         default: break;
       }
+    } else if (conn->wakepending) {
+      pni_raw_put_event(conn, PN_RAW_CONNECTION_WAKE);
+      conn->wakepending = false;
+      continue;
     } else if (conn->rpending) {
       pni_raw_put_event(conn, PN_RAW_CONNECTION_READ);
       conn->rpending = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org