You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jr...@apache.org on 2018/04/05 19:34:06 UTC
[31/51] [partial] qpid-proton git commit: PROTON-1728: Reorganize the
source tree
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/protocol.h.py
----------------------------------------------------------------------
diff --git a/c/src/protocol.h.py b/c/src/protocol.h.py
new file mode 100644
index 0000000..321cf64
--- /dev/null
+++ b/c/src/protocol.h.py
@@ -0,0 +1,157 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import print_function
+from protocol import *
+
+print("/* generated */")
+print("#ifndef _PROTON_PROTOCOL_H")
+print("#define _PROTON_PROTOCOL_H 1")
+print()
+print("#include \"proton/type_compat.h\"")
+
+fields = {}
+
+for type in TYPES:
+ fidx = 0
+ for f in type.query["field"]:
+ print("#define %s_%s (%s)" % (field_kw(type), field_kw(f), fidx))
+ fidx += 1
+ d = f["@default"]
+ if d:
+ ft = ftype(f)
+ # Don't bother to emit a boolean default that is False
+ if ft=="boolean" and d=="false": continue
+ # Don't output non numerics unless symbol
+ # We should really fully resolve to actual restricted value
+ # this is really true for symbols too which accidentally work
+ if ft=="symbol": d = '"' + d + '"'
+ elif d[0] not in '0123456789': continue
+ print("#define %s_%s_DEFAULT (%s) /* %s */" % (field_kw(type), field_kw(f), d, ft))
+
+idx = 0
+
+for type in TYPES:
+ desc = type["descriptor"]
+ name = type["@name"].upper().replace("-", "_")
+ print("#define %s_SYM (\"%s\")" % (name, desc["@name"]))
+ hi, lo = [int(x, 0) for x in desc["@code"].split(":")]
+ code = (hi << 32) + lo
+ print("#define %s ((uint64_t) %s)" % (name, code))
+ fields[code] = (type["@name"], [f["@name"] for f in type.query["field"]])
+ idx += 1
+
+print("""
+#include <stddef.h>
+
+typedef struct {
+ const unsigned char name_index;
+ const unsigned char first_field_index;
+ const unsigned char field_count;
+} pn_fields_t;
+
+extern const pn_fields_t FIELDS[];
+extern const uint16_t FIELD_NAME[];
+extern const uint16_t FIELD_FIELDS[];
+""")
+
+print('struct FIELD_STRINGS {')
+print(' const char STRING0[sizeof("")];')
+strings = set()
+for name, fnames in fields.values():
+ strings.add(name)
+ strings.update(fnames)
+for str in strings:
+ istr = str.replace("-", "_");
+ print(' const char FIELD_STRINGS_%s[sizeof("%s")];' % (istr, str))
+print("};")
+print()
+
+print("extern const struct FIELD_STRINGS FIELD_STRINGPOOL;")
+print("#ifdef DEFINE_FIELDS")
+print()
+
+print('const struct FIELD_STRINGS FIELD_STRINGPOOL = {')
+print(' "",')
+for str in strings:
+ print(' "%s",'% str)
+print("};")
+print()
+print("/* This is an array of offsets into FIELD_STRINGPOOL */")
+print("const uint16_t FIELD_NAME[] = {")
+print(" offsetof(struct FIELD_STRINGS, STRING0),")
+index = 1
+for i in range(256):
+ if i in fields:
+ name, fnames = fields[i]
+ iname = name.replace("-", "_");
+ print(' offsetof(struct FIELD_STRINGS, FIELD_STRINGS_%s), /* %d */' % (iname, index))
+ index += 1
+print("};")
+
+print("/* This is an array of offsets into FIELD_STRINGPOOL */")
+print("const uint16_t FIELD_FIELDS[] = {")
+print(" offsetof(struct FIELD_STRINGS, STRING0),")
+index = 1
+for i in range(256):
+ if i in fields:
+ name, fnames = fields[i]
+ if fnames:
+ for f in fnames:
+ ifname = f.replace("-", "_");
+ print(' offsetof(struct FIELD_STRINGS, FIELD_STRINGS_%s), /* %d (%s) */' % (ifname, index, name))
+ index += 1
+print("};")
+
+print("const pn_fields_t FIELDS[] = {")
+
+name_count = 1
+field_count = 1
+field_min = 256
+field_max = 0
+for i in range(256):
+ if i in fields:
+ if i>field_max: field_max = i
+ if i<field_min: field_min = i
+
+for i in range(field_min, field_max+1):
+ if i in fields:
+ name, fnames = fields[i]
+ if fnames:
+ print(' {%d, %d, %d}, /* %d (%s) */' % (name_count, field_count, len(fnames), i, name))
+ field_count += len(fnames)
+ else:
+ print(' {%d, 0, 0}, /* %d (%s) */' % (name_count, i, name))
+ name_count += 1
+ if i>field_max: field_max = i
+ if i<field_min: field_min = i
+ else:
+ print(' {0, 0, 0}, /* %d */' % i)
+
+print("};")
+print()
+print("#endif")
+print()
+print('enum {')
+print(' FIELD_MIN = %d,' % field_min)
+print(' FIELD_MAX = %d' % field_max)
+print('};')
+print()
+print("#endif /* protocol.h */")
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/protocol.py
----------------------------------------------------------------------
diff --git a/c/src/protocol.py b/c/src/protocol.py
new file mode 100644
index 0000000..3f04973
--- /dev/null
+++ b/c/src/protocol.py
@@ -0,0 +1,121 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import mllib, os, sys
+
+doc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "transport.xml"))
+mdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "messaging.xml"))
+tdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "transactions.xml"))
+sdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "security.xml"))
+
+def eq(attr, value):
+ return lambda nd: nd[attr] == value
+
+TYPEStmp = doc.query["amqp/section/type", eq("@class", "composite")] + \
+ mdoc.query["amqp/section/type", eq("@class", "composite")] + \
+ tdoc.query["amqp/section/type", eq("@class", "composite")] + \
+ sdoc.query["amqp/section/type", eq("@class", "composite")] + \
+ mdoc.query["amqp/section/type", eq("@provides", "section")]
+TYPES = []
+for ty in TYPEStmp:
+ if not ty in TYPES:
+ TYPES.append(ty)
+RESTRICTIONS = {}
+COMPOSITES = {}
+
+for type in doc.query["amqp/section/type"] + mdoc.query["amqp/section/type"] + \
+ sdoc.query["amqp/section/type"] + tdoc.query["amqp/section/type"]:
+
+ source = type["@source"]
+ if source:
+ RESTRICTIONS[type["@name"]] = source
+ if type["@class"] == "composite":
+ COMPOSITES[type["@name"]] = type
+
+def resolve(name):
+ if name in RESTRICTIONS:
+ return resolve(RESTRICTIONS[name])
+ else:
+ return name
+
+TYPEMAP = {
+ "boolean": ("bool", "", ""),
+ "binary": ("pn_binary_t", "*", ""),
+ "string": ("wchar_t", "*", ""),
+ "symbol": ("char", "*", ""),
+ "ubyte": ("uint8_t", "", ""),
+ "ushort": ("uint16_t", "", ""),
+ "uint": ("uint32_t", "", ""),
+ "ulong": ("uint64_t", "", ""),
+ "timestamp": ("uint64_t", "", ""),
+ "list": ("pn_list_t", "*", ""),
+ "map": ("pn_map_t", "*", ""),
+ "box": ("pn_box_t", "*", ""),
+ "*": ("pn_object_t", "*", "")
+ }
+
+CONSTRUCTORS = {
+ "boolean": "boolean",
+ "string": "string",
+ "symbol": "symbol",
+ "ubyte": "ubyte",
+ "ushort": "ushort",
+ "uint": "uint",
+ "ulong": "ulong",
+ "timestamp": "ulong"
+ }
+
+NULLABLE = set(["string", "symbol"])
+
+def fname(field):
+ return field["@name"].replace("-", "_")
+
+def tname(t):
+ return t["@name"].replace("-", "_")
+
+def multi(f):
+ return f["@multiple"] == "true"
+
+def ftype(field):
+ if multi(field):
+ return "list"
+ elif field["@type"] in COMPOSITES:
+ return "box"
+ else:
+ return resolve(field["@type"]).replace("-", "_")
+
+def fconstruct(field, expr):
+ type = ftype(field)
+ if type in CONSTRUCTORS:
+ result = "pn_%s(mem, %s)" % (CONSTRUCTORS[type], expr)
+ if type in NULLABLE:
+ result = "%s ? %s : NULL" % (expr, result)
+ else:
+ result = expr
+ if multi(field):
+ result = "pn_box(mem, pn_boolean(mem, true), %s)" % result
+ return result
+
+def declaration(field):
+ name = fname(field)
+ type = ftype(field)
+ t, pre, post = TYPEMAP.get(type, (type, "", ""))
+ return t, "%s%s%s" % (pre, name, post)
+
+def field_kw(field):
+ return fname(field).upper()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/acceptor.c
----------------------------------------------------------------------
diff --git a/c/src/reactor/acceptor.c b/c/src/reactor/acceptor.c
new file mode 100644
index 0000000..a044748
--- /dev/null
+++ b/c/src/reactor/acceptor.c
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/sasl.h>
+#include <proton/transport.h>
+#include <proton/connection.h>
+
+#include "io.h"
+#include "reactor.h"
+#include "selectable.h"
+#include "selector.h"
+
+#include <string.h>
+
+pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport);
+
+PN_HANDLE(PNI_ACCEPTOR_HANDLER)
+PN_HANDLE(PNI_ACCEPTOR_SSL_DOMAIN)
+PN_HANDLE(PNI_ACCEPTOR_CONNECTION)
+
+void pni_acceptor_readable(pn_selectable_t *sel) {
+ pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+ char name[1024];
+ pn_socket_t sock = pn_accept(pni_reactor_io(reactor), pn_selectable_get_fd(sel), name, 1024);
+ pn_handler_t *handler = (pn_handler_t *) pn_record_get(pn_selectable_attachments(sel), PNI_ACCEPTOR_HANDLER);
+ if (!handler) { handler = pn_reactor_get_handler(reactor); }
+ pn_record_t *record = pn_selectable_attachments(sel);
+ pn_ssl_domain_t *ssl_domain = (pn_ssl_domain_t *) pn_record_get(record, PNI_ACCEPTOR_SSL_DOMAIN);
+ pn_connection_t *conn = pn_reactor_connection(reactor, handler);
+ if (name[0]) { // store the peer address of connection in <host>:<port> format
+ char *port = strrchr(name, ':'); // last : separates the port #
+ *port++ = '\0';
+ pni_reactor_set_connection_peer_address(conn, name, port);
+ }
+ pn_transport_t *trans = pn_transport();
+ pn_transport_set_server(trans);
+ if (ssl_domain) {
+ pn_ssl_t *ssl = pn_ssl(trans);
+ pn_ssl_init(ssl, ssl_domain, 0);
+ }
+ pn_transport_bind(trans, conn);
+ pn_decref(trans);
+ pn_reactor_selectable_transport(reactor, sock, trans);
+ record = pn_connection_attachments(conn);
+ pn_record_def(record, PNI_ACCEPTOR_CONNECTION, PN_OBJECT);
+ pn_record_set(record, PNI_ACCEPTOR_CONNECTION, sel);
+
+}
+
+void pni_acceptor_finalize(pn_selectable_t *sel) {
+ pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+ if (pn_selectable_get_fd(sel) != PN_INVALID_SOCKET) {
+ pn_close(pni_reactor_io(reactor), pn_selectable_get_fd(sel));
+ }
+}
+
+pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, const char *port, pn_handler_t *handler) {
+ pn_socket_t socket = pn_listen(pni_reactor_io(reactor), host, port);
+ if (socket == PN_INVALID_SOCKET) {
+ return NULL;
+ }
+ pn_selectable_t *sel = pn_reactor_selectable(reactor);
+ pn_selectable_set_fd(sel, socket);
+ pn_selectable_on_readable(sel, pni_acceptor_readable);
+ pn_selectable_on_finalize(sel, pni_acceptor_finalize);
+ pni_record_init_reactor(pn_selectable_attachments(sel), reactor);
+ pn_record_t *record = pn_selectable_attachments(sel);
+ pn_record_def(record, PNI_ACCEPTOR_HANDLER, PN_OBJECT);
+ pn_record_set(record, PNI_ACCEPTOR_HANDLER, handler);
+ pn_selectable_set_reading(sel, true);
+ pn_reactor_update(reactor, sel);
+ return (pn_acceptor_t *) sel;
+}
+
+void pn_acceptor_close(pn_acceptor_t *acceptor) {
+ pn_selectable_t *sel = (pn_selectable_t *) acceptor;
+ if (!pn_selectable_is_terminal(sel)) {
+ pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+ pn_socket_t socket = pn_selectable_get_fd(sel);
+ pn_close(pni_reactor_io(reactor), socket);
+ pn_selectable_set_fd(sel, PN_INVALID_SOCKET);
+ pn_selectable_terminate(sel);
+ pn_reactor_update(reactor, sel);
+ }
+}
+
+void pn_acceptor_set_ssl_domain(pn_acceptor_t *acceptor, pn_ssl_domain_t *domain)
+{
+ pn_selectable_t *sel = (pn_selectable_t *) acceptor;
+ pn_record_t *record = pn_selectable_attachments(sel);
+ pn_record_def(record, PNI_ACCEPTOR_SSL_DOMAIN, PN_VOID);
+ pn_record_set(record, PNI_ACCEPTOR_SSL_DOMAIN, domain);
+}
+
+pn_acceptor_t *pn_connection_acceptor(pn_connection_t *conn) {
+ // Return the acceptor that created the connection or NULL if an outbound connection
+ pn_record_t *record = pn_connection_attachments(conn);
+ return (pn_acceptor_t *) pn_record_get(record, PNI_ACCEPTOR_CONNECTION);
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/connection.c
----------------------------------------------------------------------
diff --git a/c/src/reactor/connection.c b/c/src/reactor/connection.c
new file mode 100644
index 0000000..090947c
--- /dev/null
+++ b/c/src/reactor/connection.c
@@ -0,0 +1,370 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/object.h>
+#include <proton/sasl.h>
+#include <proton/ssl.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+#include <assert.h>
+#include <stdio.h>
+#include <string.h>
+#include "io.h"
+#include "selectable.h"
+#include "reactor.h"
+
+// XXX: overloaded for both directions
+PN_HANDLE(PN_TRANCTX)
+PN_HANDLE(PNI_CONN_PEER_ADDRESS)
+
+void pni_reactor_set_connection_peer_address(pn_connection_t *connection,
+ const char *host,
+ const char *port)
+{
+ pn_url_t *url = pn_url();
+ pn_url_set_host(url, host);
+ pn_url_set_port(url, port);
+ pn_record_t *record = pn_connection_attachments(connection);
+ if (!pn_record_has(record, PNI_CONN_PEER_ADDRESS)) {
+ pn_record_def(record, PNI_CONN_PEER_ADDRESS, PN_OBJECT);
+ }
+ pn_record_set(record, PNI_CONN_PEER_ADDRESS, url);
+ pn_decref(url);
+}
+
+static pn_transport_t *pni_transport(pn_selectable_t *sel) {
+ pn_record_t *record = pn_selectable_attachments(sel);
+ return (pn_transport_t *) pn_record_get(record, PN_TRANCTX);
+}
+
+static ssize_t pni_connection_capacity(pn_selectable_t *sel)
+{
+ pn_transport_t *transport = pni_transport(sel);
+ ssize_t capacity = pn_transport_capacity(transport);
+ if (capacity < 0) {
+ if (pn_transport_closed(transport)) {
+ pn_selectable_terminate(sel);
+ }
+ }
+ return capacity;
+}
+
+static ssize_t pni_connection_pending(pn_selectable_t *sel)
+{
+ pn_transport_t *transport = pni_transport(sel);
+ ssize_t pending = pn_transport_pending(transport);
+ if (pending < 0) {
+ if (pn_transport_closed(transport)) {
+ pn_selectable_terminate(sel);
+ }
+ }
+ return pending;
+}
+
+static pn_timestamp_t pni_connection_deadline(pn_selectable_t *sel)
+{
+ pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+ pn_transport_t *transport = pni_transport(sel);
+ pn_timestamp_t deadline = pn_transport_tick(transport, pn_reactor_now(reactor));
+ return deadline;
+}
+
+static void pni_connection_update(pn_selectable_t *sel) {
+ ssize_t c = pni_connection_capacity(sel);
+ ssize_t p = pni_connection_pending(sel);
+ pn_selectable_set_reading(sel, c > 0);
+ pn_selectable_set_writing(sel, p > 0);
+ pn_selectable_set_deadline(sel, pni_connection_deadline(sel));
+}
+
+void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event) {
+ assert(reactor);
+ pn_transport_t *transport = pn_event_transport(event);
+ pn_record_t *record = pn_transport_attachments(transport);
+ pn_selectable_t *sel = (pn_selectable_t *) pn_record_get(record, PN_TRANCTX);
+ if (sel && !pn_selectable_is_terminal(sel)) {
+ pni_connection_update(sel);
+ pn_reactor_update(reactor, sel);
+ }
+}
+
+pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport);
+
+void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event) {
+ assert(reactor);
+ assert(event);
+
+ pn_connection_t *conn = pn_event_connection(event);
+ if (!(pn_connection_state(conn) & PN_REMOTE_UNINIT)) {
+ return;
+ }
+
+ pn_transport_t *transport = pn_transport();
+ pn_transport_bind(transport, conn);
+ pn_decref(transport);
+}
+
+void pni_handle_bound(pn_reactor_t *reactor, pn_event_t *event) {
+ assert(reactor);
+ assert(event);
+
+ pn_connection_t *conn = pn_event_connection(event);
+ pn_transport_t *transport = pn_event_transport(event);
+ pn_record_t *record = pn_connection_attachments(conn);
+ pn_url_t *url = (pn_url_t *)pn_record_get(record, PNI_CONN_PEER_ADDRESS);
+ const char *host = NULL;
+ const char *port = "5672";
+ pn_string_t *str = NULL;
+
+ // link the new transport to its reactor:
+ pni_record_init_reactor(pn_transport_attachments(transport), reactor);
+
+ if (pn_connection_acceptor(conn) != NULL) {
+ // this connection was created by the acceptor. There is already a
+ // socket assigned to this connection. Nothing needs to be done.
+ return;
+ }
+
+ if (url) {
+ host = pn_url_get_host(url);
+ const char *uport = pn_url_get_port(url);
+ if (uport) {
+ port = uport;
+ } else {
+ const char *scheme = pn_url_get_scheme(url);
+ if (scheme && strcmp(scheme, "amqps") == 0) {
+ port = "5671";
+ }
+ }
+ if (!pn_connection_get_user(conn)) {
+ // user did not manually set auth info
+ const char *user = pn_url_get_username(url);
+ if (user) pn_connection_set_user(conn, user);
+ const char *passwd = pn_url_get_password(url);
+ if (passwd) pn_connection_set_password(conn, passwd);
+ }
+ } else {
+ // for backward compatibility, see if the connection's hostname can be
+ // used for the remote address. See JIRA PROTON-1133
+ const char *hostname = pn_connection_get_hostname(conn);
+ if (hostname) {
+ str = pn_string(hostname);
+ char *h = pn_string_buffer(str);
+ // see if a port has been included in the hostname. This is not
+ // allowed by the spec, but the old reactor interface allowed it.
+ char *colon = strrchr(h, ':');
+ if (colon) {
+ *colon = '\0';
+ port = colon + 1;
+ }
+ host = h;
+ }
+ }
+
+ if (!host) {
+ // error: no address configured
+ pn_condition_t *cond = pn_transport_condition(transport);
+ pn_condition_set_name(cond, "proton:io");
+ pn_condition_set_description(cond, "Connection failed: no address configured");
+ pn_transport_close_tail(transport);
+ pn_transport_close_head(transport);
+ } else {
+ pn_socket_t sock = pn_connect(pni_reactor_io(reactor), host, port);
+ // invalid sockets are ignored by poll, so we need to do this manually
+ if (sock == PN_INVALID_SOCKET) {
+ pn_condition_t *cond = pn_transport_condition(transport);
+ pn_condition_set_name(cond, "proton:io");
+ pn_condition_set_description(cond, pn_error_text(pn_reactor_error(reactor)));
+ pn_transport_close_tail(transport);
+ pn_transport_close_head(transport);
+ } else {
+ pn_reactor_selectable_transport(reactor, sock, transport);
+ }
+ }
+ pn_free(str);
+}
+
+void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event) {
+ assert(reactor);
+ assert(event);
+ pn_connection_t *conn = pn_event_connection(event);
+ pn_list_remove(pn_reactor_children(reactor), conn);
+}
+
+static void pni_connection_readable(pn_selectable_t *sel)
+{
+ pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+ pn_transport_t *transport = pni_transport(sel);
+ ssize_t capacity = pn_transport_capacity(transport);
+ if (capacity > 0) {
+ ssize_t n = pn_recv(pni_reactor_io(reactor), pn_selectable_get_fd(sel),
+ pn_transport_tail(transport), capacity);
+ if (n <= 0) {
+ if (n == 0 || !pn_wouldblock(pni_reactor_io(reactor))) {
+ if (n < 0) {
+ pn_condition_t *cond = pn_transport_condition(transport);
+ pn_condition_set_name(cond, "proton:io");
+ pn_condition_set_description(cond, pn_error_text(pn_reactor_error(reactor)));
+ }
+ pn_transport_close_tail(transport);
+ }
+ } else {
+ pn_transport_process(transport, (size_t)n);
+ }
+ }
+
+ ssize_t newcap = pn_transport_capacity(transport);
+ //occasionally transport events aren't generated when expected, so
+ //the following hack ensures we always update the selector
+ if (1 || newcap != capacity) {
+ pni_connection_update(sel);
+ pn_reactor_update(reactor, sel);
+ }
+}
+
+static void pni_connection_writable(pn_selectable_t *sel)
+{
+ pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+ pn_transport_t *transport = pni_transport(sel);
+ ssize_t pending = pn_transport_pending(transport);
+ if (pending > 0) {
+ ssize_t n = pn_send(pni_reactor_io(reactor), pn_selectable_get_fd(sel),
+ pn_transport_head(transport), pending);
+ if (n < 0) {
+ if (!pn_wouldblock(pni_reactor_io(reactor))) {
+ pn_condition_t *cond = pn_transport_condition(transport);
+ if (!pn_condition_is_set(cond)) {
+ pn_condition_set_name(cond, "proton:io");
+ pn_condition_set_description(cond, pn_error_text(pn_reactor_error(reactor)));
+ }
+ pn_transport_close_head(transport);
+ }
+ } else {
+ pn_transport_pop(transport, n);
+ }
+ }
+
+ ssize_t newpending = pn_transport_pending(transport);
+ if (newpending != pending) {
+ pni_connection_update(sel);
+ pn_reactor_update(reactor, sel);
+ }
+}
+
+static void pni_connection_error(pn_selectable_t *sel) {
+ pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+ pn_transport_t *transport = pni_transport(sel);
+ pn_transport_close_head(transport);
+ pn_transport_close_tail(transport);
+ pn_selectable_terminate(sel);
+ pn_reactor_update(reactor, sel);
+}
+
+static void pni_connection_expired(pn_selectable_t *sel) {
+ pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+ pn_transport_t *transport = pni_transport(sel);
+ pn_timestamp_t deadline = pn_transport_tick(transport, pn_reactor_now(reactor));
+ pn_selectable_set_deadline(sel, deadline);
+ ssize_t c = pni_connection_capacity(sel);
+ ssize_t p = pni_connection_pending(sel);
+ pn_selectable_set_reading(sel, c > 0);
+ pn_selectable_set_writing(sel, p > 0);
+ pn_reactor_update(reactor, sel);
+}
+
+static void pni_connection_finalize(pn_selectable_t *sel) {
+ pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+ pn_transport_t *transport = pni_transport(sel);
+ pn_record_t *record = pn_transport_attachments(transport);
+ pn_record_set(record, PN_TRANCTX, NULL);
+ pn_socket_t fd = pn_selectable_get_fd(sel);
+ pn_close(pni_reactor_io(reactor), fd);
+}
+
+pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport) {
+ pn_selectable_t *sel = pn_reactor_selectable(reactor);
+ pn_selectable_set_fd(sel, sock);
+ pn_selectable_on_readable(sel, pni_connection_readable);
+ pn_selectable_on_writable(sel, pni_connection_writable);
+ pn_selectable_on_error(sel, pni_connection_error);
+ pn_selectable_on_expired(sel, pni_connection_expired);
+ pn_selectable_on_finalize(sel, pni_connection_finalize);
+ pn_record_t *record = pn_selectable_attachments(sel);
+ pn_record_def(record, PN_TRANCTX, PN_OBJECT);
+ pn_record_set(record, PN_TRANCTX, transport);
+ pn_record_t *tr = pn_transport_attachments(transport);
+ pn_record_def(tr, PN_TRANCTX, PN_WEAKREF);
+ pn_record_set(tr, PN_TRANCTX, sel);
+ pni_connection_update(sel);
+ pn_reactor_update(reactor, sel);
+ return sel;
+}
+
+pn_connection_t *pn_reactor_connection(pn_reactor_t *reactor, pn_handler_t *handler) {
+ assert(reactor);
+ pn_connection_t *connection = pn_connection();
+ pn_record_t *record = pn_connection_attachments(connection);
+ pn_record_set_handler(record, handler);
+ pn_connection_collect(connection, pn_reactor_collector(reactor));
+ pn_list_add(pn_reactor_children(reactor), connection);
+ pni_record_init_reactor(record, reactor);
+ pn_decref(connection);
+ return connection;
+}
+
+pn_connection_t *pn_reactor_connection_to_host(pn_reactor_t *reactor,
+ const char *host,
+ const char *port,
+ pn_handler_t *handler) {
+ pn_connection_t *connection = pn_reactor_connection(reactor, handler);
+ pn_reactor_set_connection_host(reactor, connection, host, port);
+ return connection;
+}
+
+
+void pn_reactor_set_connection_host(pn_reactor_t *reactor,
+ pn_connection_t *connection,
+ const char *host,
+ const char *port)
+{
+ (void)reactor; // ignored
+ if (pn_connection_acceptor(connection) != NULL) {
+ // this is an inbound connection created by the acceptor. The peer
+ // address cannot be modified.
+ return;
+ }
+ pni_reactor_set_connection_peer_address(connection, host, port);
+}
+
+
+const char *pn_reactor_get_connection_address(pn_reactor_t *reactor,
+ pn_connection_t *connection)
+{
+ (void)reactor; // ignored
+ if (!connection) return NULL;
+ pn_record_t *record = pn_connection_attachments(connection);
+ pn_url_t *url = (pn_url_t *)pn_record_get(record, PNI_CONN_PEER_ADDRESS);
+ if (url) {
+ return pn_url_str(url);
+ }
+ return NULL;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/handler.c
----------------------------------------------------------------------
diff --git a/c/src/reactor/handler.c b/c/src/reactor/handler.c
new file mode 100644
index 0000000..2f86afd
--- /dev/null
+++ b/c/src/reactor/handler.c
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/object.h>
+#include <proton/reactor.h>
+#include <proton/event.h>
+#include <string.h>
+#include <assert.h>
+
+struct pn_handler_t {
+ void (*dispatch) (pn_handler_t *, pn_event_t *, pn_event_type_t);
+ void (*finalize) (pn_handler_t *);
+ pn_list_t *children;
+};
+
+void pn_handler_initialize(void *object) {
+ pn_handler_t *handler = (pn_handler_t *) object;
+ handler->dispatch = NULL;
+ handler->children = NULL;
+}
+
+void pn_handler_finalize(void *object) {
+ pn_handler_t *handler = (pn_handler_t *) object;
+ if (handler->finalize) {
+ handler->finalize(handler);
+ }
+ pn_free(handler->children);
+}
+
+#define pn_handler_hashcode NULL
+#define pn_handler_compare NULL
+#define pn_handler_inspect NULL
+
+pn_handler_t *pn_handler(void (*dispatch)(pn_handler_t *, pn_event_t *, pn_event_type_t)) {
+ return pn_handler_new(dispatch, 0, NULL);
+}
+
+pn_handler_t *pn_handler_new(void (*dispatch)(pn_handler_t *, pn_event_t *, pn_event_type_t), size_t size,
+ void (*finalize)(pn_handler_t *)) {
+ static const pn_class_t clazz = PN_CLASS(pn_handler);
+ pn_handler_t *handler = (pn_handler_t *) pn_class_new(&clazz, sizeof(pn_handler_t) + size);
+ handler->dispatch = dispatch;
+ handler->finalize = finalize;
+ memset(pn_handler_mem(handler), 0, size);
+ return handler;
+}
+
+void pn_handler_free(pn_handler_t *handler) {
+ if (handler) {
+ if (handler->children) {
+ size_t n = pn_list_size(handler->children);
+ for (size_t i = 0; i < n; i++) {
+ void *child = pn_list_get(handler->children, i);
+ pn_decref(child);
+ }
+ }
+
+ pn_decref(handler);
+ }
+}
+
+void *pn_handler_mem(pn_handler_t *handler) {
+ return (void *) (handler + 1);
+}
+
+void pn_handler_add(pn_handler_t *handler, pn_handler_t *child) {
+ assert(handler);
+ if (!handler->children) {
+ handler->children = pn_list(PN_OBJECT, 0);
+ }
+ pn_list_add(handler->children, child);
+}
+
+void pn_handler_clear(pn_handler_t *handler) {
+ assert(handler);
+ if (handler->children) {
+ pn_list_clear(handler->children);
+ }
+}
+
+void pn_handler_dispatch(pn_handler_t *handler, pn_event_t *event, pn_event_type_t type) {
+ assert(handler);
+ if (handler->dispatch) {
+ handler->dispatch(handler, event, type);
+ }
+ if (handler->children) {
+ size_t n = pn_list_size(handler->children);
+ for (size_t i = 0; i < n; i++) {
+ pn_handler_t *child = (pn_handler_t *) pn_list_get(handler->children, i);
+ pn_handler_dispatch(child, event, type);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/io.h
----------------------------------------------------------------------
diff --git a/c/src/reactor/io.h b/c/src/reactor/io.h
new file mode 100644
index 0000000..24596ec
--- /dev/null
+++ b/c/src/reactor/io.h
@@ -0,0 +1,70 @@
+#ifndef PROTON_IO_H
+#define PROTON_IO_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "selector.h"
+
+#include <proton/import_export.h>
+#include <proton/error.h>
+#include <proton/type_compat.h>
+#include <stddef.h>
+
+/**
+ * A ::pn_io_t manages IO for a group of pn_socket_t handles. A
+ * pn_io_t object may have zero or one pn_selector_t selectors
+ * associated with it (see ::pn_io_selector()). If one is associated,
+ * all the pn_socket_t handles managed by a pn_io_t must use that
+ * pn_selector_t instance.
+ *
+ * The pn_io_t interface is single-threaded. All methods are intended
+ * to be used by one thread at a time, except that multiple threads
+ * may use:
+ *
+ * ::pn_write()
+ * ::pn_send()
+ * ::pn_recv()
+ * ::pn_close()
+ * ::pn_selector_select()
+ *
+ * provided at most one thread is calling ::pn_selector_select() and
+ * the other threads are operating on separate pn_socket_t handles.
+ */
+typedef struct pn_io_t pn_io_t;
+
+pn_io_t *pn_io(void);
+void pn_io_free(pn_io_t *io);
+pn_error_t *pn_io_error(pn_io_t *io);
+pn_socket_t pn_connect(pn_io_t *io, const char *host, const char *port);
+pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port);
+
+pn_socket_t pn_accept(pn_io_t *io, pn_socket_t socket, char *name, size_t size);
+void pn_close(pn_io_t *io, pn_socket_t socket);
+ssize_t pn_send(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size);
+ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size);
+int pn_pipe(pn_io_t *io, pn_socket_t *dest);
+ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size);
+ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size);
+bool pn_wouldblock(pn_io_t *io);
+pn_selector_t *pn_io_selector(pn_io_t *io);
+
+#endif /* io.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/io/posix/io.c
----------------------------------------------------------------------
diff --git a/c/src/reactor/io/posix/io.c b/c/src/reactor/io/posix/io.c
new file mode 100644
index 0000000..5a0de3b
--- /dev/null
+++ b/c/src/reactor/io/posix/io.c
@@ -0,0 +1,342 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "reactor/io.h"
+#include "reactor/selector.h"
+#include "platform/platform.h" // pn_i_error_from_errno
+
+#include <proton/object.h>
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <assert.h>
+
+#define MAX_HOST (1024)
+#define MAX_SERV (64)
+
+struct pn_io_t {
+ char host[MAX_HOST];
+ char serv[MAX_SERV];
+ pn_error_t *error;
+ pn_selector_t *selector;
+ bool wouldblock;
+};
+
+void pn_io_initialize(void *obj)
+{
+ pn_io_t *io = (pn_io_t *) obj;
+ io->error = pn_error();
+ io->wouldblock = false;
+ io->selector = NULL;
+}
+
+void pn_io_finalize(void *obj)
+{
+ pn_io_t *io = (pn_io_t *) obj;
+ pn_error_free(io->error);
+}
+
+#define pn_io_hashcode NULL
+#define pn_io_compare NULL
+#define pn_io_inspect NULL
+
+pn_io_t *pn_io(void)
+{
+ static const pn_class_t clazz = PN_CLASS(pn_io);
+ pn_io_t *io = (pn_io_t *) pn_class_new(&clazz, sizeof(pn_io_t));
+ return io;
+}
+
+void pn_io_free(pn_io_t *io)
+{
+ pn_free(io);
+}
+
+pn_error_t *pn_io_error(pn_io_t *io)
+{
+ assert(io);
+ return io->error;
+}
+
+int pn_pipe(pn_io_t *io, pn_socket_t *dest)
+{
+ int n = pipe(dest);
+ if (n) {
+ pn_i_error_from_errno(io->error, "pipe");
+ }
+
+ return n;
+}
+
+static void pn_configure_sock(pn_io_t *io, pn_socket_t sock) {
+ // this would be nice, but doesn't appear to exist on linux
+ /*
+ int set = 1;
+ if (!setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int))) {
+ pn_i_error_from_errno(io->error, "setsockopt");
+ };
+ */
+
+ int flags = fcntl(sock, F_GETFL);
+ flags |= O_NONBLOCK;
+
+ if (fcntl(sock, F_SETFL, flags) < 0) {
+ pn_i_error_from_errno(io->error, "fcntl");
+ }
+
+ //
+ // Disable the Nagle algorithm on TCP connections.
+ //
+ // Note: It would be more correct for the "level" argument to be SOL_TCP. However, there
+ // are portability issues with this macro so we use IPPROTO_TCP instead.
+ //
+ int tcp_nodelay = 1;
+ if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void*) &tcp_nodelay, sizeof(tcp_nodelay)) < 0) {
+ pn_i_error_from_errno(io->error, "setsockopt");
+ }
+}
+
+static inline int pn_create_socket(int af, int protocol);
+
+pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port)
+{
+ struct addrinfo *addr;
+ struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM};
+ int code = getaddrinfo(host, port, &hints, &addr);
+ if (code) {
+ pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code));
+ return PN_INVALID_SOCKET;
+ }
+
+ pn_socket_t sock = pn_create_socket(addr->ai_family, addr->ai_protocol);
+ if (sock == PN_INVALID_SOCKET) {
+ freeaddrinfo(addr);
+ pn_i_error_from_errno(io->error, "pn_create_socket");
+ return PN_INVALID_SOCKET;
+ }
+
+ int optval = 1;
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) {
+ pn_i_error_from_errno(io->error, "setsockopt");
+ freeaddrinfo(addr);
+ close(sock);
+ return PN_INVALID_SOCKET;
+ }
+
+ if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+ pn_i_error_from_errno(io->error, "bind");
+ freeaddrinfo(addr);
+ close(sock);
+ return PN_INVALID_SOCKET;
+ }
+
+ freeaddrinfo(addr);
+
+ if (listen(sock, 50) == -1) {
+ pn_i_error_from_errno(io->error, "listen");
+ close(sock);
+ return PN_INVALID_SOCKET;
+ }
+
+ return sock;
+}
+
+pn_socket_t pn_connect(pn_io_t *io, const char *host, const char *port)
+{
+ struct addrinfo *addr;
+ struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM};
+ int code = getaddrinfo(host, port, &hints, &addr);
+ if (code) {
+ pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code));
+ return PN_INVALID_SOCKET;
+ }
+
+ pn_socket_t sock = pn_create_socket(addr->ai_family, addr->ai_protocol);
+ if (sock == PN_INVALID_SOCKET) {
+ pn_i_error_from_errno(io->error, "pn_create_socket");
+ freeaddrinfo(addr);
+ return PN_INVALID_SOCKET;
+ }
+
+ pn_configure_sock(io, sock);
+
+ if (connect(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+ if (errno != EINPROGRESS) {
+ pn_i_error_from_errno(io->error, "connect");
+ freeaddrinfo(addr);
+ close(sock);
+ return PN_INVALID_SOCKET;
+ }
+ }
+
+ freeaddrinfo(addr);
+
+ return sock;
+}
+
+pn_socket_t pn_accept(pn_io_t *io, pn_socket_t socket, char *name, size_t size)
+{
+ struct sockaddr_storage addr;
+ socklen_t addrlen = sizeof(addr);
+ *name = '\0';
+ pn_socket_t sock = accept(socket, (struct sockaddr *) &addr, &addrlen);
+ if (sock == PN_INVALID_SOCKET) {
+ pn_i_error_from_errno(io->error, "accept");
+ return sock;
+ } else {
+ int code;
+ if ((code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, MAX_HOST, io->serv, MAX_SERV, 0))) {
+ pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code));
+ if (close(sock) == -1)
+ pn_i_error_from_errno(io->error, "close");
+ return PN_INVALID_SOCKET;
+ } else {
+ pn_configure_sock(io, sock);
+ pni_snprintf(name, size, "%s:%s", io->host, io->serv);
+ return sock;
+ }
+ }
+}
+
+/* Abstract away turning off SIGPIPE */
+#ifdef MSG_NOSIGNAL
+ssize_t pn_send(pn_io_t *io, pn_socket_t socket, const void *buf, size_t len) {
+ ssize_t count = send(socket, buf, len, MSG_NOSIGNAL);
+ io->wouldblock = (errno == EAGAIN || errno == EWOULDBLOCK);
+ if (count < 0) { pn_i_error_from_errno(io->error, "send"); }
+ return count;
+}
+
+static inline int pn_create_socket(int af, int protocol) {
+ return socket(af, SOCK_STREAM, protocol);
+}
+#elif defined(SO_NOSIGPIPE)
+ssize_t pn_send(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size) {
+ ssize_t count = send(socket, buf, size, 0);
+ io->wouldblock = (errno == EAGAIN || errno == EWOULDBLOCK);
+ if (count < 0) { pn_i_error_from_errno(io->error, "send"); }
+ return count;
+}
+
+static inline int pn_create_socket(int af, int protocol) {
+ int sock;
+ sock = socket(af, SOCK_STREAM, protocol);
+ if (sock == -1) return sock;
+
+ int optval = 1;
+ if (setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof(optval)) == -1) {
+ close(sock);
+ return -1;
+ }
+ return sock;
+}
+#else
+
+#include <signal.h>
+
+static inline int pn_create_socket(int af, int protocol) {
+ return socket(af, SOCK_STREAM, protocol);
+}
+
+static ssize_t nosigpipe_send(int fd, const void *buffer, size_t size) {
+ sigset_t pendingSignals, oldSignals, newSignals;
+ ssize_t count;
+ int sendErrno, sigmaskErr;
+
+ sigpending(&pendingSignals);
+ int sigpipeIsPending = sigismember(&pendingSignals, SIGPIPE);
+ if (!sigpipeIsPending) {
+ sigemptyset(&newSignals);
+ sigaddset(&newSignals, SIGPIPE);
+ if (sigmaskErr = pthread_sigmask(SIG_BLOCK, (const sigset_t *)&newSignals, (sigset_t *)&oldSignals))
+ {
+ errno = sigmaskErr;
+ return -1;
+ }
+ }
+
+ count = send(fd, buffer, size, 0);
+ if (!sigpipeIsPending) {
+ sendErrno = errno;
+ if (count == -1 && errno == EPIPE) {
+ while (-1 == sigtimedwait(&newSignals, NULL, &(struct timespec){ 0, 0 }) && errno == EINTR)
+ ; //do nothing
+ }
+ if (sigmaskErr = pthread_sigmask(SIG_SETMASK, (const sigset_t *)&oldSignals, (sigset_t *)NULL))
+ {
+ errno = sigmaskErr;
+ return -1;
+ }
+ errno = sendErrno;
+ }
+ return count;
+}
+
+ssize_t pn_send(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size) {
+ ssize_t count = nosigpipe_send(socket, buf, size);
+ io->wouldblock = (errno == EAGAIN || errno == EWOULDBLOCK);
+ if (count < 0) { pn_i_error_from_errno(io->error, "send"); }
+ return count;
+}
+#endif
+
+ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
+{
+ ssize_t count = recv(socket, buf, size, 0);
+ io->wouldblock = count < 0 && (errno == EAGAIN || errno == EWOULDBLOCK);
+ if (count < 0) { pn_i_error_from_errno(io->error, "recv"); }
+ return count;
+}
+
+ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size)
+{
+ return write(socket, buf, size);
+}
+
+ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
+{
+ return read(socket, buf, size);
+}
+
+void pn_close(pn_io_t *io, pn_socket_t socket)
+{
+ close(socket);
+}
+
+bool pn_wouldblock(pn_io_t *io)
+{
+ return io->wouldblock;
+}
+
+pn_selector_t *pn_io_selector(pn_io_t *io)
+{
+ if (io->selector == NULL)
+ io->selector = pni_selector();
+ return io->selector;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/io/posix/selector.c
----------------------------------------------------------------------
diff --git a/c/src/reactor/io/posix/selector.c b/c/src/reactor/io/posix/selector.c
new file mode 100644
index 0000000..bf6882a
--- /dev/null
+++ b/c/src/reactor/io/posix/selector.c
@@ -0,0 +1,214 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "core/util.h"
+#include "platform/platform.h" // pn_i_now, pn_i_error_from_errno
+#include "reactor/io.h"
+#include "reactor/selector.h"
+#include "reactor/selectable.h"
+
+#include <proton/error.h>
+
+#include <poll.h>
+#include <stdlib.h>
+#include <assert.h>
+
+struct pn_selector_t {
+ struct pollfd *fds;
+ pn_timestamp_t *deadlines;
+ size_t capacity;
+ pn_list_t *selectables;
+ size_t current;
+ pn_timestamp_t awoken;
+ pn_error_t *error;
+};
+
+void pn_selector_initialize(void *obj)
+{
+ pn_selector_t *selector = (pn_selector_t *) obj;
+ selector->fds = NULL;
+ selector->deadlines = NULL;
+ selector->capacity = 0;
+ selector->selectables = pn_list(PN_WEAKREF, 0);
+ selector->current = 0;
+ selector->awoken = 0;
+ selector->error = pn_error();
+}
+
+void pn_selector_finalize(void *obj)
+{
+ pn_selector_t *selector = (pn_selector_t *) obj;
+ free(selector->fds);
+ free(selector->deadlines);
+ pn_free(selector->selectables);
+ pn_error_free(selector->error);
+}
+
+#define pn_selector_hashcode NULL
+#define pn_selector_compare NULL
+#define pn_selector_inspect NULL
+
+pn_selector_t *pni_selector(void)
+{
+ static const pn_class_t clazz = PN_CLASS(pn_selector);
+ pn_selector_t *selector = (pn_selector_t *) pn_class_new(&clazz, sizeof(pn_selector_t));
+ return selector;
+}
+
+void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable)
+{
+ assert(selector);
+ assert(selectable);
+ assert(pni_selectable_get_index(selectable) < 0);
+
+ if (pni_selectable_get_index(selectable) < 0) {
+ pn_list_add(selector->selectables, selectable);
+ size_t size = pn_list_size(selector->selectables);
+
+ if (selector->capacity < size) {
+ selector->fds = (struct pollfd *) realloc(selector->fds, size*sizeof(struct pollfd));
+ selector->deadlines = (pn_timestamp_t *) realloc(selector->deadlines, size*sizeof(pn_timestamp_t));
+ selector->capacity = size;
+ }
+
+ pni_selectable_set_index(selectable, size - 1);
+ }
+
+ pn_selector_update(selector, selectable);
+}
+
+void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable)
+{
+ int idx = pni_selectable_get_index(selectable);
+ assert(idx >= 0);
+ selector->fds[idx].fd = pn_selectable_get_fd(selectable);
+ selector->fds[idx].events = 0;
+ selector->fds[idx].revents = 0;
+ if (pn_selectable_is_reading(selectable)) {
+ selector->fds[idx].events |= POLLIN;
+ }
+ if (pn_selectable_is_writing(selectable)) {
+ selector->fds[idx].events |= POLLOUT;
+ }
+ selector->deadlines[idx] = pn_selectable_get_deadline(selectable);
+}
+
+void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable)
+{
+ assert(selector);
+ assert(selectable);
+
+ int idx = pni_selectable_get_index(selectable);
+ assert(idx >= 0);
+ pn_list_del(selector->selectables, idx, 1);
+ size_t size = pn_list_size(selector->selectables);
+ for (size_t i = idx; i < size; i++) {
+ pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i);
+ pni_selectable_set_index(sel, i);
+ selector->fds[i] = selector->fds[i + 1];
+ }
+
+ pni_selectable_set_index(selectable, -1);
+
+ if (selector->current >= (size_t) idx) {
+ selector->current--;
+ }
+}
+
+size_t pn_selector_size(pn_selector_t *selector) {
+ assert(selector);
+ return pn_list_size(selector->selectables);
+}
+
+int pn_selector_select(pn_selector_t *selector, int timeout)
+{
+ assert(selector);
+
+ size_t size = pn_list_size(selector->selectables);
+
+ if (timeout) {
+ pn_timestamp_t deadline = 0;
+ for (size_t i = 0; i < size; i++) {
+ pn_timestamp_t d = selector->deadlines[i];
+ if (d)
+ deadline = (deadline == 0) ? d : pn_min(deadline, d);
+ }
+
+ if (deadline) {
+ pn_timestamp_t now = pn_i_now();
+ int64_t delta = deadline - now;
+ if (delta < 0) {
+ timeout = 0;
+ } else if (delta < timeout) {
+ timeout = delta;
+ }
+ }
+ }
+
+ int error = 0;
+ int result = poll(selector->fds, size, timeout);
+ if (result == -1) {
+ error = pn_i_error_from_errno(selector->error, "poll");
+ } else {
+ selector->current = 0;
+ selector->awoken = pn_i_now();
+ }
+
+ return error;
+}
+
+pn_selectable_t *pn_selector_next(pn_selector_t *selector, int *events)
+{
+ pn_list_t *l = selector->selectables;
+ size_t size = pn_list_size(l);
+ while (selector->current < size) {
+ pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(l, selector->current);
+ struct pollfd *pfd = &selector->fds[selector->current];
+ pn_timestamp_t deadline = selector->deadlines[selector->current];
+ int ev = 0;
+ if (pfd->revents & POLLIN) {
+ ev |= PN_READABLE;
+ }
+ if ((pfd->revents & POLLERR) ||
+ (pfd->revents & POLLHUP) ||
+ (pfd->revents & POLLNVAL)) {
+ ev |= PN_ERROR;
+ }
+ if (pfd->revents & POLLOUT) {
+ ev |= PN_WRITABLE;
+ }
+ if (deadline && selector->awoken >= deadline) {
+ ev |= PN_EXPIRED;
+ }
+ selector->current++;
+ if (ev) {
+ *events = ev;
+ return sel;
+ }
+ }
+ return NULL;
+}
+
+void pn_selector_free(pn_selector_t *selector)
+{
+ assert(selector);
+ pn_free(selector);
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/io/windows/io.c
----------------------------------------------------------------------
diff --git a/c/src/reactor/io/windows/io.c b/c/src/reactor/io/windows/io.c
new file mode 100644
index 0000000..07692d1
--- /dev/null
+++ b/c/src/reactor/io/windows/io.c
@@ -0,0 +1,464 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#define FD_SETSIZE 2048
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <mswsock.h>
+#include <Ws2tcpip.h>
+
+#include "reactor/io.h"
+#include "reactor/selector.h"
+
+#include "platform/platform.h"
+#include "iocp.h"
+#include "core/util.h"
+
+#include <proton/object.h>
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdio.h>
+#include <assert.h>
+
+int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code)
+{
+ // Error code can be from GetLastError or WSAGetLastError,
+ char err[1024] = {0};
+ FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
+ FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
+ return pn_error_format(error, PN_ERR, "%s: %s", msg, err);
+}
+
+static void io_log(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fflush(stderr);
+}
+
+struct pn_io_t {
+ char host[NI_MAXHOST];
+ char serv[NI_MAXSERV];
+ pn_error_t *error;
+ bool trace;
+ bool wouldblock;
+ iocp_t *iocp;
+};
+
+void pn_io_initialize(void *obj)
+{
+ pn_io_t *io = (pn_io_t *) obj;
+ io->error = pn_error();
+ io->wouldblock = false;
+ io->trace = pn_env_bool("PN_TRACE_DRV");
+
+ /* Request WinSock 2.2 */
+ WORD wsa_ver = MAKEWORD(2, 2);
+ WSADATA unused;
+ int err = WSAStartup(wsa_ver, &unused);
+ if (err) {
+ pni_win32_error(io->error, "WSAStartup", WSAGetLastError());
+ fprintf(stderr, "Can't load WinSock: %s\n", pn_error_text(io->error));
+ }
+ io->iocp = pni_iocp();
+}
+
+void pn_io_finalize(void *obj)
+{
+ pn_io_t *io = (pn_io_t *) obj;
+ pn_selector_t *sel = io->iocp->selector;
+ pn_free(io->iocp);
+ if (sel)
+ pn_decref(sel);
+ pn_error_free(io->error);
+ WSACleanup();
+}
+
+#define pn_io_hashcode NULL
+#define pn_io_compare NULL
+#define pn_io_inspect
+
+pn_io_t *pn_io(void)
+{
+ static const pn_class_t clazz = PN_CLASS(pn_io);
+ pn_io_t *io = (pn_io_t *) pn_class_new(&clazz, sizeof(pn_io_t));
+ return io;
+}
+
+void pn_io_free(pn_io_t *io)
+{
+ pn_free(io);
+}
+
+pn_error_t *pn_io_error(pn_io_t *io)
+{
+ assert(io);
+ return io->error;
+}
+
+static void ensure_unique(pn_io_t *io, pn_socket_t new_socket)
+{
+ // A brand new socket can have the same HANDLE value as a previous
+ // one after a socketclose. If the application closes one itself
+ // (i.e. not using pn_close), we don't find out about it until here.
+ iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, new_socket);
+ if (iocpd) {
+ if (io->trace)
+ io_log("Stale external socket reference discarded\n");
+ // Re-use means former socket instance was closed
+ assert(iocpd->ops_in_progress == 0);
+ assert(iocpd->external);
+ // Clean up the straggler as best we can
+ pn_socket_t sock = iocpd->socket;
+ iocpd->socket = INVALID_SOCKET;
+ pni_iocpdesc_map_del(io->iocp, sock); // may free the iocpdesc_t depending on refcount
+ }
+}
+
+
+/*
+ * This heavyweight surrogate pipe could be replaced with a normal Windows pipe
+ * now that select() is no longer used. If interrupt semantics are all that is
+ * needed, a simple user space counter and reserved completion status would
+ * probably suffice.
+ */
+static int pni_socket_pair(pn_io_t *io, SOCKET sv[2]);
+
+int pn_pipe(pn_io_t *io, pn_socket_t *dest)
+{
+ int n = pni_socket_pair(io, dest);
+ if (n) {
+ pni_win32_error(io->error, "pipe", WSAGetLastError());
+ }
+ return n;
+}
+
+static void pn_configure_sock(pn_io_t *io, pn_socket_t sock) {
+ //
+ // Disable the Nagle algorithm on TCP connections.
+ //
+ int flag = 1;
+ if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) {
+ perror("setsockopt");
+ }
+
+ u_long nonblock = 1;
+ if (ioctlsocket(sock, FIONBIO, &nonblock)) {
+ perror("ioctlsocket");
+ }
+}
+
+static inline pn_socket_t pni_create_socket(int domain, int protocol);
+
+static const char *amqp_service(const char *port) {
+ // Help older Windows to know about amqp[s] ports
+ if (port) {
+ if (!strcmp("amqp", port)) return "5672";
+ if (!strcmp("amqps", port)) return "5671";
+ }
+ return port;
+}
+
+pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port)
+{
+ struct addrinfo *addr;
+ int code = getaddrinfo(host, amqp_service(port), NULL, &addr);
+ if (code) {
+ pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code));
+ return INVALID_SOCKET;
+ }
+
+ pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol);
+ if (sock == INVALID_SOCKET) {
+ pni_win32_error(io->error, "pni_create_socket", WSAGetLastError());
+ return INVALID_SOCKET;
+ }
+ ensure_unique(io, sock);
+
+ bool optval = 1;
+ if (setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *) &optval,
+ sizeof(optval)) == -1) {
+ pni_win32_error(io->error, "setsockopt", WSAGetLastError());
+ closesocket(sock);
+ return INVALID_SOCKET;
+ }
+
+ if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+ pni_win32_error(io->error, "bind", WSAGetLastError());
+ freeaddrinfo(addr);
+ closesocket(sock);
+ return INVALID_SOCKET;
+ }
+ freeaddrinfo(addr);
+
+ if (listen(sock, 50) == -1) {
+ pni_win32_error(io->error, "listen", WSAGetLastError());
+ closesocket(sock);
+ return INVALID_SOCKET;
+ }
+
+ if (io->iocp->selector) {
+ iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
+ if (!iocpd) {
+ pn_i_error_from_errno(io->error, "register");
+ closesocket(sock);
+ return INVALID_SOCKET;
+ }
+ pni_iocpdesc_start(iocpd);
+ }
+
+ return sock;
+}
+
+pn_socket_t pn_connect(pn_io_t *io, const char *hostarg, const char *port)
+{
+ // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
+ const char *host = strcmp("0.0.0.0", hostarg) ? hostarg : "127.0.0.1";
+
+ struct addrinfo *addr;
+ int code = getaddrinfo(host, amqp_service(port), NULL, &addr);
+ if (code) {
+ pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code));
+ return INVALID_SOCKET;
+ }
+
+ pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol);
+ if (sock == INVALID_SOCKET) {
+ pni_win32_error(io->error, "proton pni_create_socket", WSAGetLastError());
+ freeaddrinfo(addr);
+ return INVALID_SOCKET;
+ }
+
+ ensure_unique(io, sock);
+ pn_configure_sock(io, sock);
+
+ if (io->iocp->selector) {
+ return pni_iocp_begin_connect(io->iocp, sock, addr, io->error);
+ } else {
+ if (connect(sock, addr->ai_addr, addr->ai_addrlen) != 0) {
+ if (WSAGetLastError() != WSAEWOULDBLOCK) {
+ pni_win32_error(io->error, "connect", WSAGetLastError());
+ freeaddrinfo(addr);
+ closesocket(sock);
+ return INVALID_SOCKET;
+ }
+ }
+
+ freeaddrinfo(addr);
+ return sock;
+ }
+}
+
+pn_socket_t pn_accept(pn_io_t *io, pn_socket_t listen_sock, char *name, size_t size)
+{
+ struct sockaddr_storage addr;
+ socklen_t addrlen = sizeof(addr);
+ iocpdesc_t *listend = pni_iocpdesc_map_get(io->iocp, listen_sock);
+ pn_socket_t accept_sock;
+
+ *name = '\0';
+ if (listend)
+ accept_sock = pni_iocp_end_accept(listend, (struct sockaddr *) &addr, &addrlen, &io->wouldblock, io->error);
+ else {
+ // User supplied socket
+ accept_sock = accept(listen_sock, (struct sockaddr *) &addr, &addrlen);
+ if (accept_sock == INVALID_SOCKET)
+ pni_win32_error(io->error, "sync accept", WSAGetLastError());
+ }
+
+ if (accept_sock == INVALID_SOCKET)
+ return accept_sock;
+
+ int code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
+ io->serv, NI_MAXSERV, 0);
+ if (code)
+ code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
+ io->serv, NI_MAXSERV, NI_NUMERICHOST | NI_NUMERICSERV);
+ if (code) {
+ pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code));
+ pn_close(io, accept_sock);
+ return INVALID_SOCKET;
+ } else {
+ pn_configure_sock(io, accept_sock);
+ pni_snprintf(name, size, "%s:%s", io->host, io->serv);
+ if (listend) {
+ pni_iocpdesc_start(pni_iocpdesc_map_get(io->iocp, accept_sock));
+ }
+ return accept_sock;
+ }
+}
+
+static inline pn_socket_t pni_create_socket(int domain, int protocol) {
+ return socket(domain, SOCK_STREAM, protocol);
+}
+
+ssize_t pn_send(pn_io_t *io, pn_socket_t sockfd, const void *buf, size_t len) {
+ ssize_t count;
+ iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, sockfd);
+ if (iocpd) {
+ count = pni_iocp_begin_write(iocpd, buf, len, &io->wouldblock, io->error);
+ } else {
+ count = send(sockfd, (const char *) buf, len, 0);
+ io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
+ }
+ return count;
+}
+
+ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
+{
+ ssize_t count;
+ iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
+ if (iocpd) {
+ count = pni_iocp_recv(iocpd, buf, size, &io->wouldblock, io->error);
+ } else {
+ count = recv(socket, (char *) buf, size, 0);
+ io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
+ }
+ return count;
+}
+
+ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size)
+{
+ // non-socket io is mapped to socket io for now. See pn_pipe()
+ return pn_send(io, socket, buf, size);
+}
+
+ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
+{
+ return pn_recv(io, socket, buf, size);
+}
+
+void pn_close(pn_io_t *io, pn_socket_t socket)
+{
+ iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
+ if (iocpd)
+ pni_iocp_begin_close(iocpd);
+ else {
+ closesocket(socket);
+ }
+}
+
+bool pn_wouldblock(pn_io_t *io)
+{
+ return io->wouldblock;
+}
+
+pn_selector_t *pn_io_selector(pn_io_t *io)
+{
+ if (io->iocp->selector == NULL) {
+ io->iocp->selector = pni_selector_create(io->iocp);
+ pn_incref(io->iocp->selector);
+ }
+ return io->iocp->selector;
+}
+
+static void configure_pipe_socket(pn_io_t *io, pn_socket_t sock)
+{
+ u_long v = 1;
+ ioctlsocket (sock, FIONBIO, &v);
+ ensure_unique(io, sock);
+ iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
+ pni_iocpdesc_start(iocpd);
+}
+
+
+static int pni_socket_pair (pn_io_t *io, SOCKET sv[2]) {
+ // no socketpair on windows. provide pipe() semantics using sockets
+ struct protoent * pe_tcp = getprotobyname("tcp");
+ if (pe_tcp == NULL) {
+ perror("getprotobyname");
+ return -1;
+ }
+
+ SOCKET sock = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto);
+ if (sock == INVALID_SOCKET) {
+ perror("socket");
+ return -1;
+ }
+
+ BOOL b = 1;
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &b, sizeof(b)) == -1) {
+ perror("setsockopt");
+ closesocket(sock);
+ return -1;
+ }
+ else {
+ struct sockaddr_in addr = {0};
+ addr.sin_family = AF_INET;
+ addr.sin_port = 0;
+ addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
+
+ if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
+ perror("bind");
+ closesocket(sock);
+ return -1;
+ }
+ }
+
+ if (listen(sock, 50) == -1) {
+ perror("listen");
+ closesocket(sock);
+ return -1;
+ }
+
+ if ((sv[1] = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto)) == INVALID_SOCKET) {
+ perror("sock1");
+ closesocket(sock);
+ return -1;
+ }
+ else {
+ struct sockaddr addr = {0};
+ int l = sizeof(addr);
+ if (getsockname(sock, &addr, &l) == -1) {
+ perror("getsockname");
+ closesocket(sock);
+ return -1;
+ }
+
+ if (connect(sv[1], &addr, sizeof(addr)) == -1) {
+ int err = WSAGetLastError();
+ fprintf(stderr, "connect wsaerrr %d\n", err);
+ closesocket(sock);
+ closesocket(sv[1]);
+ return -1;
+ }
+
+ if ((sv[0] = accept(sock, &addr, &l)) == INVALID_SOCKET) {
+ perror("accept");
+ closesocket(sock);
+ closesocket(sv[1]);
+ return -1;
+ }
+ }
+
+ configure_pipe_socket(io, sv[0]);
+ configure_pipe_socket(io, sv[1]);
+ closesocket(sock);
+ return 0;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org