You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/04/07 21:23:50 UTC
qpid-proton git commit: PROTON-1460: epoll proactor,
initial implementation, can run some C proactor examples
Repository: qpid-proton
Updated Branches:
refs/heads/master 6aefc4d23 -> 2d3a9de8f
PROTON-1460: epoll proactor, initial implementation, can run some C proactor examples
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2d3a9de8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2d3a9de8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2d3a9de8
Branch: refs/heads/master
Commit: 2d3a9de8f36c684a0806242e901417f40c5b2846
Parents: 6aefc4d
Author: Clifford Jansen <cl...@apache.org>
Authored: Fri Apr 7 14:20:21 2017 -0700
Committer: Clifford Jansen <cl...@apache.org>
Committed: Fri Apr 7 14:21:49 2017 -0700
----------------------------------------------------------------------
proton-c/CMakeLists.txt | 27 +-
proton-c/src/proactor/epoll.c | 1628 ++++++++++++++++++++++++++++++++++++
2 files changed, 1646 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3a9de8/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index 16da920..086c5a2 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -221,6 +221,7 @@ option(ENABLE_UNDEFINED_ERROR "Check for unresolved library symbols" ${DEFAULT_U
option(ENABLE_LINKTIME_OPTIMIZATION "Perform link time optimization" ${DEFAULT_LINKTIME_OPTIMIZATION})
option(ENABLE_HIDE_UNEXPORTED_SYMBOLS "Only export library symbols that are explicitly requested" ${DEFAULT_HIDE_UNEXPORTED_SYMBOLS})
+option(ENABLE_EPOLL "Use the Linux epoll proactor implementation" OFF)
# Set any additional compiler specific flags
if (CMAKE_COMPILER_IS_GNUCC)
if (ENABLE_WARNING_ERROR)
@@ -322,6 +323,7 @@ set (qpid-proton-platform-all
src/reactor/io/posix/io.c
src/reactor/io/posix/selector.c
src/proactor/libuv.c
+ src/proactor/epoll.c
)
# platform specific library build:
@@ -473,16 +475,23 @@ set (qpid-proton-include-extra
)
# Select proactor sources and build flags
-find_package(Libuv)
-if (Libuv_FOUND)
- set (qpid-proton-proactor src/proactor/libuv.c)
- set (PROACTOR_LIBS ${Libuv_LIBRARIES})
+if (ENABLE_EPOLL)
+ set (qpid-proton-proactor src/proactor/epoll.c)
+ set (PROACTOR_LIBS "")
set_source_files_properties (${qpid-proton-proactor} PROPERTIES
- # Skip COMPILE_LANGUAGE_FLAGS, libuv.h won't compile with --std=c99
- COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${LTO} "
- )
-endif()
-
+ COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_LANGUAGE_FLAGS} ${LTO}"
+ )
+else (ENABLE_EPOLL)
+ find_package(Libuv)
+ if (Libuv_FOUND)
+ set (qpid-proton-proactor src/proactor/libuv.c)
+ set (PROACTOR_LIBS ${Libuv_LIBRARIES})
+ set_source_files_properties (${qpid-proton-proactor} PROPERTIES
+ # Skip COMPILE_LANGUAGE_FLAGS, libuv.h won't compile with --std=c99
+ COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${LTO} "
+ )
+ endif(Libuv_FOUND)
+endif (ENABLE_EPOLL)
# note: process bindings after the source lists have been defined so
# the bindings can reference them
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3a9de8/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
new file mode 100644
index 0000000..511b95d
--- /dev/null
+++ b/proton-c/src/proactor/epoll.c
@@ -0,0 +1,1628 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "../core/log_private.h"
+#include "../core/url-internal.h"
+#include <proton/condition.h>
+#include <proton/connection_driver.h>
+#include <proton/engine.h>
+#include <proton/object.h>
+#include <proton/proactor.h>
+#include <proton/transport.h>
+#include <proton/listener.h>
+
+#include <assert.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <errno.h>
+#include <pthread.h>
+#include <sys/timerfd.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <fcntl.h>
+#include <netinet/tcp.h>
+#include <sys/eventfd.h>
+#include <limits.h>
+
+// TODO: replace timerfd per connection with global lightweight timer mechanism.
+// logging in general, listener events in particular
+// SIGPIPE?
+// Can some of the mutexes be spinlocks (any benefit over adaptive pthread mutex)?
+// Maybe futex is even better?
+// See other "TODO" in code.
+//
+// Consider case of large number of wakes: proactor_do_epoll() could start by
+// looking for pending wakes before a kernel call to epoll_wait(), or there
+// could be several eventfds with random assignment of wakeables.
+
+
+// ========================================================================
+// First define a proactor mutex (pmutex) and timer mechanism (ptimer) to taste.
+// ========================================================================
+
+typedef pthread_mutex_t pmutex;
+static void pmutex_init(pthread_mutex_t *pm){
+ pthread_mutexattr_t attr;
+
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP);
+ if (pthread_mutex_init(pm, &attr)) {
+ perror("pthread failure");
+ abort();
+ }
+}
+
+static void pmutex_finalize(pthread_mutex_t *m) { pthread_mutex_destroy(m); }
+static inline void lock(pmutex *m) { pthread_mutex_lock(m); }
+static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }
+
+typedef struct psocket_t psocket_t;
+typedef enum {
+ WAKE, /* see if any work to do in proactor/psocket context */
+ PCONNECTION_IO,
+ PCONNECTION_TIMER,
+ LISTENER_IO,
+ PROACTOR_TIMER } epoll_type_t;
+
+// Context to use with epoll.
+typedef struct epoll_extended_t {
+ psocket_t *psocket; // pconnection, listener, or NULL -> proactor
+ int fd;
+ epoll_type_t type; // io/timer/wakeup
+ uint32_t wanted; // events to poll for
+} epoll_extended_t;
+
+/*
+ * This timerfd logic assumes EPOLLONESHOT and there never being two
+ * active timeout callbacks. There can be multiple unclaimed expiries
+ * processed in a single callback.
+ */
+
+typedef struct ptimer_t {
+ pmutex mutex;
+ int timerfd;
+ epoll_extended_t epoll_io;
+ int pending_count;
+ int skip_count;
+} ptimer_t;
+
+static bool ptimer_init(ptimer_t *pt, psocket_t *ps) {
+ pt->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
+ if (pt->timerfd < 0) return false;
+ pmutex_init(&pt->mutex);
+ pt->pending_count = 0;
+ pt->skip_count = 0;
+ epoll_type_t type = ps ? PCONNECTION_TIMER : PROACTOR_TIMER;
+ pt->epoll_io.psocket = ps;
+ pt->epoll_io.fd = pt->timerfd;
+ pt->epoll_io.type = type;
+ pt->epoll_io.wanted = EPOLLIN;
+ return true;
+}
+
+static void ptimer_set(ptimer_t *pt, uint64_t t_millis) {
+ // t_millis == 0 -> cancel
+ lock(&pt->mutex);
+ if (t_millis == 0 && pt->pending_count == 0) {
+ unlock(&pt->mutex);
+ return; // nothing to cancel
+ }
+ struct itimerspec newt, oldt;
+ memset(&newt, 0, sizeof(newt));
+ newt.it_value.tv_sec = t_millis / 1000;
+ newt.it_value.tv_nsec = (t_millis % 1000) * 1000000;
+
+ timerfd_settime(pt->timerfd, 0, &newt, &oldt);
+ if (oldt.it_value.tv_sec || oldt.it_value.tv_nsec) {
+ // old value cancelled
+ assert (pt->pending_count > 0);
+ pt->pending_count--;
+ } else if (pt->pending_count) {
+ // cancel instance waiting on this lock
+ pt->skip_count++;
+ }
+ if (t_millis)
+ pt->pending_count++;
+ assert(pt->pending_count >= 0);
+ unlock(&pt->mutex);
+}
+
+// Callback bookkeeping. Return number of uncancelled expiry events.
+static int ptimer_callback(ptimer_t *pt) {
+ lock(&pt->mutex);
+ uint64_t u_exp_count;
+ ssize_t l = read(pt->timerfd, &u_exp_count, sizeof(uint64_t));
+ assert(l == sizeof(uint64_t));
+ assert(u_exp_count < INT_MAX); // or test and log it?
+ int exp_count = (int) u_exp_count;
+ assert(exp_count >= pt->skip_count);
+ assert(exp_count <= pt->pending_count);
+ exp_count -= pt->skip_count;
+ pt->skip_count = 0;
+ pt->pending_count -= exp_count;
+ unlock(&pt->mutex);
+ return (int) exp_count;
+}
+
+static void ptimer_finalize(ptimer_t *pt) {
+ if (pt->timerfd >= 0) close(pt->timerfd);
+ pmutex_finalize(&pt->mutex);
+}
+
+pn_timestamp_t pn_i_now2(void)
+{
+ struct timespec now;
+ clock_gettime(CLOCK_REALTIME, &now);
+ return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000);
+}
+
+// ========================================================================
+// Proactor common code
+// ========================================================================
+
+const char *COND_NAME = "proactor";
+const char *AMQP_PORT = "5672";
+const char *AMQP_PORT_NAME = "amqp";
+const char *AMQPS_PORT = "5671";
+const char *AMQPS_PORT_NAME = "amqps";
+
+PN_HANDLE(PN_PROACTOR)
+
+// The number of times a connection event batch may be replenished for
+// a thread between calls to wait().
+// TODO: consider some instrumentation to determine an optimal number
+// or perhaps switch to cpu time based limit.
+#define HOG_MAX 3
+
+/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
+ Class definitions are for identification as pn_event_t context only.
+*/
+PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
+PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
+
+static bool start_polling(epoll_extended_t *ee, int epollfd) {
+ struct epoll_event ev;
+ ev.data.ptr = ee;
+ ev.events = ee->wanted | EPOLLONESHOT;
+ return (epoll_ctl(epollfd, EPOLL_CTL_ADD, ee->fd, &ev) == 0);
+}
+
+static void stop_polling(epoll_extended_t *ee, int epollfd) {
+ // TODO: check for error, return bool or just log?
+ if (ee->fd == -1)
+ return;
+ struct epoll_event ev;
+ ev.data.ptr = ee;
+ ev.events = 0;
+ epoll_ctl(epollfd, EPOLL_CTL_DEL, ee->fd, &ev); // TODO: check for error
+ ee->fd = -1;
+}
+
+static inline bool polling(epoll_extended_t *ee) {
+ return ee->fd != -1;
+}
+
+/*
+ * The proactor maintains a number of serialization contexts: each
+ * connection, each listener, the proactor itself. The serialization
+ * is presented to the application via each associated event batch.
+ *
+ * Currently internal wakeups (via wake()/wake_notify()) are used to
+ * force a context to check if it has work to do. To minimize trips
+ * through the kernel, wake() is a no-op if the context is working.
+ * Conversely, a context must never stop working without checking if
+ * it has newly arrived work.
+ *
+ * External wake operations, like pn_connection_wake() and
+ * pn_proactor_interrupt(), are built on top of the internal wake
+ * mechanism. The former coalesces multiple wakes until event
+ * delivery, the latter does not. The WAKEABLE implementation can be
+ * modeled on whichever is more suited.
+ */
+typedef enum {
+ PROACTOR,
+ PCONNECTION,
+ LISTENER,
+ WAKEABLE } pcontext_type_t;
+
+typedef struct pcontext_t {
+ pmutex mutex;
+ pn_proactor_t *proactor; /* Immutable */
+ void *owner; /* Instance governed by the context */
+ pcontext_type_t type;
+ bool working;
+ int wake_ops; // unprocessed eventfd wake callback (convert to bool?)
+ pcontext_t *next; // wake list, guarded by proactor eventfd_mutex
+} pcontext_t;
+
+static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p, void *o) {
+ pmutex_init(&ctx->mutex);
+ ctx->proactor = p;
+ ctx->owner = o;
+ ctx->type = t;
+ ctx->working = false;
+ ctx->wake_ops = 0;
+ ctx->next = NULL;
+}
+
+static void pcontext_finalize(pcontext_t* ctx) {
+ pmutex_finalize(&ctx->mutex);
+}
+
+/* common to connection and listener */
+typedef struct psocket_t {
+ pn_proactor_t *proactor;
+ struct psocket_t* next; /* Protected by proactor.mutex */
+ struct psocket_t* prev; /* Protected by proactor.mutex */
+ int sockfd;
+ epoll_extended_t epoll_io;
+ bool is_conn;
+ bool closing;
+ char host[NI_MAXHOST];
+ char port[NI_MAXSERV];
+} psocket_t;
+
+struct pn_proactor_t {
+ pcontext_t context;
+ int epollfd;
+ ptimer_t timer;
+ pn_collector_t *collector;
+ psocket_t *psockets; /* track in-use psockets for PN_PROACTOR_INACTIVE and final cleanup */
+ epoll_extended_t epoll_wake;
+ pn_event_t *cached_event;
+ pn_event_batch_t batch;
+ size_t interrupts; /* total pending interrupts */
+ size_t deferred_interrupts; /* interrupts for current batch */
+ bool inactive;
+ bool timer_expired;
+ bool timer_cancelled;
+ bool timer_armed;
+ bool shutting_down;
+ // wake subsystem
+ int eventfd;
+ pmutex eventfd_mutex;
+ bool wakes_in_progress;
+ pcontext_t *wake_list_first;
+ pcontext_t *wake_list_last;
+};
+
+static void rearm(pn_proactor_t *p, epoll_extended_t *ee);
+
+/*
+ * Wake strategy with eventfd.
+ * - wakees can be in the list only once
+ * - wakers only write() if wakes_in_progress is false
+ * - wakees only read() if about to set wakes_in_progress to false
+ * When multiple wakes are pending, the kernel cost is a single rearm().
+ * Otherwise it is the trio of write/read/rearm.
+ * Only the writes and reads need to be carefully ordered.
+ *
+ * Multiple eventfds could be used and shared amongst the pcontext_t's.
+ */
+
+// part1: call with ctx->owner lock held, return true if notify required by caller
+static bool wake(pcontext_t *ctx) {
+ bool notify = false;
+ if (!ctx->wake_ops) {
+ if (!ctx->working) {
+ ctx->wake_ops++;
+ pn_proactor_t *p = ctx->proactor;
+ lock(&p->eventfd_mutex);
+ if (!p->wake_list_first) {
+ p->wake_list_first = p->wake_list_last = ctx;
+ } else {
+ p->wake_list_last->next = ctx;
+ p->wake_list_last = ctx;
+ }
+ if (!p->wakes_in_progress) {
+ // force a wakeup via the eventfd
+ p->wakes_in_progress = true;
+ notify = true;
+ }
+ unlock(&p->eventfd_mutex);
+ }
+ }
+ return notify;
+}
+
+// part2: make OS call without lock held
+static inline void wake_notify(pcontext_t *ctx) {
+ uint64_t increment = 1;
+ write(ctx->proactor->eventfd, &increment, sizeof(uint64_t)); // TODO: check for error
+}
+
+// call with no locks
+static pcontext_t *wake_pop_front(pn_proactor_t *p) {
+ pcontext_t *ctx = NULL;
+ lock(&p->eventfd_mutex);
+ assert(p->wakes_in_progress);
+ if (p->wake_list_first) {
+ ctx = p->wake_list_first;
+ p->wake_list_first = ctx->next;
+ if (!p->wake_list_first) p->wake_list_last = NULL;
+ ctx->next = NULL;
+
+ if (!p->wake_list_first) {
+ /* Reset the eventfd until a future write.
+ * Can the read system call be made without holding the lock?
+ * Note that if the reads/writes happen out of order, the wake
+ * mechanism will hang. */
+ uint64_t ignored;
+ read(p->eventfd, &ignored, sizeof(uint64_t)); // TODO: check for error
+ p->wakes_in_progress = false;
+ }
+ }
+ unlock(&p->eventfd_mutex);
+ rearm(p, &p->epoll_wake);
+ return ctx;
+}
+
+// call with owner lock held, once for each pop from the wake list
+static inline void wake_done(pcontext_t *ctx) {
+ assert(ctx->wake_ops > 0);
+ ctx->wake_ops--;
+}
+
+
+static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host, const char *port) {
+ ps->epoll_io.psocket = ps;
+ ps->epoll_io.fd = -1;
+ ps->epoll_io.type = is_conn ? PCONNECTION_IO : LISTENER_IO;
+ ps->epoll_io.wanted = 0;
+ ps->proactor = p;
+ ps->next = NULL;
+ ps->prev = NULL;
+ ps->is_conn = is_conn;
+ ps->closing = false;
+ ps->sockfd = -1;
+
+ /* For platforms that don't know about "amqp" and "amqps" service names. */
+ if (port && strcmp(port, AMQP_PORT_NAME) == 0)
+ port = AMQP_PORT;
+ else if (port && strcmp(port, AMQPS_PORT_NAME) == 0)
+ port = AMQPS_PORT;
+ /* Set to "\001" to indicate a NULL as opposed to an empty string "" */
+ strncpy(ps->host, host ? host : "\001", sizeof(ps->host));
+ strncpy(ps->port, port ? port : "\001", sizeof(ps->port));
+}
+
+/* Turn "\001" back to NULL */
+static inline const char* fixstr(const char* str) {
+ return str[0] == '\001' ? NULL : str;
+}
+
+typedef struct pconnection_t {
+ psocket_t psocket;
+ pcontext_t context;
+ uint32_t new_events;
+ int wake_count;
+ bool server; /* accept, not connect */
+ bool tick_pending;
+ bool timer_armed;
+ ptimer_t timer; // TODO: review one timerfd per connectoin
+ // Following values only changed by (sole) working context:
+ uint32_t current_arm; // active epoll io events
+ bool read_blocked;
+ bool write_blocked;
+ bool read_closed;
+ bool write_closed;
+ bool disconnected;
+ int hog_count; // thread hogging limiter
+ pn_event_t *cached_event;
+ pn_event_batch_t batch;
+ pn_connection_driver_t driver;
+} pconnection_t;
+
+struct pn_listener_t {
+ psocket_t psocket;
+ pcontext_t context;
+ pn_condition_t *condition;
+ pn_collector_t *collector;
+ pn_event_t *cached_event;
+ pn_event_batch_t batch;
+ pn_record_t *attachments;
+ void *listener_context;
+ size_t backlog;
+ int available_accepts;
+ int pending_accepts;
+ bool close_dispatched;
+ bool armed;
+};
+
+
+static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool topup);
+static void listener_begin_close(pn_listener_t* l);
+static void proactor_add(psocket_t *ps);
+static void proactor_remove(psocket_t *ps);
+
+static inline pconnection_t *as_pconnection(psocket_t* ps) {
+ return ps->is_conn ? (pconnection_t*)ps : NULL;
+}
+
+static inline pn_listener_t *as_listener(psocket_t* ps) {
+ return ps->is_conn ? NULL: (pn_listener_t*)ps;
+}
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
+static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch);
+
+static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
+ return (batch->next_event == proactor_batch_next) ?
+ (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
+}
+
+static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
+ return (batch->next_event == listener_batch_next) ?
+ (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
+}
+
+static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
+ return (batch->next_event == pconnection_batch_next) ?
+ (pconnection_t*)((char*)batch - offsetof(pconnection_t, batch)) : NULL;
+}
+
+static inline bool pconnection_has_event(pconnection_t *pc) {
+ return (pc->cached_event || (pc->cached_event = pn_connection_driver_next_event(&pc->driver)));
+}
+
+static inline bool listener_has_event(pn_listener_t *l) {
+ return (l->cached_event || (l->cached_event = pn_collector_next(l->collector)));
+}
+
+static inline bool proactor_has_event(pn_proactor_t *p) {
+ return (p->cached_event || (p->cached_event = pn_collector_next(p->collector)));
+}
+
+static void psocket_error(psocket_t *ps, int err, const char* what) {
+ if (ps->is_conn) {
+ pn_connection_driver_t *driver = &as_pconnection(ps)->driver;
+ pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+ pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
+ what, fixstr(ps->host), fixstr(ps->port),
+ strerror(err));
+ pn_connection_driver_close(driver);
+ } else {
+ pn_listener_t *l = as_listener(ps);
+ pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
+ what, fixstr(ps->host), fixstr(ps->port),
+ strerror(err));
+ listener_begin_close(l);
+ }
+}
+
+static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
+ struct epoll_event ev;
+ ev.data.ptr = ee;
+ ev.events = ee->wanted | EPOLLONESHOT;
+ epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev); // TODO: check for error
+}
+
+// ========================================================================
+// pconnection
+// ========================================================================
+
+/* Make a pn_class for pconnection_t since it is attached to a pn_connection_t record */
+#define CID_pconnection CID_pn_object
+#define pconnection_inspect NULL
+#define pconnection_initialize NULL
+#define pconnection_hashcode NULL
+#define pconnection_compare NULL
+
+static void pconnection_finalize(void *vp_pconnection) {
+ pconnection_t *pc = (pconnection_t*)vp_pconnection;
+ pcontext_finalize(&pc->context);
+}
+
+
+static const pn_class_t pconnection_class = PN_CLASS(pconnection);
+
+
+static void pconnection_tick(pconnection_t *pc);
+
+static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
+ pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
+ if (!pc) return NULL;
+ if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
+ return NULL;
+ }
+ if (!ptimer_init(&pc->timer, &pc->psocket)) {
+ perror("timer setup failure");
+ abort();
+ }
+ pcontext_init(&pc->context, PCONNECTION, p, pc);
+ psocket_init(&pc->psocket, p, true, host, port);
+ pc->new_events = 0;
+ pc->wake_count = 0;
+ pc->tick_pending = false;
+ pc->timer_armed = false;
+
+ pc->current_arm = 0;
+ pc->read_blocked = true;
+ pc->write_blocked = true;
+ pc->read_closed = true;
+ pc->write_closed = true;
+ pc->disconnected = false;
+ pc->hog_count = 0;;
+ pc->cached_event = NULL;
+ pc->batch.next_event = pconnection_batch_next;
+
+ if (server) {
+ pn_transport_set_server(pc->driver.transport);
+ }
+ pn_record_t *r = pn_connection_attachments(pc->driver.connection);
+ pn_record_def(r, PN_PROACTOR, &pconnection_class);
+ pn_record_set(r, PN_PROACTOR, pc);
+ pn_decref(pc); /* Will be deleted when the connection is */
+ return pc;
+}
+
+// Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), timer cancelled.
+// Return true when all possible outstanding epoll events associated with this pconnection have been processed.
+static inline bool pconnection_is_final(pconnection_t *pc) {
+ return !pc->current_arm && !pc->timer.pending_count && !pc->context.wake_ops;
+}
+
+// call without lock, but only if pconnection_is_final() is true
+static void pconnection_cleanup(pconnection_t *pc) {
+ if (pc->psocket.sockfd != -1)
+ close(pc->psocket.sockfd);
+ proactor_remove(&pc->psocket);
+ stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd);
+ ptimer_finalize(&pc->timer);
+ pn_incref(pc); /* Make sure we don't do a circular free */
+ pn_connection_driver_destroy(&pc->driver);
+ pn_decref(pc);
+ /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */
+}
+
+// Call with lock held or from forced_shutdown
+void pconnection_begin_close(pconnection_t *pc) {
+ if (!pc->psocket.closing) {
+ pc->psocket.closing = true;
+ pc->read_closed = pc->write_closed = true;
+ stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
+ pn_connection_driver_close(&pc->driver);
+ ptimer_set(&pc->timer, 0);
+ }
+}
+
+static void pconnection_forced_shutdown(pconnection_t *pc) {
+ // Called by proactor_free, no competing threads, no epoll activity.
+ pconnection_begin_close(pc);
+ // pconnection_process will never be called again. Zero everything.
+ pc->timer.pending_count = 0;
+ pc->context.wake_ops = 0;
+ pn_connection_t *c = pc->driver.connection;
+ pn_collector_t *col = pn_connection_collector(c);
+ if (pc->cached_event != NULL) {
+ pn_collector_pop(col);
+ pc->cached_event = NULL;
+ }
+ pn_collector_release(col);
+ assert(pconnection_is_final(pc));
+ pconnection_cleanup(pc);
+}
+
+static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
+ pconnection_t *pc = batch_pconnection(batch);
+ pn_event_t *e = NULL;
+ if (pconnection_has_event(pc))
+ e = pc->cached_event;
+ else if (pc->hog_count < HOG_MAX) {
+ pconnection_process(pc, 0, false, true); // top up
+ if (pconnection_has_event(pc))
+ e = pc->cached_event;
+ }
+ pc->cached_event = NULL;
+ return e;
+}
+
+/* Call only from working context (no competitor for pc->current_arm or
+ connection driver). If true returned, caller must do
+ pconnection_rearm().
+
+ Never rearm(0 | EPOLLONESHOT), since this really means
+ rearm(EPOLLHUP | EPOLLERR | EPOLLONESHOT) and leaves doubt that the
+ EPOLL_CTL_DEL can prevent a parallel HUP/ERR error notification during
+ close/shutdown. Let read()/write() return 0 or -1 to trigger cleanup logic.
+ */
+static bool pconnection_rearm_check(pconnection_t *pc) {
+ if (pc->read_closed && pc->write_closed) return false;
+
+ uint32_t wanted_now = (pc->read_blocked && !pc->read_closed) ? EPOLLIN : 0;
+ if (!pc->write_closed) {
+ if (pc->write_blocked)
+ wanted_now |= EPOLLOUT;
+ else {
+ pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+ if (wbuf.size > 0)
+ wanted_now |= EPOLLOUT;
+ }
+ }
+ if (!wanted_now || pc->current_arm == wanted_now) return false;
+
+ pc->psocket.epoll_io.wanted = wanted_now;
+ pc->current_arm = wanted_now;
+ return true;
+}
+
+/* Call without lock */
+static inline void pconnection_rearm(pconnection_t *pc) {
+ rearm(pc->psocket.proactor, &pc->psocket.epoll_io);
+}
+
+static inline bool pconnection_work_pending(pconnection_t *pc) {
+ if (pc->new_events || pc->wake_count || pc->tick_pending)
+ return true;
+ if (!pc->read_blocked && !pc->read_closed)
+ return true;
+ pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+ return (wbuf.size > 0 && !pc->write_blocked);
+}
+
+static void pconnection_done(pconnection_t *pc) {
+ bool notify = false;
+ lock(&pc->context.mutex);
+ // Is this a good time to write if not write_blocked? For now, rely on topup mechanism.
+ pc->context.working = false; // So we can wake() ourself if necessary. We remain the defacto
+ // working context while the lock is held.
+ pc->hog_count = 0;
+ if (pconnection_has_event(pc) || pconnection_work_pending(pc)) {
+ notify = wake(&pc->context);
+ } else if (!pc->cached_event && pn_connection_driver_finished(&pc->driver)) {
+ pconnection_begin_close(pc);
+ if (pconnection_is_final(pc)) {
+ unlock(&pc->context.mutex);
+ pconnection_cleanup(pc);
+ return;
+ }
+ }
+ bool rearm = pconnection_rearm_check(pc);
+ unlock(&pc->context.mutex);
+ if (rearm) pconnection_rearm(pc);
+ if (notify) wake_notify(&pc->context);
+}
+
+static pconnection_t *get_pconnection(pn_connection_t* c) {
+ if (!c) return NULL;
+ pn_record_t *r = pn_connection_attachments(c);
+ return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
+}
+
+// Return true unless error
+static bool pconnection_write(pconnection_t *pc, pn_bytes_t wbuf) {
+ ssize_t n = write(pc->psocket.sockfd, wbuf.start, wbuf.size);
+ if (n > 0) {
+ pn_connection_driver_write_done(&pc->driver, n);
+ if ((size_t) n < wbuf.size) pc->write_blocked = true;
+ } else if (errno == EWOULDBLOCK) {
+ pc->write_blocked = true;
+ } else if (!(errno == EAGAIN || errno == EINTR)) {
+ return false;
+ }
+ return true;
+}
+
+/*
+ * May be called concurrently from multiple threads:
+ * pn_event_batch_t loop (topup is true)
+ * timer (timeout is true)
+ * socket io (events != 0)
+ * one or more wake()
+ * Only one thread becomes (or always was) the working thread.
+ */
+static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool topup) {
+ bool inbound_wake = !(events | timeout | topup);
+ bool timer_unarmed = false;
+ bool timer_fired = false;
+ bool waking = false;
+ bool tick_required = false;
+
+ // Don't touch data exclusive to working thread (yet).
+
+ if (timeout) {
+ timer_unarmed = true;
+ timer_fired = (ptimer_callback(&pc->timer) != 0);
+ }
+ lock(&pc->context.mutex);
+
+ if (events) {
+ pc->new_events = events;
+ events = 0;
+ }
+ else if (timer_fired) {
+ pc->tick_pending = true;
+ timer_fired = false;
+ }
+ else if (inbound_wake) {
+ wake_done(&pc->context);
+ inbound_wake = false;
+ }
+
+ if (timer_unarmed)
+ pc->timer_armed = false;
+
+ if (topup) {
+ // Only called by the batch owner. Does not loop, just "tops up"
+ // once. May be back depending on hog_count.
+ assert(pc->context.working);
+ }
+ else {
+ if (pc->context.working) {
+ // Another thread is the working context.
+ unlock(&pc->context.mutex);
+ return NULL;
+ }
+ pc->context.working = true;
+ }
+
+ // Confirmed as working thread. Review state and unlock ASAP.
+
+ if (pc->psocket.closing && pconnection_is_final(pc)) {
+ unlock(&pc->context.mutex);
+ pconnection_cleanup(pc);
+ return NULL;
+ }
+
+ retry:
+
+ if (pconnection_has_event(pc)) {
+ unlock(&pc->context.mutex);
+ return &pc->batch;
+ }
+
+ if (pc->wake_count) {
+ waking = true;
+ pc->wake_count = 0;
+ }
+ if (pc->tick_pending) {
+ pc->tick_pending = false;
+ if (!(pc->read_closed && pc->write_closed))
+ tick_required = true;
+ }
+
+ if (pc->new_events) {
+ if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pc->read_closed && !pc->write_closed)
+ pc->disconnected = true;
+ if (pc->new_events & EPOLLOUT)
+ pc->write_blocked = false;
+ if (pc->new_events & EPOLLIN)
+ pc->read_blocked = false;
+ pc->current_arm = 0;
+ pc->new_events = 0;
+ }
+ bool unarmed = (pc->current_arm == 0);
+ if (!pc->timer_armed) {
+ pc->timer_armed = true; // about to rearm outside the lock
+ timer_unarmed = true; // so we remember
+ }
+
+ unlock(&pc->context.mutex);
+ pc->hog_count++; // working context doing work
+
+ if (timer_unarmed) {
+ rearm(pc->psocket.proactor, &pc->timer.epoll_io);
+ timer_unarmed = false;
+ }
+ if (waking) {
+ pn_connection_t *c = pc->driver.connection;
+ pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+ waking = false;
+ }
+
+ // read... tick... write
+ // perhaps should be: write_if_recent_EPOLLOUT... read... tick... write
+
+ if (!pc->read_closed) {
+ pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+ if (rbuf.size > 0 && (!pc->read_blocked)) {
+ ssize_t n = read(pc->psocket.sockfd, rbuf.start, rbuf.size);
+
+ if (n > 0) {
+ pn_connection_driver_read_done(&pc->driver, n);
+ pconnection_tick(pc); /* check for tick changes. */
+ tick_required = false;
+ if (pn_connection_driver_read_closed(&pc->driver)) {
+ // No more blocks on read in case peer doesn't send shutdown.
+ pc->read_closed = true;
+ }
+ else if ((size_t) n < rbuf.size)
+ pc->read_blocked = true;
+ }
+ else if (n == 0) {
+ pn_connection_driver_read_close(&pc->driver);
+ pc->read_closed = true;
+ }
+ else if (errno == EWOULDBLOCK)
+ pc->read_blocked = true;
+ else if (!(errno == EAGAIN || errno == EINTR)) {
+ psocket_error(&pc->psocket, errno, pc->disconnected ? "Disconnected" : "on read from");
+ pc->read_closed = pc->write_closed = true;
+ }
+ }
+ }
+
+ if (tick_required) {
+ pconnection_tick(pc); /* check for tick changes. */
+ tick_required = false;
+ }
+
+ while (!pc->write_blocked && !pc->write_closed) {
+ pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+ if (wbuf.size > 0) {
+ if (!pconnection_write(pc, wbuf)) {
+ psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : "on write to");
+ pc->read_closed = pc->write_closed = true;
+ }
+ }
+ else {
+ if (pn_connection_driver_write_closed(&pc->driver)) {
+ shutdown(pc->psocket.sockfd, SHUT_WR);
+ pc->write_closed = true;
+ pc->write_blocked = true;
+ }
+ else
+ break; /* nothing to write until next read/wake/timeout */
+ }
+ }
+
+ if (topup) {
+ // If there was anything new to topup, we have it by now.
+ if (unarmed && pconnection_rearm_check(pc))
+ pconnection_rearm(pc);
+ return NULL; // caller already owns the batch
+ }
+
+ if (pconnection_has_event(pc)) {
+ if (unarmed && pconnection_rearm_check(pc))
+ pconnection_rearm(pc);
+ return &pc->batch;
+ }
+
+ lock(&pc->context.mutex);
+ if (pc->psocket.closing && pconnection_is_final(pc)) {
+ unlock(&pc->context.mutex);
+ pconnection_cleanup(pc);
+ return NULL;
+ }
+
+ // Never stop working while work remains. hog_count exception to this rule is elsewhere.
+ if (pconnection_work_pending(pc))
+ goto retry; // TODO: get rid of goto without adding more locking
+
+ pc->context.working = false;
+ bool rearm = pconnection_rearm_check(pc);
+
+ unlock(&pc->context.mutex);
+ if (rearm) pconnection_rearm(pc);
+ return NULL;
+}
+
+
+static void configure_socket(int sock) {
+ int flags = fcntl(sock, F_GETFL);
+ flags |= O_NONBLOCK;
+ fcntl(sock, F_SETFL, flags);
+
+ int tcp_nodelay = 1;
+ setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void*) &tcp_nodelay, sizeof(tcp_nodelay));
+}
+
+void pconnection_start(pconnection_t *pc) {
+ int efd = pc->psocket.proactor->epollfd;
+ start_polling(&pc->timer.epoll_io, efd); // TODO: check for error
+
+ pc->read_closed = false;
+ pc->write_closed = false;
+ epoll_extended_t *ee = &pc->psocket.epoll_io;
+ ee->fd = pc->psocket.sockfd;
+ ee->wanted = EPOLLIN | EPOLLOUT;
+ start_polling(ee, efd); // TODO: check for error
+}
+
+void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
+ char *buf = strdup(addr);
+ assert(buf); // TODO: memory safety
+ char *scheme, *user, *pass, *host, *port, *path;
+ pni_parse_url(buf, &scheme, &user, &pass, &host, &port, &path);
+ pconnection_t *pc = new_pconnection_t(p, c, false, host, port);
+ assert(pc); // TODO: memory safety
+ // TODO: check case of proactor shutting down
+ lock(&pc->context.mutex);
+ proactor_add(&pc->psocket);
+
+ struct addrinfo *ai = NULL;
+ int fd = -1;
+ if (!getaddrinfo(host, port, 0, &ai)) {
+ fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
+ if (fd >= 0) {
+ configure_socket(fd);
+ if (!connect(fd, ai->ai_addr, ai->ai_addrlen) || errno == EINPROGRESS) {
+ pc->psocket.sockfd = fd;
+ pconnection_start(pc);
+ unlock(&pc->context.mutex);
+ freeaddrinfo(ai);
+ free(buf);
+ return;
+ }
+ }
+ }
+
+ psocket_error(&pc->psocket, errno, "connect to ");
+ bool notify = wake(&pc->context);
+ if (ai) freeaddrinfo(ai);
+ if (fd != -1) close (fd);
+ unlock(&pc->context.mutex);
+ if (notify) wake_notify(&pc->context);
+ free(buf);
+ return;
+}
+
+
+static void pconnection_tick(pconnection_t *pc) {
+ pn_transport_t *t = pc->driver.transport;
+ if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
+ ptimer_set(&pc->timer, 0);
+ uint64_t now = pn_i_now2();
+ uint64_t next = pn_transport_tick(t, now);
+ if (next) {
+ ptimer_set(&pc->timer, next - now);
+ }
+ }
+}
+
+void pn_connection_wake(pn_connection_t* c) {
+ bool notify = false;
+ pconnection_t *pc = get_pconnection(c);
+ if (pc) {
+ lock(&pc->context.mutex);
+ if (!pc->psocket.closing) {
+ pc->wake_count++;
+ notify = wake(&pc->context);
+ }
+ unlock(&pc->context.mutex);
+ }
+ if (notify) wake_notify(&pc->context);
+}
+
+
+// ========================================================================
+// listener
+// ========================================================================
+
+pn_listener_t *pn_event_listener(pn_event_t *e) {
+ return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
+}
+
+pn_listener_t *pn_listener() {
+ pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
+ if (l) {
+ l->batch.next_event = listener_batch_next;
+ l->collector = pn_collector();
+ l->condition = pn_condition();
+ l->attachments = pn_record();
+ if (!l->condition || !l->collector || !l->attachments) {
+ pn_listener_free(l);
+ return NULL;
+ }
+ }
+ return l;
+}
+
+void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog)
+{
+ char *buf = strdup(addr);
+ assert(buf); // TODO: memory safety
+ char *scheme, *user, *pass, *host, *port, *path;
+ pni_parse_url(buf, &scheme, &user, &pass, &host, &port, &path);
+ pcontext_init(&l->context, LISTENER, p, l);
+ psocket_init(&l->psocket, p, false, host, port);
+ l->backlog = backlog;
+ proactor_add(&l->psocket);
+ /* Always put an OPEN event for symmetry, even if we immediately close with err */
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
+
+ struct addrinfo *ai = NULL;
+ int fd = -1;
+ if (!getaddrinfo(host, port, 0, &ai)) {
+ fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
+ if (fd >= 0) {
+ int yes = 1;
+ if (!setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
+ if (!bind(fd, ai->ai_addr, ai->ai_addrlen))
+ if (!listen(fd, backlog)) {
+ l->psocket.sockfd = fd;
+ l->psocket.epoll_io.fd = fd;
+ l->psocket.epoll_io.wanted = EPOLLIN;
+ start_polling(&l->psocket.epoll_io, l->psocket.proactor->epollfd); // TODO: check for error
+ free(buf);
+ return;
+ }
+ }
+ }
+
+ psocket_error(&l->psocket, errno, "listen on");
+ if (ai) freeaddrinfo(ai);
+ wake(&l->context);
+ free(buf);
+ return;
+}
+
+// call with lock held
+static inline bool listener_can_free(pn_listener_t *l) {
+ return l->psocket.closing && l->close_dispatched &&
+ !l->context.wake_ops;
+}
+
+void pn_listener_free(pn_listener_t *l) {
+ // TODO: do we need a QPID DeletionManager equivalent to be safe from inbound connection (accept) epoll events?
+ // TODO: handle external call by user
+ proactor_remove(&l->psocket);
+ if (l) {
+ if (l->collector) pn_collector_free(l->collector);
+ if (!l->condition) pn_condition_free(l->condition);
+ if (!l->attachments) pn_free(l->attachments);
+ pcontext_finalize(&l->context);
+ free(l);
+ }
+}
+
+static void listener_begin_close(pn_listener_t* l) {
+ if (!l->psocket.closing) {
+ l->psocket.closing = true;
+ if (l->psocket.sockfd >= 0) {
+ stop_polling(&l->psocket.epoll_io, l->psocket.proactor->epollfd);
+ close(l->psocket.sockfd);
+ }
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
+ l->pending_accepts = l->available_accepts = 0;
+ }
+}
+
+void pn_listener_close(pn_listener_t* l) {
+ bool notify = false;
+ lock(&l->context.mutex);
+ if (!l->psocket.closing) {
+ listener_begin_close(l);
+ notify = wake(&l->context);
+ }
+ unlock(&l->context.mutex);
+ if (notify) wake_notify(&l->context);
+}
+
+static void listener_forced_shutdown(pn_listener_t *l) {
+ // Called by proactor_free, no competing threads, no epoll activity.
+ listener_begin_close(l);
+ // pconnection_process will never be called again. Zero everything.
+ l->context.wake_ops = 0;
+ l->close_dispatched = true;
+ assert(listener_can_free(l));
+ pn_listener_free(l);
+}
+
+
+static pn_event_batch_t *listener_process(pn_listener_t *l, uint32_t events) {
+ // TODO: some parallelization of the accept mechanism.
+ lock(&l->context.mutex);
+ if (events) {
+ l->armed = false;
+ if (events & EPOLLRDHUP)
+ psocket_error(&l->psocket, errno, "listener epoll"); // includes listener_begin_close
+ else if (!l->psocket.closing && events & EPOLLIN) {
+ l->available_accepts++;
+ l->pending_accepts++;
+ }
+ } else {
+ wake_done(&l->context); // callback accounting
+ }
+ pn_event_batch_t *lb = NULL;
+ if (!l->context.working) {
+ l->context.working = true;
+ if (listener_has_event(l) || l->available_accepts)
+ lb = &l->batch;
+ else
+ l->context.working = false;
+ }
+ unlock(&l->context.mutex);
+ return lb;
+}
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
+ pn_listener_t *l = batch_listener(batch);
+ lock(&l->context.mutex);
+ pn_event_t *e = NULL;
+ if (!listener_has_event(l) && l->available_accepts && !l->psocket.closing) {
+ l->available_accepts--;
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+ }
+ if (listener_has_event(l))
+ e = l->cached_event;
+ l->cached_event = NULL;
+ if (e && pn_event_type(e) == PN_LISTENER_CLOSE)
+ l->close_dispatched = true;
+ unlock(&l->context.mutex);
+ return e;
+}
+
+static void listener_done(pn_listener_t *l) {
+ bool notify = false;
+ lock(&l->context.mutex);
+ l->context.working = false;
+
+ if (l->close_dispatched) {
+ if (listener_can_free(l)) {
+ unlock(&l->context.mutex);
+ pn_listener_free(l);
+ return;
+ }
+ } else {
+ if (listener_has_event(l) || l->available_accepts)
+ notify = wake(&l->context);
+ else {
+ if (!l->psocket.closing && !l->armed) {
+ rearm(l->psocket.proactor, &l->psocket.epoll_io);
+ l->armed = true;
+ }
+ }
+ }
+ unlock(&l->context.mutex);
+ if (notify) wake_notify(&l->context);
+}
+
+pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
+ return l ? l->psocket.proactor : NULL;
+}
+
+pn_condition_t* pn_listener_condition(pn_listener_t* l) {
+ return l->condition;
+}
+
+void *pn_listener_get_context(pn_listener_t *l) {
+ return l->listener_context;
+}
+
+void pn_listener_set_context(pn_listener_t *l, void *context) {
+ l->listener_context = context;
+}
+
+pn_record_t *pn_listener_attachments(pn_listener_t *l) {
+ return l->attachments;
+}
+
+void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
+ // TODO: fuller sanity check on input args
+ pconnection_t *pc = new_pconnection_t(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
+ assert(pc); // TODO: memory safety
+ int err = 0;
+
+ lock(&l->context.mutex);
+ proactor_add(&pc->psocket);
+ if (l->psocket.closing)
+ err = EBADF;
+ else {
+ if (l->pending_accepts == 0)
+ err = EAGAIN;
+ }
+
+ if (err) {
+ psocket_error(&l->psocket, errno, "listener state on accept");
+ unlock(&l->context.mutex);
+ return;
+ }
+ l->pending_accepts--;
+
+ int newfd = accept(l->psocket.sockfd, NULL, 0);
+ if (newfd < 0) {
+ err = errno;
+ psocket_error(&pc->psocket, err, "failed initialization on accept");
+ psocket_error(&l->psocket, err, "accept");
+ } else {
+ lock(&pc->context.mutex);
+ configure_socket(newfd);
+ pc->psocket.sockfd = newfd;
+ pconnection_start(pc);
+ unlock(&pc->context.mutex);
+ }
+
+ unlock(&l->context.mutex);
+}
+
+
+// ========================================================================
+// proactor
+// ========================================================================
+
+pn_proactor_t *pn_proactor() {
+ pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
+ if (!p) return NULL;
+ p->epollfd = p->eventfd = p->timer.timerfd = -1;
+ pcontext_init(&p->context, PROACTOR, p, p);
+ pmutex_init(&p->eventfd_mutex);
+ ptimer_init(&p->timer, 0);
+
+ if ((p->epollfd = epoll_create(1)) >= 0)
+ if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
+ if (p->timer.timerfd >= 0)
+ if ((p->collector = pn_collector()) != NULL) {
+ p->batch.next_event = &proactor_batch_next;
+ start_polling(&p->timer.epoll_io, p->epollfd); // TODO: check for error
+ p->timer_armed = true;
+
+ p->epoll_wake.psocket = NULL;
+ p->epoll_wake.fd = p->eventfd;
+ p->epoll_wake.type = WAKE;
+ p->epoll_wake.wanted = EPOLLIN;
+ start_polling(&p->epoll_wake, p->epollfd); // TODO: check for error
+ return p;
+ }
+ }
+
+ if (p->epollfd >= 0) close(p->epollfd);
+ if (p->eventfd >= 0) close(p->eventfd);
+ ptimer_finalize(&p->timer);
+ if (p->collector) pn_free(p->collector);
+ free (p);
+ return NULL;
+}
+
+void pn_proactor_free(pn_proactor_t *p) {
+ // No competing threads, not even a pending timer
+ close(p->epollfd);
+ close(p->eventfd);
+ ptimer_finalize(&p->timer);
+ while (p->psockets) {
+ psocket_t *ps = p->psockets;
+ p->psockets = ps->next;
+ pconnection_t *pc = as_pconnection(ps);
+ if (pc) {
+ pconnection_forced_shutdown(pc);
+ } else {
+ pn_listener_t *l = as_listener(ps);
+ if (l)
+ listener_forced_shutdown(l);
+ }
+ }
+
+ pn_collector_free(p->collector);
+ pmutex_finalize(&p->eventfd_mutex);
+ pcontext_finalize(&p->context);
+ free(p);
+}
+
+pn_proactor_t *pn_event_proactor(pn_event_t *e) {
+ if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
+ pn_listener_t *l = pn_event_listener(e);
+ if (l) return l->psocket.proactor;
+ pn_connection_t *c = pn_event_connection(e);
+ if (c) return pn_connection_proactor(pn_event_connection(e));
+ return NULL;
+}
+
+static void proactor_add_event(pn_proactor_t *p, pn_event_type_t t) {
+ pn_collector_put(p->collector, pn_proactor__class(), p, t);
+}
+
+// Call with lock held. Leave unchanged if events pending.
+// There can be multiple interrupts but only one inside the collector to avoid coalescing.
+// Return true if there is an event in the collector.
+static bool proactor_update_batch(pn_proactor_t *p) {
+ if (proactor_has_event(p))
+ return true;
+ if (p->deferred_interrupts > 0) {
+ // drain these first
+ --p->deferred_interrupts;
+ --p->interrupts;
+ proactor_add_event(p, PN_PROACTOR_INTERRUPT);
+ return true;
+ }
+
+ if (p->timer_expired) {
+ p->timer_expired = false;
+ proactor_add_event(p, PN_PROACTOR_TIMEOUT);
+ return true;
+ }
+
+ int ec = 0;
+ if (p->interrupts > 0) {
+ --p->interrupts;
+ proactor_add_event(p, PN_PROACTOR_INTERRUPT);
+ ec++;
+ if (p->interrupts > 0)
+ p->deferred_interrupts = p->interrupts;
+ }
+ if (p->inactive && ec == 0) {
+ p->inactive = false;
+ ec++;
+ proactor_add_event(p, PN_PROACTOR_INACTIVE);
+ }
+ return ec > 0;
+}
+
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
+ pn_proactor_t *p = batch_proactor(batch);
+ pn_event_t *e = NULL;
+ lock(&p->context.mutex);
+ proactor_update_batch(p);
+ if (proactor_has_event(p))
+ e = p->cached_event;
+ unlock(&p->context.mutex);
+ p->cached_event = NULL;
+ return e;
+}
+
+static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout) {
+ bool timer_fired = timeout && ptimer_callback(&p->timer) != 0;
+ lock(&p->context.mutex);
+ if (timeout) {
+ p->timer_armed = false;
+ if (timer_fired && !p->timer_cancelled)
+ p->timer_expired = true;
+ } else {
+ wake_done(&p->context);
+ }
+ if (!p->context.working) { /* Can generate proactor events */
+ if (proactor_update_batch(p)) {
+ p->context.working = true;
+ unlock(&p->context.mutex);
+ return &p->batch;
+ }
+ }
+ bool rearm_timer = !p->timer_armed;
+ unlock(&p->context.mutex);
+ if (rearm_timer)
+ rearm(p, &p->timer.epoll_io);
+ return NULL;
+}
+
+static void proactor_add(psocket_t *ps) {
+ pn_proactor_t *p = ps->proactor;
+ lock(&p->context.mutex);
+ if (p->psockets) {
+ p->psockets->prev = ps;
+ ps->next = p->psockets;
+ p->psockets = ps;
+ }
+ else p->psockets = ps;
+ unlock(&p->context.mutex);
+}
+
+static void proactor_remove(psocket_t *ps) {
+ pn_proactor_t *p = ps->proactor;
+ lock(&p->context.mutex);
+ bool notify = false;
+ if (ps->prev)
+ ps->prev->next = ps->next;
+ else {
+ p->psockets = ps->next;
+ ps->next = NULL;
+ if (p->psockets)
+ p->psockets->prev = NULL;
+ }
+ if (ps->next)
+ ps->next->prev = ps->prev;
+
+ if (!p->psockets && !p->shutting_down) {
+ p->inactive = true;
+ notify = wake(&p->context);
+ }
+ unlock(&p->context.mutex);
+ if (notify) wake_notify(&p->context);
+}
+
+static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p) {
+ pcontext_t *ctx = wake_pop_front(p);
+ if (ctx) {
+ switch (ctx->type) {
+ case PROACTOR:
+ return proactor_process(p, false);
+ case PCONNECTION:
+ return pconnection_process((pconnection_t *) ctx->owner, 0, false, false);
+ case LISTENER:
+ return listener_process((pn_listener_t *) ctx->owner, 0);
+ default:
+ assert(ctx->type == WAKEABLE); // TODO: implement or remove
+ }
+ }
+ return NULL;
+}
+
+static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_block) {
+ int timeout = can_block ? -1 : 0;
+ while(true) {
+ pn_event_batch_t *batch = NULL;
+ struct epoll_event ev;
+ int n = epoll_wait(p->epollfd, &ev, 1, timeout);
+
+ if (n < 0) {
+ if (errno != EINTR)
+ perror("epoll_wait"); // TODO: proper log
+ if (!can_block)
+ return NULL;
+ else
+ continue;
+ } else if (n == 0) {
+ if (!can_block)
+ return NULL;
+ else {
+ perror("epoll_wait unexpected timeout"); // TODO: proper log
+ continue;
+ }
+ }
+ assert(n == 1);
+ epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr;
+
+ if (ee->type == WAKE) {
+ batch = process_inbound_wake(p);
+ } else if (ee->type == PROACTOR_TIMER) {
+ batch = proactor_process(p, true);
+ } else {
+ pconnection_t *pc = as_pconnection(ee->psocket);
+ if (pc) {
+ if (ee->type == PCONNECTION_IO) {
+ batch = pconnection_process(pc, ev.events, false, false);
+ } else {
+ assert(ee->type == PCONNECTION_TIMER);
+ batch = pconnection_process(pc, 0, true, false);
+ }
+ }
+ else {
+ pn_listener_t *l = as_listener(ee->psocket);
+ // TODO: can any of the listener processing be parallelized like IOCP?
+ batch = listener_process(l, ev.events);
+ }
+ }
+
+ if (batch) return batch;
+ // No Proton event generated. epoll_wait() again.
+ }
+}
+
+pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
+ return proactor_do_epoll(p, true);
+}
+
+pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
+ return proactor_do_epoll(p, false);
+}
+
+void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
+ pconnection_t *pc = batch_pconnection(batch);
+ if (pc) {
+ pconnection_done(pc);
+ return;
+ }
+ pn_listener_t *l = batch_listener(batch);
+ if (l) {
+ listener_done(l);
+ return;
+ }
+ pn_proactor_t *bp = batch_proactor(batch);
+ if (bp == p) {
+ bool notify = false;
+ lock(&p->context.mutex);
+ bool rearm_timer = !p->timer_armed;
+ p->context.working = false;
+ proactor_update_batch(p);
+ if (proactor_has_event(p))
+ notify = wake(&p->context);
+ unlock(&p->context.mutex);
+ if (notify) wake_notify(&p->context);
+ if (rearm_timer)
+ rearm(p, &p->timer.epoll_io);
+ return;
+ }
+}
+
+void pn_proactor_interrupt(pn_proactor_t *p) {
+ lock(&p->context.mutex);
+ ++p->interrupts;
+ bool notify = wake(&p->context);
+ unlock(&p->context.mutex);
+ if (notify) wake_notify(&p->context);
+}
+
+void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
+ bool notify = false;
+ lock(&p->context.mutex);
+ p->timer_cancelled = false;
+ if (t == 0) {
+ ptimer_set(&p->timer, 0);
+ p->timer_expired = true;
+ notify = wake(&p->context);
+ } else {
+ ptimer_set(&p->timer, t);
+ }
+ unlock(&p->context.mutex);
+ if (notify) wake_notify(&p->context);
+}
+
+void pn_proactor_cancel_timeout(pn_proactor_t *p) {
+ lock(&p->context.mutex);
+ p->timer_cancelled = true; // stays cancelled until next set_timeout()
+ p->timer_expired = false;
+ ptimer_set(&p->timer, 0);
+ unlock(&p->context.mutex);
+}
+
+pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
+ pconnection_t *pc = get_pconnection(c);
+ return pc ? pc->psocket.proactor : NULL;
+}
+
+void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
+ // TODO: make this boilerplate actually work without deadlocking
+ lock(&p->context.mutex);
+ psocket_t *ps = p->psockets;
+ while (ps) {
+ pconnection_t *pc = as_pconnection(ps);
+ if (pc) {
+ if (cond) {
+ pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
+ }
+ pn_connection_driver_close(&pc->driver);
+ } else {
+ pn_listener_t *l = as_listener(ps);
+ if (l) {
+ if (cond) {
+ pn_condition_copy(pn_listener_condition(l), cond);
+ }
+ pn_listener_close(l);
+ }
+ }
+ }
+ unlock(&p->context.mutex);
+}
+
+
+const struct sockaddr_storage *pn_proactor_addr_sockaddr(const pn_proactor_addr_t *addr) {
+ assert(false);
+ return NULL;
+}
+
+const struct pn_proactor_addr_t *pn_proactor_addr_local(pn_transport_t *t) {
+ assert(false);
+ return NULL;
+}
+
+const struct pn_proactor_addr_t *pn_proactor_addr_remote(pn_transport_t *t) {
+ assert(false);
+ return NULL;
+}
+
+size_t pn_proactor_addr_str(const struct pn_proactor_addr_t* addr, char *buf, size_t len) {
+ struct sockaddr_storage *sa = (struct sockaddr_storage*)addr;
+ char host[NI_MAXHOST];
+ char port[NI_MAXSERV];
+ int err = getnameinfo((struct sockaddr *)sa, sizeof(*sa), host, sizeof(host), port, sizeof(port),
+ NI_NUMERICHOST | NI_NUMERICSERV);
+ if (!err) {
+ return snprintf(buf, len, "%s:%s", host, port); /* FIXME aconway 2017-03-29: ipv6 format? */
+ } else {
+ if (buf) *buf = '\0';
+ return 0;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org