You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2020/08/12 15:34:14 UTC
[qpid-proton] 04/05: PROTON-2247: Epoll implementation of raw
connection API
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 7dc9240bd52a74225bd1c7a24a84ba37a08b30e4
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Fri Jun 19 14:00:16 2020 -0400
PROTON-2247: Epoll implementation of raw connection API
---
c/CMakeLists.txt | 2 +-
c/src/proactor/epoll-internal.h | 84 ++++++-
c/src/proactor/epoll.c | 141 +++++-------
c/src/proactor/epoll_raw_connection.c | 376 +++++++++++++++++++++++++++++++
c/src/proactor/raw_connection-internal.h | 1 +
c/src/proactor/raw_connection.c | 7 +-
6 files changed, 523 insertions(+), 88 deletions(-)
diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt
index 6b2c167..9e224de 100644
--- a/c/CMakeLists.txt
+++ b/c/CMakeLists.txt
@@ -340,7 +340,7 @@ if (PROACTOR STREQUAL "epoll" OR (NOT PROACTOR AND NOT BUILD_PROACTOR))
check_symbol_exists(epoll_wait "sys/epoll.h" HAVE_EPOLL)
if (HAVE_EPOLL)
set (PROACTOR_OK epoll)
- set (qpid-proton-proactor src/proactor/epoll.c ${qpid-proton-proactor-common})
+ set (qpid-proton-proactor src/proactor/epoll.c src/proactor/epoll_raw_connection.c ${qpid-proton-proactor-common})
set (PROACTOR_LIBS Threads::Threads ${TIME_LIB})
endif()
endif()
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index fd02817..78cad14 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -22,8 +22,18 @@
*
*/
+/* Enable POSIX features beyond c99 for modern pthread and standard strerror_r() */
+#ifndef _POSIX_C_SOURCE
+#define _POSIX_C_SOURCE 200809L
+#endif
+/* Avoid GNU extensions, in particular the incompatible alternative strerror_r() */
+#undef _GNU_SOURCE
+
#include <stdbool.h>
#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+
#include <pthread.h>
#include <netdb.h>
@@ -50,7 +60,8 @@ typedef enum {
PCONNECTION_IO,
PCONNECTION_TIMER,
LISTENER_IO,
- PROACTOR_TIMER
+ PROACTOR_TIMER,
+ RAW_CONNECTION_IO
} epoll_type_t;
// Data to use with epoll.
@@ -73,7 +84,8 @@ typedef struct ptimer_t {
typedef enum {
PROACTOR,
PCONNECTION,
- LISTENER
+ LISTENER,
+ RAW_CONNECTION
} pcontext_type_t;
typedef struct pcontext_t {
@@ -82,7 +94,7 @@ typedef struct pcontext_t {
pcontext_type_t type;
bool working;
bool on_wake_list;
- bool wake_pending; // unprocessed eventfd wake callback (convert to bool?)
+ bool wake_pending; // unprocessed eventfd wake callback
struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
bool closing;
// Next 4 are protected by the proactor mutex
@@ -209,7 +221,7 @@ typedef struct pconnection_t {
ptimer_t timer; // TODO: review one timerfd per connection
const char *host, *port;
uint32_t new_events;
- int wake_count;
+ int wake_count; // TODO: protected by context.mutex so should be moved in there (also really bool)
bool server; /* accept, not connect */
bool tick_pending;
bool timer_armed;
@@ -279,6 +291,70 @@ struct pn_listener_t {
uint32_t sched_io_events;
};
+typedef char strerrorbuf[1024]; /* used for pstrerror message buffer */
+void pstrerror(int err, strerrorbuf msg);
+
+// In general all locks to be held singly and shortly (possibly as spin locks).
+// See above about lock ordering.
+
+static inline void pmutex_init(pthread_mutex_t *pm){
+ pthread_mutexattr_t attr;
+
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP);
+ if (pthread_mutex_init(pm, &attr)) {
+ perror("pthread failure");
+ abort();
+ }
+}
+
+static inline void pmutex_finalize(pthread_mutex_t *m) { pthread_mutex_destroy(m); }
+static inline void lock(pmutex *m) { pthread_mutex_lock(m); }
+static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }
+
+static inline bool pconnection_has_event(pconnection_t *pc) {
+ return pn_connection_driver_has_event(&pc->driver);
+}
+
+static inline bool listener_has_event(pn_listener_t *l) {
+ return pn_collector_peek(l->collector) || (l->pending_count);
+}
+
+static inline bool proactor_has_event(pn_proactor_t *p) {
+ return pn_collector_peek(p->collector);
+}
+
+bool wake_if_inactive(pn_proactor_t *p);
+int pclosefd(pn_proactor_t *p, int fd);
+
+void proactor_add(pcontext_t *ctx);
+bool proactor_remove(pcontext_t *ctx);
+
+bool unassign_thread(tslot_t *ts, tslot_state new_state);
+
+void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p);
+bool wake(pcontext_t *ctx);
+void wake_notify(pcontext_t *ctx);
+void wake_done(pcontext_t *ctx);
+
+void psocket_init(psocket_t* ps, pn_proactor_t* p, epoll_type_t type);
+bool start_polling(epoll_extended_t *ee, int epollfd);
+void stop_polling(epoll_extended_t *ee, int epollfd);
+void rearm_polling(epoll_extended_t *ee, int epollfd);
+
+int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res);
+void configure_socket(int sock);
+
+accepted_t *listener_accepted_next(pn_listener_t *listener);
+
+pcontext_t *pni_psocket_raw_context(psocket_t *ps);
+pn_event_batch_t *pni_raw_connection_process(pcontext_t *c, bool sched_wake);
+
+typedef struct praw_connection_t praw_connection_t;
+pcontext_t *pni_raw_connection_context(praw_connection_t *rc);
+praw_connection_t *pni_batch_raw_connection(pn_event_batch_t* batch);
+void pni_raw_connection_done(praw_connection_t *rc);
+
#ifdef __cplusplus
}
#endif
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 3b5f32e..1e693fe 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -106,10 +106,8 @@
// could be several eventfds with random assignment of wakeables.
-typedef char strerrorbuf[1024]; /* used for pstrerror message buffer */
-
/* Like strerror_r but provide a default message if strerror_r fails */
-static void pstrerror(int err, strerrorbuf msg) {
+void pstrerror(int err, strerrorbuf msg) {
int e = strerror_r(err, msg, sizeof(strerrorbuf));
if (e) snprintf(msg, sizeof(strerrorbuf), "unknown error %d", err);
}
@@ -128,24 +126,6 @@ static void pstrerror(int err, strerrorbuf msg) {
// First define a proactor mutex (pmutex) and timer mechanism (ptimer) to taste.
// ========================================================================
-// In general all locks to be held singly and shortly (possibly as spin locks).
-// See above about lock ordering.
-
-static void pmutex_init(pthread_mutex_t *pm){
- pthread_mutexattr_t attr;
-
- pthread_mutexattr_init(&attr);
- pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP);
- if (pthread_mutex_init(pm, &attr)) {
- perror("pthread failure");
- abort();
- }
-}
-
-static void pmutex_finalize(pthread_mutex_t *m) { pthread_mutex_destroy(m); }
-static inline void lock(pmutex *m) { pthread_mutex_lock(m); }
-static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }
-
/* epoll_ctl()/epoll_wait() do not form a memory barrier, so cached memory
writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
visible to epoll_wait() thread. This function creates a memory barrier,
@@ -286,7 +266,7 @@ const char *AMQP_PORT_NAME = "amqp";
PN_STRUCT_CLASSDEF(pn_proactor)
PN_STRUCT_CLASSDEF(pn_listener)
-static bool start_polling(epoll_extended_t *ee, int epollfd) {
+bool start_polling(epoll_extended_t *ee, int epollfd) {
if (ee->polling)
return false;
ee->polling = true;
@@ -297,7 +277,7 @@ static bool start_polling(epoll_extended_t *ee, int epollfd) {
return (epoll_ctl(epollfd, EPOLL_CTL_ADD, ee->fd, &ev) == 0);
}
-static void stop_polling(epoll_extended_t *ee, int epollfd) {
+void stop_polling(epoll_extended_t *ee, int epollfd) {
// TODO: check for error, return bool or just log?
// TODO: is EPOLL_CTL_DEL ever needed beyond auto de-register when ee->fd is closed?
if (ee->fd == -1 || !ee->polling || epollfd == -1)
@@ -312,6 +292,19 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) {
ee->polling = false;
}
+void rearm_polling(epoll_extended_t *ee, int epollfd) {
+ struct epoll_event ev = {0};
+ ev.data.ptr = ee;
+ ev.events = ee->wanted | EPOLLONESHOT;
+ memory_barrier(ee);
+ if (epoll_ctl(epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
+ EPOLL_FATAL("arming polled file descriptor", errno);
+}
+
+static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
+ rearm_polling(ee, p->epollfd);
+}
+
/*
* The proactor maintains a number of serialization contexts: each
* connection, each listener, the proactor itself. The serialization
@@ -339,7 +332,7 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) {
// Fake thread for temporarily disabling the scheduling of a context.
static struct tslot_t *REWAKE_PLACEHOLDER = (struct tslot_t*) -1;
-static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p) {
+void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p) {
memset(ctx, 0, sizeof(*ctx));
pmutex_init(&ctx->mutex);
ctx->proactor = p;
@@ -350,8 +343,6 @@ static void pcontext_finalize(pcontext_t* ctx) {
pmutex_finalize(&ctx->mutex);
}
-static void rearm(pn_proactor_t *p, epoll_extended_t *ee);
-
/*
* Wake strategy with eventfd.
* - wakees can be in the list only once
@@ -411,7 +402,9 @@ static void pop_wake(pcontext_t *ctx) {
}
// part1: call with ctx->owner lock held, return true if notify required by caller
-static bool wake(pcontext_t *ctx) {
+// Note that this will return false if either there is a pending wake OR if we are already
+// in the connection context that is to be woken (as we don't have to wake it up)
+bool wake(pcontext_t *ctx) {
bool notify = false;
if (!ctx->wake_pending) {
@@ -440,7 +433,7 @@ static bool wake(pcontext_t *ctx) {
}
// part2: make OS call without lock held
-static inline void wake_notify(pcontext_t *ctx) {
+void wake_notify(pcontext_t *ctx) {
pn_proactor_t *p = ctx->proactor;
if (p->eventfd == -1)
return;
@@ -448,7 +441,7 @@ static inline void wake_notify(pcontext_t *ctx) {
}
// call with owner lock held, once for each pop from the wake list
-static inline void wake_done(pcontext_t *ctx) {
+void wake_done(pcontext_t *ctx) {
// assert(ctx->wake_pending > 0);
ctx->wake_pending = false;
}
@@ -572,7 +565,7 @@ static bool rewake(pcontext_t *ctx) {
}
// Call with sched lock
-static bool unassign_thread(tslot_t *ts, tslot_state new_state) {
+bool unassign_thread(tslot_t *ts, tslot_state new_state) {
pcontext_t *ctx = ts->context;
bool notify = false;
bool deleting = (ts->state == DELETING);
@@ -660,7 +653,7 @@ static void make_runnable(pcontext_t *ctx) {
-static void psocket_init(psocket_t* ps, pn_proactor_t* p, epoll_type_t type)
+void psocket_init(psocket_t* ps, pn_proactor_t* p, epoll_type_t type)
{
ps->epoll_io.fd = -1;
ps->epoll_io.type = type;
@@ -698,8 +691,6 @@ static void set_pconnection(pn_connection_t* c, pconnection_t *pc) {
static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool wake, bool topup);
static void write_flush(pconnection_t *pc);
static void listener_begin_close(pn_listener_t* l);
-static void proactor_add(pcontext_t *ctx);
-static bool proactor_remove(pcontext_t *ctx);
static void poller_done(struct pn_proactor_t* p, tslot_t *ts);
static inline pconnection_t *psocket_pconnection(psocket_t* ps) {
@@ -741,18 +732,6 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
containerof(batch, pconnection_t, batch) : NULL;
}
-static inline bool pconnection_has_event(pconnection_t *pc) {
- return pn_connection_driver_has_event(&pc->driver);
-}
-
-static inline bool listener_has_event(pn_listener_t *l) {
- return pn_collector_peek(l->collector) || (l->pending_count);
-}
-
-static inline bool proactor_has_event(pn_proactor_t *p) {
- return pn_collector_peek(p->collector);
-}
-
static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) {
pconnection_t *pc = psocket_pconnection(ps);
if (pc) {
@@ -780,15 +759,6 @@ static void psocket_gai_error(psocket_t *ps, int gai_err, const char* what) {
psocket_error_str(ps, gai_strerror(gai_err), what);
}
-static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
- struct epoll_event ev = {0};
- ev.data.ptr = ee;
- ev.events = ee->wanted | EPOLLONESHOT;
- memory_barrier(ee);
- if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
- EPOLL_FATAL("arming polled file descriptor", errno);
-}
-
static void listener_accepted_append(pn_listener_t *listener, accepted_t item) {
if (listener->pending_first+listener->pending_count >= listener->backlog) return;
@@ -796,7 +766,7 @@ static void listener_accepted_append(pn_listener_t *listener, accepted_t item) {
listener->pending_count++;
}
-static accepted_t *listener_accepted_next(pn_listener_t *listener) {
+accepted_t *listener_accepted_next(pn_listener_t *listener) {
if (!listener->pending_count) return NULL;
listener->pending_count--;
@@ -861,7 +831,7 @@ static void proactor_rearm_overflow(pn_proactor_t *p) {
}
// Close an FD and rearm overflow listeners. Call with no listener locks held.
-static int pclosefd(pn_proactor_t *p, int fd) {
+int pclosefd(pn_proactor_t *p, int fd) {
int err = close(fd);
if (!err) proactor_rearm_overflow(p);
return err;
@@ -1258,7 +1228,6 @@ static void pconnection_connected_lh(pconnection_t *pc);
static void pconnection_maybe_connect_lh(pconnection_t *pc);
static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool sched_wake, bool topup) {
- bool inbound_wake = sched_wake;
bool rearm_timer = false;
bool timer_fired = false;
bool waking = false;
@@ -1281,10 +1250,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
pc->tick_pending = true;
timer_fired = false;
}
- if (inbound_wake) {
- wake_done(&pc->context);
- inbound_wake = false;
- }
+ if (sched_wake) wake_done(&pc->context);
if (rearm_timer)
pc->timer_armed = false;
@@ -1462,7 +1428,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
return NULL;
}
-static void configure_socket(int sock) {
+void configure_socket(int sock) {
int flags = fcntl(sock, F_GETFL);
flags |= O_NONBLOCK;
(void)fcntl(sock, F_SETFL, flags); // TODO: check for error
@@ -1536,7 +1502,7 @@ static void pconnection_maybe_connect_lh(pconnection_t *pc) {
pc->disconnected = true;
}
-static int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
+int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
{
struct addrinfo hints = { 0 };
hints.ai_family = AF_UNSPEC;
@@ -1550,7 +1516,7 @@ static inline bool is_inactive(pn_proactor_t *p) {
}
/* If inactive set need_inactive and return true if the proactor needs a wakeup */
-static bool wake_if_inactive(pn_proactor_t *p) {
+bool wake_if_inactive(pn_proactor_t *p) {
if (is_inactive(p)) {
p->need_inactive = true;
return wake(&p->context);
@@ -2280,7 +2246,7 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout, bool i
return NULL;
}
-static void proactor_add(pcontext_t *ctx) {
+void proactor_add(pcontext_t *ctx) {
pn_proactor_t *p = ctx->proactor;
lock(&p->context.mutex);
if (p->contexts) {
@@ -2294,7 +2260,7 @@ static void proactor_add(pcontext_t *ctx) {
// call with psocket's mutex held
// return true if safe for caller to free psocket
-static bool proactor_remove(pcontext_t *ctx) {
+bool proactor_remove(pcontext_t *ctx) {
pn_proactor_t *p = ctx->proactor;
// Disassociate this context from scheduler
if (!p->shutting_down) {
@@ -2420,6 +2386,11 @@ static pn_event_batch_t *process(pcontext_t *ctx) {
batch = listener_process(l, n_events, ctx_wake);
break;
}
+ case RAW_CONNECTION: {
+ unlock(&p->sched_mutex);
+ batch = pni_raw_connection_process(ctx, ctx_wake);
+ break;
+ }
default:
assert(NULL);
}
@@ -2469,20 +2440,20 @@ static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
ctx->sched_pending = true;
break;
- case PCONNECTION_IO: {
- psocket_t *ps = containerof(ee, psocket_t, epoll_io);
- pconnection_t *pc = psocket_pconnection(ps);
+ case PCONNECTION_TIMER: {
+ pconnection_t *pc = containerof(containerof(ee, ptimer_t, epoll_io), pconnection_t, timer);
assert(pc);
ctx = &pc->context;
- ps->sched_io_events = evp->events;
+ pc->sched_timeout = true;;
ctx->sched_pending = true;
break;
}
- case PCONNECTION_TIMER: {
- pconnection_t *pc = containerof(containerof(ee, ptimer_t, epoll_io), pconnection_t, timer);
+ case PCONNECTION_IO: {
+ psocket_t *ps = containerof(ee, psocket_t, epoll_io);
+ pconnection_t *pc = psocket_pconnection(ps);
assert(pc);
ctx = &pc->context;
- pc->sched_timeout = true;;
+ ps->sched_io_events = evp->events;
ctx->sched_pending = true;
break;
}
@@ -2495,6 +2466,13 @@ static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
ctx->sched_pending = true;
break;
}
+ case RAW_CONNECTION_IO: {
+ psocket_t *ps = containerof(ee, psocket_t, epoll_io);
+ ctx = pni_psocket_raw_context(ps);
+ ps->sched_io_events = evp->events;
+ ctx->sched_pending = true;
+ break;
+ }
}
if (ctx && !ctx->runnable && !ctx->runner)
return ctx;
@@ -2860,6 +2838,14 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
check_earmark_override(p, ts);
return;
}
+ praw_connection_t *rc = pni_batch_raw_connection(batch);
+ if (rc) {
+ tslot_t *ts = pni_raw_connection_context(rc)->runner;
+ pni_raw_connection_done(rc);
+ // rc possibly freed/invalid
+ check_earmark_override(p, ts);
+ return;
+ }
pn_proactor_t *bp = batch_proactor(batch);
if (bp == p) {
bool notify = false;
@@ -2962,6 +2948,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
bool do_free = false;
bool ctx_notify = false;
pmutex *ctx_mutex = NULL;
+ // TODO: Need to extend this for raw connections too
pconnection_t *pc = pcontext_pconnection(ctx);
if (pc) {
ctx_mutex = &pc->context.mutex;
@@ -3047,11 +3034,3 @@ int64_t pn_proactor_now_64(void) {
clock_gettime(CLOCK_MONOTONIC, &t);
return t.tv_sec * 1000 + t.tv_nsec / 1000000;
}
-
-// Empty stubs for raw connection code
-pn_raw_connection_t *pn_raw_connection(void) { return NULL; }
-void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {}
-void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) {}
-void pn_raw_connection_wake(pn_raw_connection_t *conn) {}
-const struct pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *connection) { return NULL; }
-const struct pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *connection) { return NULL; }
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
new file mode 100644
index 0000000..2b9c4d4
--- /dev/null
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -0,0 +1,376 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* This is currently epoll implementation specific - and will need changing for the other proactors */
+
+#include "epoll-internal.h"
+#include "proactor-internal.h"
+#include "raw_connection-internal.h"
+
+#include <proton/proactor.h>
+#include <proton/listener.h>
+#include <proton/netaddr.h>
+#include <proton/raw_connection.h>
+
+#include <alloca.h>
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <sys/epoll.h>
+
+/* epoll specific raw connection struct */
+struct praw_connection_t {
+ pcontext_t context;
+ struct pn_raw_connection_t raw_connection;
+ psocket_t psocket;
+ struct pn_netaddr_t local, remote; /* Actual addresses */
+ pmutex rearm_mutex; /* protects pconnection_rearm from out of order arming*/
+ pn_event_batch_t batch;
+ struct addrinfo *addrinfo; /* Resolved address list */
+ struct addrinfo *ai; /* Current connect address */
+ bool connected;
+ bool disconnected;
+ bool waking; // TODO: This is actually protected by context.mutex so should be moved into context (pconnection too)
+};
+
+static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
+ pn_condition_t *cond = rc->raw_connection.condition;
+ if (!pn_condition_is_set(cond)) { /* Preserve older error information */
+ strerrorbuf what;
+ pstrerror(err, what);
+ char addr[PN_MAX_ADDR];
+ pn_netaddr_str(&rc->remote, addr, sizeof(addr));
+ pn_condition_format(cond, PNI_IO_CONDITION, "%s - %s %s", what, msg, addr);
+ }
+}
+
+static void psocket_gai_error(praw_connection_t *rc, int gai_err, const char* what, const char *addr) {
+ pn_condition_format(rc->raw_connection.condition, PNI_IO_CONDITION, "%s - %s %s",
+ gai_strerror(gai_err), what, addr);
+}
+
+static void praw_connection_connected_lh(praw_connection_t *prc) {
+ // Need to check socket for connection error
+ prc->connected = true;
+ if (prc->addrinfo) {
+ freeaddrinfo(prc->addrinfo);
+ prc->addrinfo = NULL;
+ }
+ prc->ai = NULL;
+ socklen_t len = sizeof(prc->remote.ss);
+ (void)getpeername(prc->psocket.epoll_io.fd, (struct sockaddr*)&prc->remote.ss, &len);
+
+ pni_raw_connected(&prc->raw_connection);
+}
+
+/* multi-address connections may call pconnection_start multiple times with diffferent FDs */
+static void praw_connection_start(praw_connection_t *prc, int fd) {
+ int efd = prc->psocket.proactor->epollfd;
+
+ /* Get the local socket name now, get the peer name in pconnection_connected */
+ socklen_t len = sizeof(prc->local.ss);
+ (void)getsockname(fd, (struct sockaddr*)&prc->local.ss, &len);
+
+ epoll_extended_t *ee = &prc->psocket.epoll_io;
+ if (ee->polling) { /* This is not the first attempt, stop polling and close the old FD */
+ int fd = ee->fd; /* Save fd, it will be set to -1 by stop_polling */
+ stop_polling(ee, efd);
+ pclosefd(prc->psocket.proactor, fd);
+ }
+ ee->fd = fd;
+ ee->wanted = EPOLLIN | EPOLLOUT;
+ start_polling(ee, efd); // TODO: check for error
+}
+
+/* Called on initial connect, and if connection fails to try another address */
+static void praw_connection_maybe_connect_lh(praw_connection_t *prc) {
+ while (prc->ai) { /* Have an address */
+ struct addrinfo *ai = prc->ai;
+ prc->ai = prc->ai->ai_next; /* Move to next address in case this fails */
+ int fd = socket(ai->ai_family, SOCK_STREAM, 0);
+ if (fd >= 0) {
+ configure_socket(fd);
+ if (!connect(fd, ai->ai_addr, ai->ai_addrlen) || errno == EINPROGRESS) {
+
+ /* Until we finish connecting save away the address we're trying to connect to */
+ memcpy((struct sockaddr *) &prc->remote.ss, ai->ai_addr, ai->ai_addrlen);
+
+ praw_connection_start(prc, fd);
+ return; /* Async connection started */
+ } else {
+ close(fd);
+ }
+ }
+ /* connect failed immediately, go round the loop to try the next addr */
+ }
+ int err;
+ socklen_t errlen = sizeof(err);
+ getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);
+ psocket_error(prc, err, "on connect");
+
+ freeaddrinfo(prc->addrinfo);
+ prc->addrinfo = NULL;
+ prc->disconnected = true;
+}
+
+//
+// Raw socket API
+//
+static pn_event_t * pni_raw_batch_next(pn_event_batch_t *batch);
+
+static void praw_connection_init(praw_connection_t *prc, pn_proactor_t *p, pn_raw_connection_t *rc) {
+ pcontext_init(&prc->context, RAW_CONNECTION, p);
+ psocket_init(&prc->psocket, p, RAW_CONNECTION_IO);
+
+ prc->connected = false;
+ prc->disconnected = false;
+ prc->waking = false;
+ prc->batch.next_event = pni_raw_batch_next;
+
+ pmutex_init(&prc->rearm_mutex);
+}
+
+static void praw_connection_cleanup(praw_connection_t *prc) {
+ int fd = prc->psocket.epoll_io.fd;
+ stop_polling(&prc->psocket.epoll_io, prc->psocket.proactor->epollfd);
+ if (fd != -1)
+ pclosefd(prc->psocket.proactor, fd);
+
+ lock(&prc->context.mutex);
+ bool can_free = proactor_remove(&prc->context);
+ unlock(&prc->context.mutex);
+ if (can_free) {
+ free(prc);
+ }
+ // else proactor_disconnect logic owns prc and its final free
+}
+
+pn_raw_connection_t *pn_raw_connection(void) {
+ praw_connection_t *conn = (praw_connection_t*) calloc(1, sizeof(praw_connection_t));
+ if (!conn) return NULL;
+
+ pni_raw_initialize(&conn->raw_connection);
+
+ return &conn->raw_connection;
+}
+
+void pn_raw_connection_free(pn_raw_connection_t *conn) {
+}
+
+void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {
+ assert(rc);
+ praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+ praw_connection_init(prc, p, rc);
+ // TODO: check case of proactor shutting down
+
+ lock(&prc->context.mutex);
+ proactor_add(&prc->context);
+
+ bool notify = false;
+ bool notify_proactor = false;
+
+ const char *host;
+ const char *port;
+ size_t addrlen = strlen(addr);
+ char *addr_buf = (char*) alloca(addrlen+1);
+ pni_parse_addr(addr, addr_buf, addrlen+1, &host, &port);
+
+ int gai_error = pgetaddrinfo(host, port, 0, &prc->addrinfo);
+ if (!gai_error) {
+ prc->ai = prc->addrinfo;
+ praw_connection_maybe_connect_lh(prc); /* Start connection attempts */
+ if (prc->disconnected) notify = wake(&prc->context);
+ } else {
+ psocket_gai_error(prc, gai_error, "connect to ", addr);
+ prc->disconnected = true;
+ notify = wake(&prc->context);
+ lock(&p->context.mutex);
+ notify_proactor = wake_if_inactive(p);
+ unlock(&p->context.mutex);
+ }
+
+ /* We need to issue INACTIVE on immediate failure */
+ unlock(&prc->context.mutex);
+ if (notify) wake_notify(&prc->context);
+ if (notify_proactor) wake_notify(&p->context);
+}
+
+void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) {
+ assert(rc);
+ praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+ praw_connection_init(prc, pn_listener_proactor(l), rc);
+ // TODO: fuller sanity check on input args
+
+ int err = 0;
+ int fd = -1;
+ bool notify = false;
+ lock(&l->context.mutex);
+ if (l->context.closing)
+ err = EBADF;
+ else {
+ accepted_t *a = listener_accepted_next(l);
+ if (a) {
+ fd = a->accepted_fd;
+ a->accepted_fd = -1;
+ }
+ else err = EWOULDBLOCK;
+ }
+
+ proactor_add(&prc->context);
+
+ lock(&prc->context.mutex);
+ if (fd >= 0) {
+ configure_socket(fd);
+ praw_connection_start(prc, fd);
+ praw_connection_connected_lh(prc);
+ } else {
+ psocket_error(prc, err, "pn_listener_accept");
+ }
+
+ if (!l->context.working && listener_has_event(l)) {
+ notify = wake(&l->context);
+ }
+ unlock(&prc->context.mutex);
+ unlock(&l->context.mutex);
+ if (notify) wake_notify(&l->context);
+}
+
+const pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *rc) {
+ praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+ if (!prc) return NULL;
+ return &prc->local;
+}
+
+const pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *rc) {
+ praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+ if (!prc) return NULL;
+ return &prc->remote;
+}
+
+void pn_raw_connection_wake(pn_raw_connection_t *rc) {
+ bool notify = false;
+ praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+ if (prc) {
+ lock(&prc->context.mutex);
+ if (!prc->context.closing) {
+ prc->waking = true;
+ notify = wake(&prc->context);
+ }
+ unlock(&prc->context.mutex);
+ }
+ if (notify) wake_notify(&prc->context);
+}
+
+static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) {
+ pn_raw_connection_t *raw = &containerof(batch, praw_connection_t, batch)->raw_connection;
+ return pni_raw_event_next(raw);
+}
+
+pcontext_t *pni_psocket_raw_context(psocket_t* ps) {
+ return &containerof(ps, praw_connection_t, psocket)->context;
+}
+
+praw_connection_t *pni_batch_raw_connection(pn_event_batch_t *batch) {
+ return (batch->next_event == pni_raw_batch_next) ?
+ containerof(batch, praw_connection_t, batch) : NULL;
+}
+
+pcontext_t *pni_raw_connection_context(praw_connection_t *rc) {
+ return &rc->context;
+}
+
+static long snd(int fd, const void* b, size_t s) {
+ return send(fd, b, s, MSG_NOSIGNAL | MSG_DONTWAIT);
+}
+
+static long rcv(int fd, void* b, size_t s) {
+ return recv(fd, b, s, MSG_DONTWAIT);
+}
+
+static void set_error(pn_raw_connection_t *conn, const char *msg, int err) {
+ psocket_error(containerof(conn, praw_connection_t, raw_connection), err, msg);
+}
+
+pn_event_batch_t *pni_raw_connection_process(pcontext_t *c, bool sched_wake) {
+ praw_connection_t *rc = containerof(c, praw_connection_t, context);
+ int events = rc->psocket.sched_io_events;
+ int fd = rc->psocket.epoll_io.fd;
+ if (!rc->connected) {
+ if (events & (EPOLLHUP | EPOLLERR)) {
+ praw_connection_maybe_connect_lh(rc);
+ }
+ if (rc->disconnected) {
+ pni_raw_disconnect(&rc->raw_connection);
+ return &rc->batch;
+ }
+ if (events & (EPOLLHUP | EPOLLERR)) {
+ return NULL;
+ }
+ praw_connection_connected_lh(rc);
+ }
+
+ bool wake = false;
+ lock(&c->mutex);
+ c->working = true;
+ if (sched_wake) wake_done(c);
+ wake = sched_wake || rc->waking;
+ rc->waking = false;
+ unlock(&c->mutex);
+
+ if (wake) pni_raw_wake(&rc->raw_connection);
+ if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
+ if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error);
+ return &rc->batch;
+}
+
+void pni_raw_connection_done(praw_connection_t *rc) {
+ bool self_notify = false;
+ lock(&rc->context.mutex);
+ pn_proactor_t *p = rc->context.proactor;
+ tslot_t *ts = rc->context.runner;
+ rc->context.working = false;
+ self_notify = rc->waking && wake(&rc->context);
+ unlock(&rc->context.mutex);
+ if (self_notify) wake_notify(&rc->context);
+
+ pn_raw_connection_t *raw = &rc->raw_connection;
+ int wanted =
+ (pni_raw_can_read(raw) ? EPOLLIN : 0) |
+ (pni_raw_can_write(raw) ? EPOLLOUT : 0);
+ if (wanted) {
+ rc->psocket.epoll_io.wanted = wanted;
+ rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error
+ } else {
+ bool finished_disconnect = raw->rclosed && raw->wclosed && !raw->disconnectpending;
+ if (finished_disconnect) {
+ // If we're closed and we've sent the disconnect then close
+ pni_raw_finalize(raw);
+ praw_connection_cleanup(rc);
+ }
+ }
+
+ lock(&p->sched_mutex);
+ bool notify = unassign_thread(ts, UNUSED);
+ unlock(&p->sched_mutex);
+ if (notify) wake_notify(&p->context);
+}
diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h
index 79633e4..02c3af2 100644
--- a/c/src/proactor/raw_connection-internal.h
+++ b/c/src/proactor/raw_connection-internal.h
@@ -83,6 +83,7 @@ struct pn_raw_connection_t {
bool rdrainpending;
bool wdrainpending;
bool disconnectpending;
+ bool wakepending;
};
/*
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
index 3e4a74c..a225df9 100644
--- a/c/src/proactor/raw_connection.c
+++ b/c/src/proactor/raw_connection.c
@@ -56,7 +56,6 @@ void pni_raw_initialize(pn_raw_connection_t *conn) {
conn->wbuffers[i-1].type = buff_wempty;
}
- //conn->batch.next_event = pni_raw_batch_next;
conn->condition = pn_condition();
conn->collector = pn_collector();
conn->attachments = pn_record();
@@ -323,7 +322,7 @@ void pni_raw_connected(pn_raw_connection_t *conn) {
}
void pni_raw_wake(pn_raw_connection_t *conn) {
- pni_raw_put_event(conn, PN_RAW_CONNECTION_WAKE);
+ conn->wakepending = true;
}
void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) {
@@ -482,6 +481,10 @@ pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
switch (type) {
default: break;
}
+ } else if (conn->wakepending) {
+ pni_raw_put_event(conn, PN_RAW_CONNECTION_WAKE);
+ conn->wakepending = false;
+ continue;
} else if (conn->rpending) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_READ);
conn->rpending = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org