You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/25 21:01:05 UTC
[09/48] qpid-proton git commit: PROTON-1350 PROTON-1351: Introduce
proton-c core library - Created new core proton library qpid-proton-core
which only contains protocol processsing and no IO. - Rearranged source tree
to separate core protocol code and
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/io/windows/io.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/io/windows/io.c b/proton-c/src/reactor/io/windows/io.c
new file mode 100644
index 0000000..3ae6722
--- /dev/null
+++ b/proton-c/src/reactor/io/windows/io.c
@@ -0,0 +1,459 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#define FD_SETSIZE 2048
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <mswsock.h>
+#include <Ws2tcpip.h>
+
+#include "reactor/io.h"
+#include "reactor/selector.h"
+
+#include "platform/platform.h"
+#include "iocp.h"
+#include "core/util.h"
+
+#include <proton/object.h>
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdio.h>
+#include <assert.h>
+
+int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code)
+{
+ // Error code can be from GetLastError or WSAGetLastError,
+ char err[1024] = {0};
+ FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
+ FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
+ return pn_error_format(error, PN_ERR, "%s: %s", msg, err);
+}
+
+static void io_log(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fflush(stderr);
+}
+
+struct pn_io_t {
+ char host[NI_MAXHOST];
+ char serv[NI_MAXSERV];
+ pn_error_t *error;
+ bool trace;
+ bool wouldblock;
+ iocp_t *iocp;
+};
+
+void pn_io_initialize(void *obj)
+{
+ pn_io_t *io = (pn_io_t *) obj;
+ io->error = pn_error();
+ io->wouldblock = false;
+ io->trace = pn_env_bool("PN_TRACE_DRV");
+
+ /* Request WinSock 2.2 */
+ WORD wsa_ver = MAKEWORD(2, 2);
+ WSADATA unused;
+ int err = WSAStartup(wsa_ver, &unused);
+ if (err) {
+ pni_win32_error(io->error, "WSAStartup", WSAGetLastError());
+ fprintf(stderr, "Can't load WinSock: %s\n", pn_error_text(io->error));
+ }
+ io->iocp = pni_iocp();
+}
+
+void pn_io_finalize(void *obj)
+{
+ pn_io_t *io = (pn_io_t *) obj;
+ pn_error_free(io->error);
+ pn_free(io->iocp);
+ WSACleanup();
+}
+
+#define pn_io_hashcode NULL
+#define pn_io_compare NULL
+#define pn_io_inspect
+
+pn_io_t *pn_io(void)
+{
+ static const pn_class_t clazz = PN_CLASS(pn_io);
+ pn_io_t *io = (pn_io_t *) pn_class_new(&clazz, sizeof(pn_io_t));
+ return io;
+}
+
+void pn_io_free(pn_io_t *io)
+{
+ pn_free(io);
+}
+
+pn_error_t *pn_io_error(pn_io_t *io)
+{
+ assert(io);
+ return io->error;
+}
+
+static void ensure_unique(pn_io_t *io, pn_socket_t new_socket)
+{
+ // A brand new socket can have the same HANDLE value as a previous
+ // one after a socketclose. If the application closes one itself
+ // (i.e. not using pn_close), we don't find out about it until here.
+ iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, new_socket);
+ if (iocpd) {
+ if (io->trace)
+ io_log("Stale external socket reference discarded\n");
+ // Re-use means former socket instance was closed
+ assert(iocpd->ops_in_progress == 0);
+ assert(iocpd->external);
+ // Clean up the straggler as best we can
+ pn_socket_t sock = iocpd->socket;
+ iocpd->socket = INVALID_SOCKET;
+ pni_iocpdesc_map_del(io->iocp, sock); // may free the iocpdesc_t depending on refcount
+ }
+}
+
+
+/*
+ * This heavyweight surrogate pipe could be replaced with a normal Windows pipe
+ * now that select() is no longer used. If interrupt semantics are all that is
+ * needed, a simple user space counter and reserved completion status would
+ * probably suffice.
+ */
+static int pni_socket_pair(pn_io_t *io, SOCKET sv[2]);
+
+int pn_pipe(pn_io_t *io, pn_socket_t *dest)
+{
+ int n = pni_socket_pair(io, dest);
+ if (n) {
+ pni_win32_error(io->error, "pipe", WSAGetLastError());
+ }
+ return n;
+}
+
+static void pn_configure_sock(pn_io_t *io, pn_socket_t sock) {
+ //
+ // Disable the Nagle algorithm on TCP connections.
+ //
+ int flag = 1;
+ if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) {
+ perror("setsockopt");
+ }
+
+ u_long nonblock = 1;
+ if (ioctlsocket(sock, FIONBIO, &nonblock)) {
+ perror("ioctlsocket");
+ }
+}
+
+static inline pn_socket_t pni_create_socket(int domain, int protocol);
+
+static const char *amqp_service(const char *port) {
+ // Help older Windows to know about amqp[s] ports
+ if (port) {
+ if (!strcmp("amqp", port)) return "5672";
+ if (!strcmp("amqps", port)) return "5671";
+ }
+ return port;
+}
+
+pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port)
+{
+ struct addrinfo *addr;
+ int code = getaddrinfo(host, amqp_service(port), NULL, &addr);
+ if (code) {
+ pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code));
+ return INVALID_SOCKET;
+ }
+
+ pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol);
+ if (sock == INVALID_SOCKET) {
+ pni_win32_error(io->error, "pni_create_socket", WSAGetLastError());
+ return INVALID_SOCKET;
+ }
+ ensure_unique(io, sock);
+
+ bool optval = 1;
+ if (setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *) &optval,
+ sizeof(optval)) == -1) {
+ pni_win32_error(io->error, "setsockopt", WSAGetLastError());
+ closesocket(sock);
+ return INVALID_SOCKET;
+ }
+
+ if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+ pni_win32_error(io->error, "bind", WSAGetLastError());
+ freeaddrinfo(addr);
+ closesocket(sock);
+ return INVALID_SOCKET;
+ }
+ freeaddrinfo(addr);
+
+ if (listen(sock, 50) == -1) {
+ pni_win32_error(io->error, "listen", WSAGetLastError());
+ closesocket(sock);
+ return INVALID_SOCKET;
+ }
+
+ if (io->iocp->selector) {
+ iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
+ if (!iocpd) {
+ pn_i_error_from_errno(io->error, "register");
+ closesocket(sock);
+ return INVALID_SOCKET;
+ }
+ pni_iocpdesc_start(iocpd);
+ }
+
+ return sock;
+}
+
+pn_socket_t pn_connect(pn_io_t *io, const char *hostarg, const char *port)
+{
+ // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
+ const char *host = strcmp("0.0.0.0", hostarg) ? hostarg : "127.0.0.1";
+
+ struct addrinfo *addr;
+ int code = getaddrinfo(host, amqp_service(port), NULL, &addr);
+ if (code) {
+ pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code));
+ return INVALID_SOCKET;
+ }
+
+ pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol);
+ if (sock == INVALID_SOCKET) {
+ pni_win32_error(io->error, "proton pni_create_socket", WSAGetLastError());
+ freeaddrinfo(addr);
+ return INVALID_SOCKET;
+ }
+
+ ensure_unique(io, sock);
+ pn_configure_sock(io, sock);
+
+ if (io->iocp->selector) {
+ return pni_iocp_begin_connect(io->iocp, sock, addr, io->error);
+ } else {
+ if (connect(sock, addr->ai_addr, addr->ai_addrlen) != 0) {
+ if (WSAGetLastError() != WSAEWOULDBLOCK) {
+ pni_win32_error(io->error, "connect", WSAGetLastError());
+ freeaddrinfo(addr);
+ closesocket(sock);
+ return INVALID_SOCKET;
+ }
+ }
+
+ freeaddrinfo(addr);
+ return sock;
+ }
+}
+
+pn_socket_t pn_accept(pn_io_t *io, pn_socket_t listen_sock, char *name, size_t size)
+{
+ struct sockaddr_storage addr;
+ socklen_t addrlen = sizeof(addr);
+ iocpdesc_t *listend = pni_iocpdesc_map_get(io->iocp, listen_sock);
+ pn_socket_t accept_sock;
+
+ *name = '\0';
+ if (listend)
+ accept_sock = pni_iocp_end_accept(listend, (struct sockaddr *) &addr, &addrlen, &io->wouldblock, io->error);
+ else {
+ // User supplied socket
+ accept_sock = accept(listen_sock, (struct sockaddr *) &addr, &addrlen);
+ if (accept_sock == INVALID_SOCKET)
+ pni_win32_error(io->error, "sync accept", WSAGetLastError());
+ }
+
+ if (accept_sock == INVALID_SOCKET)
+ return accept_sock;
+
+ int code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
+ io->serv, NI_MAXSERV, 0);
+ if (code)
+ code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
+ io->serv, NI_MAXSERV, NI_NUMERICHOST | NI_NUMERICSERV);
+ if (code) {
+ pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code));
+ pn_close(io, accept_sock);
+ return INVALID_SOCKET;
+ } else {
+ pn_configure_sock(io, accept_sock);
+ pni_snprintf(name, size, "%s:%s", io->host, io->serv);
+ if (listend) {
+ pni_iocpdesc_start(pni_iocpdesc_map_get(io->iocp, accept_sock));
+ }
+ return accept_sock;
+ }
+}
+
+static inline pn_socket_t pni_create_socket(int domain, int protocol) {
+ return socket(domain, SOCK_STREAM, protocol);
+}
+
+ssize_t pn_send(pn_io_t *io, pn_socket_t sockfd, const void *buf, size_t len) {
+ ssize_t count;
+ iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, sockfd);
+ if (iocpd) {
+ count = pni_iocp_begin_write(iocpd, buf, len, &io->wouldblock, io->error);
+ } else {
+ count = send(sockfd, (const char *) buf, len, 0);
+ io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
+ }
+ return count;
+}
+
+ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
+{
+ ssize_t count;
+ iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
+ if (iocpd) {
+ count = pni_iocp_recv(iocpd, buf, size, &io->wouldblock, io->error);
+ } else {
+ count = recv(socket, (char *) buf, size, 0);
+ io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
+ }
+ return count;
+}
+
+ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size)
+{
+ // non-socket io is mapped to socket io for now. See pn_pipe()
+ return pn_send(io, socket, buf, size);
+}
+
+ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
+{
+ return pn_recv(io, socket, buf, size);
+}
+
+void pn_close(pn_io_t *io, pn_socket_t socket)
+{
+ iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
+ if (iocpd)
+ pni_iocp_begin_close(iocpd);
+ else {
+ closesocket(socket);
+ }
+}
+
+bool pn_wouldblock(pn_io_t *io)
+{
+ return io->wouldblock;
+}
+
+pn_selector_t *pn_io_selector(pn_io_t *io)
+{
+ if (io->iocp->selector == NULL)
+ io->iocp->selector = pni_selector_create(io->iocp);
+ return io->iocp->selector;
+}
+
+static void configure_pipe_socket(pn_io_t *io, pn_socket_t sock)
+{
+ u_long v = 1;
+ ioctlsocket (sock, FIONBIO, &v);
+ ensure_unique(io, sock);
+ iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
+ pni_iocpdesc_start(iocpd);
+}
+
+
+static int pni_socket_pair (pn_io_t *io, SOCKET sv[2]) {
+ // no socketpair on windows. provide pipe() semantics using sockets
+ struct protoent * pe_tcp = getprotobyname("tcp");
+ if (pe_tcp == NULL) {
+ perror("getprotobyname");
+ return -1;
+ }
+
+ SOCKET sock = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto);
+ if (sock == INVALID_SOCKET) {
+ perror("socket");
+ return -1;
+ }
+
+ BOOL b = 1;
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &b, sizeof(b)) == -1) {
+ perror("setsockopt");
+ closesocket(sock);
+ return -1;
+ }
+ else {
+ struct sockaddr_in addr = {0};
+ addr.sin_family = AF_INET;
+ addr.sin_port = 0;
+ addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
+
+ if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
+ perror("bind");
+ closesocket(sock);
+ return -1;
+ }
+ }
+
+ if (listen(sock, 50) == -1) {
+ perror("listen");
+ closesocket(sock);
+ return -1;
+ }
+
+ if ((sv[1] = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto)) == INVALID_SOCKET) {
+ perror("sock1");
+ closesocket(sock);
+ return -1;
+ }
+ else {
+ struct sockaddr addr = {0};
+ int l = sizeof(addr);
+ if (getsockname(sock, &addr, &l) == -1) {
+ perror("getsockname");
+ closesocket(sock);
+ return -1;
+ }
+
+ if (connect(sv[1], &addr, sizeof(addr)) == -1) {
+ int err = WSAGetLastError();
+ fprintf(stderr, "connect wsaerrr %d\n", err);
+ closesocket(sock);
+ closesocket(sv[1]);
+ return -1;
+ }
+
+ if ((sv[0] = accept(sock, &addr, &l)) == INVALID_SOCKET) {
+ perror("accept");
+ closesocket(sock);
+ closesocket(sv[1]);
+ return -1;
+ }
+ }
+
+ configure_pipe_socket(io, sv[0]);
+ configure_pipe_socket(io, sv[1]);
+ closesocket(sock);
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/io/windows/iocp.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/io/windows/iocp.c b/proton-c/src/reactor/io/windows/iocp.c
new file mode 100644
index 0000000..8a1a64a
--- /dev/null
+++ b/proton-c/src/reactor/io/windows/iocp.c
@@ -0,0 +1,1179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <mswsock.h>
+#include <Ws2tcpip.h>
+
+#include "reactor/io.h"
+#include "reactor/selector.h"
+
+#include "iocp.h"
+#include "platform/platform.h"
+#include "core/util.h"
+
+#include <proton/object.h>
+#include <proton/error.h>
+#include <proton/transport.h>
+
+#include <assert.h>
+
+/*
+ * Windows IO Completion Port support for Proton.
+ *
+ * Overlapped writes are used to avoid lengthy stalls between write
+ * completion and starting a new write. Non-overlapped reads are used
+ * since Windows accumulates inbound traffic without stalling and
+ * managing read buffers would not avoid a memory copy at the pn_read
+ * boundary.
+ *
+ * A socket must not get a Windows closesocket() unless the
+ * application has called pn_close on the socket or a global
+ * pn_io_finalize(). On error, the internal accounting for
+ * write_closed or read_closed may be updated along with the external
+ * event notification. A socket may be closed if it is never added to
+ * the iocpdesc_map or is on its way out of the map.
+ */
+
+// Max number of overlapped accepts per listener
+#define IOCP_MAX_ACCEPTS 10
+
+// AcceptEx squishes the local and remote addresses and optional data
+// all together when accepting the connection. Reserve enough for
+// IPv6 addresses, even if the socket is IPv4. The 16 bytes padding
+// per address is required by AcceptEx.
+#define IOCP_SOCKADDRMAXLEN (sizeof(sockaddr_in6) + 16)
+#define IOCP_SOCKADDRBUFLEN (2 * IOCP_SOCKADDRMAXLEN)
+
+static void iocp_log(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fflush(stderr);
+}
+
+static void set_iocp_error_status(pn_error_t *error, int code, HRESULT status)
+{
+ char buf[512];
+ if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM,
+ 0, status, 0, buf, sizeof(buf), 0))
+ pn_error_set(error, code, buf);
+ else {
+ fprintf(stderr, "pn internal Windows error: %lu\n", GetLastError());
+ }
+}
+
+static void reap_check(iocpdesc_t *);
+static void bind_to_completion_port(iocpdesc_t *iocpd);
+static void iocp_shutdown(iocpdesc_t *iocpd);
+static void start_reading(iocpdesc_t *iocpd);
+static bool is_listener(iocpdesc_t *iocpd);
+static void release_sys_sendbuf(SOCKET s);
+
+static void iocpdesc_fail(iocpdesc_t *iocpd, HRESULT status, const char* text)
+{
+ pni_win32_error(iocpd->error, text, status);
+ if (iocpd->iocp->iocp_trace) {
+ iocp_log("connection terminated: %s\n", pn_error_text(iocpd->error));
+ }
+ iocpd->write_closed = true;
+ iocpd->read_closed = true;
+ iocpd->poll_error = true;
+ pni_events_update(iocpd, iocpd->events & ~(PN_READABLE | PN_WRITABLE));
+}
+
+// Helper functions to use specialized IOCP AcceptEx() and ConnectEx()
+static LPFN_ACCEPTEX lookup_accept_ex(SOCKET s)
+{
+ GUID guid = WSAID_ACCEPTEX;
+ DWORD bytes = 0;
+ LPFN_ACCEPTEX fn;
+ WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+ &fn, sizeof(fn), &bytes, NULL, NULL);
+ assert(fn);
+ return fn;
+}
+
+static LPFN_CONNECTEX lookup_connect_ex(SOCKET s)
+{
+ GUID guid = WSAID_CONNECTEX;
+ DWORD bytes = 0;
+ LPFN_CONNECTEX fn;
+ WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+ &fn, sizeof(fn), &bytes, NULL, NULL);
+ assert(fn);
+ return fn;
+}
+
+static LPFN_GETACCEPTEXSOCKADDRS lookup_get_accept_ex_sockaddrs(SOCKET s)
+{
+ GUID guid = WSAID_GETACCEPTEXSOCKADDRS;
+ DWORD bytes = 0;
+ LPFN_GETACCEPTEXSOCKADDRS fn;
+ WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+ &fn, sizeof(fn), &bytes, NULL, NULL);
+ assert(fn);
+ return fn;
+}
+
+// match accept socket to listener socket
+static iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd)
+{
+ sockaddr_storage sa;
+ socklen_t salen = sizeof(sa);
+ if (getsockname(iocpd->socket, (sockaddr*)&sa, &salen) == -1)
+ return NULL;
+ SOCKET s = socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+ if (s == INVALID_SOCKET)
+ return NULL;
+ return pni_iocpdesc_create(iocpd->iocp, s, false);
+}
+
+static bool is_listener(iocpdesc_t *iocpd)
+{
+ return iocpd && iocpd->acceptor;
+}
+
+// === Async accept processing
+
+typedef struct {
+ iocp_result_t base;
+ iocpdesc_t *new_sock;
+ char address_buffer[IOCP_SOCKADDRBUFLEN];
+ DWORD unused;
+} accept_result_t;
+
+static accept_result_t *accept_result(iocpdesc_t *listen_sock) {
+ accept_result_t *result = (accept_result_t *)calloc(1, sizeof(accept_result_t));
+ if (result) {
+ result->base.type = IOCP_ACCEPT;
+ result->base.iocpd = listen_sock;
+ }
+ return result;
+}
+
+static void reset_accept_result(accept_result_t *result) {
+ memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+ memset(&result->address_buffer, 0, IOCP_SOCKADDRBUFLEN);
+}
+
+struct pni_acceptor_t {
+ int accept_queue_size;
+ pn_list_t *accepts;
+ iocpdesc_t *listen_sock;
+ bool signalled;
+ LPFN_ACCEPTEX fn_accept_ex;
+ LPFN_GETACCEPTEXSOCKADDRS fn_get_accept_ex_sockaddrs;
+};
+
+#define pni_acceptor_compare NULL
+#define pni_acceptor_inspect NULL
+#define pni_acceptor_hashcode NULL
+
+static void pni_acceptor_initialize(void *object)
+{
+ pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
+ acceptor->accepts = pn_list(PN_VOID, IOCP_MAX_ACCEPTS);
+}
+
+static void pni_acceptor_finalize(void *object)
+{
+ pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
+ size_t len = pn_list_size(acceptor->accepts);
+ for (size_t i = 0; i < len; i++)
+ free(pn_list_get(acceptor->accepts, i));
+ pn_free(acceptor->accepts);
+}
+
+static pni_acceptor_t *pni_acceptor(iocpdesc_t *iocpd)
+{
+ static const pn_cid_t CID_pni_acceptor = CID_pn_void;
+ static const pn_class_t clazz = PN_CLASS(pni_acceptor);
+ pni_acceptor_t *acceptor = (pni_acceptor_t *) pn_class_new(&clazz, sizeof(pni_acceptor_t));
+ acceptor->listen_sock = iocpd;
+ acceptor->accept_queue_size = 0;
+ acceptor->signalled = false;
+ pn_socket_t sock = acceptor->listen_sock->socket;
+ acceptor->fn_accept_ex = lookup_accept_ex(sock);
+ acceptor->fn_get_accept_ex_sockaddrs = lookup_get_accept_ex_sockaddrs(sock);
+ return acceptor;
+}
+
+static void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result)
+{
+ if (acceptor->listen_sock->closing) {
+ if (result) {
+ free(result);
+ acceptor->accept_queue_size--;
+ }
+ if (acceptor->accept_queue_size == 0)
+ acceptor->signalled = true;
+ return;
+ }
+
+ if (result) {
+ reset_accept_result(result);
+ } else {
+ if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS &&
+ pn_list_size(acceptor->accepts) == acceptor->accept_queue_size ) {
+ result = accept_result(acceptor->listen_sock);
+ acceptor->accept_queue_size++;
+ } else {
+ // an async accept is still pending or max concurrent accepts already hit
+ return;
+ }
+ }
+
+ result->new_sock = create_same_type_socket(acceptor->listen_sock);
+ if (result->new_sock) {
+ // Not yet connected.
+ result->new_sock->read_closed = true;
+ result->new_sock->write_closed = true;
+
+ bool success = acceptor->fn_accept_ex(acceptor->listen_sock->socket, result->new_sock->socket,
+ result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN,
+ &result->unused, (LPOVERLAPPED) result);
+ if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
+ result->base.status = WSAGetLastError();
+ pn_list_add(acceptor->accepts, result);
+ pni_events_update(acceptor->listen_sock, acceptor->listen_sock->events | PN_READABLE);
+ } else {
+ acceptor->listen_sock->ops_in_progress++;
+ // This socket is equally involved in the async operation.
+ result->new_sock->ops_in_progress++;
+ }
+ } else {
+ iocpdesc_fail(acceptor->listen_sock, WSAGetLastError(), "create accept socket");
+ }
+}
+
+static void complete_accept(accept_result_t *result, HRESULT status)
+{
+ result->new_sock->ops_in_progress--;
+ iocpdesc_t *ld = result->base.iocpd;
+ if (ld->read_closed) {
+ if (!result->new_sock->closing)
+ pni_iocp_begin_close(result->new_sock);
+ free(result); // discard
+ reap_check(ld);
+ } else {
+ result->base.status = status;
+ pn_list_add(ld->acceptor->accepts, result);
+ pni_events_update(ld, ld->events | PN_READABLE);
+ }
+}
+
+pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error)
+{
+ if (!is_listener(ld)) {
+ set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
+ return INVALID_SOCKET;
+ }
+ if (ld->read_closed) {
+ set_iocp_error_status(error, PN_ERR, WSAENOTSOCK);
+ return INVALID_SOCKET;
+ }
+ if (pn_list_size(ld->acceptor->accepts) == 0) {
+ if (ld->events & PN_READABLE && ld->iocp->iocp_trace)
+ iocp_log("listen socket readable with no available accept completions\n");
+ *would_block = true;
+ return INVALID_SOCKET;
+ }
+
+ accept_result_t *result = (accept_result_t *) pn_list_get(ld->acceptor->accepts, 0);
+ pn_list_del(ld->acceptor->accepts, 0, 1);
+ if (!pn_list_size(ld->acceptor->accepts))
+ pni_events_update(ld, ld->events & ~PN_READABLE); // No pending accepts
+
+ pn_socket_t accept_sock;
+ if (result->base.status) {
+ accept_sock = INVALID_SOCKET;
+ pni_win32_error(ld->error, "accept failure", result->base.status);
+ if (ld->iocp->iocp_trace)
+ iocp_log("%s\n", pn_error_text(ld->error));
+ // App never sees this socket so close it here.
+ pni_iocp_begin_close(result->new_sock);
+ } else {
+ accept_sock = result->new_sock->socket;
+ // AcceptEx special setsockopt:
+ setsockopt(accept_sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&ld->socket,
+ sizeof (SOCKET));
+ if (addr && addrlen && *addrlen > 0) {
+ sockaddr_storage *local_addr = NULL;
+ sockaddr_storage *remote_addr = NULL;
+ int local_addrlen, remote_addrlen;
+ LPFN_GETACCEPTEXSOCKADDRS fn = ld->acceptor->fn_get_accept_ex_sockaddrs;
+ fn(result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN,
+ (SOCKADDR **) &local_addr, &local_addrlen, (SOCKADDR **) &remote_addr,
+ &remote_addrlen);
+ *addrlen = pn_min(*addrlen, remote_addrlen);
+ memmove(addr, remote_addr, *addrlen);
+ }
+ }
+
+ if (accept_sock != INVALID_SOCKET) {
+ // Connected.
+ result->new_sock->read_closed = false;
+ result->new_sock->write_closed = false;
+ }
+
+ // Done with the completion result, so reuse it
+ result->new_sock = NULL;
+ begin_accept(ld->acceptor, result);
+ return accept_sock;
+}
+
+
+// === Async connect processing
+
+typedef struct {
+ iocp_result_t base;
+ char address_buffer[IOCP_SOCKADDRBUFLEN];
+ struct addrinfo *addrinfo;
+} connect_result_t;
+
+#define connect_result_initialize NULL
+#define connect_result_compare NULL
+#define connect_result_inspect NULL
+#define connect_result_hashcode NULL
+
+static void connect_result_finalize(void *object)
+{
+ connect_result_t *result = (connect_result_t *) object;
+ // Do not release addrinfo until ConnectEx completes
+ if (result->addrinfo)
+ freeaddrinfo(result->addrinfo);
+}
+
+static connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr) {
+ static const pn_cid_t CID_connect_result = CID_pn_void;
+ static const pn_class_t clazz = PN_CLASS(connect_result);
+ connect_result_t *result = (connect_result_t *) pn_class_new(&clazz, sizeof(connect_result_t));
+ if (result) {
+ memset(result, 0, sizeof(connect_result_t));
+ result->base.type = IOCP_CONNECT;
+ result->base.iocpd = iocpd;
+ result->addrinfo = addr;
+ }
+ return result;
+}
+
+pn_socket_t pni_iocp_begin_connect(iocp_t *iocp, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error)
+{
+ // addr lives for the duration of the async connect. Caller has passed ownership here.
+ // See connect_result_finalize().
+ // Use of Windows-specific ConnectEx() requires our socket to be "loosely" pre-bound:
+ sockaddr_storage sa;
+ memset(&sa, 0, sizeof(sa));
+ sa.ss_family = addr->ai_family;
+ if (bind(sock, (SOCKADDR *) &sa, addr->ai_addrlen)) {
+ pni_win32_error(error, "begin async connection", WSAGetLastError());
+ if (iocp->iocp_trace)
+ iocp_log("%s\n", pn_error_text(error));
+ closesocket(sock);
+ freeaddrinfo(addr);
+ return INVALID_SOCKET;
+ }
+
+ iocpdesc_t *iocpd = pni_iocpdesc_create(iocp, sock, false);
+ bind_to_completion_port(iocpd);
+ LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex(iocpd->socket);
+ connect_result_t *result = connect_result(iocpd, addr);
+ DWORD unused;
+ bool success = fn_connect_ex(iocpd->socket, result->addrinfo->ai_addr, result->addrinfo->ai_addrlen,
+ NULL, 0, &unused, (LPOVERLAPPED) result);
+ if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
+ pni_win32_error(error, "ConnectEx failure", WSAGetLastError());
+ pn_free(result);
+ iocpd->write_closed = true;
+ iocpd->read_closed = true;
+ if (iocp->iocp_trace)
+ iocp_log("%s\n", pn_error_text(error));
+ } else {
+ iocpd->ops_in_progress++;
+ }
+ return sock;
+}
+
+static void complete_connect(connect_result_t *result, HRESULT status)
+{
+ iocpdesc_t *iocpd = result->base.iocpd;
+ if (iocpd->closing) {
+ pn_free(result);
+ reap_check(iocpd);
+ return;
+ }
+
+ if (status) {
+ iocpdesc_fail(iocpd, status, "Connect failure");
+ // Posix sets selectable events as follows:
+ pni_events_update(iocpd, PN_READABLE | PN_EXPIRED);
+ } else {
+ release_sys_sendbuf(iocpd->socket);
+ if (setsockopt(iocpd->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) {
+ iocpdesc_fail(iocpd, WSAGetLastError(), "Internal connect failure (update context)");
+ } else {
+ pni_events_update(iocpd, PN_WRITABLE);
+ start_reading(iocpd);
+ }
+ }
+ pn_free(result);
+ return;
+}
+
+
+// === Async writes
+
+static bool write_in_progress(iocpdesc_t *iocpd)
+{
+ return pni_write_pipeline_size(iocpd->pipeline) != 0;
+}
+
+write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen)
+{
+ write_result_t *result = (write_result_t *) calloc(sizeof(write_result_t), 1);
+ if (result) {
+ result->base.type = IOCP_WRITE;
+ result->base.iocpd = iocpd;
+ result->buffer.start = buf;
+ result->buffer.size = buflen;
+ }
+ return result;
+}
+
+static int submit_write(write_result_t *result, const void *buf, size_t len)
+{
+ WSABUF wsabuf;
+ wsabuf.buf = (char *) buf;
+ wsabuf.len = len;
+ memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+ return WSASend(result->base.iocpd->socket, &wsabuf, 1, NULL, 0,
+ (LPOVERLAPPED) result, 0);
+}
+
+ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, bool *would_block, pn_error_t *error)
+{
+ if (len == 0) return 0;
+ *would_block = false;
+ if (is_listener(iocpd)) {
+ set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
+ return INVALID_SOCKET;
+ }
+ if (iocpd->closing) {
+ set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
+ return SOCKET_ERROR;
+ }
+ if (iocpd->write_closed) {
+ assert(pn_error_code(iocpd->error));
+ pn_error_copy(error, iocpd->error);
+ if (iocpd->iocp->iocp_trace)
+ iocp_log("write error: %s\n", pn_error_text(error));
+ return SOCKET_ERROR;
+ }
+ if (len == 0) return 0;
+ if (!(iocpd->events & PN_WRITABLE)) {
+ *would_block = true;
+ return SOCKET_ERROR;
+ }
+
+ size_t written = 0;
+ size_t requested = len;
+ const char *outgoing = (const char *) buf;
+ size_t available = pni_write_pipeline_reserve(iocpd->pipeline, len);
+ if (!available) {
+ *would_block = true;
+ return SOCKET_ERROR;
+ }
+
+ for (size_t wr_count = 0; wr_count < available; wr_count++) {
+ write_result_t *result = pni_write_pipeline_next(iocpd->pipeline);
+ assert(result);
+ result->base.iocpd = iocpd;
+ ssize_t actual_len = pn_min(len, result->buffer.size);
+ result->requested = actual_len;
+ memmove((void *)result->buffer.start, outgoing, actual_len);
+ outgoing += actual_len;
+ written += actual_len;
+ len -= actual_len;
+
+ int werror = submit_write(result, result->buffer.start, actual_len);
+ if (werror && WSAGetLastError() != ERROR_IO_PENDING) {
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ iocpdesc_fail(iocpd, WSAGetLastError(), "overlapped send");
+ return SOCKET_ERROR;
+ }
+ iocpd->ops_in_progress++;
+ }
+
+ if (!pni_write_pipeline_writable(iocpd->pipeline))
+ pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE);
+ return written;
+}
+
+/*
+ * Note: iocp write completion is not "bytes on the wire", it is "peer
+ * acked the sent bytes". Completion can be seconds on a slow
+ * consuming peer.
+ */
+static void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status)
+{
+ iocpdesc_t *iocpd = result->base.iocpd;
+ if (iocpd->closing) {
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ if (!iocpd->write_closed && !write_in_progress(iocpd))
+ iocp_shutdown(iocpd);
+ reap_check(iocpd);
+ return;
+ }
+ if (status == 0 && xfer_count > 0) {
+ if (xfer_count != result->requested) {
+ // Is this recoverable? How to preserve order if multiple overlapped writes?
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ iocpdesc_fail(iocpd, WSA_OPERATION_ABORTED, "Partial overlapped write on socket");
+ return;
+ } else {
+ // Success.
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ if (pni_write_pipeline_writable(iocpd->pipeline))
+ pni_events_update(iocpd, iocpd->events | PN_WRITABLE);
+ return;
+ }
+ }
+ // Other error
+ pni_write_pipeline_return(iocpd->pipeline, result);
+ if (status == WSAECONNABORTED || status == WSAECONNRESET || status == WSAENOTCONN
+ || status == ERROR_NETNAME_DELETED) {
+ iocpd->write_closed = true;
+ iocpd->poll_error = true;
+ pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE);
+ pni_win32_error(iocpd->error, "Remote close or timeout", status);
+ } else {
+ iocpdesc_fail(iocpd, status, "IOCP async write error");
+ }
+}
+
+
+// === Async reads
+
+struct read_result_t {
+ iocp_result_t base;
+ size_t drain_count;
+ char unused_buf[1];
+};
+
+static read_result_t *read_result(iocpdesc_t *iocpd)
+{
+ read_result_t *result = (read_result_t *) calloc(sizeof(read_result_t), 1);
+ if (result) {
+ result->base.type = IOCP_READ;
+ result->base.iocpd = iocpd;
+ }
+ return result;
+}
+
+static void begin_zero_byte_read(iocpdesc_t *iocpd)
+{
+ if (iocpd->read_in_progress) return;
+ if (iocpd->read_closed) {
+ pni_events_update(iocpd, iocpd->events | PN_READABLE);
+ return;
+ }
+
+ read_result_t *result = iocpd->read_result;
+ memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+ DWORD flags = 0;
+ WSABUF wsabuf;
+ wsabuf.buf = result->unused_buf;
+ wsabuf.len = 0;
+ int rc = WSARecv(iocpd->socket, &wsabuf, 1, NULL, &flags,
+ &result->base.overlapped, 0);
+ if (rc && WSAGetLastError() != ERROR_IO_PENDING) {
+ iocpdesc_fail(iocpd, WSAGetLastError(), "IOCP read error");
+ return;
+ }
+ iocpd->ops_in_progress++;
+ iocpd->read_in_progress = true;
+}
+
+static void drain_until_closed(iocpdesc_t *iocpd) {
+ size_t max_drain = 16 * 1024;
+ char buf[512];
+ read_result_t *result = iocpd->read_result;
+ while (result->drain_count < max_drain) {
+ int rv = recv(iocpd->socket, buf, 512, 0);
+ if (rv > 0)
+ result->drain_count += rv;
+ else if (rv == 0) {
+ iocpd->read_closed = true;
+ return;
+ } else if (WSAGetLastError() == WSAEWOULDBLOCK) {
+ // wait a little longer
+ start_reading(iocpd);
+ return;
+ }
+ else
+ break;
+ }
+ // Graceful close indication unlikely, force the issue
+ if (iocpd->iocp->iocp_trace)
+ if (result->drain_count >= max_drain)
+ iocp_log("graceful close on reader abandoned (too many chars)\n");
+ else
+ iocp_log("graceful close on reader abandoned: %d\n", WSAGetLastError());
+ iocpd->read_closed = true;
+}
+
+
+static void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status)
+{
+ iocpdesc_t *iocpd = result->base.iocpd;
+ iocpd->read_in_progress = false;
+
+ if (iocpd->closing) {
+ // Application no longer reading, but we are looking for a zero length read
+ if (!iocpd->read_closed)
+ drain_until_closed(iocpd);
+ reap_check(iocpd);
+ return;
+ }
+
+ if (status == 0 && xfer_count == 0) {
+ // Success.
+ pni_events_update(iocpd, iocpd->events | PN_READABLE);
+ } else {
+ iocpdesc_fail(iocpd, status, "IOCP read complete error");
+ }
+}
+
+ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error)
+{
+ if (size == 0) return 0;
+ *would_block = false;
+ if (is_listener(iocpd)) {
+ set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
+ return SOCKET_ERROR;
+ }
+ if (iocpd->closing) {
+ // Previous call to pn_close()
+ set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
+ return SOCKET_ERROR;
+ }
+ if (iocpd->read_closed) {
+ if (pn_error_code(iocpd->error))
+ pn_error_copy(error, iocpd->error);
+ else
+ set_iocp_error_status(error, PN_ERR, WSAENOTCONN);
+ return SOCKET_ERROR;
+ }
+
+ int count = recv(iocpd->socket, (char *) buf, size, 0);
+ if (count > 0) {
+ pni_events_update(iocpd, iocpd->events & ~PN_READABLE);
+ begin_zero_byte_read(iocpd);
+ return (ssize_t) count;
+ } else if (count == 0) {
+ iocpd->read_closed = true;
+ return 0;
+ }
+ if (WSAGetLastError() == WSAEWOULDBLOCK)
+ *would_block = true;
+ else {
+ set_iocp_error_status(error, PN_ERR, WSAGetLastError());
+ iocpd->read_closed = true;
+ }
+ return SOCKET_ERROR;
+}
+
+static void start_reading(iocpdesc_t *iocpd)
+{
+ begin_zero_byte_read(iocpd);
+}
+
+
+// === The iocp descriptor
+
+static void pni_iocpdesc_initialize(void *object)
+{
+ iocpdesc_t *iocpd = (iocpdesc_t *) object;
+ memset(iocpd, 0, sizeof(iocpdesc_t));
+ iocpd->socket = INVALID_SOCKET;
+}
+
+static void pni_iocpdesc_finalize(void *object)
+{
+ iocpdesc_t *iocpd = (iocpdesc_t *) object;
+ pn_free(iocpd->acceptor);
+ pn_error_free(iocpd->error);
+ if (iocpd->pipeline)
+ if (write_in_progress(iocpd))
+ iocp_log("iocp descriptor write leak\n");
+ else
+ pn_free(iocpd->pipeline);
+ if (iocpd->read_in_progress)
+ iocp_log("iocp descriptor read leak\n");
+ else
+ free(iocpd->read_result);
+}
+
+static uintptr_t pni_iocpdesc_hashcode(void *object)
+{
+ iocpdesc_t *iocpd = (iocpdesc_t *) object;
+ return iocpd->socket;
+}
+
+#define pni_iocpdesc_compare NULL
+#define pni_iocpdesc_inspect NULL
+
+// Reference counted in the iocpdesc map, zombie_list, selector.
+static iocpdesc_t *pni_iocpdesc(pn_socket_t s)
+{
+ static const pn_cid_t CID_pni_iocpdesc = CID_pn_void;
+ static pn_class_t clazz = PN_CLASS(pni_iocpdesc);
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_class_new(&clazz, sizeof(iocpdesc_t));
+ assert(iocpd);
+ iocpd->socket = s;
+ return iocpd;
+}
+
+static bool is_listener_socket(pn_socket_t s)
+{
+ BOOL tval = false;
+ int tvalsz = sizeof(tval);
+ int code = getsockopt(s, SOL_SOCKET, SO_ACCEPTCONN, (char *)&tval, &tvalsz);
+ return code == 0 && tval;
+}
+
+iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) {
+ assert (s != INVALID_SOCKET);
+ assert(!pni_iocpdesc_map_get(iocp, s));
+ bool listening = is_listener_socket(s);
+ iocpdesc_t *iocpd = pni_iocpdesc(s);
+ iocpd->iocp = iocp;
+ if (iocpd) {
+ iocpd->external = external;
+ iocpd->error = pn_error();
+ if (listening) {
+ iocpd->acceptor = pni_acceptor(iocpd);
+ } else {
+ iocpd->pipeline = pni_write_pipeline(iocpd);
+ iocpd->read_result = read_result(iocpd);
+ }
+ pni_iocpdesc_map_push(iocpd);
+ }
+ return iocpd;
+}
+
+iocpdesc_t *pni_deadline_desc(iocp_t *iocp) {
+ // Non IO descriptor for selector deadlines. Do not add to iocpdesc map or
+ // zombie list. Selector responsible to free/decref object.
+ iocpdesc_t *iocpd = pni_iocpdesc(PN_INVALID_SOCKET);
+ iocpd->iocp = iocp;
+ iocpd->deadline_desc = true;
+ return iocpd;
+}
+
+// === Fast lookup of a socket's iocpdesc_t
+
+iocpdesc_t *pni_iocpdesc_map_get(iocp_t *iocp, pn_socket_t s) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_get(iocp->iocpdesc_map, s);
+ return iocpd;
+}
+
+void pni_iocpdesc_map_push(iocpdesc_t *iocpd) {
+ pn_hash_put(iocpd->iocp->iocpdesc_map, iocpd->socket, iocpd);
+ pn_decref(iocpd);
+ assert(pn_refcount(iocpd) == 1);
+}
+
+void pni_iocpdesc_map_del(iocp_t *iocp, pn_socket_t s) {
+ pn_hash_del(iocp->iocpdesc_map, (uintptr_t) s);
+}
+
+static void bind_to_completion_port(iocpdesc_t *iocpd)
+{
+ if (iocpd->bound) return;
+ if (!iocpd->iocp->completion_port) {
+ iocpdesc_fail(iocpd, WSAEINVAL, "Incomplete setup, no completion port.");
+ return;
+ }
+
+ if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0))
+ iocpd->bound = true;
+ else {
+ iocpdesc_fail(iocpd, GetLastError(), "IOCP socket setup.");
+ }
+}
+
+static void release_sys_sendbuf(SOCKET s)
+{
+ // Set the socket's send buffer size to zero.
+ int sz = 0;
+ int status = setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&sz, sizeof(int));
+ assert(status == 0);
+}
+
+void pni_iocpdesc_start(iocpdesc_t *iocpd)
+{
+ if (iocpd->bound) return;
+ bind_to_completion_port(iocpd);
+ if (is_listener(iocpd)) {
+ begin_accept(iocpd->acceptor, NULL);
+ }
+ else {
+ release_sys_sendbuf(iocpd->socket);
+ pni_events_update(iocpd, PN_WRITABLE);
+ start_reading(iocpd);
+ }
+}
+
+static void complete(iocp_result_t *result, bool success, DWORD num_transferred) {
+ result->iocpd->ops_in_progress--;
+ DWORD status = success ? 0 : GetLastError();
+
+ switch (result->type) {
+ case IOCP_ACCEPT:
+ complete_accept((accept_result_t *) result, status);
+ break;
+ case IOCP_CONNECT:
+ complete_connect((connect_result_t *) result, status);
+ break;
+ case IOCP_WRITE:
+ complete_write((write_result_t *) result, num_transferred, status);
+ break;
+ case IOCP_READ:
+ complete_read((read_result_t *) result, num_transferred, status);
+ break;
+ default:
+ assert(false);
+ }
+}
+
+void pni_iocp_drain_completions(iocp_t *iocp)
+{
+ while (true) {
+ DWORD timeout_ms = 0;
+ DWORD num_transferred = 0;
+ ULONG_PTR completion_key = 0;
+ OVERLAPPED *overlapped = 0;
+
+ bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
+ &completion_key, &overlapped, timeout_ms);
+ if (!overlapped)
+ return; // timed out
+ iocp_result_t *result = (iocp_result_t *) overlapped;
+ complete(result, good_op, num_transferred);
+ }
+}
+
+// returns: -1 on error, 0 on timeout, 1 successful completion
+int pni_iocp_wait_one(iocp_t *iocp, int timeout, pn_error_t *error) {
+ DWORD win_timeout = (timeout < 0) ? INFINITE : (DWORD) timeout;
+ DWORD num_transferred = 0;
+ ULONG_PTR completion_key = 0;
+ OVERLAPPED *overlapped = 0;
+
+ bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
+ &completion_key, &overlapped, win_timeout);
+ if (!overlapped)
+ if (GetLastError() == WAIT_TIMEOUT)
+ return 0;
+ else {
+ if (error)
+ pni_win32_error(error, "GetQueuedCompletionStatus", GetLastError());
+ return -1;
+ }
+
+ iocp_result_t *result = (iocp_result_t *) overlapped;
+ complete(result, good_op, num_transferred);
+ return 1;
+}
+
+// === Close (graceful and otherwise)
+
+// zombie_list is for sockets transitioning out of iocp on their way to zero ops_in_progress
+// and fully closed.
+
+static void zombie_list_add(iocpdesc_t *iocpd)
+{
+ assert(iocpd->closing);
+ if (!iocpd->ops_in_progress) {
+ // No need to make a zombie.
+ if (iocpd->socket != INVALID_SOCKET) {
+ closesocket(iocpd->socket);
+ iocpd->socket = INVALID_SOCKET;
+ iocpd->read_closed = true;
+ }
+ return;
+ }
+ // Allow 2 seconds for graceful shutdown before releasing socket resource.
+ iocpd->reap_time = pn_i_now() + 2000;
+ pn_list_add(iocpd->iocp->zombie_list, iocpd);
+}
+
+static void reap_check(iocpdesc_t *iocpd)
+{
+ if (iocpd->closing && !iocpd->ops_in_progress) {
+ if (iocpd->socket != INVALID_SOCKET) {
+ closesocket(iocpd->socket);
+ iocpd->socket = INVALID_SOCKET;
+ }
+ pn_list_remove(iocpd->iocp->zombie_list, iocpd);
+ // iocpd is decref'ed and possibly released
+ }
+}
+
+pn_timestamp_t pni_zombie_deadline(iocp_t *iocp)
+{
+ if (pn_list_size(iocp->zombie_list)) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, 0);
+ return iocpd->reap_time;
+ }
+ return 0;
+}
+
+void pni_zombie_check(iocp_t *iocp, pn_timestamp_t now)
+{
+ pn_list_t *zl = iocp->zombie_list;
+ // Look for stale zombies that should have been reaped by "now"
+ for (size_t idx = 0; idx < pn_list_size(zl); idx++) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(zl, idx);
+ if (iocpd->reap_time > now)
+ return;
+ if (iocpd->socket == INVALID_SOCKET)
+ continue;
+ assert(iocpd->ops_in_progress > 0);
+ if (iocp->iocp_trace)
+ iocp_log("async close: graceful close timeout exceeded\n");
+ closesocket(iocpd->socket);
+ iocpd->socket = INVALID_SOCKET;
+ iocpd->read_closed = true;
+ // outstanding ops should complete immediately now
+ }
+}
+
+static void drain_zombie_completions(iocp_t *iocp)
+{
+ // No more pn_selector_select() from App, but zombies still need care and feeding
+ // until their outstanding async actions complete.
+ pni_iocp_drain_completions(iocp);
+
+ // Discard any that have no pending async IO
+ size_t sz = pn_list_size(iocp->zombie_list);
+ for (size_t idx = 0; idx < sz;) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, idx);
+ if (!iocpd->ops_in_progress) {
+ pn_list_del(iocp->zombie_list, idx, 1);
+ sz--;
+ } else {
+ idx++;
+ }
+ }
+
+ unsigned shutdown_grace = 2000;
+ char *override = getenv("PN_SHUTDOWN_GRACE");
+ if (override) {
+ int grace = atoi(override);
+ if (grace > 0 && grace < 60000)
+ shutdown_grace = (unsigned) grace;
+ }
+ pn_timestamp_t now = pn_i_now();
+ pn_timestamp_t deadline = now + shutdown_grace;
+
+ while (pn_list_size(iocp->zombie_list)) {
+ if (now >= deadline)
+ break;
+ int rv = pni_iocp_wait_one(iocp, deadline - now, NULL);
+ if (rv < 0) {
+ iocp_log("unexpected IOCP failure on Proton IO shutdown %d\n", GetLastError());
+ break;
+ }
+ now = pn_i_now();
+ }
+ if (now >= deadline && pn_list_size(iocp->zombie_list) && iocp->iocp_trace)
+ // Should only happen if really slow TCP handshakes, i.e. total network failure
+ iocp_log("network failure on Proton shutdown\n");
+}
+
+static pn_list_t *iocp_map_close_all(iocp_t *iocp)
+{
+ // Zombify stragglers, i.e. no pn_close() from the application.
+ pn_list_t *externals = pn_list(PN_OBJECT, 0);
+ for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry;
+ entry = pn_hash_next(iocp->iocpdesc_map, entry)) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry);
+ // Just listeners first.
+ if (is_listener(iocpd)) {
+ if (iocpd->external) {
+ // Owned by application, just keep a temporary reference to it.
+ // iocp_result_t structs must not be free'd until completed or
+ // the completion port is closed.
+ if (iocpd->ops_in_progress)
+ pn_list_add(externals, iocpd);
+ pni_iocpdesc_map_del(iocp, iocpd->socket);
+ } else {
+ // Make it a zombie.
+ pni_iocp_begin_close(iocpd);
+ }
+ }
+ }
+ pni_iocp_drain_completions(iocp);
+
+ for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry;
+ entry = pn_hash_next(iocp->iocpdesc_map, entry)) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry);
+ if (iocpd->external) {
+ iocpd->read_closed = true; // Do not consume from read side
+ iocpd->write_closed = true; // Do not shutdown write side
+ if (iocpd->ops_in_progress)
+ pn_list_add(externals, iocpd);
+ pni_iocpdesc_map_del(iocp, iocpd->socket);
+ } else {
+ // Make it a zombie.
+ pni_iocp_begin_close(iocpd);
+ }
+ }
+ return externals;
+}
+
+static void zombie_list_hard_close_all(iocp_t *iocp)
+{
+ pni_iocp_drain_completions(iocp);
+ size_t zs = pn_list_size(iocp->zombie_list);
+ for (size_t i = 0; i < zs; i++) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
+ if (iocpd->socket != INVALID_SOCKET) {
+ closesocket(iocpd->socket);
+ iocpd->socket = INVALID_SOCKET;
+ iocpd->read_closed = true;
+ iocpd->write_closed = true;
+ }
+ }
+ pni_iocp_drain_completions(iocp);
+
+ // Zombies should be all gone. Do a sanity check.
+ zs = pn_list_size(iocp->zombie_list);
+ int remaining = 0;
+ int ops = 0;
+ for (size_t i = 0; i < zs; i++) {
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
+ remaining++;
+ ops += iocpd->ops_in_progress;
+ }
+ if (remaining)
+ iocp_log("Proton: %d unfinished close operations (ops count = %d)\n", remaining, ops);
+}
+
+static void iocp_shutdown(iocpdesc_t *iocpd)
+{
+ if (iocpd->socket == PN_INVALID_SOCKET)
+ return; // Hard close in progress
+ if (shutdown(iocpd->socket, SD_SEND)) {
+ int err = WSAGetLastError();
+ if (err != WSAECONNABORTED && err != WSAECONNRESET && err != WSAENOTCONN)
+ if (iocpd->iocp->iocp_trace)
+ iocp_log("socket shutdown failed %d\n", err);
+ }
+ iocpd->write_closed = true;
+}
+
+void pni_iocp_begin_close(iocpdesc_t *iocpd)
+{
+ assert (!iocpd->closing);
+ if (is_listener(iocpd)) {
+ // Listening socket is easy. Close the socket which will cancel async ops.
+ pn_socket_t old_sock = iocpd->socket;
+ iocpd->socket = INVALID_SOCKET;
+ iocpd->closing = true;
+ iocpd->read_closed = true;
+ iocpd->write_closed = true;
+ closesocket(old_sock);
+ // Pending accepts will now complete. Zombie can die when all consumed.
+ zombie_list_add(iocpd);
+ pni_iocpdesc_map_del(iocpd->iocp, old_sock); // may pn_free *iocpd
+ } else {
+ // Continue async operation looking for graceful close confirmation or timeout.
+ pn_socket_t old_sock = iocpd->socket;
+ iocpd->closing = true;
+ if (!iocpd->write_closed && !write_in_progress(iocpd))
+ iocp_shutdown(iocpd);
+ zombie_list_add(iocpd);
+ pni_iocpdesc_map_del(iocpd->iocp, old_sock); // may pn_free *iocpd
+ }
+}
+
+
+// === iocp_t
+
+#define pni_iocp_hashcode NULL
+#define pni_iocp_compare NULL
+#define pni_iocp_inspect NULL
+
+void pni_iocp_initialize(void *obj)
+{
+ iocp_t *iocp = (iocp_t *) obj;
+ memset(iocp, 0, sizeof(iocp_t));
+ pni_shared_pool_create(iocp);
+ iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+ assert(iocp->completion_port != NULL);
+ iocp->iocpdesc_map = pn_hash(PN_OBJECT, 0, 0.75);
+ iocp->zombie_list = pn_list(PN_OBJECT, 0);
+ iocp->iocp_trace = pn_env_bool("PN_TRACE_DRV");
+ iocp->selector = NULL;
+}
+
+void pni_iocp_finalize(void *obj)
+{
+ iocp_t *iocp = (iocp_t *) obj;
+ // Move sockets to closed state, except external sockets.
+ pn_list_t *externals = iocp_map_close_all(iocp);
+ // Now everything with ops_in_progress is in the zombie_list or the externals list.
+ assert(!pn_hash_head(iocp->iocpdesc_map));
+ pn_free(iocp->iocpdesc_map);
+
+ drain_zombie_completions(iocp); // Last chance for graceful close
+ zombie_list_hard_close_all(iocp);
+ CloseHandle(iocp->completion_port); // This cancels all our async ops
+ iocp->completion_port = NULL;
+
+ if (pn_list_size(externals) && iocp->iocp_trace)
+ iocp_log("%d external sockets not closed and removed from Proton IOCP control\n", pn_list_size(externals));
+
+ // Now safe to free everything that might be touched by a former async operation.
+ pn_free(externals);
+ pn_free(iocp->zombie_list);
+ pni_shared_pool_free(iocp);
+}
+
+iocp_t *pni_iocp()
+{
+ static const pn_cid_t CID_pni_iocp = CID_pn_void;
+ static const pn_class_t clazz = PN_CLASS(pni_iocp);
+ iocp_t *iocp = (iocp_t *) pn_class_new(&clazz, sizeof(iocp_t));
+ return iocp;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/io/windows/iocp.h
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/io/windows/iocp.h b/proton-c/src/reactor/io/windows/iocp.h
new file mode 100644
index 0000000..07f47be
--- /dev/null
+++ b/proton-c/src/reactor/io/windows/iocp.h
@@ -0,0 +1,136 @@
+#ifndef PROTON_SRC_IOCP_H
+#define PROTON_SRC_IOCP_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/import_export.h>
+#include <proton/selectable.h>
+#include <proton/type_compat.h>
+
+typedef struct pni_acceptor_t pni_acceptor_t;
+typedef struct write_result_t write_result_t;
+typedef struct read_result_t read_result_t;
+typedef struct write_pipeline_t write_pipeline_t;
+typedef struct iocpdesc_t iocpdesc_t;
+
+
+// One per pn_io_t.
+
+struct iocp_t {
+ HANDLE completion_port;
+ pn_hash_t *iocpdesc_map;
+ pn_list_t *zombie_list;
+ int shared_pool_size;
+ char *shared_pool_memory;
+ write_result_t **shared_results;
+ write_result_t **available_results;
+ size_t shared_available_count;
+ size_t writer_count;
+ int loopback_bufsize;
+ bool iocp_trace;
+ pn_selector_t *selector;
+};
+
+
+// One for each socket.
+// This iocpdesc_t structure is ref counted by the iocpdesc_map, zombie_list,
+// selector->iocp_descriptors list. It should remain ref counted in the
+// zombie_list until ops_in_progress == 0 or the completion port is closed.
+
+struct iocpdesc_t {
+ pn_socket_t socket;
+ iocp_t *iocp;
+ pni_acceptor_t *acceptor;
+ pn_error_t *error;
+ int ops_in_progress;
+ bool read_in_progress;
+ write_pipeline_t *pipeline;
+ read_result_t *read_result;
+ bool external; // true if socket set up outside Proton
+ bool bound; // associted with the completion port
+ bool closing; // pn_close called by application
+ bool read_closed; // EOF or read error
+ bool write_closed; // shutdown sent or write error
+ bool poll_error; // flag posix-like POLLERR/POLLHUP/POLLNVAL
+ bool deadline_desc; // Socket-less deadline descriptor for selectors
+ pn_selector_t *selector;
+ pn_selectable_t *selectable;
+ int events;
+ int interests;
+ pn_timestamp_t deadline;
+ iocpdesc_t *triggered_list_next;
+ iocpdesc_t *triggered_list_prev;
+ iocpdesc_t *deadlines_next;
+ iocpdesc_t *deadlines_prev;
+ pn_timestamp_t reap_time;;
+};
+
+typedef enum { IOCP_ACCEPT, IOCP_CONNECT, IOCP_READ, IOCP_WRITE } iocp_type_t;
+
+typedef struct {
+ OVERLAPPED overlapped;
+ iocp_type_t type;
+ iocpdesc_t *iocpd;
+ HRESULT status;
+} iocp_result_t;
+
+struct write_result_t {
+ iocp_result_t base;
+ size_t requested;
+ bool in_use;
+ pn_bytes_t buffer;
+};
+
+iocpdesc_t *pni_iocpdesc_create(iocp_t *, pn_socket_t s, bool external);
+iocpdesc_t *pni_iocpdesc_map_get(iocp_t *, pn_socket_t s);
+iocpdesc_t *pni_deadline_desc(iocp_t *);
+void pni_iocpdesc_map_del(iocp_t *, pn_socket_t s);
+void pni_iocpdesc_map_push(iocpdesc_t *iocpd);
+void pni_iocpdesc_start(iocpdesc_t *iocpd);
+void pni_iocp_drain_completions(iocp_t *);
+int pni_iocp_wait_one(iocp_t *, int timeout, pn_error_t *);
+void pni_iocp_start_accepting(iocpdesc_t *iocpd);
+pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error);
+pn_socket_t pni_iocp_begin_connect(iocp_t *, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error);
+ssize_t pni_iocp_begin_write(iocpdesc_t *, const void *, size_t, bool *, pn_error_t *);
+ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error);
+void pni_iocp_begin_close(iocpdesc_t *iocpd);
+iocp_t *pni_iocp();
+
+void pni_events_update(iocpdesc_t *iocpd, int events);
+write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen);
+write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd);
+size_t pni_write_pipeline_size(write_pipeline_t *);
+bool pni_write_pipeline_writable(write_pipeline_t *);
+void pni_write_pipeline_return(write_pipeline_t *, write_result_t *);
+size_t pni_write_pipeline_reserve(write_pipeline_t *, size_t);
+write_result_t *pni_write_pipeline_next(write_pipeline_t *);
+void pni_shared_pool_create(iocp_t *);
+void pni_shared_pool_free(iocp_t *);
+void pni_zombie_check(iocp_t *, pn_timestamp_t);
+pn_timestamp_t pni_zombie_deadline(iocp_t *);
+
+pn_selector_t *pni_selector_create(iocp_t *iocp);
+
+int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code);
+
+#endif /* iocp.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/io/windows/selector.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/io/windows/selector.c b/proton-c/src/reactor/io/windows/selector.c
new file mode 100644
index 0000000..15da73b
--- /dev/null
+++ b/proton-c/src/reactor/io/windows/selector.c
@@ -0,0 +1,384 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <Ws2tcpip.h>
+
+#include "reactor/io.h"
+#include "reactor/selectable.h"
+#include "reactor/selector.h"
+
+#include "iocp.h"
+#include "platform/platform.h"
+#include "core/util.h"
+
+#include <proton/object.h>
+#include <proton/error.h>
+#include <assert.h>
+
+static void interests_update(iocpdesc_t *iocpd, int interests);
+static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t t);
+
+struct pn_selector_t {
+ iocp_t *iocp;
+ pn_list_t *selectables;
+ pn_list_t *iocp_descriptors;
+ size_t current;
+ iocpdesc_t *current_triggered;
+ pn_timestamp_t awoken;
+ pn_error_t *error;
+ iocpdesc_t *triggered_list_head;
+ iocpdesc_t *triggered_list_tail;
+ iocpdesc_t *deadlines_head;
+ iocpdesc_t *deadlines_tail;
+};
+
+void pn_selector_initialize(void *obj)
+{
+ pn_selector_t *selector = (pn_selector_t *) obj;
+ selector->iocp = NULL;
+ selector->selectables = pn_list(PN_WEAKREF, 0);
+ selector->iocp_descriptors = pn_list(PN_OBJECT, 0);
+ selector->current = 0;
+ selector->current_triggered = NULL;
+ selector->awoken = 0;
+ selector->error = pn_error();
+ selector->triggered_list_head = NULL;
+ selector->triggered_list_tail = NULL;
+ selector->deadlines_head = NULL;
+ selector->deadlines_tail = NULL;
+}
+
+void pn_selector_finalize(void *obj)
+{
+ pn_selector_t *selector = (pn_selector_t *) obj;
+ pn_free(selector->selectables);
+ pn_free(selector->iocp_descriptors);
+ pn_error_free(selector->error);
+ selector->iocp->selector = NULL;
+}
+
+#define pn_selector_hashcode NULL
+#define pn_selector_compare NULL
+#define pn_selector_inspect NULL
+
+pn_selector_t *pni_selector()
+{
+ static const pn_class_t clazz = PN_CLASS(pn_selector);
+ pn_selector_t *selector = (pn_selector_t *) pn_class_new(&clazz, sizeof(pn_selector_t));
+ return selector;
+}
+
+pn_selector_t *pni_selector_create(iocp_t *iocp)
+{
+ pn_selector_t *selector = pni_selector();
+ selector->iocp = iocp;
+ return selector;
+}
+
+void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable)
+{
+ assert(selector);
+ assert(selectable);
+ assert(pni_selectable_get_index(selectable) < 0);
+ pn_socket_t sock = pn_selectable_get_fd(selectable);
+ iocpdesc_t *iocpd = NULL;
+
+ if (pni_selectable_get_index(selectable) < 0) {
+ pn_list_add(selector->selectables, selectable);
+ pn_list_add(selector->iocp_descriptors, NULL);
+ size_t size = pn_list_size(selector->selectables);
+ pni_selectable_set_index(selectable, size - 1);
+ }
+
+ pn_selector_update(selector, selectable);
+}
+
+void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable)
+{
+ // A selectable's fd may switch from PN_INVALID_SCOKET to a working socket between
+ // update calls. If a selectable without a valid socket has a deadline, we need
+ // a dummy iocpdesc_t to participate in the deadlines list.
+ int idx = pni_selectable_get_index(selectable);
+ assert(idx >= 0);
+ pn_timestamp_t deadline = pn_selectable_get_deadline(selectable);
+ pn_socket_t sock = pn_selectable_get_fd(selectable);
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx);
+
+ if (!iocpd && deadline && sock == PN_INVALID_SOCKET) {
+ iocpd = pni_deadline_desc(selector->iocp);
+ assert(iocpd);
+ pn_list_set(selector->iocp_descriptors, idx, iocpd);
+ pn_decref(iocpd); // life is solely tied to iocp_descriptors list
+ iocpd->selector = selector;
+ iocpd->selectable = selectable;
+ }
+ else if (iocpd && iocpd->deadline_desc && sock != PN_INVALID_SOCKET) {
+ // Switching to a real socket. Stop using a deadline descriptor.
+ deadlines_update(iocpd, 0);
+ // decref descriptor in list and pick up a real iocpd below
+ pn_list_set(selector->iocp_descriptors, idx, NULL);
+ iocpd = NULL;
+ }
+
+ // The selectables socket may be set long after it has been added
+ if (!iocpd && sock != PN_INVALID_SOCKET) {
+ iocpd = pni_iocpdesc_map_get(selector->iocp, sock);
+ if (!iocpd) {
+ // Socket created outside proton. Hook it up to iocp.
+ iocpd = pni_iocpdesc_create(selector->iocp, sock, true);
+ assert(iocpd);
+ if (iocpd)
+ pni_iocpdesc_start(iocpd);
+ }
+ if (iocpd) {
+ pn_list_set(selector->iocp_descriptors, idx, iocpd);
+ iocpd->selector = selector;
+ iocpd->selectable = selectable;
+ }
+ }
+
+ if (iocpd) {
+ assert(sock == iocpd->socket || iocpd->closing);
+ int interests = PN_ERROR; // Always
+ if (pn_selectable_is_reading(selectable)) {
+ interests |= PN_READABLE;
+ }
+ if (pn_selectable_is_writing(selectable)) {
+ interests |= PN_WRITABLE;
+ }
+ if (deadline) {
+ interests |= PN_EXPIRED;
+ }
+ interests_update(iocpd, interests);
+ deadlines_update(iocpd, deadline);
+ }
+}
+
+void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable)
+{
+ assert(selector);
+ assert(selectable);
+
+ int idx = pni_selectable_get_index(selectable);
+ assert(idx >= 0);
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx);
+ if (iocpd) {
+ if (selector->current_triggered == iocpd)
+ selector->current_triggered = iocpd->triggered_list_next;
+ interests_update(iocpd, 0);
+ deadlines_update(iocpd, 0);
+ assert(selector->triggered_list_head != iocpd && !iocpd->triggered_list_prev);
+ assert(selector->deadlines_head != iocpd && !iocpd->deadlines_prev);
+ iocpd->selector = NULL;
+ iocpd->selectable = NULL;
+ }
+ pn_list_del(selector->selectables, idx, 1);
+ pn_list_del(selector->iocp_descriptors, idx, 1);
+ size_t size = pn_list_size(selector->selectables);
+ for (size_t i = idx; i < size; i++) {
+ pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i);
+ pni_selectable_set_index(sel, i);
+ }
+
+ pni_selectable_set_index(selectable, -1);
+
+ if (selector->current >= (size_t) idx) {
+ selector->current--;
+ }
+}
+
+size_t pn_selector_size(pn_selector_t *selector) {
+ assert(selector);
+ return pn_list_size(selector->selectables);
+}
+
+int pn_selector_select(pn_selector_t *selector, int timeout)
+{
+ assert(selector);
+ pn_error_clear(selector->error);
+ pn_timestamp_t deadline = 0;
+ pn_timestamp_t now = pn_i_now();
+
+ if (timeout) {
+ if (selector->deadlines_head)
+ deadline = selector->deadlines_head->deadline;
+ }
+ if (deadline) {
+ int64_t delta = deadline - now;
+ if (delta < 0) {
+ delta = 0;
+ }
+ if (timeout < 0)
+ timeout = delta;
+ else if (timeout > delta)
+ timeout = delta;
+ }
+ deadline = (timeout >= 0) ? now + timeout : 0;
+
+ // Process all currently available completions, even if matched events available
+ pni_iocp_drain_completions(selector->iocp);
+ pni_zombie_check(selector->iocp, now);
+ // Loop until an interested event is matched, or until deadline
+ while (true) {
+ if (selector->triggered_list_head)
+ break;
+ if (deadline && deadline <= now)
+ break;
+ pn_timestamp_t completion_deadline = deadline;
+ pn_timestamp_t zd = pni_zombie_deadline(selector->iocp);
+ if (zd)
+ completion_deadline = completion_deadline ? pn_min(zd, completion_deadline) : zd;
+
+ int completion_timeout = (!completion_deadline) ? -1 : completion_deadline - now;
+ int rv = pni_iocp_wait_one(selector->iocp, completion_timeout, selector->error);
+ if (rv < 0)
+ return pn_error_code(selector->error);
+
+ now = pn_i_now();
+ if (zd && zd <= now) {
+ pni_zombie_check(selector->iocp, now);
+ }
+ }
+
+ selector->current = 0;
+ selector->awoken = now;
+ for (iocpdesc_t *iocpd = selector->deadlines_head; iocpd; iocpd = iocpd->deadlines_next) {
+ if (iocpd->deadline <= now)
+ pni_events_update(iocpd, iocpd->events | PN_EXPIRED);
+ else
+ break;
+ }
+ selector->current_triggered = selector->triggered_list_head;
+ return pn_error_code(selector->error);
+}
+
+pn_selectable_t *pn_selector_next(pn_selector_t *selector, int *events)
+{
+ if (selector->current_triggered) {
+ iocpdesc_t *iocpd = selector->current_triggered;
+ *events = iocpd->interests & iocpd->events;
+ selector->current_triggered = iocpd->triggered_list_next;
+ return iocpd->selectable;
+ }
+ return NULL;
+}
+
+void pn_selector_free(pn_selector_t *selector)
+{
+ assert(selector);
+ pn_free(selector);
+}
+
+
+static void triggered_list_add(pn_selector_t *selector, iocpdesc_t *iocpd)
+{
+ if (iocpd->triggered_list_prev || selector->triggered_list_head == iocpd)
+ return; // already in list
+ LL_ADD(selector, triggered_list, iocpd);
+}
+
+static void triggered_list_remove(pn_selector_t *selector, iocpdesc_t *iocpd)
+{
+ if (!iocpd->triggered_list_prev && selector->triggered_list_head != iocpd)
+ return; // not in list
+ LL_REMOVE(selector, triggered_list, iocpd);
+ iocpd->triggered_list_prev = NULL;
+ iocpd->triggered_list_next = NULL;
+}
+
+
+void pni_events_update(iocpdesc_t *iocpd, int events)
+{
+ // If set, a poll error is permanent
+ if (iocpd->poll_error)
+ events |= PN_ERROR;
+ if (iocpd->events == events)
+ return;
+ iocpd->events = events;
+ if (iocpd->selector) {
+ if (iocpd->events & iocpd->interests)
+ triggered_list_add(iocpd->selector, iocpd);
+ else
+ triggered_list_remove(iocpd->selector, iocpd);
+ }
+}
+
+static void interests_update(iocpdesc_t *iocpd, int interests)
+{
+ int old_interests = iocpd->interests;
+ if (old_interests == interests)
+ return;
+ iocpd->interests = interests;
+ if (iocpd->selector) {
+ if (iocpd->events & iocpd->interests)
+ triggered_list_add(iocpd->selector, iocpd);
+ else
+ triggered_list_remove(iocpd->selector, iocpd);
+ }
+}
+
+static void deadlines_remove(pn_selector_t *selector, iocpdesc_t *iocpd)
+{
+ if (!iocpd->deadlines_prev && selector->deadlines_head != iocpd)
+ return; // not in list
+ LL_REMOVE(selector, deadlines, iocpd);
+ iocpd->deadlines_prev = NULL;
+ iocpd->deadlines_next = NULL;
+}
+
+
+static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t deadline)
+{
+ if (deadline == iocpd->deadline)
+ return;
+
+ iocpd->deadline = deadline;
+ pn_selector_t *selector = iocpd->selector;
+ if (!deadline) {
+ deadlines_remove(selector, iocpd);
+ pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED);
+ } else {
+ if (iocpd->deadlines_prev || selector->deadlines_head == iocpd) {
+ deadlines_remove(selector, iocpd);
+ pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED);
+ }
+ iocpdesc_t *dl_iocpd = LL_HEAD(selector, deadlines);
+ while (dl_iocpd && dl_iocpd->deadline <= deadline)
+ dl_iocpd = dl_iocpd->deadlines_next;
+ if (dl_iocpd) {
+ // insert
+ iocpd->deadlines_prev = dl_iocpd->deadlines_prev;
+ iocpd->deadlines_next = dl_iocpd;
+ dl_iocpd->deadlines_prev = iocpd;
+ if (selector->deadlines_head == dl_iocpd)
+ selector->deadlines_head = iocpd;
+ } else {
+ LL_ADD(selector, deadlines, iocpd); // append
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/io/windows/write_pipeline.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/io/windows/write_pipeline.c b/proton-c/src/reactor/io/windows/write_pipeline.c
new file mode 100644
index 0000000..905c7f6
--- /dev/null
+++ b/proton-c/src/reactor/io/windows/write_pipeline.c
@@ -0,0 +1,314 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * A simple write buffer pool. Each socket has a dedicated "primary"
+ * buffer and can borrow from a shared pool with limited size tuning.
+ * Could enhance e.g. with separate pools per network interface and fancier
+ * memory tuning based on interface speed, system resources, and
+ * number of connections, etc.
+ */
+
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <Ws2tcpip.h>
+
+#include "reactor/io.h"
+#include "reactor/selector.h"
+#include "reactor/selectable.h"
+
+#include "iocp.h"
+#include "core/util.h"
+
+#include <proton/error.h>
+#include <proton/object.h>
+
+#include <assert.h>
+
+// Max overlapped writes per socket
+#define IOCP_MAX_OWRITES 16
+// Write buffer size
+#define IOCP_WBUFSIZE 16384
+
+static void pipeline_log(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fflush(stderr);
+}
+
+void pni_shared_pool_create(iocp_t *iocp)
+{
+ // TODO: more pools (or larger one) when using multiple non-loopback interfaces
+ iocp->shared_pool_size = 16;
+ char *env = getenv("PNI_WRITE_BUFFERS"); // Internal: for debugging
+ if (env) {
+ int sz = atoi(env);
+ if (sz >= 0 && sz < 256) {
+ iocp->shared_pool_size = sz;
+ }
+ }
+ iocp->loopback_bufsize = 0;
+ env = getenv("PNI_LB_BUFSIZE"); // Internal: for debugging
+ if (env) {
+ int sz = atoi(env);
+ if (sz >= 0 && sz <= 128 * 1024) {
+ iocp->loopback_bufsize = sz;
+ }
+ }
+
+ if (iocp->shared_pool_size) {
+ iocp->shared_pool_memory = (char *) VirtualAlloc(NULL, IOCP_WBUFSIZE * iocp->shared_pool_size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
+ HRESULT status = GetLastError();
+ if (!iocp->shared_pool_memory) {
+ perror("Proton write buffer pool allocation failure\n");
+ iocp->shared_pool_size = 0;
+ iocp->shared_available_count = 0;
+ return;
+ }
+
+ iocp->shared_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
+ iocp->available_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
+ iocp->shared_available_count = iocp->shared_pool_size;
+ char *mem = iocp->shared_pool_memory;
+ for (int i = 0; i < iocp->shared_pool_size; i++) {
+ iocp->shared_results[i] = iocp->available_results[i] = pni_write_result(NULL, mem, IOCP_WBUFSIZE);
+ mem += IOCP_WBUFSIZE;
+ }
+ }
+}
+
+void pni_shared_pool_free(iocp_t *iocp)
+{
+ for (int i = 0; i < iocp->shared_pool_size; i++) {
+ write_result_t *result = iocp->shared_results[i];
+ if (result->in_use)
+ pipeline_log("Proton buffer pool leak\n");
+ else
+ free(result);
+ }
+ if (iocp->shared_pool_size) {
+ free(iocp->shared_results);
+ free(iocp->available_results);
+ if (iocp->shared_pool_memory) {
+ if (!VirtualFree(iocp->shared_pool_memory, 0, MEM_RELEASE)) {
+ perror("write buffers release failed");
+ }
+ iocp->shared_pool_memory = NULL;
+ }
+ }
+}
+
+static void shared_pool_push(write_result_t *result)
+{
+ iocp_t *iocp = result->base.iocpd->iocp;
+ assert(iocp->shared_available_count < iocp->shared_pool_size);
+ iocp->available_results[iocp->shared_available_count++] = result;
+}
+
+static write_result_t *shared_pool_pop(iocp_t *iocp)
+{
+ return iocp->shared_available_count ? iocp->available_results[--iocp->shared_available_count] : NULL;
+}
+
+struct write_pipeline_t {
+ iocpdesc_t *iocpd;
+ size_t pending_count;
+ write_result_t *primary;
+ size_t reserved_count;
+ size_t next_primary_index;
+ size_t depth;
+ bool is_writer;
+};
+
+#define write_pipeline_compare NULL
+#define write_pipeline_inspect NULL
+#define write_pipeline_hashcode NULL
+
+static void write_pipeline_initialize(void *object)
+{
+ write_pipeline_t *pl = (write_pipeline_t *) object;
+ pl->pending_count = 0;
+ const char *pribuf = (const char *) malloc(IOCP_WBUFSIZE);
+ pl->primary = pni_write_result(NULL, pribuf, IOCP_WBUFSIZE);
+ pl->depth = 0;
+ pl->is_writer = false;
+}
+
+static void write_pipeline_finalize(void *object)
+{
+ write_pipeline_t *pl = (write_pipeline_t *) object;
+ free((void *)pl->primary->buffer.start);
+ free(pl->primary);
+}
+
+write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd)
+{
+ static const pn_cid_t CID_write_pipeline = CID_pn_void;
+ static const pn_class_t clazz = PN_CLASS(write_pipeline);
+ write_pipeline_t *pipeline = (write_pipeline_t *) pn_class_new(&clazz, sizeof(write_pipeline_t));
+ pipeline->iocpd = iocpd;
+ pipeline->primary->base.iocpd = iocpd;
+ return pipeline;
+}
+
+static void confirm_as_writer(write_pipeline_t *pl)
+{
+ if (!pl->is_writer) {
+ iocp_t *iocp = pl->iocpd->iocp;
+ iocp->writer_count++;
+ pl->is_writer = true;
+ }
+}
+
+static void remove_as_writer(write_pipeline_t *pl)
+{
+ if (!pl->is_writer)
+ return;
+ iocp_t *iocp = pl->iocpd->iocp;
+ assert(iocp->writer_count);
+ pl->is_writer = false;
+ iocp->writer_count--;
+}
+
+/*
+ * Optimal depth will depend on properties of the NIC, server, and driver. For now,
+ * just distinguish between loopback interfaces and the rest. Optimizations in the
+ * loopback stack allow decent performance with depth 1 and actually cause major
+ * performance hiccups if set to large values.
+ */
+static void set_depth(write_pipeline_t *pl)
+{
+ pl->depth = 1;
+ sockaddr_storage sa;
+ socklen_t salen = sizeof(sa);
+ char buf[INET6_ADDRSTRLEN];
+ DWORD buflen = sizeof(buf);
+
+ if (getsockname(pl->iocpd->socket,(sockaddr*) &sa, &salen) == 0 &&
+ getnameinfo((sockaddr*) &sa, salen, buf, buflen, NULL, 0, NI_NUMERICHOST) == 0) {
+ if ((sa.ss_family == AF_INET6 && strcmp(buf, "::1")) ||
+ (sa.ss_family == AF_INET && strncmp(buf, "127.", 4))) {
+ // not loopback
+ pl->depth = IOCP_MAX_OWRITES;
+ } else {
+ iocp_t *iocp = pl->iocpd->iocp;
+ if (iocp->loopback_bufsize) {
+ const char *p = (const char *) realloc((void *) pl->primary->buffer.start, iocp->loopback_bufsize);
+ if (p) {
+ pl->primary->buffer.start = p;
+ pl->primary->buffer.size = iocp->loopback_bufsize;
+ }
+ }
+ }
+ }
+}
+
+// Reserve as many buffers as possible for count bytes.
+size_t pni_write_pipeline_reserve(write_pipeline_t *pl, size_t count)
+{
+ if (pl->primary->in_use)
+ return 0; // I.e. io->wouldblock
+ if (!pl->depth)
+ set_depth(pl);
+ if (pl->depth == 1) {
+ // always use the primary
+ pl->reserved_count = 1;
+ pl->next_primary_index = 0;
+ return 1;
+ }
+
+ iocp_t *iocp = pl->iocpd->iocp;
+ confirm_as_writer(pl);
+ size_t wanted = (count / IOCP_WBUFSIZE);
+ if (count % IOCP_WBUFSIZE)
+ wanted++;
+ size_t pending = pl->pending_count;
+ assert(pending < pl->depth);
+ size_t bufs = pn_min(wanted, pl->depth - pending);
+ // Can draw from shared pool or the primary... but share with others.
+ size_t writers = iocp->writer_count;
+ size_t shared_count = (iocp->shared_available_count + writers - 1) / writers;
+ bufs = pn_min(bufs, shared_count + 1);
+ pl->reserved_count = pending + bufs;
+
+ if (bufs == wanted &&
+ pl->reserved_count < (pl->depth / 2) &&
+ iocp->shared_available_count > (2 * writers + bufs)) {
+ // No shortage: keep the primary as spare for future use
+ pl->next_primary_index = pl->reserved_count;
+ } else if (bufs == 1) {
+ pl->next_primary_index = pending;
+ } else {
+ // let approx 1/3 drain before replenishing
+ pl->next_primary_index = ((pl->reserved_count + 2) / 3) - 1;
+ if (pl->next_primary_index < pending)
+ pl->next_primary_index = pending;
+ }
+ return bufs;
+}
+
+write_result_t *pni_write_pipeline_next(write_pipeline_t *pl)
+{
+ size_t sz = pl->pending_count;
+ if (sz >= pl->reserved_count)
+ return NULL;
+ write_result_t *result;
+ if (sz == pl->next_primary_index) {
+ result = pl->primary;
+ } else {
+ assert(pl->iocpd->iocp->shared_available_count > 0);
+ result = shared_pool_pop(pl->iocpd->iocp);
+ }
+
+ result->in_use = true;
+ pl->pending_count++;
+ return result;
+}
+
+void pni_write_pipeline_return(write_pipeline_t *pl, write_result_t *result)
+{
+ result->in_use = false;
+ pl->pending_count--;
+ pl->reserved_count = 0;
+ if (result != pl->primary)
+ shared_pool_push(result);
+ if (pl->pending_count == 0)
+ remove_as_writer(pl);
+}
+
+bool pni_write_pipeline_writable(write_pipeline_t *pl)
+{
+ // Only writable if not full and we can guarantee a buffer:
+ return pl->pending_count < pl->depth && !pl->primary->in_use;
+}
+
+size_t pni_write_pipeline_size(write_pipeline_t *pl)
+{
+ return pl->pending_count;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index a83a881..abf5d1e 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -19,23 +19,24 @@
*
*/
+#include "io.h"
+#include "reactor.h"
+#include "selectable.h"
+#include "platform/platform.h" // pn_i_now
+
#include <proton/object.h>
#include <proton/handlers.h>
-#include <proton/io.h>
#include <proton/event.h>
#include <proton/transport.h>
#include <proton/connection.h>
#include <proton/session.h>
#include <proton/link.h>
#include <proton/delivery.h>
+
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
-#include "reactor.h"
-#include "selectable.h"
-#include "platform.h"
-
struct pn_reactor_t {
pn_record_t *attachments;
pn_io_t *io;
@@ -164,7 +165,7 @@ void pn_reactor_set_handler(pn_reactor_t *reactor, pn_handler_t *handler) {
pn_incref(reactor->handler);
}
-pn_io_t *pn_reactor_io(pn_reactor_t *reactor) {
+pn_io_t *pni_reactor_io(pn_reactor_t *reactor) {
assert(reactor);
return reactor->io;
}
@@ -389,6 +390,16 @@ bool pn_reactor_quiesced(pn_reactor_t *reactor) {
return pn_event_type(event) == PN_REACTOR_QUIESCED;
}
+pn_handler_t *pn_event_root(pn_event_t *event)
+{
+ pn_handler_t *h = pn_record_get_handler(pn_event_attachments(event));
+ return h;
+}
+
+static void pni_event_set_root(pn_event_t *event, pn_handler_t *handler) {
+ pn_record_set_handler(pn_event_attachments(event), handler);
+}
+
bool pn_reactor_process(pn_reactor_t *reactor) {
assert(reactor);
pn_reactor_mark(reactor);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/reactor.h
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.h b/proton-c/src/reactor/reactor.h
index 461e8b3..bfb397c 100644
--- a/proton-c/src/reactor/reactor.h
+++ b/proton-c/src/reactor/reactor.h
@@ -26,9 +26,9 @@
#include <proton/url.h>
void pni_record_init_reactor(pn_record_t *record, pn_reactor_t *reactor);
-void pni_event_set_root(pn_event_t *event, pn_handler_t *handler);
void pni_reactor_set_connection_peer_address(pn_connection_t *connection,
const char *host,
const char *port);
+pn_io_t *pni_reactor_io(pn_reactor_t *reactor);
#endif /* src/reactor.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/reactor/selectable.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/selectable.c b/proton-c/src/reactor/selectable.c
new file mode 100644
index 0000000..b42ad1f
--- /dev/null
+++ b/proton-c/src/reactor/selectable.c
@@ -0,0 +1,300 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "selectable.h"
+
+#include <proton/error.h>
+
+#include "io.h"
+
+#include <assert.h>
+#include <stdlib.h>
+
+pn_selectables_t *pn_selectables(void)
+{
+ return pn_iterator();
+}
+
+pn_selectable_t *pn_selectables_next(pn_selectables_t *selectables)
+{
+ return (pn_selectable_t *) pn_iterator_next(selectables);
+}
+
+void pn_selectables_free(pn_selectables_t *selectables)
+{
+ pn_free(selectables);
+}
+
+struct pn_selectable_t {
+ pn_socket_t fd;
+ int index;
+ pn_record_t *attachments;
+ void (*readable)(pn_selectable_t *);
+ void (*writable)(pn_selectable_t *);
+ void (*error)(pn_selectable_t *);
+ void (*expired)(pn_selectable_t *);
+ void (*release) (pn_selectable_t *);
+ void (*finalize)(pn_selectable_t *);
+ pn_collector_t *collector;
+ pn_timestamp_t deadline;
+ bool reading;
+ bool writing;
+ bool registered;
+ bool terminal;
+};
+
+void pn_selectable_initialize(pn_selectable_t *sel)
+{
+ sel->fd = PN_INVALID_SOCKET;
+ sel->index = -1;
+ sel->attachments = pn_record();
+ sel->readable = NULL;
+ sel->writable = NULL;
+ sel->error = NULL;
+ sel->expired = NULL;
+ sel->release = NULL;
+ sel->finalize = NULL;
+ sel->collector = NULL;
+ sel->deadline = 0;
+ sel->reading = false;
+ sel->writing = false;
+ sel->registered = false;
+ sel->terminal = false;
+}
+
+void pn_selectable_finalize(pn_selectable_t *sel)
+{
+ if (sel->finalize) {
+ sel->finalize(sel);
+ }
+ pn_decref(sel->attachments);
+ pn_decref(sel->collector);
+}
+
+#define pn_selectable_hashcode NULL
+#define pn_selectable_inspect NULL
+#define pn_selectable_compare NULL
+
+PN_CLASSDEF(pn_selectable)
+
+pn_selectable_t *pn_selectable(void)
+{
+ return pn_selectable_new();
+}
+
+bool pn_selectable_is_reading(pn_selectable_t *sel) {
+ assert(sel);
+ return sel->reading;
+}
+
+void pn_selectable_set_reading(pn_selectable_t *sel, bool reading) {
+ assert(sel);
+ sel->reading = reading;
+}
+
+bool pn_selectable_is_writing(pn_selectable_t *sel) {
+ assert(sel);
+ return sel->writing;
+}
+
+void pn_selectable_set_writing(pn_selectable_t *sel, bool writing) {
+ assert(sel);
+ sel->writing = writing;
+}
+
+pn_timestamp_t pn_selectable_get_deadline(pn_selectable_t *sel) {
+ assert(sel);
+ return sel->deadline;
+}
+
+void pn_selectable_set_deadline(pn_selectable_t *sel, pn_timestamp_t deadline) {
+ assert(sel);
+ sel->deadline = deadline;
+}
+
+void pn_selectable_on_readable(pn_selectable_t *sel, void (*readable)(pn_selectable_t *)) {
+ assert(sel);
+ sel->readable = readable;
+}
+
+void pn_selectable_on_writable(pn_selectable_t *sel, void (*writable)(pn_selectable_t *)) {
+ assert(sel);
+ sel->writable = writable;
+}
+
+void pn_selectable_on_error(pn_selectable_t *sel, void (*error)(pn_selectable_t *)) {
+ assert(sel);
+ sel->error = error;
+}
+
+void pn_selectable_on_expired(pn_selectable_t *sel, void (*expired)(pn_selectable_t *)) {
+ assert(sel);
+ sel->expired = expired;
+}
+
+void pn_selectable_on_release(pn_selectable_t *sel, void (*release)(pn_selectable_t *)) {
+ assert(sel);
+ sel->release = release;
+}
+
+void pn_selectable_on_finalize(pn_selectable_t *sel, void (*finalize)(pn_selectable_t *)) {
+ assert(sel);
+ sel->finalize = finalize;
+}
+
+pn_record_t *pn_selectable_attachments(pn_selectable_t *sel) {
+ return sel->attachments;
+}
+
+void *pni_selectable_get_context(pn_selectable_t *selectable)
+{
+ assert(selectable);
+ return pn_record_get(selectable->attachments, PN_LEGCTX);
+}
+
+void pni_selectable_set_context(pn_selectable_t *selectable, void *context)
+{
+ assert(selectable);
+ pn_record_set(selectable->attachments, PN_LEGCTX, context);
+}
+
+int pni_selectable_get_index(pn_selectable_t *selectable)
+{
+ assert(selectable);
+ return selectable->index;
+}
+
+void pni_selectable_set_index(pn_selectable_t *selectable, int index)
+{
+ assert(selectable);
+ selectable->index = index;
+}
+
+pn_socket_t pn_selectable_get_fd(pn_selectable_t *selectable)
+{
+ assert(selectable);
+ return selectable->fd;
+}
+
+void pn_selectable_set_fd(pn_selectable_t *selectable, pn_socket_t fd)
+{
+ assert(selectable);
+ selectable->fd = fd;
+}
+
+void pn_selectable_readable(pn_selectable_t *selectable)
+{
+ assert(selectable);
+ if (selectable->readable) {
+ selectable->readable(selectable);
+ }
+}
+
+void pn_selectable_writable(pn_selectable_t *selectable)
+{
+ assert(selectable);
+ if (selectable->writable) {
+ selectable->writable(selectable);
+ }
+}
+
+void pn_selectable_error(pn_selectable_t *selectable)
+{
+ assert(selectable);
+ if (selectable->error) {
+ selectable->error(selectable);
+ }
+}
+
+void pn_selectable_expired(pn_selectable_t *selectable)
+{
+ assert(selectable);
+ if (selectable->expired) {
+ selectable->expired(selectable);
+ }
+}
+
+bool pn_selectable_is_registered(pn_selectable_t *selectable)
+{
+ assert(selectable);
+ return selectable->registered;
+}
+
+void pn_selectable_set_registered(pn_selectable_t *selectable, bool registered)
+{
+ assert(selectable);
+ selectable->registered = registered;
+}
+
+bool pn_selectable_is_terminal(pn_selectable_t *selectable)
+{
+ assert(selectable);
+ return selectable->terminal;
+}
+
+void pn_selectable_terminate(pn_selectable_t *selectable)
+{
+ assert(selectable);
+ selectable->terminal = true;
+}
+
+void pn_selectable_release(pn_selectable_t *selectable)
+{
+ assert(selectable);
+ if (selectable->release) {
+ selectable->release(selectable);
+ }
+}
+
+void pn_selectable_free(pn_selectable_t *selectable)
+{
+ pn_decref(selectable);
+}
+
+static void pni_readable(pn_selectable_t *selectable) {
+ pn_collector_put(selectable->collector, PN_OBJECT, selectable, PN_SELECTABLE_READABLE);
+}
+
+static void pni_writable(pn_selectable_t *selectable) {
+ pn_collector_put(selectable->collector, PN_OBJECT, selectable, PN_SELECTABLE_WRITABLE);
+}
+
+static void pni_error(pn_selectable_t *selectable) {
+ pn_collector_put(selectable->collector, PN_OBJECT, selectable, PN_SELECTABLE_ERROR);
+}
+
+static void pni_expired(pn_selectable_t *selectable) {
+ pn_collector_put(selectable->collector, PN_OBJECT, selectable, PN_SELECTABLE_EXPIRED);
+}
+
+void pn_selectable_collect(pn_selectable_t *selectable, pn_collector_t *collector) {
+ assert(selectable);
+ pn_decref(selectable->collector);
+ selectable->collector = collector;
+ pn_incref(selectable->collector);
+
+ if (collector) {
+ pn_selectable_on_readable(selectable, pni_readable);
+ pn_selectable_on_writable(selectable, pni_writable);
+ pn_selectable_on_error(selectable, pni_error);
+ pn_selectable_on_expired(selectable, pni_expired);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org