You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jr...@apache.org on 2018/04/05 19:34:07 UTC
[32/51] [partial] qpid-proton git commit: PROTON-1728: Reorganize the
source tree
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/proactor/win_iocp.c
----------------------------------------------------------------------
diff --git a/c/src/proactor/win_iocp.c b/c/src/proactor/win_iocp.c
new file mode 100644
index 0000000..e41ac5b
--- /dev/null
+++ b/c/src/proactor/win_iocp.c
@@ -0,0 +1,3429 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/condition.h>
+#include <proton/connection_driver.h>
+#include <proton/engine.h>
+#include <proton/message.h>
+#include <proton/object.h>
+#include <proton/proactor.h>
+#include <proton/transport.h>
+#include <proton/listener.h>
+#include <proton/proactor.h>
+
+#include <assert.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <queue>
+#include <list>
+
+#include <winsock2.h>
+#include <mswsock.h>
+#include <Ws2tcpip.h>
+
+#include <iostream>
+#include <sstream>
+
+#include "./netaddr-internal.h" /* Include after socket/inet headers */
+
+/*
+ * Proactor for Windows using IO completion ports.
+ *
+ * There is no specific threading code other than locking and indirect (and
+ * brief) calls to the default threadpool via timerqueue timers.
+ *
+ * The IOCP system remembers what threads have called pn_proactor_wait and
+ * assumes they are doing proactor work until they call wait again. Using a
+ * dedicated threadpool to drive the proactor, as in the examples, is
+ * recommended. The use of other long-lived threads to occasionally drive the
+ * proactor will hurt the completion port scheduler and cause fewer threads to
+ * run.
+ *
+ * Each proactor connection maintains its own queue of pending completions and
+ * its work is serialized on that. The proactor listener runs parallel where
+ * possible, but its event batch is serialized.
+ */
+
+// TODO: make all code C++ or all C90-ish
+// change INACTIVE to be from begin_close instead of zombie reap, to be more like Posix
+// make the global write lock window much smaller
+// 2 exclusive write buffers per connection
+// make the zombie processing thread safe
+// configurable completion port concurrency
+// proton objects and ref counting: just say no.
+
+
+
+// From selector.h
+#define PN_READABLE (1)
+#define PN_WRITABLE (2)
+#define PN_EXPIRED (4)
+#define PN_ERROR (8)
+
+typedef SOCKET pn_socket_t;
+
+
+namespace pn_experimental {
+// Code borrowed from old messenger/reactor io.c iocp.c write_pipeline.c select.c
+// Mostly unchanged except fro some thread safety (i.e. dropping the old descriptor map)
+
+typedef struct pni_acceptor_t pni_acceptor_t;
+typedef struct write_result_t write_result_t;
+typedef struct read_result_t read_result_t;
+typedef struct write_pipeline_t write_pipeline_t;
+typedef struct iocpdesc_t iocpdesc_t;
+typedef struct iocp_t iocp_t;
+
+
+// One per completion port.
+
+struct iocp_t {
+ HANDLE completion_port;
+ pn_list_t *zombie_list;
+ unsigned shared_pool_size;
+ char *shared_pool_memory;
+ write_result_t **shared_results;
+ write_result_t **available_results;
+ size_t shared_available_count;
+ size_t writer_count;
+ int loopback_bufsize;
+ bool iocp_trace;
+};
+
+// One for each socket.
+// It should remain ref counted in the
+// zombie_list until ops_in_progress == 0 or the completion port is closed.
+
+struct iocpdesc_t {
+ pn_socket_t socket;
+ iocp_t *iocp;
+ void *active_completer;
+ pni_acceptor_t *acceptor;
+ pn_error_t *error;
+ int ops_in_progress;
+ bool read_in_progress;
+ write_pipeline_t *pipeline;
+ read_result_t *read_result;
+ bool bound; // associated with the completion port
+ bool closing; // close called by application
+ bool read_closed; // EOF or read error
+ bool write_closed; // shutdown sent or write error
+ bool poll_error; // flag posix-like POLLERR/POLLHUP/POLLNVAL
+ bool is_mp; // Special multi thread care required
+ bool write_blocked;
+ int events;
+ pn_timestamp_t reap_time;
+};
+
+
+// Max number of overlapped accepts per listener. TODO: configurable.
+#define IOCP_MAX_ACCEPTS 4
+
+// AcceptEx squishes the local and remote addresses and optional data
+// all together when accepting the connection. Reserve enough for
+// IPv6 addresses, even if the socket is IPv4. The 16 bytes padding
+// per address is required by AcceptEx.
+#define IOCP_SOCKADDRMAXLEN (sizeof(sockaddr_in6) + 16)
+#define IOCP_SOCKADDRBUFLEN (2 * IOCP_SOCKADDRMAXLEN)
+
+typedef enum { IOCP_ACCEPT, IOCP_CONNECT, IOCP_READ, IOCP_WRITE } iocp_type_t;
+
+typedef struct {
+ OVERLAPPED overlapped;
+ iocp_type_t type;
+ iocpdesc_t *iocpd;
+ HRESULT status;
+ DWORD num_transferred;
+} iocp_result_t;
+
+struct read_result_t {
+ iocp_result_t base;
+ size_t drain_count;
+ char unused_buf[1];
+};
+
+struct write_result_t {
+ iocp_result_t base;
+ size_t requested;
+ bool in_use;
+ pn_bytes_t buffer;
+};
+
+typedef struct {
+ iocp_result_t base;
+ char address_buffer[IOCP_SOCKADDRBUFLEN];
+ struct addrinfo *addrinfo;
+} connect_result_t;
+
+typedef struct {
+ iocp_result_t base;
+ iocpdesc_t *new_sock;
+ char address_buffer[IOCP_SOCKADDRBUFLEN];
+ DWORD unused;
+} accept_result_t;
+
+
+void complete_connect(connect_result_t *result, HRESULT status);
+void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status);
+void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status);
+void start_reading(iocpdesc_t *iocpd);
+void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result);
+void reset_accept_result(accept_result_t *result);
+iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd);
+void pni_iocp_reap_check(iocpdesc_t *iocpd);
+connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr);
+iocpdesc_t *pni_iocpdesc_create(iocp_t *, pn_socket_t s);
+iocpdesc_t *pni_deadline_desc(iocp_t *);
+void pni_iocpdesc_start(iocpdesc_t *iocpd);
+void pni_iocp_drain_completions(iocp_t *);
+int pni_iocp_wait_one(iocp_t *, int timeout, pn_error_t *);
+void pni_iocp_start_accepting(iocpdesc_t *iocpd);
+pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error);
+pn_socket_t pni_iocp_begin_connect(iocp_t *, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error);
+ssize_t pni_iocp_begin_write(iocpdesc_t *, const void *, size_t, bool *, pn_error_t *);
+ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error);
+void pni_iocp_begin_close(iocpdesc_t *iocpd);
+iocp_t *pni_iocp();
+
+void pni_events_update(iocpdesc_t *iocpd, int events);
+write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen);
+void pni_zombie_check(iocp_t *, pn_timestamp_t);
+pn_timestamp_t pni_zombie_deadline(iocp_t *);
+
+int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code);
+
+pn_timestamp_t pn_i_now2(void);
+}
+
+// ======================================================================
+// Write pipeline.
+// ======================================================================
+
+/*
+ * A simple write buffer pool. Each socket has a dedicated "primary"
+ * buffer and can borrow from a shared pool with limited size tuning.
+ * Could enhance e.g. with separate pools per network interface and fancier
+ * memory tuning based on interface speed, system resources, and
+ * number of connections, etc.
+ */
+
+namespace pn_experimental {
+
+// Max overlapped writes per socket
+#define IOCP_MAX_OWRITES 16
+// Write buffer size
+#define IOCP_WBUFSIZE 16384
+
+static void pipeline_log(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fflush(stderr);
+}
+
+void pni_shared_pool_create(iocp_t *iocp)
+{
+ // TODO: more pools (or larger one) when using multiple non-loopback interfaces
+ iocp->shared_pool_size = 16;
+ char *env = getenv("PNI_WRITE_BUFFERS"); // Internal: for debugging
+ if (env) {
+ int sz = atoi(env);
+ if (sz >= 0 && sz < 256) {
+ iocp->shared_pool_size = sz;
+ }
+ }
+ iocp->loopback_bufsize = 0;
+ env = getenv("PNI_LB_BUFSIZE"); // Internal: for debugging
+ if (env) {
+ int sz = atoi(env);
+ if (sz >= 0 && sz <= 128 * 1024) {
+ iocp->loopback_bufsize = sz;
+ }
+ }
+
+ if (iocp->shared_pool_size) {
+ iocp->shared_pool_memory = (char *) VirtualAlloc(NULL, IOCP_WBUFSIZE * iocp->shared_pool_size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
+ HRESULT status = GetLastError();
+ if (!iocp->shared_pool_memory) {
+ perror("Proton write buffer pool allocation failure\n");
+ iocp->shared_pool_size = 0;
+ iocp->shared_available_count = 0;
+ return;
+ }
+
+ iocp->shared_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
+ iocp->available_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
+ iocp->shared_available_count = iocp->shared_pool_size;
+ char *mem = iocp->shared_pool_memory;
+ for (unsigned i = 0; i < iocp->shared_pool_size; i++) {
+ iocp->shared_results[i] = iocp->available_results[i] = pni_write_result(NULL, mem, IOCP_WBUFSIZE);
+ mem += IOCP_WBUFSIZE;
+ }
+ }
+}
+
+void pni_shared_pool_free(iocp_t *iocp)
+{
+ for (unsigned i = 0; i < iocp->shared_pool_size; i++) {
+ write_result_t *result = iocp->shared_results[i];
+ if (result->in_use)
+ pipeline_log("Proton buffer pool leak\n");
+ else
+ free(result);
+ }
+ if (iocp->shared_pool_size) {
+ free(iocp->shared_results);
+ free(iocp->available_results);
+ if (iocp->shared_pool_memory) {
+ if (!VirtualFree(iocp->shared_pool_memory, 0, MEM_RELEASE)) {
+ perror("write buffers release failed");
+ }
+ iocp->shared_pool_memory = NULL;
+ }
+ }
+}
+
+static void shared_pool_push(write_result_t *result)
+{
+ iocp_t *iocp = result->base.iocpd->iocp;
+ assert(iocp->shared_available_count < iocp->shared_pool_size);
+ iocp->available_results[iocp->shared_available_count++] = result;
+}
+
+static write_result_t *shared_pool_pop(iocp_t *iocp)
+{
+ return iocp->shared_available_count ? iocp->available_results[--iocp->shared_available_count] : NULL;
+}
+
+struct write_pipeline_t {
+ iocpdesc_t *iocpd;
+ size_t pending_count;
+ write_result_t *primary;
+ size_t reserved_count;
+ size_t next_primary_index;
+ size_t depth;
+ bool is_writer;
+};
+
+#define write_pipeline_compare NULL
+#define write_pipeline_inspect NULL
+#define write_pipeline_hashcode NULL
+
+static void write_pipeline_initialize(void *object)
+{
+ write_pipeline_t *pl = (write_pipeline_t *) object;
+ pl->pending_count = 0;
+ const char *pribuf = (const char *) malloc(IOCP_WBUFSIZE);
+ pl->primary = pni_write_result(NULL, pribuf, IOCP_WBUFSIZE);
+ pl->depth = 0;
+ pl->is_writer = false;
+}
+
+static void write_pipeline_finalize(void *object)
+{
+ write_pipeline_t *pl = (write_pipeline_t *) object;
+ free((void *)pl->primary->buffer.start);
+ free(pl->primary);
+}
+
+write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd)
+{
+ static const pn_cid_t CID_write_pipeline = CID_pn_void;
+ static const pn_class_t clazz = PN_CLASS(write_pipeline);
+ write_pipeline_t *pipeline = (write_pipeline_t *) pn_class_new(&clazz, sizeof(write_pipeline_t));
+ pipeline->iocpd = iocpd;
+ pipeline->primary->base.iocpd = iocpd;
+ return pipeline;
+}
+
+static void confirm_as_writer(write_pipeline_t *pl)
+{
+ if (!pl->is_writer) {
+ iocp_t *iocp = pl->iocpd->iocp;
+ iocp->writer_count++;
+ pl->is_writer = true;
+ }
+}
+
+static void remove_as_writer(write_pipeline_t *pl)
+{
+ if (!pl->is_writer)
+ return;
+ iocp_t *iocp = pl->iocpd->iocp;
+ assert(iocp->writer_count);
+ pl->is_writer = false;
+ iocp->writer_count--;
+}
+
+/*
+ * Optimal depth will depend on properties of the NIC, server, and driver. For now,
+ * just distinguish between loopback interfaces and the rest. Optimizations in the
+ * loopback stack allow decent performance with depth 1 and actually cause major
+ * performance hiccups if set to large values.
+ */
+static void set_depth(write_pipeline_t *pl)
+{
+ pl->depth = 1;
+ sockaddr_storage sa;
+ socklen_t salen = sizeof(sa);
+ char buf[INET6_ADDRSTRLEN];
+ DWORD buflen = sizeof(buf);
+
+ if (getsockname(pl->iocpd->socket,(sockaddr*) &sa, &salen) == 0 &&
+ getnameinfo((sockaddr*) &sa, salen, buf, buflen, NULL, 0, NI_NUMERICHOST) == 0) {
+ if ((sa.ss_family == AF_INET6 && strcmp(buf, "::1")) ||
+ (sa.ss_family == AF_INET && strncmp(buf, "127.", 4))) {
+ // not loopback
+ pl->depth = IOCP_MAX_OWRITES;
+ } else {
+ iocp_t *iocp = pl->iocpd->iocp;
+ if (iocp->loopback_bufsize) {
+ const char *p = (const char *) realloc((void *) pl->primary->buffer.start, iocp->loopback_bufsize);
+ if (p) {
+ pl->primary->buffer.start = p;
+ pl->primary->buffer.size = iocp->loopback_bufsize;
+ }
+ }
+ }
+ }
+}
+
+static size_t pni_min(size_t a, size_t b) { return a < b ? a : b; }
+
+// Reserve as many buffers as possible for count bytes.
+size_t pni_write_pipeline_reserve(write_pipeline_t *pl, size_t count)
+{
+ if (pl->primary->in_use)
+ return 0; // I.e. io->wouldblock
+ if (!pl->depth)
+ set_depth(pl);
+ if (pl->depth == 1) {
+ // always use the primary
+ pl->reserved_count = 1;
+ pl->next_primary_index = 0;
+ return 1;
+ }
+
+ iocp_t *iocp = pl->iocpd->iocp;
+ confirm_as_writer(pl);
+ size_t wanted = (count / IOCP_WBUFSIZE);
+ if (count % IOCP_WBUFSIZE)
+ wanted++;
+ size_t pending = pl->pending_count;
+ assert(pending < pl->depth);
+ size_t bufs = pni_min(wanted, pl->depth - pending);
+ // Can draw from shared pool or the primary... but share with others.
+ size_t writers = iocp->writer_count;
+ size_t shared_count = (iocp->shared_available_count + writers - 1) / writers;
+ bufs = pni_min(bufs, shared_count + 1);
+ pl->reserved_count = pending + bufs;
+
+ if (bufs == wanted &&
+ pl->reserved_count < (pl->depth / 2) &&
+ iocp->shared_available_count > (2 * writers + bufs)) {
+ // No shortage: keep the primary as spare for future use
+ pl->next_primary_index = pl->reserved_count;
+ } else if (bufs == 1) {
+ pl->next_primary_index = pending;
+ } else {
+ // let approx 1/3 drain before replenishing
+ pl->next_primary_index = ((pl->reserved_count + 2) / 3) - 1;
+ if (pl->next_primary_index < pending)
+ pl->next_primary_index = pending;
+ }
+ return bufs;
+}
+
+write_result_t *pni_write_pipeline_next(write_pipeline_t *pl)
+{
+ size_t sz = pl->pending_count;
+ if (sz >= pl->reserved_count)
+ return NULL;
+ write_result_t *result;
+ if (sz == pl->next_primary_index) {
+ result = pl->primary;
+ } else {
+ assert(pl->iocpd->iocp->shared_available_count > 0);
+ result = shared_pool_pop(pl->iocpd->iocp);
+ }
+
+ result->in_use = true;
+ pl->pending_count++;
+ return result;
+}
+
+void pni_write_pipeline_return(write_pipeline_t *pl, write_result_t *result)
+{
+ result->in_use = false;
+ pl->pending_count--;
+ pl->reserved_count = 0;
+ if (result != pl->primary)
+ shared_pool_push(result);
+ if (pl->pending_count == 0)
+ remove_as_writer(pl);
+}
+
+bool pni_write_pipeline_writable(write_pipeline_t *pl)
+{
+ // Only writable if not full and we can guarantee a buffer:
+ return pl->pending_count < pl->depth && !pl->primary->in_use;
+}
+
+size_t pni_write_pipeline_size(write_pipeline_t *pl)
+{
+ return pl->pending_count;
+}
+
+}
+
+
+// ======================================================================
+// IOCP infrastructure code
+// ======================================================================
+
+/*
+ * Socket IO including graceful or forced zombie teardown.
+ *
+ * Overlapped writes are used to avoid lengthy stalls between write
+ * completion and starting a new write. Non-overlapped reads are used
+ * since Windows accumulates inbound traffic without stalling and
+ * managing read buffers would not avoid a memory copy at the pn_read
+ * boundary.
+ *
+ * A socket must not get a Windows closesocket() unless the
+ * application has called pn_connection_close on the connection or a
+ * global single threaded shutdown is in progress. On error, the
+ * internal accounting for write_closed or read_closed may be updated
+ * along with the external event notification. A socket may be
+ * directly closed if it is never gets started (accept/listener_close
+ * collision) or otherwise turned into a zombie.
+ */
+
+namespace pn_experimental {
+
+pn_timestamp_t pn_i_now2(void)
+{
+ FILETIME now;
+ GetSystemTimeAsFileTime(&now);
+ ULARGE_INTEGER t;
+ t.u.HighPart = now.dwHighDateTime;
+ t.u.LowPart = now.dwLowDateTime;
+ // Convert to milliseconds and adjust base epoch
+ return t.QuadPart / 10000 - 11644473600000;
+}
+
+static void iocp_log(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fflush(stderr);
+}
+
+static void set_iocp_error_status(pn_error_t *error, int code, HRESULT status)
+{
+ char buf[512];
+ if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM,
+ 0, status, 0, buf, sizeof(buf), 0))
+ pn_error_set(error, code, buf);
+ else {
+ fprintf(stderr, "pn internal Windows error: %lu\n", GetLastError());
+ }
+}
+
+static void reap_check(iocpdesc_t *);
+static void bind_to_completion_port(iocpdesc_t *iocpd);
+static void iocp_shutdown(iocpdesc_t *iocpd);
+static bool is_listener(iocpdesc_t *iocpd);
+static void release_sys_sendbuf(SOCKET s);
+
+int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code)
+{
+ // Error code can be from GetLastError or WSAGetLastError,
+ char err[1024] = {0};
+ FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
+ FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
+ return pn_error_format(error, PN_ERR, "%s: %s", msg, err);
+}
+
+static void iocpdesc_fail(iocpdesc_t *iocpd, HRESULT status, const char* text)
+{
+ pni_win32_error(iocpd->error, text, status);
+ if (iocpd->iocp->iocp_trace) {
+ iocp_log("connection terminated: %s\n", pn_error_text(iocpd->error));
+ }
+ iocpd->write_closed = true;
+ iocpd->read_closed = true;
+ iocpd->poll_error = true;
+ pni_events_update(iocpd, iocpd->events & ~(PN_READABLE | PN_WRITABLE));
+}
+
+static int pni_strcasecmp(const char *a, const char *b)
+{
+ int diff;
+ while (*b) {
+ char aa = *a++, bb = *b++;
+ diff = tolower(aa)-tolower(bb);
+ if ( diff!=0 ) return diff;
+ }
+ return *a;
+}
+
+static void pni_events_update(iocpdesc_t *iocpd, int events)
+{
+ // If set, a poll error is permanent
+ if (iocpd->poll_error)
+ events |= PN_ERROR;
+ if (iocpd->events == events)
+ return;
+ // ditch old code to update list of ready selectables
+ iocpd->events = events;
+}
+
+// Helper functions to use specialized IOCP AcceptEx() and ConnectEx()
+static LPFN_ACCEPTEX lookup_accept_ex(SOCKET s)
+{
+ GUID guid = WSAID_ACCEPTEX;
+ DWORD bytes = 0;
+ LPFN_ACCEPTEX fn;
+ WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+ &fn, sizeof(fn), &bytes, NULL, NULL);
+ assert(fn);
+ return fn;
+}
+
+static LPFN_CONNECTEX lookup_connect_ex(SOCKET s)
+{
+ GUID guid = WSAID_CONNECTEX;
+ DWORD bytes = 0;
+ LPFN_CONNECTEX fn;
+ WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+ &fn, sizeof(fn), &bytes, NULL, NULL);
+ assert(fn);
+ return fn;
+}
+
+static LPFN_GETACCEPTEXSOCKADDRS lookup_get_accept_ex_sockaddrs(SOCKET s)
+{
+ GUID guid = WSAID_GETACCEPTEXSOCKADDRS;
+ DWORD bytes = 0;
+ LPFN_GETACCEPTEXSOCKADDRS fn;
+ WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+ &fn, sizeof(fn), &bytes, NULL, NULL);
+ assert(fn);
+ return fn;
+}
+
+// match accept socket to listener socket
+iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd)
+{
+ sockaddr_storage sa;
+ socklen_t salen = sizeof(sa);
+ if (getsockname(iocpd->socket, (sockaddr*)&sa, &salen) == -1)
+ return NULL;
+ SOCKET s = socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+ if (s == INVALID_SOCKET)
+ return NULL;
+ return pni_iocpdesc_create(iocpd->iocp, s);
+}
+
+static bool is_listener(iocpdesc_t *iocpd)
+{
+ return iocpd && iocpd->acceptor;
+}
+
+// === Async accept processing
+
+static accept_result_t *accept_result(iocpdesc_t *listen_sock) {
+ accept_result_t *result = (accept_result_t *)calloc(1, sizeof(accept_result_t));
+ if (result) {
+ result->base.type = IOCP_ACCEPT;
+ result->base.iocpd = listen_sock;
+ }
+ return result;
+}
+
+void reset_accept_result(accept_result_t *result) {
+ memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+ memset(&result->address_buffer, 0, IOCP_SOCKADDRBUFLEN);
+ result->new_sock = NULL;
+}
+
+struct pni_acceptor_t {
+ int accept_queue_size;
+ pn_list_t *accepts;
+ iocpdesc_t *listen_sock;
+ bool signalled;
+ LPFN_ACCEPTEX fn_accept_ex;
+ LPFN_GETACCEPTEXSOCKADDRS fn_get_accept_ex_sockaddrs;
+};
+
+#define pni_acceptor_compare NULL
+#define pni_acceptor_inspect NULL
+#define pni_acceptor_hashcode NULL
+
+static void pni_acceptor_initialize(void *object)
+{
+ pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
+ acceptor->accepts = pn_list(PN_VOID, IOCP_MAX_ACCEPTS);
+}
+
+static void pni_acceptor_finalize(void *object)
+{
+ pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
+ size_t len = pn_list_size(acceptor->accepts);
+ for (size_t i = 0; i < len; i++)
+ free(pn_list_get(acceptor->accepts, i));
+ pn_free(acceptor->accepts);
+}
+
+static pni_acceptor_t *pni_acceptor(iocpdesc_t *iocpd)
+{
+ static const pn_cid_t CID_pni_acceptor = CID_pn_void;
+ static const pn_class_t clazz = PN_CLASS(pni_acceptor);
+ pni_acceptor_t *acceptor = (pni_acceptor_t *) pn_class_new(&clazz, sizeof(pni_acceptor_t));
+ acceptor->listen_sock = iocpd;
+ acceptor->accept_queue_size = 0;
+ acceptor->signalled = false;
+ pn_socket_t sock = acceptor->listen_sock->socket;
+ acceptor->fn_accept_ex = lookup_accept_ex(sock);
+ acceptor->fn_get_accept_ex_sockaddrs = lookup_get_accept_ex_sockaddrs(sock);
+ return acceptor;
+}
+
+void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result)
+{
+ // flag to divide this routine's logic into locked/unlocked mp portions
+ bool mp = acceptor->listen_sock->is_mp;
+ bool created = false;
+
+ if (acceptor->listen_sock->closing) {
+ if (result) {
+ if (mp && result->new_sock && result->new_sock->socket != INVALID_SOCKET)
+ closesocket(result->new_sock->socket);
+ free(result);
+ acceptor->accept_queue_size--;
+ }
+ if (acceptor->accept_queue_size == 0)
+ acceptor->signalled = true;
+ return;
+ }
+
+ if (result) {
+ if (!mp)
+ reset_accept_result(result);
+ } else {
+ if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS && (mp ||
+ pn_list_size(acceptor->accepts) == acceptor->accept_queue_size )) {
+ result = accept_result(acceptor->listen_sock);
+ acceptor->accept_queue_size++;
+ created = true;
+ } else {
+ // an async accept is still pending or max concurrent accepts already hit
+ return;
+ }
+ }
+
+ if (created || !mp)
+ result->new_sock = create_same_type_socket(acceptor->listen_sock);
+ if (result->new_sock) {
+ // Not yet connected.
+ result->new_sock->read_closed = true;
+ result->new_sock->write_closed = true;
+
+ bool success = acceptor->fn_accept_ex(acceptor->listen_sock->socket, result->new_sock->socket,
+ result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN,
+ &result->unused, (LPOVERLAPPED) result);
+ if (!success) {
+ DWORD err = WSAGetLastError();
+ if (err != ERROR_IO_PENDING) {
+ if (err == WSAECONNRESET) {
+ // other side gave up. Ignore and try again.
+ begin_accept(acceptor, result);
+ return;
+ }
+ else {
+ iocpdesc_fail(acceptor->listen_sock, err, "AcceptEX call failure");
+ return;
+ }
+ }
+ acceptor->listen_sock->ops_in_progress++;
+ // This socket is equally involved in the async operation.
+ result->new_sock->ops_in_progress++;
+ }
+ } else {
+ iocpdesc_fail(acceptor->listen_sock, WSAGetLastError(), "create accept socket");
+ }
+}
+
+static void complete_accept(accept_result_t *result, HRESULT status)
+{
+ result->new_sock->ops_in_progress--;
+ iocpdesc_t *ld = result->base.iocpd;
+ if (ld->read_closed) {
+ if (!result->new_sock->closing)
+ pni_iocp_begin_close(result->new_sock);
+ pn_decref(result->new_sock);
+ free(result); // discard
+ reap_check(ld);
+ } else {
+ assert(!ld->is_mp); // Non mp only
+ result->base.status = status;
+ pn_list_add(ld->acceptor->accepts, result);
+ pni_events_update(ld, ld->events | PN_READABLE);
+ }
+}
+
+
+// === Async connect processing
+
+
+#define connect_result_initialize NULL
+#define connect_result_compare NULL
+#define connect_result_inspect NULL
+#define connect_result_hashcode NULL
+
+static void connect_result_finalize(void *object)
+{
+ connect_result_t *result = (connect_result_t *) object;
+ // Do not release addrinfo until ConnectEx completes
+ if (result->addrinfo)
+ freeaddrinfo(result->addrinfo);
+}
+
+connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr) {
+ static const pn_cid_t CID_connect_result = CID_pn_void;
+ static const pn_class_t clazz = PN_CLASS(connect_result);
+ connect_result_t *result = (connect_result_t *) pn_class_new(&clazz, sizeof(connect_result_t));
+ if (result) {
+ memset(result, 0, sizeof(connect_result_t));
+ result->base.type = IOCP_CONNECT;
+ result->base.iocpd = iocpd;
+ result->addrinfo = addr;
+ }
+ return result;
+}
+
+pn_socket_t pni_iocp_begin_connect(iocp_t *iocp, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error)
+{
+ // addr lives for the duration of the async connect. Caller has passed ownership here.
+ // See connect_result_finalize().
+ // Use of Windows-specific ConnectEx() requires our socket to be "loosely" pre-bound:
+ sockaddr_storage sa;
+ memset(&sa, 0, sizeof(sa));
+ sa.ss_family = addr->ai_family;
+ if (bind(sock, (SOCKADDR *) &sa, addr->ai_addrlen)) {
+ pni_win32_error(error, "begin async connection", WSAGetLastError());
+ if (iocp->iocp_trace)
+ iocp_log("%s\n", pn_error_text(error));
+ closesocket(sock);
+ freeaddrinfo(addr);
+ return INVALID_SOCKET;
+ }
+
+ iocpdesc_t *iocpd = pni_iocpdesc_create(iocp, sock);
+ bind_to_completion_port(iocpd);
+ LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex(iocpd->socket);
+ connect_result_t *result = connect_result(iocpd, addr);
+ DWORD unused;
+ bool success = fn_connect_ex(iocpd->socket, result->addrinfo->ai_addr, result->addrinfo->ai_addrlen,
+ NULL, 0, &unused, (LPOVERLAPPED) result);
+ if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
+ pni_win32_error(error, "ConnectEx failure", WSAGetLastError());
+ pn_free(result);
+ iocpd->write_closed = true;
+ iocpd->read_closed = true;
+ if (iocp->iocp_trace)
+ iocp_log("%s\n", pn_error_text(error));
+ } else {
+ iocpd->ops_in_progress++;
+ }
+ return sock;
+}
+
+void complete_connect(connect_result_t *result, HRESULT status)
+{
+ iocpdesc_t *iocpd = result->base.iocpd;
+ if (iocpd->closing) {
+ pn_free(result);
+ reap_check(iocpd);
+ return;
+ }
+
+ if (status) {
+ iocpdesc_fail(iocpd, status, "Connect failure");
+ // Posix sets selectable events as follows:
+ pni_events_update(iocpd, PN_READABLE | PN_EXPIRED);
+ } else {
+ release_sys_sendbuf(iocpd->socket);
+ if (setsockopt(iocpd->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) {
+ iocpdesc_fail(iocpd, WSAGetLastError(), "Internal connect failure (update context)");
+ } else {
+ pni_events_update(iocpd, PN_WRITABLE);
+ start_reading(iocpd);
+ }
+ }
+ pn_free(result);
+ return;
+}
+
+
+// === Async writes
+
+static bool write_in_progress(iocpdesc_t *iocpd)
+{
+ return pni_write_pipeline_size(iocpd->pipeline) != 0;
+}
+
+write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen)
+{
+ write_result_t *result = (write_result_t *) calloc(sizeof(write_result_t), 1);
+ if (result) {
+ result->base.type = IOCP_WRITE;
+ result->base.iocpd = iocpd;
+ result->buffer.start = buf;
+ result->buffer.size = buflen;
+ }
+ return result;
+}
+
+static int submit_write(write_result_t *result, const void *buf, size_t len)
+{
+ WSABUF wsabuf;
+ wsabuf.buf = (char *) buf;
+ wsabuf.len = len;
+ memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+ return WSASend(result->base.iocpd->socket, &wsabuf, 1, NULL, 0,
+ (LPOVERLAPPED) result, 0);
+}
+
+ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, bool *would_block, pn_error_t *error)
+{
+ if (len == 0) return 0;
+ *would_block = false;
+ if (is_listener(iocpd)) {
+ set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
+ return INVALID_SOCKET;
+ }
+ if (iocpd->closing) {
+ set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
+ return SOCKET_ERROR;
+ }
+ if (iocpd->write_closed) {
+ assert(pn_error_code(iocpd->error));
+ pn_error_copy(error, iocpd->error);
+ if (iocpd->iocp->iocp_trace)
+ iocp_log("write error: %s\n", pn_error_text(error));
+ return SOCKET_ERROR;
+ }
+ if (len == 0) return 0;
+ if (!(iocpd->events & PN_WRITABLE)) {
+ *would_block = true;
+ return SOCKET_ERROR;
+ }
+
+ size_t written = 0;
+ size_t requested = len;
+ const char *outgoing = (const char *) buf;
+ size_t available = pni_write_pipeline_reserve(iocpd->pipeline, len);
+ if (!available) {
+ *would_block = true;
+ return SOCKET_ERROR;
+ }
+
+ for (size_t wr_count = 0; wr_count < available; wr_count++) {
+ write_result_t *result = pni_write_pipeline_next(iocpd->pipeline);
+ assert(result);
+ result->base.iocpd = iocpd;
+ ssize_t actual_len = len;
+ if (len > result->buffer.size) actual_len = result->buffer.size;
+ result->requested = actual_len;
+ memmove((void *)result->buffer.start, outgoing, actual_len);
+ outgoing += actual_len;
+ written += actual_len;
+ len -= actual_len;
+
+ int werror = submit_write(result, result->buffer.start, actual_len);
+ if (werror && WSAGetLastError() != ERROR_IO_PENDING) {
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ iocpdesc_fail(iocpd, WSAGetLastError(), "overlapped send");
+ return SOCKET_ERROR;
+ }
+ iocpd->ops_in_progress++;
+ }
+
+ if (!pni_write_pipeline_writable(iocpd->pipeline))
+ pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE);
+ return written;
+}
+
+/*
+ * Note: iocp write completion is not "bytes on the wire", it is "peer
+ * acked the sent bytes". Completion can be seconds on a slow
+ * consuming peer.
+ */
+void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status)
+{
+ iocpdesc_t *iocpd = result->base.iocpd;
+ if (iocpd->closing) {
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ if (!iocpd->write_closed && !write_in_progress(iocpd))
+ iocp_shutdown(iocpd);
+ reap_check(iocpd);
+ return;
+ }
+ if (status == 0 && xfer_count > 0) {
+ if (xfer_count != result->requested) {
+ // Is this recoverable? How to preserve order if multiple overlapped writes?
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ iocpdesc_fail(iocpd, WSA_OPERATION_ABORTED, "Partial overlapped write on socket");
+ return;
+ } else {
+ // Success.
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ if (pni_write_pipeline_writable(iocpd->pipeline))
+ pni_events_update(iocpd, iocpd->events | PN_WRITABLE);
+ return;
+ }
+ }
+ // Other error
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ if (status == WSAECONNABORTED || status == WSAECONNRESET || status == WSAENOTCONN
+ || status == ERROR_NETNAME_DELETED) {
+ iocpd->write_closed = true;
+ iocpd->poll_error = true;
+ pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE);
+ pni_win32_error(iocpd->error, "Remote close or timeout", status);
+ } else {
+ iocpdesc_fail(iocpd, status, "IOCP async write error");
+ }
+}
+
+
+// === Async reads
+
+
+static read_result_t *read_result(iocpdesc_t *iocpd)
+{
+ read_result_t *result = (read_result_t *) calloc(sizeof(read_result_t), 1);
+ if (result) {
+ result->base.type = IOCP_READ;
+ result->base.iocpd = iocpd;
+ }
+ return result;
+}
+
+static void begin_zero_byte_read(iocpdesc_t *iocpd)
+{
+ if (iocpd->read_in_progress) return;
+ if (iocpd->read_closed) {
+ pni_events_update(iocpd, iocpd->events | PN_READABLE);
+ return;
+ }
+
+ read_result_t *result = iocpd->read_result;
+ memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+ DWORD flags = 0;
+ WSABUF wsabuf;
+ wsabuf.buf = result->unused_buf;
+ wsabuf.len = 0;
+ int rc = WSARecv(iocpd->socket, &wsabuf, 1, NULL, &flags,
+ &result->base.overlapped, 0);
+ if (rc && WSAGetLastError() != ERROR_IO_PENDING) {
+ iocpdesc_fail(iocpd, WSAGetLastError(), "IOCP read error");
+ return;
+ }
+ iocpd->ops_in_progress++;
+ iocpd->read_in_progress = true;
+}
+
+static void drain_until_closed(iocpdesc_t *iocpd) {
+ size_t max_drain = 16 * 1024;
+ char buf[512];
+ read_result_t *result = iocpd->read_result;
+ while (result->drain_count < max_drain) {
+ int rv = recv(iocpd->socket, buf, 512, 0);
+ if (rv > 0)
+ result->drain_count += rv;
+ else if (rv == 0) {
+ iocpd->read_closed = true;
+ return;
+ } else if (WSAGetLastError() == WSAEWOULDBLOCK) {
+ // wait a little longer
+ start_reading(iocpd);
+ return;
+ }
+ else
+ break;
+ }
+ // Graceful close indication unlikely, force the issue
+ if (iocpd->iocp->iocp_trace)
+ if (result->drain_count >= max_drain)
+ iocp_log("graceful close on reader abandoned (too many chars)\n");
+ else
+ iocp_log("graceful close on reader abandoned: %d\n", WSAGetLastError());
+ iocpd->read_closed = true;
+}
+
+
+void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status)
+{
+ iocpdesc_t *iocpd = result->base.iocpd;
+ iocpd->read_in_progress = false;
+
+ if (iocpd->closing) {
+ // Application no longer reading, but we are looking for a zero length read
+ if (!iocpd->read_closed)
+ drain_until_closed(iocpd);
+ reap_check(iocpd);
+ return;
+ }
+
+ if (status == 0 && xfer_count == 0) {
+ // Success.
+ pni_events_update(iocpd, iocpd->events | PN_READABLE);
+ } else {
+ iocpdesc_fail(iocpd, status, "IOCP read complete error");
+ }
+}
+
+ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error)
+{
+ if (size == 0) return 0;
+ *would_block = false;
+ if (is_listener(iocpd)) {
+ set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
+ return SOCKET_ERROR;
+ }
+ if (iocpd->closing) {
+ // Previous call to pn_close()
+ set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
+ return SOCKET_ERROR;
+ }
+ if (iocpd->read_closed) {
+ if (pn_error_code(iocpd->error))
+ pn_error_copy(error, iocpd->error);
+ else
+ set_iocp_error_status(error, PN_ERR, WSAENOTCONN);
+ return SOCKET_ERROR;
+ }
+
+ int count = recv(iocpd->socket, (char *) buf, size, 0);
+ if (count > 0) {
+ pni_events_update(iocpd, iocpd->events & ~PN_READABLE);
+ // caller must initiate begin_zero_byte_read(iocpd);
+ return (ssize_t) count;
+ } else if (count == 0) {
+ iocpd->read_closed = true;
+ return 0;
+ }
+ if (WSAGetLastError() == WSAEWOULDBLOCK)
+ *would_block = true;
+ else {
+ set_iocp_error_status(error, PN_ERR, WSAGetLastError());
+ iocpd->read_closed = true;
+ }
+ return SOCKET_ERROR;
+}
+
+void start_reading(iocpdesc_t *iocpd)
+{
+ begin_zero_byte_read(iocpd);
+}
+
+
+// === The iocp descriptor
+
+static void pni_iocpdesc_initialize(void *object)
+{
+ iocpdesc_t *iocpd = (iocpdesc_t *) object;
+ memset(iocpd, 0, sizeof(iocpdesc_t));
+ iocpd->socket = INVALID_SOCKET;
+}
+
+static void pni_iocpdesc_finalize(void *object)
+{
+ iocpdesc_t *iocpd = (iocpdesc_t *) object;
+ pn_free(iocpd->acceptor);
+ pn_error_free(iocpd->error);
+ if (iocpd->pipeline)
+ if (write_in_progress(iocpd))
+ iocp_log("iocp descriptor write leak\n");
+ else
+ pn_free(iocpd->pipeline);
+ if (iocpd->read_in_progress)
+ iocp_log("iocp descriptor read leak\n");
+ else
+ free(iocpd->read_result);
+}
+
+static uintptr_t pni_iocpdesc_hashcode(void *object)
+{
+ iocpdesc_t *iocpd = (iocpdesc_t *) object;
+ return iocpd->socket;
+}
+
+#define pni_iocpdesc_compare NULL
+#define pni_iocpdesc_inspect NULL
+
+// Reference counted in the iocpdesc map, zombie_list, selector.
+static iocpdesc_t *pni_iocpdesc(pn_socket_t s)
+{
+ static const pn_cid_t CID_pni_iocpdesc = CID_pn_void;
+ static pn_class_t clazz = PN_CLASS(pni_iocpdesc);
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_class_new(&clazz, sizeof(iocpdesc_t));
+ assert(iocpd);
+ iocpd->socket = s;
+ return iocpd;
+}
+
+static bool is_listener_socket(pn_socket_t s)
+{
+ BOOL tval = false;
+ int tvalsz = sizeof(tval);
+ int code = getsockopt(s, SOL_SOCKET, SO_ACCEPTCONN, (char *)&tval, &tvalsz);
+ return code == 0 && tval;
+}
+
+iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s) {
+ assert (s != INVALID_SOCKET);
+ bool listening = is_listener_socket(s);
+ iocpdesc_t *iocpd = pni_iocpdesc(s);
+ iocpd->iocp = iocp;
+ if (iocpd) {
+ iocpd->error = pn_error();
+ if (listening) {
+ iocpd->acceptor = pni_acceptor(iocpd);
+ } else {
+ iocpd->pipeline = pni_write_pipeline(iocpd);
+ iocpd->read_result = read_result(iocpd);
+ }
+ }
+ return iocpd;
+}
+
+
+static void bind_to_completion_port(iocpdesc_t *iocpd)
+{
+ if (iocpd->bound) return;
+ if (!iocpd->iocp->completion_port) {
+ iocpdesc_fail(iocpd, WSAEINVAL, "Incomplete setup, no completion port.");
+ return;
+ }
+
+ if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0))
+ iocpd->bound = true;
+ else {
+ iocpdesc_fail(iocpd, GetLastError(), "IOCP socket setup.");
+ }
+}
+
+static void release_sys_sendbuf(SOCKET s)
+{
+ // Set the socket's send buffer size to zero.
+ int sz = 0;
+ int status = setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&sz, sizeof(int));
+ assert(status == 0);
+}
+
+void pni_iocpdesc_start(iocpdesc_t *iocpd)
+{
+ if (iocpd->bound) return;
+ bind_to_completion_port(iocpd);
+ if (is_listener(iocpd)) {
+ begin_accept(iocpd->acceptor, NULL);
+ }
+ else {
+ release_sys_sendbuf(iocpd->socket);
+ pni_events_update(iocpd, PN_WRITABLE);
+ start_reading(iocpd);
+ }
+}
+
+static void complete(iocp_result_t *result, bool success, DWORD num_transferred) {
+ result->iocpd->ops_in_progress--;
+ DWORD status = success ? 0 : GetLastError();
+
+ switch (result->type) {
+ case IOCP_ACCEPT:
+ complete_accept((accept_result_t *) result, status);
+ break;
+ case IOCP_CONNECT:
+ complete_connect((connect_result_t *) result, status);
+ break;
+ case IOCP_WRITE:
+ complete_write((write_result_t *) result, num_transferred, status);
+ break;
+ case IOCP_READ:
+ complete_read((read_result_t *) result, num_transferred, status);
+ break;
+ default:
+ assert(false);
+ }
+}
+
+void pni_iocp_drain_completions(iocp_t *iocp)
+{
+ while (true) {
+ DWORD timeout_ms = 0;
+ DWORD num_transferred = 0;
+ ULONG_PTR completion_key = 0;
+ OVERLAPPED *overlapped = 0;
+
+ bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
+ &completion_key, &overlapped, timeout_ms);
+ if (!overlapped)
+ return; // timed out
+ iocp_result_t *result = (iocp_result_t *) overlapped;
+ if (!completion_key)
+ complete(result, good_op, num_transferred);
+ }
+}
+
+// returns: -1 on error, 0 on timeout, 1 successful completion
+// proactor layer uses completion_key, but we don't, so ignore those (shutdown in progress)
+int pni_iocp_wait_one(iocp_t *iocp, int timeout, pn_error_t *error) {
+ DWORD win_timeout = (timeout < 0) ? INFINITE : (DWORD) timeout;
+ DWORD num_transferred = 0;
+ ULONG_PTR completion_key = 0;
+ OVERLAPPED *overlapped = 0;
+
+ bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
+ &completion_key, &overlapped, win_timeout);
+ if (!overlapped)
+ if (GetLastError() == WAIT_TIMEOUT)
+ return 0;
+ else {
+ if (error)
+ pni_win32_error(error, "GetQueuedCompletionStatus", GetLastError());
+ return -1;
+ }
+
+ if (completion_key)
+ return pni_iocp_wait_one(iocp, timeout, error);
+ iocp_result_t *result = (iocp_result_t *) overlapped;
+ complete(result, good_op, num_transferred);
+ return 1;
+}
+
+// === Close (graceful and otherwise)
+
+// zombie_list is for sockets transitioning out of iocp on their way to zero ops_in_progress
+// and fully closed.
+
+static void zombie_list_add(iocpdesc_t *iocpd)
+{
+ assert(iocpd->closing);
+ if (!iocpd->ops_in_progress) {
+ // No need to make a zombie.
+ if (iocpd->socket != INVALID_SOCKET) {
+ closesocket(iocpd->socket);
+ iocpd->socket = INVALID_SOCKET;
+ iocpd->read_closed = true;
+ }
+ return;
+ }
+ // Allow 2 seconds for graceful shutdown before releasing socket resource.
+ iocpd->reap_time = pn_i_now2() + 2000;
+ pn_list_add(iocpd->iocp->zombie_list, iocpd);
+}
+
+static void reap_check(iocpdesc_t *iocpd)
+{
+ if (iocpd->closing && !iocpd->ops_in_progress) {
+ if (iocpd->socket != INVALID_SOCKET) {
+ closesocket(iocpd->socket);
+ iocpd->socket = INVALID_SOCKET;
+ }
+ pn_list_remove(iocpd->iocp->zombie_list, iocpd);
+ // iocpd is decref'ed and possibly released
+ }
+}
+
+void pni_iocp_reap_check(iocpdesc_t *iocpd) {
+ reap_check(iocpd);
+}
+
+pn_timestamp_t pni_zombie_deadline(iocp_t *iocp)
+{
+ if (pn_list_size(iocp->zombie_list)) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, 0);
+ return iocpd->reap_time;
+ }
+ return 0;
+}
+
+void pni_zombie_check(iocp_t *iocp, pn_timestamp_t now)
+{
+ pn_list_t *zl = iocp->zombie_list;
+ // Look for stale zombies that should have been reaped by "now"
+ for (size_t idx = 0; idx < pn_list_size(zl); idx++) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(zl, idx);
+ if (iocpd->reap_time > now)
+ return;
+ if (iocpd->socket == INVALID_SOCKET)
+ continue;
+ if (iocp->iocp_trace)
+ iocp_log("async close: graceful close timeout exceeded\n");
+ closesocket(iocpd->socket);
+ iocpd->socket = INVALID_SOCKET;
+ iocpd->read_closed = true;
+ // outstanding ops should complete immediately now
+ }
+}
+
+static void drain_zombie_completions(iocp_t *iocp)
+{
+ // No more pn_selector_select() from App, but zombies still need care and feeding
+ // until their outstanding async actions complete.
+ pni_iocp_drain_completions(iocp);
+
+ // Discard any that have no pending async IO
+ size_t sz = pn_list_size(iocp->zombie_list);
+ for (size_t idx = 0; idx < sz;) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, idx);
+ if (!iocpd->ops_in_progress) {
+ pn_list_del(iocp->zombie_list, idx, 1);
+ sz--;
+ } else {
+ idx++;
+ }
+ }
+
+ unsigned shutdown_grace = 2000;
+ char *override = getenv("PN_SHUTDOWN_GRACE");
+ if (override) {
+ int grace = atoi(override);
+ if (grace > 0 && grace < 60000)
+ shutdown_grace = (unsigned) grace;
+ }
+ pn_timestamp_t now = pn_i_now2();
+ pn_timestamp_t deadline = now + shutdown_grace;
+
+ while (pn_list_size(iocp->zombie_list)) {
+ if (now >= deadline)
+ break;
+ int rv = pni_iocp_wait_one(iocp, deadline - now, NULL);
+ if (rv < 0) {
+ iocp_log("unexpected IOCP failure on Proton IO shutdown %d\n", GetLastError());
+ break;
+ }
+ now = pn_i_now2();
+ }
+ if (now >= deadline && pn_list_size(iocp->zombie_list) && iocp->iocp_trace)
+ // Should only happen if really slow TCP handshakes, i.e. total network failure
+ iocp_log("network failure on Proton shutdown\n");
+}
+
+static void zombie_list_hard_close_all(iocp_t *iocp)
+{
+ pni_iocp_drain_completions(iocp);
+ size_t zs = pn_list_size(iocp->zombie_list);
+ for (size_t i = 0; i < zs; i++) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
+ if (iocpd->socket != INVALID_SOCKET) {
+ closesocket(iocpd->socket);
+ iocpd->socket = INVALID_SOCKET;
+ iocpd->read_closed = true;
+ iocpd->write_closed = true;
+ }
+ }
+ pni_iocp_drain_completions(iocp);
+
+ // Zombies should be all gone. Do a sanity check.
+ zs = pn_list_size(iocp->zombie_list);
+ int remaining = 0;
+ int ops = 0;
+ for (size_t i = 0; i < zs; i++) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
+ remaining++;
+ ops += iocpd->ops_in_progress;
+ }
+ if (remaining)
+ iocp_log("Proton: %d unfinished close operations (ops count = %d)\n", remaining, ops);
+}
+
+static void iocp_shutdown(iocpdesc_t *iocpd)
+{
+ if (iocpd->socket == INVALID_SOCKET)
+ return; // Hard close in progress
+ if (shutdown(iocpd->socket, SD_SEND)) {
+ int err = WSAGetLastError();
+ if (err != WSAECONNABORTED && err != WSAECONNRESET && err != WSAENOTCONN)
+ if (iocpd->iocp->iocp_trace)
+ iocp_log("socket shutdown failed %d\n", err);
+ }
+ iocpd->write_closed = true;
+}
+
+void pni_iocp_begin_close(iocpdesc_t *iocpd)
+{
+ assert (!iocpd->closing);
+ if (is_listener(iocpd)) {
+ // Listening socket is easy. Close the socket which will cancel async ops.
+ pn_socket_t old_sock = iocpd->socket;
+ iocpd->socket = INVALID_SOCKET;
+ iocpd->closing = true;
+ iocpd->read_closed = true;
+ iocpd->write_closed = true;
+ closesocket(old_sock);
+ // Pending accepts will now complete. Zombie can die when all consumed.
+ zombie_list_add(iocpd);
+ } else {
+ // Continue async operation looking for graceful close confirmation or timeout.
+ pn_socket_t old_sock = iocpd->socket;
+ iocpd->closing = true;
+ if (!iocpd->write_closed && !write_in_progress(iocpd))
+ iocp_shutdown(iocpd);
+ zombie_list_add(iocpd);
+ }
+}
+
+
+// === iocp_t
+
+#define pni_iocp_hashcode NULL
+#define pni_iocp_compare NULL
+#define pni_iocp_inspect NULL
+
+void pni_iocp_initialize(void *obj)
+{
+ iocp_t *iocp = (iocp_t *) obj;
+ memset(iocp, 0, sizeof(iocp_t));
+ pni_shared_pool_create(iocp);
+ iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+ assert(iocp->completion_port != NULL);
+ iocp->zombie_list = pn_list(PN_OBJECT, 0);
+ iocp->iocp_trace = false;
+}
+
+void pni_iocp_finalize(void *obj)
+{
+ iocp_t *iocp = (iocp_t *) obj;
+ // Move sockets to closed state
+ drain_zombie_completions(iocp); // Last chance for graceful close
+ zombie_list_hard_close_all(iocp);
+ CloseHandle(iocp->completion_port); // This cancels all our async ops
+ iocp->completion_port = NULL;
+ // Now safe to free everything that might be touched by a former async operation.
+ pn_free(iocp->zombie_list);
+ pni_shared_pool_free(iocp);
+}
+
+iocp_t *pni_iocp()
+{
+ static const pn_cid_t CID_pni_iocp = CID_pn_void;
+ static const pn_class_t clazz = PN_CLASS(pni_iocp);
+ iocp_t *iocp = (iocp_t *) pn_class_new(&clazz, sizeof(iocp_t));
+ return iocp;
+}
+
+}
+
+
+// ======================================================================
+// Proton Proactor support
+// ======================================================================
+
+#include "../core/log_private.h"
+#include "./proactor-internal.h"
+
+class csguard {
+ public:
+ csguard(CRITICAL_SECTION *cs) : cs_(cs), set_(true) { EnterCriticalSection(cs_); }
+ ~csguard() { if (set_) LeaveCriticalSection(cs_); }
+ void release() {
+ if (set_) {
+ set_ = false;
+ LeaveCriticalSection(cs_);
+ }
+ }
+ private:
+ LPCRITICAL_SECTION cs_;
+ bool set_;
+};
+
+
+// Get string from error status
+std::string errno_str2(DWORD status) {
+ char buf[512];
+ if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM,
+ 0, status, 0, buf, sizeof(buf), 0))
+ return std::string(buf);
+ return std::string("internal proactor error");
+}
+
+
+std::string errno_str(const std::string& msg, bool is_wsa) {
+ DWORD e = is_wsa ? WSAGetLastError() : GetLastError();
+ return msg + ": " + errno_str2(e);
+}
+
+
+
+using namespace pn_experimental;
+
+static void proactor_wake_stub() {}
+ULONG_PTR proactor_wake_key = (ULONG_PTR) &proactor_wake_stub;
+
+static void psocket_wakeup_stub() {}
+ULONG_PTR psocket_wakeup_key = (ULONG_PTR) &psocket_wakeup_stub;
+
+static void recycle_accept_stub() {}
+ULONG_PTR recycle_accept_key = (ULONG_PTR) &recycle_accept_stub;
+
+static int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
+{
+ struct addrinfo hints = { 0 };
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags;
+ return getaddrinfo(host, port, &hints, res) ? WSAGetLastError() : 0;
+}
+
+const char *COND_NAME = "proactor";
+PN_HANDLE(PN_PROACTOR)
+
+// The number of times a connection event batch may be replenished for
+// a thread between calls to wait().
+// TODO: consider some instrumentation to determine an optimal number
+// or switch to cpu time based limit.
+#define HOG_MAX 3
+
+/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
+ Class definitions are for identification as pn_event_t context only.
+*/
+PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
+PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
+
+/* Completion serialization context common to connection and listener. */
+/* And also the reaper singleton (which has no socket */
+
+typedef enum {
+ PROACTOR,
+ PCONNECTION,
+ LISTENER,
+ WAKEABLE } pcontext_type_t;
+
+typedef struct pcontext_t {
+ CRITICAL_SECTION cslock;
+ pn_proactor_t *proactor; /* Immutable */
+ void *owner; /* Instance governed by the context */
+ pcontext_type_t type;
+ bool working;
+ bool wake_pending;
+ int completion_ops; // uncompleted ops that are not socket IO related
+ struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
+ bool closing;
+ // Next 4 are protected by the proactor mutex
+ struct pcontext_t* next; /* Protected by proactor.mutex */
+ struct pcontext_t* prev; /* Protected by proactor.mutex */
+ int disconnect_ops; /* ops remaining before disconnect complete */
+ bool disconnecting; /* pn_proactor_disconnect */
+} pcontext_t;
+
+static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p, void *o) {
+ memset(ctx, 0, sizeof(*ctx));
+ InitializeCriticalSectionAndSpinCount(&ctx->cslock, 4000);
+ ctx->proactor = p;
+ ctx->owner = o;
+ ctx->type = t;
+}
+
+static void pcontext_finalize(pcontext_t* ctx) {
+ DeleteCriticalSection(&ctx->cslock);
+}
+
+typedef struct psocket_t {
+ iocpdesc_t *iocpd; /* NULL if reaper, or socket open failure. */
+ pn_listener_t *listener; /* NULL for a connection socket */
+ pn_netaddr_t listen_addr; /* Not filled in for connection sockets */
+ char addr_buf[PN_MAX_ADDR];
+ const char *host, *port;
+ bool is_reaper;
+} psocket_t;
+
+static void psocket_init(psocket_t* ps, pn_listener_t *listener, bool is_reaper, const char *addr) {
+ ps->is_reaper = is_reaper;
+ if (is_reaper) return;
+ ps->listener = listener;
+ pni_parse_addr(addr, ps->addr_buf, sizeof(ps->addr_buf), &ps->host, &ps->port);
+}
+
+struct pn_proactor_t {
+ pcontext_t context;
+ CRITICAL_SECTION write_lock;
+ CRITICAL_SECTION timer_lock;
+ CRITICAL_SECTION bind_lock;
+ HANDLE timer_queue;
+ HANDLE timeout_timer;
+ iocp_t *iocp;
+ class reaper *reaper;
+ pn_collector_t *collector;
+ pcontext_t *contexts; /* in-use contexts for PN_PROACTOR_INACTIVE and cleanup */
+ pn_event_batch_t batch;
+ size_t disconnects_pending; /* unfinished proactor disconnects*/
+
+ // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch()
+ bool need_interrupt;
+ bool need_inactive;
+ bool need_timeout;
+ bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */
+ bool timeout_processed; /* timout event dispatched in the most recent event batch */
+ bool delayed_interrupt;
+ bool shutting_down;
+};
+
+typedef struct pconnection_t {
+ psocket_t psocket;
+ pcontext_t context;
+ pn_connection_driver_t driver;
+ std::queue<iocp_result_t *> *completion_queue;
+ std::queue<iocp_result_t *> *work_queue;
+ pn_condition_t *disconnect_condition;
+ pn_event_batch_t batch;
+ int wake_count;
+ int hog_count; // thread hogging limiter
+ bool server; /* accept, not connect */
+ bool started;
+ bool connecting;
+ bool tick_pending;
+ bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
+ bool bound;
+ bool stop_timer_required;
+ bool can_wake;
+ HANDLE tick_timer;
+ struct pn_netaddr_t local, remote; /* Actual addresses */
+ struct addrinfo *addrinfo; /* Resolved address list */
+ struct addrinfo *ai; /* Current connect address */
+} pconnection_t;
+
+struct pn_listener_t {
+ psocket_t *psockets; /* Array of listening sockets */
+ size_t psockets_size;
+ pcontext_t context;
+ std::queue<accept_result_t *> *pending_accepts; // sockets awaiting a pn_listener_accept
+ int pending_events; // number of PN_LISTENER_ACCEPT events to be delivered
+ pn_condition_t *condition;
+ pn_collector_t *collector;
+ pn_event_batch_t batch;
+ pn_record_t *attachments;
+ void *listener_context;
+ size_t backlog;
+ bool close_dispatched;
+};
+
+
+static bool proactor_remove(pcontext_t *ctx);
+static void listener_done(pn_listener_t *l);
+static bool proactor_update_batch(pn_proactor_t *p);
+static pn_event_batch_t *listener_process(pn_listener_t *l, iocp_result_t *result);
+static void recycle_result(accept_result_t *accept_result);
+static void proactor_add(pcontext_t *p);
+static void release_pending_accepts(pn_listener_t *l);
+static void proactor_shutdown(pn_proactor_t *p);
+
+static void post_completion(iocp_t *iocp, ULONG_PTR arg1, void *arg2) {
+ // Despite the vagueness of the official documentation, the
+ // following args to Post are passed undisturbed and unvalidated
+ // to GetQueuedCompletionStatus(). In particular, arg2 does not
+ // have to be an OVERLAPPED struct and may be NULL.
+
+ DWORD nxfer = 0; // We could pass this through too if needed.
+ PostQueuedCompletionStatus(iocp->completion_port, nxfer, arg1, (LPOVERLAPPED) arg2);
+}
+
+static inline bool is_write_result(iocp_result_t *r) {
+ return r && r->type == IOCP_WRITE;
+}
+
+static inline bool is_read_result(iocp_result_t *r) {
+ return r && r->type == IOCP_READ;
+}
+
+static inline bool is_connect_result(iocp_result_t *r) {
+ return r && r->type == IOCP_CONNECT;
+}
+
+// From old io.c
+static void pni_configure_sock_2(pn_socket_t sock) {
+ //
+ // Disable the Nagle algorithm on TCP connections.
+ //
+
+ int flag = 1;
+ if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) {
+ //TODO: log/err
+ }
+
+ u_long nonblock = 1;
+ if (ioctlsocket(sock, FIONBIO, &nonblock)) {
+ // TODO: log/err
+ }
+}
+
+static LPFN_CONNECTEX lookup_connect_ex2(SOCKET s)
+{
+ GUID guid = WSAID_CONNECTEX;
+ DWORD bytes = 0;
+ LPFN_CONNECTEX fn;
+ WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+ &fn, sizeof(fn), &bytes, NULL, NULL);
+ assert(fn);
+ return fn;
+}
+
+
+// File descriptor wrapper that calls ::close in destructor.
+class unique_socket {
+ public:
+ unique_socket(pn_socket_t socket) : socket_(socket) {}
+ ~unique_socket() { if (socket_ != INVALID_SOCKET) ::closesocket(socket_); }
+ operator pn_socket_t() const { return socket_; }
+ pn_socket_t release() { pn_socket_t ret = socket_; socket_ = INVALID_SOCKET; return ret; }
+
+ protected:
+ pn_socket_t socket_;
+};
+
+void do_complete(iocp_result_t *result) {
+ iocpdesc_t *iocpd = result->iocpd; // connect result gets deleted
+ switch (result->type) {
+
+ case IOCP_ACCEPT:
+ /* accept is now processed inline to do in parallel, except on teardown */
+ assert(iocpd->closing);
+ complete_accept((accept_result_t *) result, result->status); // free's result and retires new_sock
+ break;
+ case IOCP_CONNECT:
+ complete_connect((connect_result_t *) result, result->status);
+ break;
+ case IOCP_WRITE:
+ complete_write((write_result_t *) result, result->num_transferred, result->status);
+ break;
+ case IOCP_READ:
+ complete_read((read_result_t *) result, result->num_transferred, result->status);
+ break;
+ default:
+ assert(false);
+ }
+
+ iocpd->ops_in_progress--; // Set in each begin_xxx call
+}
+
+
+static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
+ return !(ps->is_reaper || ps->listener) ? (pconnection_t*)ps : NULL;
+}
+
+static inline pn_listener_t *as_listener(psocket_t* ps) {
+ return ps->listener;
+}
+
+static inline pconnection_t *pcontext_pconnection(pcontext_t *c) {
+ return c->type == PCONNECTION ?
+ (pconnection_t*)((char*)c - offsetof(pconnection_t, context)) : NULL;
+}
+static inline pn_listener_t *pcontext_listener(pcontext_t *c) {
+ return c->type == LISTENER ?
+ (pn_listener_t*)((char*)c - offsetof(pn_listener_t, context)) : NULL;
+}
+
+static pcontext_t *psocket_context(psocket_t *ps) {
+ if (ps->listener)
+ return &ps->listener->context;
+ pconnection_t *pc = as_pconnection_t(ps);
+ return &pc->context;
+}
+
+
+// Call wih lock held
+static inline void wake_complete(pcontext_t *ctx) {
+ ctx->wake_pending = false;
+ ctx->completion_ops--;
+}
+
+// Call wih lock held
+static void wakeup(psocket_t *ps) {
+ pcontext_t *ctx = psocket_context(ps);
+ if (!ctx->working && !ctx->wake_pending) {
+ ctx->wake_pending = true;
+ ctx->completion_ops++;
+ post_completion(ctx->proactor->iocp, psocket_wakeup_key, ps);
+ }
+}
+
+// Call wih lock held
+static inline void proactor_wake_complete(pn_proactor_t *p) {
+ wake_complete(&p->context);
+}
+
+// Call wih lock held
+static void proactor_wake(pn_proactor_t *p) {
+ if (!p->context.working && !p->context.wake_pending) {
+ p->context.wake_pending = true;
+ p->context.completion_ops++;
+ post_completion(p->iocp, proactor_wake_key, p);
+ }
+}
+
+VOID CALLBACK reap_check_cb(PVOID arg, BOOLEAN /* ignored*/ );
+
+// Serialize handling listener and connection closing IO completions
+// after the engine has lost interest. A temporary convenience while
+// using the old single threaded iocp/io/select driver code.
+class reaper {
+ public:
+ reaper(pn_proactor_t *p, CRITICAL_SECTION *wlock, iocp_t *iocp)
+ : iocp_(iocp), global_wlock_(wlock), timer_(NULL), running(true) {
+ InitializeCriticalSectionAndSpinCount(&lock_, 4000);
+ timer_queue_ = CreateTimerQueue();
+ if (!timer_queue_) {
+ perror("CreateTimerQueue");
+ abort();
+ }
+ }
+
+ // Connection or listener lock must also be held by caller.
+ bool add(iocpdesc_t *iocpd) {
+ if (!iocpd) return false;
+ csguard g(&lock_);
+ if (iocpd->closing) return false;
+ bool rval = !iocpd->ops_in_progress;
+ pni_iocp_begin_close(iocpd); // sets iocpd->closing
+ pn_decref(iocpd); // may still be ref counted on zombie list
+ reap_timer();
+ return rval;
+ }
+
+ // For cases where the close will be immediate. I.E. after a failed
+ // connection attempt where there is no follow-on IO.
+ void fast_reap(iocpdesc_t *iocpd) {
+ assert(iocpd && iocpd->ops_in_progress == 0 && !iocpd->closing);
+ csguard g(&lock_);
+ pni_iocp_begin_close(iocpd);
+ pn_decref(iocpd);
+ }
+
+ bool process(iocp_result_t *result) {
+ // No queue of completions for the reaper. Just process
+ // serialized by the lock assuming all actions are "short".
+ // That may be wrong, and if so the real fix is not a
+ // consumer/producer setup but just replace the reaper with a
+ // multi threaded alternative.
+ csguard g(&lock_);
+ iocpdesc_t *iocpd = result->iocpd;
+ if (is_write_result(result)) {
+ csguard wg(global_wlock_);
+ do_complete(result);
+ }
+ else do_complete(result);
+ // result may now be NULL
+ bool rval = (iocpd->ops_in_progress == 0);
+ pni_iocp_reap_check(iocpd);
+ return rval;
+ }
+
+ // Called when all competing threads have terminated except our own reap_check timer.
+ void final_shutdown() {
+ running = false;
+ DeleteTimerQueueEx(timer_queue_, INVALID_HANDLE_VALUE);
+ // No pending or active timers from thread pool remain. Truly single threaded now.
+ pn_free((void *) iocp_); // calls pni_iocp_finalize(); cleans up all sockets, completions, completion port.
+ DeleteCriticalSection(&lock_);
+ }
+
+ void reap_check() {
+ csguard g(&lock_);
+ DeleteTimerQueueTimer(timer_queue_, timer_, NULL);
+ timer_ = NULL;
+ reap_timer();
+ }
+
+
+ private:
+ void reap_timer() {
+ // Call with lock
+ if (timer_ || !running)
+ return;
+ pn_timestamp_t now = pn_i_now2();
+ pni_zombie_check(iocp_, now);
+ pn_timestamp_t zd = pni_zombie_deadline(iocp_);
+ if (zd) {
+ DWORD tm = (zd > now) ? zd - now : 1;
+ if (!CreateTimerQueueTimer(&timer_, timer_queue_, reap_check_cb, this, tm,
+ 0, WT_EXECUTEONLYONCE)) {
+ perror("CreateTimerQueueTimer");
+ abort();
+ }
+ }
+ }
+
+ iocp_t *iocp_;
+ CRITICAL_SECTION lock_;
+ CRITICAL_SECTION *global_wlock_;
+ HANDLE timer_queue_;
+ HANDLE timer_;
+ bool running;
+};
+
+VOID CALLBACK reap_check_cb(PVOID arg, BOOLEAN /* ignored*/ ) {
+ // queue timer callback
+ reaper *r = static_cast<reaper *>(arg);
+ r->reap_check();
+}
+
+static void listener_begin_close(pn_listener_t* l);
+static void connect_step_done(pconnection_t *pc, connect_result_t *result);
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
+static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch);
+
+
+static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
+ return (batch->next_event == proactor_batch_next) ?
+ (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
+}
+
+static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
+ return (batch->next_event == listener_batch_next) ?
+ (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
+}
+
+static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
+ return (batch->next_event == pconnection_batch_next) ?
+ (pconnection_t*)((char*)batch - offsetof(pconnection_t, batch)) : NULL;
+}
+
+static inline bool pconnection_has_event(pconnection_t *pc) {
+ return pn_connection_driver_has_event(&pc->driver);
+}
+
+static inline bool listener_has_event(pn_listener_t *l) {
+ return pn_collector_peek(l->collector);
+}
+
+static inline bool proactor_has_event(pn_proactor_t *p) {
+ return pn_collector_peek(p->collector);
+}
+
+static pn_event_t *log_event(void* p, pn_event_t *e) {
+ if (e) {
+ pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e)));
+ }
+ return e;
+}
+
+static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) {
+ if (ps->is_reaper)
+ return;
+ if (!ps->listener) {
+ pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
+ pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+ pni_proactor_set_cond(pn_transport_condition(driver->transport), what, ps->host, ps->port, msg);
+ pn_connection_driver_close(driver);
+ } else {
+ pn_listener_t *l = as_listener(ps);
+ pni_proactor_set_cond(l->condition, what, ps->host, ps->port, msg);
+ listener_begin_close(l);
+ }
+}
+
+static void psocket_error(psocket_t *ps, int err, const char* what) {
+ psocket_error_str(ps, errno_str2(err).c_str(), what);
+}
+
+
+
+// ========================================================================
+// pconnection
+// ========================================================================
+
+/* Make a pn_class for pconnection_t since it is attached to a pn_connection_t record */
+#define CID_pconnection CID_pn_object
+#define pconnection_inspect NULL
+#define pconnection_initialize NULL
+#define pconnection_hashcode NULL
+#define pconnection_compare NULL
+
+static void pconnection_finalize(void *vp_pconnection) {
+ pconnection_t *pc = (pconnection_t*)vp_pconnection;
+ pcontext_finalize(&pc->context);
+}
+
+static const pn_class_t pconnection_class = PN_CLASS(pconnection);
+
+static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, bool server, const char *addr)
+{
+ if (pn_connection_driver_init(&pc->driver, c, t) != 0) {
+ free(pc);
+ return "pn_connection_driver_init failure";
+ }
+ {
+ csguard g(&p->bind_lock);
+ pn_record_t *r = pn_connection_attachments(pc->driver.connection);
+ if (pn_record_get(r, PN_PROACTOR)) {
+ pn_connection_driver_destroy(&pc->driver);
+ free(pc);
+ return "pn_connection_t already in use";
+ }
+ pn_record_def(r, PN_PROACTOR, &pconnection_class);
+ pn_record_set(r, PN_PROACTOR, pc);
+ pc->bound = true;
+ pc->can_wake = true;
+ }
+
+ pc->completion_queue = new std::queue<iocp_result_t *>();
+ pc->work_queue = new std::queue<iocp_result_t *>();
+ pcontext_init(&pc->context, PCONNECTION, p, pc);
+
+ psocket_init(&pc->psocket, NULL, false, addr);
+ pc->batch.next_event = pconnection_batch_next;
+ if (server) {
+ pn_transport_set_server(pc->driver.transport);
+ }
+
+ pn_decref(pc); /* Will be deleted when the connection is */
+ return NULL;
+}
+
+// Either stops a timer before firing or returns after the callback has
+// completed (in the threadpool thread). Never "in doubt".
+static bool stop_timer(HANDLE tqueue, HANDLE *timer) {
+ if (!*timer) return true;
+ if (DeleteTimerQueueTimer(tqueue, *timer, INVALID_HANDLE_VALUE)) {
+ *timer = NULL;
+ return true;
+ }
+ return false; // error
+}
+
+static bool start_timer(HANDLE tqueue, HANDLE *timer, WAITORTIMERCALLBACK cb, void *cb_arg, DWORD time) {
+ if (*timer) {
+ // TODO: log err
+ return false;
+ }
+ return CreateTimerQueueTimer(timer, tqueue, cb, cb_arg, time, 0, WT_EXECUTEONLYONCE);
+}
+
+VOID CALLBACK tick_timer_cb(PVOID arg, BOOLEAN /* ignored*/ ) {
+ pconnection_t *pc = (pconnection_t *) arg;
+ csguard g(&pc->context.cslock);
+ if (pc->psocket.iocpd && !pc->psocket.iocpd->closing) {
+ pc->tick_pending = true;
+ wakeup(&pc->psocket);
+ }
+}
+
+
+// Call with no lock held or stop_timer and callback may deadlock
+static void pconnection_tick(pconnection_t *pc) {
+ pn_transport_t *t = pc->driver.transport;
+ if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
+ if(!stop_timer(pc->context.proactor->timer_queue, &pc->tick_timer)) {
+ // TODO: handle error
+ }
+ uint64_t now = pn_i_now2();
+ uint64_t next = pn_transport_tick(t, now);
+ if (next) {
+ if (!start_timer(pc->context.proactor->timer_queue, &pc->tick_timer, tick_timer_cb, pc, next - now)) {
+ // TODO: handle error
+ }
+ }
+ }
+}
+
+static pconnection_t *get_pconnection(pn_connection_t* c) {
+ if (!c) return NULL;
+ pn_record_t *r = pn_connection_attachments(c);
+ return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
+}
+
+
+pn_listener_t *pn_event_listener(pn_event_t *e) {
+ return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
+}
+
+pn_proactor_t *pn_event_proactor(pn_event_t *e) {
+ if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
+ pn_listener_t *l = pn_event_listener(e);
+ if (l) return l->context.proactor;
+ pn_connection_t *c = pn_event_connection(e);
+ if (c) return pn_connection_proactor(pn_event_connection(e));
+ return NULL;
+}
+
+// Call after successful accept
+static void set_sock_names(pconnection_t *pc) {
+ // This works. Note possible use of GetAcceptExSockaddrs()
+ pn_socket_t sock = pc->psocket.iocpd->socket;
+ socklen_t len = sizeof(pc->local.ss);
+ getsockname(sock, (struct sockaddr*)&pc->local.ss, &len);
+ len = sizeof(pc->remote.ss);
+ getpeername(sock, (struct sockaddr*)&pc->remote.ss, &len);
+}
+
+
+// Call with lock held when closing and transitioning away from working context
+static inline bool pconnection_can_free(pconnection_t *pc) {
+ return pc->psocket.iocpd == NULL && pc->context.completion_ops == 0
+ && !pc->stop_timer_required && !pconnection_has_event(pc) && !pc->queued_disconnect;
+}
+
+static void pconnection_final_free(pconnection_t *pc) {
+ if (pc->addrinfo) {
+ freeaddrinfo(pc->addrinfo);
+ }
+ pn_condition_free(pc->disconnect_condition);
+ pn_incref(pc); /* Make sure we don't do a circular free */
+ pn_connection_driver_destroy(&pc->driver);
+ pn_decref(pc);
+ /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */
+}
+
+// Call with lock held or from forced shutdown
+static void pconnection_begin_close(pconnection_t *pc) {
+ if (!pc->context.closing) {
+ pc->context.closing = true;
+ pn_connection_driver_close(&pc->driver);
+ pc->stop_timer_required = true;
+ if (pc->context.proactor->reaper->add(pc->psocket.iocpd))
+ pc->psocket.iocpd = NULL;
+ wakeup(&pc->psocket);
+ }
+}
+
+// call with lock held. return true if caller must call pconnection_final_free()
+static bool pconnection_cleanup(pconnection_t *pc) {
+ delete pc->completion_queue;
+ delete pc->work_queue;
+ return proactor_remove(&pc->context);
+}
+
+static inline bool pconnection_work_pending(pconnection_t *pc) {
+ if (pc->completion_queue->size() || pc->wake_count || pc->tick_pending || pc->queued_disconnect)
+ return true;
+ if (!pc->started)
+ return false;
+ pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+ return (wbuf.size > 0 && (pc->psocket.iocpd->events & PN_WRITABLE));
+}
+
+// Return true unless reaped
+static bool pconnection_write(pconnection_t *pc, pn_bytes_t wbuf) {
+ ssize_t n;
+ bool wouldblock;
+ {
+ csguard g(&pc->context.proactor->write_lock);
+ n = pni_iocp_begin_write(pc->psocket.iocpd, wbuf.start, wbuf.size, &wouldblock, pc->psocket.iocpd->error);
+ }
+ if (n > 0) {
+ pn_connection_driver_write_done(&pc->driver, n);
+ } else if (n < 0 && !wouldblock) {
+ psocket_error(&pc->psocket, WSAGetLastError(), "on write to");
+ }
+ else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
+ if (pc->context.proactor->reaper->add(pc->psocket.iocpd)) {
+ pc->psocket.iocpd = NULL;
+ return false;
+ }
+ }
+ return true;
+}
+
+// Queue the result (if any) for the one worker thread. Become the worker if possible.
+// NULL result is a wakeup or a topup.
+// topup signifies that the working thread is the caller looking for additional events.
+static pn_event_batch_t *pconnection_process(pconnection_t *pc, iocp_result_t *result, bool topup) {
+ bool first = true;
+ bool waking = false;
+ bool tick_required = false;
+ bool open = false;
+
+ while (true) {
+ {
+ csguard g(&pc->context.cslock);
+ if (first) {
+ first = false;
+ if (result)
+ pc->completion_queue->push(result);
+ else if (!topup)
+ wake_complete(&pc->context);
+ if (!topup) {
+ if (pc->context.working)
+ return NULL;
+ pc->context.working = true;
+ }
+ open = pc->started && !pc->connecting && !pc->context.closing;
+ }
+ else {
+ // Just re-acquired lock after processing IO and engine work
+ if (pconnection_has_event(pc))
+ return &pc->batch;
+
+ if (!pconnection_work_pending(pc)) {
+ pc->context.working = false;
+ if (pn_connection_driver_finished(&pc->driver)) {
+ pconnection_begin_close(pc);
+ }
+ if (pc->context.closing && pconnection_can_free(pc)) {
+ if (pconnection_cleanup(pc)) {
+ g.release();
+ pconnection_final_free(pc);
+ return NULL;
+ } // else disconnect logic has the free obligation
+ }
+ return NULL;
+ }
+ }
+
+ if (pc->queued_disconnect) { // From pn_proactor_disconnect()
+ pc->queued_disconnect = false;
+ if (!pc->context.closing) {
+ if (pc->disconnect_condition) {
+ pn_condition_copy(pn_transport_condition(pc->driver.transport), pc->disconnect_condition);
+ }
+ pn_connection_driver_close(&pc->driver);
+ }
+ }
+
+ assert(pc->work_queue->empty());
+ if (pc->completion_queue->size())
+ std::swap(pc->work_queue, pc->completion_queue);
+
+ if (pc->wake_count) {
+ waking = open && pc->can_wake && !pn_connection_driver_finished(&pc->driver);
+ pc->wake_count = 0;
+ }
+ if (pc->tick_pending) {
+ pc->tick_pending = false;
+ if (open)
+ tick_required = true;
+ }
+ }
+
+ // No lock
+
+ // Drain completions prior to engine work
+ while (pc->work_queue->size()) {
+ result = (iocp_result_t *) pc->work_queue->front();
+ pc->work_queue->pop();
+ if (result->iocpd->closing) {
+ if (pc->context.proactor->reaper->process(result)) {
+ pc->psocket.iocpd = NULL; // reaped
+ open = false;
+ }
+ }
+ else {
+ if (is_write_result(result)) {
+ csguard g(&pc->context.proactor->write_lock);
+ do_complete(result);
+ }
+ else if (is_connect_result(result)) {
+ connect_step_done(pc, (connect_result_t *) result);
+ if (pc->psocket.iocpd && (pc->psocket.iocpd->events & PN_WRITABLE)) {
+ pc->connecting = false;
+ if (pc->started)
+ open = true;
+ }
+ }
+ else do_complete(result);
+ }
+ }
+
+ if (!open) {
+ if (pc->stop_timer_required) {
+ pc->stop_timer_required = false;
+ // Do without context lock to avoid possible deadlock
+ stop_timer(pc->context.proactor->timer_queue, &pc->tick_timer);
+ }
+ } else {
+
+ pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+ pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+
+ /* Ticks and checking buffers can generate events, process before proceeding */
+ bool ready = pconnection_has_event(pc);
+ if (waking) {
+ pn_connection_t *c = pc->driver.connection;
+ pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+ waking = false;
+ }
+ if (ready) {
+ continue;
+ }
+
+ if (wbuf.size >= 16384 && (pc->psocket.iocpd->events & PN_WRITABLE)) {
+ if (!pconnection_write(pc, wbuf))
+ continue;
+ }
+ if (rbuf.size > 0 && !pc->psocket.iocpd->read_in_progress) {
+ bool wouldblock;
+ ssize_t n = pni_iocp_recv(pc->psocket.iocpd, rbuf.start, rbuf.size, &wouldblock, pc->psocket.iocpd->error);
+
+ if (n > 0) {
+ pn_connection_driver_read_done(&pc->driver, n);
+ pconnection_tick(pc); /* check for tick changes. */
+ tick_required = false;
+ }
+ else if (n == 0)
+ pn_connection_driver_read_close(&pc->driver);
+ else if (!wouldblock)
+ psocket_error(&pc->psocket, WSAGetLastError(), "on read from");
+ }
+ if (!pc->psocket.iocpd->read_in_progress)
+ start_reading(pc->psocket.iocpd);
+
+
+ if (tick_required) {
+ pconnection_tick(pc); /* check for tick changes. */
+ tick_required = false;
+ }
+ wbuf = pn_connection_driver_write_buffer(&pc->driver);
+ if (wbuf.size > 0 && (pc->psocket.iocpd->events & PN_WRITABLE)) {
+ if (!pconnection_write(pc, wbuf))
+ open = false;
+ }
+ }
+
+ if (topup)
+ return NULL; // regardless if new events made available
+ }
+}
+
+
+static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
+ pconnection_t *pc = batch_pconnection(batch);
+ pn_event_t *e = pn_connection_driver_next_event(&pc->driver);
+ if (!e && ++pc->hog_count < HOG_MAX) {
+ pconnection_process(pc, NULL, true); // top up
+ e = pn_connection_driver_next_event(&pc->driver);
+ }
+ if (e && !pc->started && pn_event_type(e) == PN_CONNECTION_BOUND)
+ pc->started = true; // SSL will be set up on return and safe to do IO with correct transport layers
+ return e;
+}
+
+static void pconnection_done(pconnection_t *pc) {
+ {
+ csguard g(&pc->context.cslock);
+ pc->context.working = false;
+ pc->hog_count = 0;
+ if (pconnection_has_event(pc) || pconnection_work_pending(pc)) {
+ wakeup(&pc->psocket);
+ } else if (pn_connection_driver_finished(&pc->driver)) {
+ pconnection_begin_close(pc);
+ wakeup(&pc->psocket);
+ }
+ }
+}
+
+static inline bool is_inactive(pn_proactor_t *p) {
+ return (!p->contexts && !p->disconnects_pending && !p->timeout_set && !p->need_timeout && !p->shutting_down);
+}
+
+// Call whenever transitioning from "definitely active" to "maybe inactive"
+static void wake_if_inactive(pn_proactor_t *p) {
+ if (is_inactive(p)) {
+ p->need_inactive = true;
+ proactor_wake(p);
+ }
+}
+
+void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
+ pconnection_t *pc = batch_pconnection(batch);
+ if (pc) {
+ pconnection_done(pc);
+ return;
+ }
+ pn_listener_t *l = batch_listener(batch);
+ if (l) {
+ listener_done(l);
+ return;
+ }
+ pn_proactor_t *bp = batch_proactor(batch);
+ if (bp == p) {
+ csguard g(&p->context.cslock);
+ p->context.working = false;
+ if (p->delayed_interrupt) {
+ p->delayed_interrupt = false;
+ p->need_interrupt = true;
+ }
+ if (p->timeout_processed) {
+ p->timeout_processed = false;
+ wake_if_inactive(p);
+ }
+ if (proactor_update_batch(p))
+ proactor_wake(p);
+ return;
+ }
+}
+
+static void proactor_add_event(pn_proactor_t *p, pn_event_type_t t) {
+ pn_collector_put(p->collector, pn_proactor__class(), p, t);
+}
+
+static pn_event_batch_t *proactor_process(pn_proactor_t *p) {
+ csguard g(&p->context.cslock);
+ proactor_wake_complete(p);
+ if (!p->context.working) { /* Can generate proactor events */
+ if (proactor_update_batch(p)) {
+ p->context.working = true;
+ return &p->batch;
+ }
+ }
+ return NULL;
+}
+
+static pn_event_batch_t *psocket_process(psocket_t *ps, iocp_result_t *result, reaper *rpr) {
+ if (ps) {
+ pconnection_t *pc = as_pconnection_t(ps);
+ if (pc) {
+ return pconnection_process(pc, result, false);
+ } else {
+ pn_listener_t *l = as_listener(ps);
+ if (l)
+ return listener_process(l, result);
+ }
+ }
+ rpr->process(result);
+ return NULL;
+}
+
+static pn_event_batch_t *proactor_completion_loop(struct pn_proactor_t* p, bool can_block) {
+ // Proact! Process inbound completions of async activity until one
+ // of them provides a batch of events.
+ while(true) {
+ pn_event_batch_t *batch = NULL;
+
+ DWORD win_timeout = can_block ? INFINITE : 0;
+ DWORD num_xfer = 0;
+ ULONG_PTR completion_key = 0;
+ OVERLAPPED *overlapped = 0;
+
+ bool good_op = GetQueuedCompletionStatus (p->iocp->completion_port, &num_xfer,
+ &completion_key, &overlapped, win_timeout);
+ if (!overlapped && !can_block && GetLastError() == WAIT_TIMEOUT)
+ return NULL; // valid timeout
+
+ if (!good_op && !overlapped) {
+ // Should never happen. shutdown?
+ // We aren't expecting a timeout, closed completion port, or other error here.
+ pn_logf("%s", errno_str("Windows Proton proactor internal failure\n", false).c_str());
+ abort();
+ }
+
+ if (completion_key == NULL) {
+ // Normal IO case for connections and listeners
+ iocp_result_t *result = (iocp_result_t *) overlapped;
+ result->status = good_op ? 0 : GetLastError();
+ result->num_transferred = num_xfer;
+ psocket_t *ps = (psocket_t *) result->iocpd->active_completer;
+ batch = psocket_process(ps, result, p->reaper);
+ }
+ else {
+ // completion_key on our completion port is always null unless set by us
+ // in PostQueuedCompletionStatus. In which case, we hijack the overlapped
+ // data structure for our own use.
+ if (completion_key == psocket_wakeup_key)
+ batch = psocket_process((psocket_t *) overlapped, NULL, p->reaper);
+ else if (completion_key == proactor_wake_key)
+ batch = proactor_process((pn_proactor_t *) overlapped);
+ else if (completion_key == recycle_accept_key)
+ recycle_result((accept_result_t *) overlapped);
+ }
+ if (batch) return batch;
+ // No event generated. Try again with next completion.
+ }
+}
+
+pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
+ return proactor_completion_loop(p, true);
+}
+
+pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
+ return proactor_completion_loop(p, false);
+}
+
+void pn_proactor_interrupt(pn_proactor_t *p) {
+ csguard g(&p->context.cslock);
+ if (p->context.working)
+ p->delayed_interrupt = true;
+ else
+ p->need_interrupt = true;
+ proactor_wake(p);
+}
+
+// runs on a threadpool thread. Must not hold timer_lock.
+VOID CALLBACK timeout_cb(PVOID arg, BOOLEAN /* ignored*/ ) {
+ pn_proactor_t *p = (pn_proactor_t *) arg;
+ csguard gtimer(&p->timer_lock);
+ csguard g(&p->context.cslock);
+ if (p->timeout_set)
+ p->need_timeout = true; // else cancelled
+ p->timeout_set = false;
+ if (p->need_timeout)
+ proactor_wake(p);
+}
+
+void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
+ bool ticking = false;
+ csguard gtimer(&p->timer_lock);
+ {
+ csguard g(&p->context.cslock);
+ ticking = (p->timeout_timer != NULL);
+ if (t == 0) {
+ p->need_timeout = true;
+ p->timeout_set = false;
+ proactor_wake(p);
+ }
+ else
+ p->timeout_set = true;
+ }
+ // Just timer_lock held
+ if (ticking) {
+ stop_timer(p->timer_queue, &p->timeout_timer);
+ }
+ if (t) {
+ start_timer(p->timer_queue, &p->timeout_timer, timeout_cb, p, t);
+ }
+}
+
+void pn_proactor_cancel_timeout(pn_proactor_t *p) {
+ bool ticking = false;
+ csguard gtimer(&p->timer_lock);
+ {
+ csguard g(&p->context.cslock);
+ p->timeout_set = false;
+ ticking = (p->timeout_timer != NULL);
+ }
+ if (ticking) {
+ stop_timer(p->timer_queue, &p->timeout_timer);
+ csguard g(&p->context.cslock);
+ wake_if_inactive(p);
+ }
+}
+
+// Return true if connect_step_done()will handle connection status
+static bool connect_step(pconnection_t *pc) {
+ pn_proactor_t *p = pc->context.proactor;
+ while (pc->ai) { /* Have an address */
+ struct addrinfo *ai = pc->ai;
+ pc->ai = pc->ai->ai_next; /* Move to next address in case this fails */
+ unique_socket fd(::socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol));
+ if (fd != INVALID_SOCKET) {
+ // Windows ConnectEx requires loosely bound socket.
+ sockaddr_storage sa;
+ memset(&sa, 0, sizeof(sa));
+ sa.ss_family = ai->ai_family;
+ if (!bind(fd, (SOCKADDR *) &sa, ai->ai_addrlen)) {
+ pni_configure_sock_2(fd);
+ pc->psocket.iocpd = pni_iocpdesc_create(p->iocp, fd);
+ assert(pc->psocket.iocpd);
+ pc->psocket.iocpd->write_closed = true;
+ pc->psocket.iocpd->read_closed = true;
+ fd.release();
+ iocpdesc_t *iocpd = pc->psocket.iocpd;
+ if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0)) {
+ LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex2(iocpd->socket);
+ // addrinfo is owned by the pconnection so pass NULL to the connect result
+ connect_result_t *result = connect_result(iocpd, NULL);
+ DWORD unused;
+ bool success = fn_connect_ex(iocpd->socket, ai->ai_addr, ai->ai_addrlen,
+ NULL, 0, &unused, (LPOVERLAPPED) result);
+ if (success || WSAGetLastError() == ERROR_IO_PENDING) {
+ iocpd->ops_in_progress++;
+ iocpd->active_completer = &pc->psocket;
+ // getpeername unreliable for outgoing connections, but we know it at this point
+ memcpy(&pc->remote.ss, ai->ai_addr, ai->ai_addrlen);
+ return true; // logic resumes at connect_step_done()
+ }
+ pn_free(result);
+ }
+ }
+ if (pc->psocket.iocpd) {
+ pc->context.proactor->reaper->fast_reap(pc->psocket.iocpd);
+ pc->psocket.iocpd = NULL;
+ }
+ }
+ }
+ pc->context.closing = true;
+ return false;
+}
+
+static void connect_step_done(pconnection_t *pc, connect_result_t *result) {
+ csguard g(&pc->context.cslock);
+ DWORD saved_status = result->base.status;
+ iocpdesc_t *iocpd = result->base.iocpd;
+ iocpd->ops_in_progress--;
+ assert(pc->psocket.iocpd == iocpd);
+ complete_connect(result, result->base.status); // frees result, starts regular IO if connected
+
+ if (!saved_status) {
+ // Success
+ pc->psocket.iocpd->write_closed = false;
+ pc->psocket.iocpd->read_closed = false;
+ if (pc->addrinfo) {
+ socklen_t len = sizeof(pc->local.ss);
+ getsockname(pc->psocket.iocpd->socket, (struct sockaddr*)&pc->local.ss, &len);
+ freeaddrinfo(pc->addrinfo);
+ pc->addrinfo = NULL;
+ }
+ pc->ai = NULL;
+ return;
+ }
+ else {
+ // Descriptor will never be used. Dispose.
+ // Connect failed, no IO started, i.e. no pending iocpd based events
+ pc->context.proactor->reaper->fast_reap(iocpd);
+ pc->psocket.iocpd = NULL;
+ memset(&pc->remote.ss, 0, sizeof(pc->remote.ss));
+ // Is there a next connection target in the addrinfo to try?
+ if (pc->ai && connect_step(pc)) {
+ // Trying the next addrinfo possibility. Will return here.
+ return;
+ }
+ // Give up
+ psocket_error(&pc->psocket, saved_status, "connect to ");
+ pc->context.closing = true;
+ wakeup(&pc->psocket);
+ }
+}
+
+void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
+ pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
+ assert(pc); // TODO: memory safety
+ const char *err = pconnection_setup(pc, p, c, t, false, addr);
+ if (err) {
+ pn_logf("pn_proactor_connect failure: %s", err);
+ return;
+ }
+ // TODO: check case of proactor shutting down
+ csguard g(&pc->context.cslock);
+ pc->connecting = true;
+ proactor_add(&pc->context);
+ pn_connection_open(pc->driver.connection); /* Auto-open */
+
+ if (!pgetaddrinfo(pc->psocket.host, pc->psocket.port, 0, &pc->addrinfo)) {
+ pc->ai = pc->addrinfo;
+ if (connect_step(pc)) {
+ return;
+ }
+ }
+ psocket_error(&pc->psocket, WSAGetLastError(), "connect to ");
+ wakeup(&pc->psocket);
+ if (p->reaper->add(pc->psocket.iocpd)) {
+ pc->psocket.iocpd = NULL;
+ }
+}
+
+void pn_proactor_release_connection(pn_connection_t *c) {
+ bool notify = false;
+ pconnection_t *pc = g
<TRUNCATED>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org