You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jr...@apache.org on 2018/04/05 19:34:06 UTC

[31/51] [partial] qpid-proton git commit: PROTON-1728: Reorganize the source tree

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/protocol.h.py
----------------------------------------------------------------------
diff --git a/c/src/protocol.h.py b/c/src/protocol.h.py
new file mode 100644
index 0000000..321cf64
--- /dev/null
+++ b/c/src/protocol.h.py
@@ -0,0 +1,157 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import print_function
+from protocol import *
+
+print("/* generated */")
+print("#ifndef _PROTON_PROTOCOL_H")
+print("#define _PROTON_PROTOCOL_H 1")
+print()
+print("#include \"proton/type_compat.h\"")
+
+fields = {}
+
+for type in TYPES:
+  fidx = 0
+  for f in type.query["field"]:
+    print("#define %s_%s (%s)" % (field_kw(type), field_kw(f), fidx))
+    fidx += 1
+    d = f["@default"]
+    if d:
+        ft = ftype(f)
+        # Don't bother to emit a boolean default that is False
+        if ft=="boolean" and d=="false": continue
+        # Don't output non numerics unless symbol
+        # We should really fully resolve to actual restricted value
+        # this is really true for symbols too which accidentally work
+        if ft=="symbol": d = '"' + d + '"'
+        elif d[0] not in '0123456789': continue
+        print("#define %s_%s_DEFAULT (%s) /* %s */" % (field_kw(type), field_kw(f), d, ft))
+
+idx = 0
+
+for type in TYPES:
+  desc = type["descriptor"]
+  name = type["@name"].upper().replace("-", "_")
+  print("#define %s_SYM (\"%s\")" % (name, desc["@name"]))
+  hi, lo = [int(x, 0) for x in desc["@code"].split(":")]
+  code = (hi << 32) + lo
+  print("#define %s ((uint64_t) %s)" % (name, code))
+  fields[code] = (type["@name"], [f["@name"] for f in type.query["field"]])
+  idx += 1
+
+print("""
+#include <stddef.h>
+
+typedef struct {
+  const unsigned char name_index;
+  const unsigned char first_field_index;
+  const unsigned char field_count;
+} pn_fields_t;
+
+extern const pn_fields_t FIELDS[];
+extern const uint16_t FIELD_NAME[];
+extern const uint16_t FIELD_FIELDS[];
+""")
+
+print('struct FIELD_STRINGS {')
+print('  const char STRING0[sizeof("")];')
+strings = set()
+for name, fnames in fields.values():
+    strings.add(name)
+    strings.update(fnames)
+for str in strings:
+    istr = str.replace("-", "_");
+    print('  const char FIELD_STRINGS_%s[sizeof("%s")];' % (istr, str))
+print("};")
+print()
+
+print("extern const struct FIELD_STRINGS FIELD_STRINGPOOL;")
+print("#ifdef DEFINE_FIELDS")
+print()
+
+print('const struct FIELD_STRINGS FIELD_STRINGPOOL = {')
+print('  "",')
+for str in strings:
+    print('  "%s",'% str)
+print("};")
+print()
+print("/* This is an array of offsets into FIELD_STRINGPOOL */")
+print("const uint16_t FIELD_NAME[] = {")
+print("  offsetof(struct FIELD_STRINGS, STRING0),")
+index = 1
+for i in range(256):
+  if i in fields:
+    name, fnames = fields[i]
+    iname = name.replace("-", "_");
+    print('  offsetof(struct FIELD_STRINGS, FIELD_STRINGS_%s), /* %d */' % (iname, index))
+    index += 1
+print("};")
+
+print("/* This is an array of offsets into FIELD_STRINGPOOL */")
+print("const uint16_t FIELD_FIELDS[] = {")
+print("  offsetof(struct FIELD_STRINGS, STRING0),")
+index = 1
+for i in range(256):
+  if i in fields:
+    name, fnames = fields[i]
+    if fnames:
+      for f in fnames:
+        ifname = f.replace("-", "_");
+        print('  offsetof(struct FIELD_STRINGS, FIELD_STRINGS_%s), /* %d (%s) */' % (ifname, index, name))
+        index += 1
+print("};")
+
+print("const pn_fields_t FIELDS[] = {")
+
+name_count = 1
+field_count = 1
+field_min = 256
+field_max = 0
+for i in range(256):
+  if i in fields:
+    if i>field_max: field_max = i
+    if i<field_min: field_min = i
+
+for i in range(field_min, field_max+1):
+  if i in fields:
+    name, fnames = fields[i]
+    if fnames:
+      print('  {%d, %d, %d}, /* %d (%s) */' % (name_count, field_count, len(fnames), i, name))
+      field_count += len(fnames)
+    else:
+      print('  {%d, 0, 0}, /* %d (%s) */' % (name_count, i, name))
+    name_count += 1
+    if i>field_max: field_max = i
+    if i<field_min: field_min = i
+  else:
+    print('  {0, 0, 0}, /* %d */' % i)
+
+print("};")
+print()
+print("#endif")
+print()
+print('enum {')
+print('  FIELD_MIN = %d,' % field_min)
+print('  FIELD_MAX = %d' % field_max)
+print('};')
+print()
+print("#endif /* protocol.h */")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/protocol.py
----------------------------------------------------------------------
diff --git a/c/src/protocol.py b/c/src/protocol.py
new file mode 100644
index 0000000..3f04973
--- /dev/null
+++ b/c/src/protocol.py
@@ -0,0 +1,121 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import mllib, os, sys
+
+doc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "transport.xml"))
+mdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "messaging.xml"))
+tdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "transactions.xml"))
+sdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "security.xml"))
+
+def eq(attr, value):
+  return lambda nd: nd[attr] == value
+
+TYPEStmp = doc.query["amqp/section/type", eq("@class", "composite")] + \
+    mdoc.query["amqp/section/type", eq("@class", "composite")] + \
+    tdoc.query["amqp/section/type", eq("@class", "composite")] + \
+    sdoc.query["amqp/section/type", eq("@class", "composite")] + \
+    mdoc.query["amqp/section/type", eq("@provides", "section")]
+TYPES = []
+for ty in TYPEStmp:
+  if not ty in TYPES:
+    TYPES.append(ty)
+RESTRICTIONS = {}
+COMPOSITES = {}
+
+for type in doc.query["amqp/section/type"] + mdoc.query["amqp/section/type"] + \
+    sdoc.query["amqp/section/type"] + tdoc.query["amqp/section/type"]:
+
+  source = type["@source"]
+  if source:
+    RESTRICTIONS[type["@name"]] = source
+  if type["@class"] == "composite":
+    COMPOSITES[type["@name"]] = type
+
+def resolve(name):
+  if name in RESTRICTIONS:
+    return resolve(RESTRICTIONS[name])
+  else:
+    return name
+
+TYPEMAP = {
+  "boolean": ("bool", "", ""),
+  "binary": ("pn_binary_t", "*", ""),
+  "string": ("wchar_t", "*", ""),
+  "symbol": ("char", "*", ""),
+  "ubyte": ("uint8_t", "", ""),
+  "ushort": ("uint16_t", "", ""),
+  "uint": ("uint32_t", "", ""),
+  "ulong": ("uint64_t", "", ""),
+  "timestamp": ("uint64_t", "", ""),
+  "list": ("pn_list_t", "*", ""),
+  "map": ("pn_map_t", "*", ""),
+  "box": ("pn_box_t", "*", ""),
+  "*": ("pn_object_t", "*", "")
+  }
+
+CONSTRUCTORS = {
+  "boolean": "boolean",
+  "string": "string",
+  "symbol": "symbol",
+  "ubyte": "ubyte",
+  "ushort": "ushort",
+  "uint": "uint",
+  "ulong": "ulong",
+  "timestamp": "ulong"
+  }
+
+NULLABLE = set(["string", "symbol"])
+
+def fname(field):
+  return field["@name"].replace("-", "_")
+
+def tname(t):
+  return t["@name"].replace("-", "_")
+
+def multi(f):
+  return f["@multiple"] == "true"
+
+def ftype(field):
+  if multi(field):
+    return "list"
+  elif field["@type"] in COMPOSITES:
+    return "box"
+  else:
+    return resolve(field["@type"]).replace("-", "_")
+
+def fconstruct(field, expr):
+  type = ftype(field)
+  if type in CONSTRUCTORS:
+    result = "pn_%s(mem, %s)" % (CONSTRUCTORS[type], expr)
+    if type in NULLABLE:
+      result = "%s ? %s : NULL" % (expr, result)
+  else:
+    result = expr
+  if multi(field):
+    result = "pn_box(mem, pn_boolean(mem, true), %s)" % result
+  return result
+
+def declaration(field):
+  name = fname(field)
+  type = ftype(field)
+  t, pre, post = TYPEMAP.get(type, (type, "", ""))
+  return t, "%s%s%s" % (pre, name, post)
+
+def field_kw(field):
+  return fname(field).upper()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/acceptor.c
----------------------------------------------------------------------
diff --git a/c/src/reactor/acceptor.c b/c/src/reactor/acceptor.c
new file mode 100644
index 0000000..a044748
--- /dev/null
+++ b/c/src/reactor/acceptor.c
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/sasl.h>
+#include <proton/transport.h>
+#include <proton/connection.h>
+
+#include "io.h"
+#include "reactor.h"
+#include "selectable.h"
+#include "selector.h"
+
+#include <string.h>
+
+pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport);
+
+PN_HANDLE(PNI_ACCEPTOR_HANDLER)
+PN_HANDLE(PNI_ACCEPTOR_SSL_DOMAIN)
+PN_HANDLE(PNI_ACCEPTOR_CONNECTION)
+
+void pni_acceptor_readable(pn_selectable_t *sel) {
+  pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+  char name[1024];
+  pn_socket_t sock = pn_accept(pni_reactor_io(reactor), pn_selectable_get_fd(sel), name, 1024);
+  pn_handler_t *handler = (pn_handler_t *) pn_record_get(pn_selectable_attachments(sel), PNI_ACCEPTOR_HANDLER);
+  if (!handler) { handler = pn_reactor_get_handler(reactor); }
+  pn_record_t *record = pn_selectable_attachments(sel);
+  pn_ssl_domain_t *ssl_domain = (pn_ssl_domain_t *) pn_record_get(record, PNI_ACCEPTOR_SSL_DOMAIN);
+  pn_connection_t *conn = pn_reactor_connection(reactor, handler);
+  if (name[0]) { // store the peer address of connection in <host>:<port> format
+    char *port = strrchr(name, ':');   // last : separates the port #
+    *port++ = '\0';
+    pni_reactor_set_connection_peer_address(conn, name, port);
+  }
+  pn_transport_t *trans = pn_transport();
+  pn_transport_set_server(trans);
+  if (ssl_domain) {
+    pn_ssl_t *ssl = pn_ssl(trans);
+    pn_ssl_init(ssl, ssl_domain, 0);
+  }
+  pn_transport_bind(trans, conn);
+  pn_decref(trans);
+  pn_reactor_selectable_transport(reactor, sock, trans);
+  record = pn_connection_attachments(conn);
+  pn_record_def(record, PNI_ACCEPTOR_CONNECTION, PN_OBJECT);
+  pn_record_set(record, PNI_ACCEPTOR_CONNECTION, sel);
+
+}
+
+void pni_acceptor_finalize(pn_selectable_t *sel) {
+  pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+  if (pn_selectable_get_fd(sel) != PN_INVALID_SOCKET) {
+    pn_close(pni_reactor_io(reactor), pn_selectable_get_fd(sel));
+  }
+}
+
+pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, const char *port, pn_handler_t *handler) {
+  pn_socket_t socket = pn_listen(pni_reactor_io(reactor), host, port);
+  if (socket == PN_INVALID_SOCKET) {
+    return NULL;
+  }
+  pn_selectable_t *sel = pn_reactor_selectable(reactor);
+  pn_selectable_set_fd(sel, socket);
+  pn_selectable_on_readable(sel, pni_acceptor_readable);
+  pn_selectable_on_finalize(sel, pni_acceptor_finalize);
+  pni_record_init_reactor(pn_selectable_attachments(sel), reactor);
+  pn_record_t *record = pn_selectable_attachments(sel);
+  pn_record_def(record, PNI_ACCEPTOR_HANDLER, PN_OBJECT);
+  pn_record_set(record, PNI_ACCEPTOR_HANDLER, handler);
+  pn_selectable_set_reading(sel, true);
+  pn_reactor_update(reactor, sel);
+  return (pn_acceptor_t *) sel;
+}
+
+void pn_acceptor_close(pn_acceptor_t *acceptor) {
+  pn_selectable_t *sel = (pn_selectable_t *) acceptor;
+  if (!pn_selectable_is_terminal(sel)) {
+    pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+    pn_socket_t socket = pn_selectable_get_fd(sel);
+    pn_close(pni_reactor_io(reactor), socket);
+    pn_selectable_set_fd(sel, PN_INVALID_SOCKET);
+    pn_selectable_terminate(sel);
+    pn_reactor_update(reactor, sel);
+  }
+}
+
+void pn_acceptor_set_ssl_domain(pn_acceptor_t *acceptor, pn_ssl_domain_t *domain)
+{
+  pn_selectable_t *sel = (pn_selectable_t *) acceptor;
+  pn_record_t *record = pn_selectable_attachments(sel);
+  pn_record_def(record, PNI_ACCEPTOR_SSL_DOMAIN, PN_VOID);
+  pn_record_set(record, PNI_ACCEPTOR_SSL_DOMAIN, domain);
+}
+
+pn_acceptor_t *pn_connection_acceptor(pn_connection_t *conn) {
+  // Return the acceptor that created the connection or NULL if an outbound connection
+  pn_record_t *record = pn_connection_attachments(conn);
+  return (pn_acceptor_t *) pn_record_get(record, PNI_ACCEPTOR_CONNECTION);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/connection.c
----------------------------------------------------------------------
diff --git a/c/src/reactor/connection.c b/c/src/reactor/connection.c
new file mode 100644
index 0000000..090947c
--- /dev/null
+++ b/c/src/reactor/connection.c
@@ -0,0 +1,370 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/object.h>
+#include <proton/sasl.h>
+#include <proton/ssl.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+#include <assert.h>
+#include <stdio.h>
+#include <string.h>
+#include "io.h"
+#include "selectable.h"
+#include "reactor.h"
+
+// XXX: overloaded for both directions
+PN_HANDLE(PN_TRANCTX)
+PN_HANDLE(PNI_CONN_PEER_ADDRESS)
+
+void pni_reactor_set_connection_peer_address(pn_connection_t *connection,
+                                             const char *host,
+                                             const char *port)
+{
+    pn_url_t *url = pn_url();
+    pn_url_set_host(url, host);
+    pn_url_set_port(url, port);
+    pn_record_t *record = pn_connection_attachments(connection);
+    if (!pn_record_has(record, PNI_CONN_PEER_ADDRESS)) {
+      pn_record_def(record, PNI_CONN_PEER_ADDRESS, PN_OBJECT);
+    }
+    pn_record_set(record, PNI_CONN_PEER_ADDRESS, url);
+    pn_decref(url);
+}
+
+static pn_transport_t *pni_transport(pn_selectable_t *sel) {
+  pn_record_t *record = pn_selectable_attachments(sel);
+  return (pn_transport_t *) pn_record_get(record, PN_TRANCTX);
+}
+
+static ssize_t pni_connection_capacity(pn_selectable_t *sel)
+{
+  pn_transport_t *transport = pni_transport(sel);
+  ssize_t capacity = pn_transport_capacity(transport);
+  if (capacity < 0) {
+    if (pn_transport_closed(transport)) {
+      pn_selectable_terminate(sel);
+    }
+  }
+  return capacity;
+}
+
+static ssize_t pni_connection_pending(pn_selectable_t *sel)
+{
+  pn_transport_t *transport = pni_transport(sel);
+  ssize_t pending = pn_transport_pending(transport);
+  if (pending < 0) {
+    if (pn_transport_closed(transport)) {
+      pn_selectable_terminate(sel);
+    }
+  }
+  return pending;
+}
+
+static pn_timestamp_t pni_connection_deadline(pn_selectable_t *sel)
+{
+  pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+  pn_transport_t *transport = pni_transport(sel);
+  pn_timestamp_t deadline = pn_transport_tick(transport, pn_reactor_now(reactor));
+  return deadline;
+}
+
+static void pni_connection_update(pn_selectable_t *sel) {
+  ssize_t c = pni_connection_capacity(sel);
+  ssize_t p = pni_connection_pending(sel);
+  pn_selectable_set_reading(sel, c > 0);
+  pn_selectable_set_writing(sel, p > 0);
+  pn_selectable_set_deadline(sel, pni_connection_deadline(sel));
+}
+
+void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event) {
+  assert(reactor);
+  pn_transport_t *transport = pn_event_transport(event);
+  pn_record_t *record = pn_transport_attachments(transport);
+  pn_selectable_t *sel = (pn_selectable_t *) pn_record_get(record, PN_TRANCTX);
+  if (sel && !pn_selectable_is_terminal(sel)) {
+    pni_connection_update(sel);
+    pn_reactor_update(reactor, sel);
+  }
+}
+
+pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport);
+
+void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event) {
+  assert(reactor);
+  assert(event);
+
+  pn_connection_t *conn = pn_event_connection(event);
+  if (!(pn_connection_state(conn) & PN_REMOTE_UNINIT)) {
+    return;
+  }
+
+  pn_transport_t *transport = pn_transport();
+  pn_transport_bind(transport, conn);
+  pn_decref(transport);
+}
+
+void pni_handle_bound(pn_reactor_t *reactor, pn_event_t *event) {
+  assert(reactor);
+  assert(event);
+
+  pn_connection_t *conn = pn_event_connection(event);
+  pn_transport_t *transport = pn_event_transport(event);
+  pn_record_t *record = pn_connection_attachments(conn);
+  pn_url_t *url = (pn_url_t *)pn_record_get(record, PNI_CONN_PEER_ADDRESS);
+  const char *host = NULL;
+  const char *port = "5672";
+  pn_string_t *str = NULL;
+
+  // link the new transport to its reactor:
+  pni_record_init_reactor(pn_transport_attachments(transport), reactor);
+
+  if (pn_connection_acceptor(conn) != NULL) {
+      // this connection was created by the acceptor.  There is already a
+      // socket assigned to this connection.  Nothing needs to be done.
+      return;
+  }
+
+  if (url) {
+      host = pn_url_get_host(url);
+      const char *uport = pn_url_get_port(url);
+      if (uport) {
+          port = uport;
+      } else {
+          const char *scheme = pn_url_get_scheme(url);
+          if (scheme && strcmp(scheme, "amqps") == 0) {
+              port = "5671";
+          }
+      }
+      if (!pn_connection_get_user(conn)) {
+          // user did not manually set auth info
+          const char *user = pn_url_get_username(url);
+          if (user) pn_connection_set_user(conn, user);
+          const char *passwd = pn_url_get_password(url);
+          if (passwd) pn_connection_set_password(conn, passwd);
+      }
+  } else {
+      // for backward compatibility, see if the connection's hostname can be
+      // used for the remote address.  See JIRA PROTON-1133
+      const char *hostname = pn_connection_get_hostname(conn);
+      if (hostname) {
+          str = pn_string(hostname);
+          char *h = pn_string_buffer(str);
+          // see if a port has been included in the hostname.  This is not
+          // allowed by the spec, but the old reactor interface allowed it.
+          char *colon = strrchr(h, ':');
+          if (colon) {
+              *colon = '\0';
+              port = colon + 1;
+          }
+          host = h;
+      }
+  }
+
+  if (!host) {
+      // error: no address configured
+      pn_condition_t *cond = pn_transport_condition(transport);
+      pn_condition_set_name(cond, "proton:io");
+      pn_condition_set_description(cond, "Connection failed: no address configured");
+      pn_transport_close_tail(transport);
+      pn_transport_close_head(transport);
+  } else {
+      pn_socket_t sock = pn_connect(pni_reactor_io(reactor), host, port);
+      // invalid sockets are ignored by poll, so we need to do this manually
+      if (sock == PN_INVALID_SOCKET) {
+          pn_condition_t *cond = pn_transport_condition(transport);
+          pn_condition_set_name(cond, "proton:io");
+          pn_condition_set_description(cond, pn_error_text(pn_reactor_error(reactor)));
+          pn_transport_close_tail(transport);
+          pn_transport_close_head(transport);
+      } else {
+          pn_reactor_selectable_transport(reactor, sock, transport);
+      }
+  }
+  pn_free(str);
+}
+
+void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event) {
+  assert(reactor);
+  assert(event);
+  pn_connection_t *conn = pn_event_connection(event);
+  pn_list_remove(pn_reactor_children(reactor), conn);
+}
+
+static void pni_connection_readable(pn_selectable_t *sel)
+{
+  pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+  pn_transport_t *transport = pni_transport(sel);
+  ssize_t capacity = pn_transport_capacity(transport);
+  if (capacity > 0) {
+    ssize_t n = pn_recv(pni_reactor_io(reactor), pn_selectable_get_fd(sel),
+                        pn_transport_tail(transport), capacity);
+    if (n <= 0) {
+      if (n == 0 || !pn_wouldblock(pni_reactor_io(reactor))) {
+        if (n < 0) {
+          pn_condition_t *cond = pn_transport_condition(transport);
+          pn_condition_set_name(cond, "proton:io");
+          pn_condition_set_description(cond, pn_error_text(pn_reactor_error(reactor)));
+        }
+        pn_transport_close_tail(transport);
+      }
+    } else {
+      pn_transport_process(transport, (size_t)n);
+    }
+  }
+
+  ssize_t newcap = pn_transport_capacity(transport);
+  //occasionally transport events aren't generated when expected, so
+  //the following hack ensures we always update the selector
+  if (1 || newcap != capacity) {
+    pni_connection_update(sel);
+    pn_reactor_update(reactor, sel);
+  }
+}
+
+static void pni_connection_writable(pn_selectable_t *sel)
+{
+  pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+  pn_transport_t *transport = pni_transport(sel);
+  ssize_t pending = pn_transport_pending(transport);
+  if (pending > 0) {
+    ssize_t n = pn_send(pni_reactor_io(reactor), pn_selectable_get_fd(sel),
+                        pn_transport_head(transport), pending);
+    if (n < 0) {
+      if (!pn_wouldblock(pni_reactor_io(reactor))) {
+        pn_condition_t *cond = pn_transport_condition(transport);
+        if (!pn_condition_is_set(cond)) {
+          pn_condition_set_name(cond, "proton:io");
+          pn_condition_set_description(cond, pn_error_text(pn_reactor_error(reactor)));
+        }
+        pn_transport_close_head(transport);
+      }
+    } else {
+      pn_transport_pop(transport, n);
+    }
+  }
+
+  ssize_t newpending = pn_transport_pending(transport);
+  if (newpending != pending) {
+    pni_connection_update(sel);
+    pn_reactor_update(reactor, sel);
+  }
+}
+
+static void pni_connection_error(pn_selectable_t *sel) {
+  pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+  pn_transport_t *transport = pni_transport(sel);
+  pn_transport_close_head(transport);
+  pn_transport_close_tail(transport);
+  pn_selectable_terminate(sel);
+  pn_reactor_update(reactor, sel);
+}
+
+static void pni_connection_expired(pn_selectable_t *sel) {
+  pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+  pn_transport_t *transport = pni_transport(sel);
+  pn_timestamp_t deadline = pn_transport_tick(transport, pn_reactor_now(reactor));
+  pn_selectable_set_deadline(sel, deadline);
+  ssize_t c = pni_connection_capacity(sel);
+  ssize_t p = pni_connection_pending(sel);
+  pn_selectable_set_reading(sel, c > 0);
+  pn_selectable_set_writing(sel, p > 0);
+  pn_reactor_update(reactor, sel);
+}
+
+static void pni_connection_finalize(pn_selectable_t *sel) {
+  pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel);
+  pn_transport_t *transport = pni_transport(sel);
+  pn_record_t *record = pn_transport_attachments(transport);
+  pn_record_set(record, PN_TRANCTX, NULL);
+  pn_socket_t fd = pn_selectable_get_fd(sel);
+  pn_close(pni_reactor_io(reactor), fd);
+}
+
+pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport) {
+  pn_selectable_t *sel = pn_reactor_selectable(reactor);
+  pn_selectable_set_fd(sel, sock);
+  pn_selectable_on_readable(sel, pni_connection_readable);
+  pn_selectable_on_writable(sel, pni_connection_writable);
+  pn_selectable_on_error(sel, pni_connection_error);
+  pn_selectable_on_expired(sel, pni_connection_expired);
+  pn_selectable_on_finalize(sel, pni_connection_finalize);
+  pn_record_t *record = pn_selectable_attachments(sel);
+  pn_record_def(record, PN_TRANCTX, PN_OBJECT);
+  pn_record_set(record, PN_TRANCTX, transport);
+  pn_record_t *tr = pn_transport_attachments(transport);
+  pn_record_def(tr, PN_TRANCTX, PN_WEAKREF);
+  pn_record_set(tr, PN_TRANCTX, sel);
+  pni_connection_update(sel);
+  pn_reactor_update(reactor, sel);
+  return sel;
+}
+
+pn_connection_t *pn_reactor_connection(pn_reactor_t *reactor, pn_handler_t *handler) {
+  assert(reactor);
+  pn_connection_t *connection = pn_connection();
+  pn_record_t *record = pn_connection_attachments(connection);
+  pn_record_set_handler(record, handler);
+  pn_connection_collect(connection, pn_reactor_collector(reactor));
+  pn_list_add(pn_reactor_children(reactor), connection);
+  pni_record_init_reactor(record, reactor);
+  pn_decref(connection);
+  return connection;
+}
+
+pn_connection_t *pn_reactor_connection_to_host(pn_reactor_t *reactor,
+                                               const char *host,
+                                               const char *port,
+                                               pn_handler_t *handler) {
+    pn_connection_t *connection = pn_reactor_connection(reactor, handler);
+    pn_reactor_set_connection_host(reactor, connection, host, port);
+    return connection;
+}
+
+
+void pn_reactor_set_connection_host(pn_reactor_t *reactor,
+                                    pn_connection_t *connection,
+                                    const char *host,
+                                    const char *port)
+{
+    (void)reactor;  // ignored
+    if (pn_connection_acceptor(connection) != NULL) {
+        // this is an inbound connection created by the acceptor. The peer
+        // address cannot be modified.
+        return;
+    }
+    pni_reactor_set_connection_peer_address(connection, host, port);
+}
+
+
+const char *pn_reactor_get_connection_address(pn_reactor_t *reactor,
+                                              pn_connection_t *connection)
+{
+    (void)reactor;  // ignored
+    if (!connection) return NULL;
+    pn_record_t *record = pn_connection_attachments(connection);
+    pn_url_t *url = (pn_url_t *)pn_record_get(record, PNI_CONN_PEER_ADDRESS);
+    if (url) {
+        return pn_url_str(url);
+    }
+    return NULL;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/handler.c
----------------------------------------------------------------------
diff --git a/c/src/reactor/handler.c b/c/src/reactor/handler.c
new file mode 100644
index 0000000..2f86afd
--- /dev/null
+++ b/c/src/reactor/handler.c
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/object.h>
+#include <proton/reactor.h>
+#include <proton/event.h>
+#include <string.h>
+#include <assert.h>
+
+struct pn_handler_t {
+  void (*dispatch) (pn_handler_t *, pn_event_t *, pn_event_type_t);
+  void (*finalize) (pn_handler_t *);
+  pn_list_t *children;
+};
+
+void pn_handler_initialize(void *object) {
+  pn_handler_t *handler = (pn_handler_t *) object;
+  handler->dispatch = NULL;
+  handler->children = NULL;
+}
+
+void pn_handler_finalize(void *object) {
+  pn_handler_t *handler = (pn_handler_t *) object;
+  if (handler->finalize) {
+    handler->finalize(handler);
+  }
+  pn_free(handler->children);
+}
+
+#define pn_handler_hashcode NULL
+#define pn_handler_compare NULL
+#define pn_handler_inspect NULL
+
+pn_handler_t *pn_handler(void (*dispatch)(pn_handler_t *, pn_event_t *, pn_event_type_t)) {
+  return pn_handler_new(dispatch, 0, NULL);
+}
+
+pn_handler_t *pn_handler_new(void (*dispatch)(pn_handler_t *, pn_event_t *, pn_event_type_t), size_t size,
+                             void (*finalize)(pn_handler_t *)) {
+  static const pn_class_t clazz = PN_CLASS(pn_handler);
+  pn_handler_t *handler = (pn_handler_t *) pn_class_new(&clazz, sizeof(pn_handler_t) + size);
+  handler->dispatch = dispatch;
+  handler->finalize = finalize;
+  memset(pn_handler_mem(handler), 0, size);
+  return handler;
+}
+
+void pn_handler_free(pn_handler_t *handler) {
+  if (handler) {
+    if (handler->children) {
+      size_t n = pn_list_size(handler->children);
+      for (size_t i = 0; i < n; i++) {
+        void *child = pn_list_get(handler->children, i);
+        pn_decref(child);
+      }
+    }
+
+    pn_decref(handler);
+  }
+}
+
+void *pn_handler_mem(pn_handler_t *handler) {
+  return (void *) (handler + 1);
+}
+
+void pn_handler_add(pn_handler_t *handler, pn_handler_t *child) {
+  assert(handler);
+  if (!handler->children) {
+    handler->children = pn_list(PN_OBJECT, 0);
+  }
+  pn_list_add(handler->children, child);
+}
+
+void pn_handler_clear(pn_handler_t *handler) {
+  assert(handler);
+  if (handler->children) {
+    pn_list_clear(handler->children);
+  }
+}
+
+void pn_handler_dispatch(pn_handler_t *handler, pn_event_t *event, pn_event_type_t type) {
+  assert(handler);
+  if (handler->dispatch) {
+    handler->dispatch(handler, event, type);
+  }
+  if (handler->children) {
+    size_t n = pn_list_size(handler->children);
+    for (size_t i = 0; i < n; i++) {
+      pn_handler_t *child = (pn_handler_t *) pn_list_get(handler->children, i);
+      pn_handler_dispatch(child, event, type);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/io.h
----------------------------------------------------------------------
diff --git a/c/src/reactor/io.h b/c/src/reactor/io.h
new file mode 100644
index 0000000..24596ec
--- /dev/null
+++ b/c/src/reactor/io.h
@@ -0,0 +1,70 @@
+#ifndef PROTON_IO_H
+#define PROTON_IO_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "selector.h"
+
+#include <proton/import_export.h>
+#include <proton/error.h>
+#include <proton/type_compat.h>
+#include <stddef.h>
+
+/**
+ * A ::pn_io_t manages IO for a group of pn_socket_t handles.  A
+ * pn_io_t object may have zero or one pn_selector_t selectors
+ * associated with it (see ::pn_io_selector()).  If one is associated,
+ * all the pn_socket_t handles managed by a pn_io_t must use that
+ * pn_selector_t instance.
+ *
+ * The pn_io_t interface is single-threaded. All methods are intended
+ * to be used by one thread at a time, except that multiple threads
+ * may use:
+ *
+ *   ::pn_write()
+ *   ::pn_send()
+ *   ::pn_recv()
+ *   ::pn_close()
+ *   ::pn_selector_select()
+ *
+ * provided at most one thread is calling ::pn_selector_select() and
+ * the other threads are operating on separate pn_socket_t handles.
+ */
+typedef struct pn_io_t pn_io_t;
+
+pn_io_t *pn_io(void);
+void pn_io_free(pn_io_t *io);
+pn_error_t *pn_io_error(pn_io_t *io);
+pn_socket_t pn_connect(pn_io_t *io, const char *host, const char *port);
+pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port);
+
+pn_socket_t pn_accept(pn_io_t *io, pn_socket_t socket, char *name, size_t size);
+void pn_close(pn_io_t *io, pn_socket_t socket);
+ssize_t pn_send(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size);
+ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size);
+int pn_pipe(pn_io_t *io, pn_socket_t *dest);
+ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size);
+ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size);
+bool pn_wouldblock(pn_io_t *io);
+pn_selector_t *pn_io_selector(pn_io_t *io);
+
+#endif /* io.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/io/posix/io.c
----------------------------------------------------------------------
diff --git a/c/src/reactor/io/posix/io.c b/c/src/reactor/io/posix/io.c
new file mode 100644
index 0000000..5a0de3b
--- /dev/null
+++ b/c/src/reactor/io/posix/io.c
@@ -0,0 +1,342 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "reactor/io.h"
+#include "reactor/selector.h"
+#include "platform/platform.h" // pn_i_error_from_errno
+
+#include <proton/object.h>
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <assert.h>
+
+#define MAX_HOST (1024)
+#define MAX_SERV (64)
+
+struct pn_io_t {
+  char host[MAX_HOST];
+  char serv[MAX_SERV];
+  pn_error_t *error;
+  pn_selector_t *selector;
+  bool wouldblock;
+};
+
+void pn_io_initialize(void *obj)
+{
+  pn_io_t *io = (pn_io_t *) obj;
+  io->error = pn_error();
+  io->wouldblock = false;
+  io->selector = NULL;
+}
+
+void pn_io_finalize(void *obj)
+{
+  pn_io_t *io = (pn_io_t *) obj;
+  pn_error_free(io->error);
+}
+
+#define pn_io_hashcode NULL
+#define pn_io_compare NULL
+#define pn_io_inspect NULL
+
+pn_io_t *pn_io(void)
+{
+  static const pn_class_t clazz = PN_CLASS(pn_io);
+  pn_io_t *io = (pn_io_t *) pn_class_new(&clazz, sizeof(pn_io_t));
+  return io;
+}
+
+void pn_io_free(pn_io_t *io)
+{
+  pn_free(io);
+}
+
+pn_error_t *pn_io_error(pn_io_t *io)
+{
+  assert(io);
+  return io->error;
+}
+
+int pn_pipe(pn_io_t *io, pn_socket_t *dest)
+{
+  int n = pipe(dest);
+  if (n) {
+    pn_i_error_from_errno(io->error, "pipe");
+  }
+
+  return n;
+}
+
+static void pn_configure_sock(pn_io_t *io, pn_socket_t sock) {
+  // this would be nice, but doesn't appear to exist on linux
+  /*
+  int set = 1;
+  if (!setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int))) {
+    pn_i_error_from_errno(io->error, "setsockopt");
+  };
+  */
+
+  int flags = fcntl(sock, F_GETFL);
+  flags |= O_NONBLOCK;
+
+  if (fcntl(sock, F_SETFL, flags) < 0) {
+    pn_i_error_from_errno(io->error, "fcntl");
+  }
+
+  //
+  // Disable the Nagle algorithm on TCP connections.
+  //
+  // Note:  It would be more correct for the "level" argument to be SOL_TCP.  However, there
+  //        are portability issues with this macro so we use IPPROTO_TCP instead.
+  //
+  int tcp_nodelay = 1;
+  if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void*) &tcp_nodelay, sizeof(tcp_nodelay)) < 0) {
+    pn_i_error_from_errno(io->error, "setsockopt");
+  }
+}
+
+static inline int pn_create_socket(int af, int protocol);
+
+pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port)
+{
+  struct addrinfo *addr;
+  struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM};
+  int code = getaddrinfo(host, port, &hints, &addr);
+  if (code) {
+    pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code));
+    return PN_INVALID_SOCKET;
+  }
+
+  pn_socket_t sock = pn_create_socket(addr->ai_family, addr->ai_protocol);
+  if (sock == PN_INVALID_SOCKET) {
+    freeaddrinfo(addr);
+    pn_i_error_from_errno(io->error, "pn_create_socket");
+    return PN_INVALID_SOCKET;
+  }
+
+  int optval = 1;
+  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) {
+    pn_i_error_from_errno(io->error, "setsockopt");
+    freeaddrinfo(addr);
+    close(sock);
+    return PN_INVALID_SOCKET;
+  }
+
+  if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+    pn_i_error_from_errno(io->error, "bind");
+    freeaddrinfo(addr);
+    close(sock);
+    return PN_INVALID_SOCKET;
+  }
+
+  freeaddrinfo(addr);
+
+  if (listen(sock, 50) == -1) {
+    pn_i_error_from_errno(io->error, "listen");
+    close(sock);
+    return PN_INVALID_SOCKET;
+  }
+
+  return sock;
+}
+
+pn_socket_t pn_connect(pn_io_t *io, const char *host, const char *port)
+{
+  struct addrinfo *addr;
+  struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM};
+  int code = getaddrinfo(host, port, &hints, &addr);
+  if (code) {
+    pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code));
+    return PN_INVALID_SOCKET;
+  }
+
+  pn_socket_t sock = pn_create_socket(addr->ai_family, addr->ai_protocol);
+  if (sock == PN_INVALID_SOCKET) {
+    pn_i_error_from_errno(io->error, "pn_create_socket");
+    freeaddrinfo(addr);
+    return PN_INVALID_SOCKET;
+  }
+
+  pn_configure_sock(io, sock);
+
+  if (connect(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+    if (errno != EINPROGRESS) {
+      pn_i_error_from_errno(io->error, "connect");
+      freeaddrinfo(addr);
+      close(sock);
+      return PN_INVALID_SOCKET;
+    }
+  }
+
+  freeaddrinfo(addr);
+
+  return sock;
+}
+
+pn_socket_t pn_accept(pn_io_t *io, pn_socket_t socket, char *name, size_t size)
+{
+  struct sockaddr_storage addr;
+  socklen_t addrlen = sizeof(addr);
+  *name = '\0';
+  pn_socket_t sock = accept(socket, (struct sockaddr *) &addr, &addrlen);
+  if (sock == PN_INVALID_SOCKET) {
+    pn_i_error_from_errno(io->error, "accept");
+    return sock;
+  } else {
+    int code;
+    if ((code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, MAX_HOST, io->serv, MAX_SERV, 0))) {
+      pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code));
+      if (close(sock) == -1)
+        pn_i_error_from_errno(io->error, "close");
+      return PN_INVALID_SOCKET;
+    } else {
+      pn_configure_sock(io, sock);
+      pni_snprintf(name, size, "%s:%s", io->host, io->serv);
+      return sock;
+    }
+  }
+}
+
+/* Abstract away turning off SIGPIPE */
+#ifdef MSG_NOSIGNAL
+ssize_t pn_send(pn_io_t *io, pn_socket_t socket, const void *buf, size_t len) {
+  ssize_t count = send(socket, buf, len, MSG_NOSIGNAL);
+  io->wouldblock = (errno == EAGAIN || errno == EWOULDBLOCK);
+  if (count < 0) { pn_i_error_from_errno(io->error, "send"); }
+  return count;
+}
+
+static inline int pn_create_socket(int af, int protocol) {
+  return socket(af, SOCK_STREAM, protocol);
+}
+#elif defined(SO_NOSIGPIPE)
+ssize_t pn_send(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size) {
+  ssize_t count = send(socket, buf, size, 0);
+  io->wouldblock = (errno == EAGAIN || errno == EWOULDBLOCK);
+  if (count < 0) { pn_i_error_from_errno(io->error, "send"); }
+  return count;
+}
+
+static inline int pn_create_socket(int af, int protocol) {
+  int sock;
+  sock = socket(af, SOCK_STREAM, protocol);
+  if (sock == -1) return sock;
+
+  int optval = 1;
+  if (setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof(optval)) == -1) {
+    close(sock);
+    return -1;
+  }
+  return sock;
+}
+#else
+
+#include <signal.h>
+
+static inline int pn_create_socket(int af, int protocol) {
+  return socket(af, SOCK_STREAM, protocol);
+}
+
+static ssize_t nosigpipe_send(int fd, const void *buffer, size_t size) {
+  sigset_t pendingSignals, oldSignals, newSignals;
+  ssize_t count;
+  int sendErrno, sigmaskErr;
+
+  sigpending(&pendingSignals);
+  int sigpipeIsPending = sigismember(&pendingSignals, SIGPIPE);
+  if (!sigpipeIsPending) {
+    sigemptyset(&newSignals);
+    sigaddset(&newSignals, SIGPIPE);
+    if (sigmaskErr = pthread_sigmask(SIG_BLOCK, (const sigset_t *)&newSignals, (sigset_t *)&oldSignals))
+    {
+      errno = sigmaskErr;
+      return -1;
+    }
+  }
+
+  count = send(fd, buffer, size, 0);
+  if (!sigpipeIsPending) {
+    sendErrno = errno;
+    if (count == -1 && errno == EPIPE) {
+      while (-1 == sigtimedwait(&newSignals, NULL, &(struct timespec){ 0, 0 }) && errno == EINTR)
+        ; //do nothing
+    }
+    if (sigmaskErr = pthread_sigmask(SIG_SETMASK, (const sigset_t *)&oldSignals, (sigset_t *)NULL))
+    {
+      errno = sigmaskErr;
+      return -1;
+    }
+    errno = sendErrno;
+  }
+  return count;
+}
+
+ssize_t pn_send(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size) {
+  ssize_t count = nosigpipe_send(socket, buf, size);
+  io->wouldblock = (errno == EAGAIN || errno == EWOULDBLOCK);
+  if (count < 0) { pn_i_error_from_errno(io->error, "send"); }
+  return count;
+}
+#endif
+
+ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
+{
+  ssize_t count = recv(socket, buf, size, 0);
+  io->wouldblock = count < 0 && (errno == EAGAIN || errno == EWOULDBLOCK);
+  if (count < 0) { pn_i_error_from_errno(io->error, "recv"); }
+  return count;
+}
+
+ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size)
+{
+  return write(socket, buf, size);
+}
+
+ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
+{
+  return read(socket, buf, size);
+}
+
+void pn_close(pn_io_t *io, pn_socket_t socket)
+{
+  close(socket);
+}
+
+bool pn_wouldblock(pn_io_t *io)
+{
+  return io->wouldblock;
+}
+
+pn_selector_t *pn_io_selector(pn_io_t *io)
+{
+  if (io->selector == NULL)
+    io->selector = pni_selector();
+  return io->selector;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/io/posix/selector.c
----------------------------------------------------------------------
diff --git a/c/src/reactor/io/posix/selector.c b/c/src/reactor/io/posix/selector.c
new file mode 100644
index 0000000..bf6882a
--- /dev/null
+++ b/c/src/reactor/io/posix/selector.c
@@ -0,0 +1,214 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "core/util.h"
+#include "platform/platform.h" // pn_i_now, pn_i_error_from_errno
+#include "reactor/io.h"
+#include "reactor/selector.h"
+#include "reactor/selectable.h"
+
+#include <proton/error.h>
+
+#include <poll.h>
+#include <stdlib.h>
+#include <assert.h>
+
+struct pn_selector_t {
+  struct pollfd *fds;
+  pn_timestamp_t *deadlines;
+  size_t capacity;
+  pn_list_t *selectables;
+  size_t current;
+  pn_timestamp_t awoken;
+  pn_error_t *error;
+};
+
+void pn_selector_initialize(void *obj)
+{
+  pn_selector_t *selector = (pn_selector_t *) obj;
+  selector->fds = NULL;
+  selector->deadlines = NULL;
+  selector->capacity = 0;
+  selector->selectables = pn_list(PN_WEAKREF, 0);
+  selector->current = 0;
+  selector->awoken = 0;
+  selector->error = pn_error();
+}
+
+void pn_selector_finalize(void *obj)
+{
+  pn_selector_t *selector = (pn_selector_t *) obj;
+  free(selector->fds);
+  free(selector->deadlines);
+  pn_free(selector->selectables);
+  pn_error_free(selector->error);
+}
+
+#define pn_selector_hashcode NULL
+#define pn_selector_compare NULL
+#define pn_selector_inspect NULL
+
+pn_selector_t *pni_selector(void)
+{
+  static const pn_class_t clazz = PN_CLASS(pn_selector);
+  pn_selector_t *selector = (pn_selector_t *) pn_class_new(&clazz, sizeof(pn_selector_t));
+  return selector;
+}
+
+void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable)
+{
+  assert(selector);
+  assert(selectable);
+  assert(pni_selectable_get_index(selectable) < 0);
+
+  if (pni_selectable_get_index(selectable) < 0) {
+    pn_list_add(selector->selectables, selectable);
+    size_t size = pn_list_size(selector->selectables);
+
+    if (selector->capacity < size) {
+      selector->fds = (struct pollfd *) realloc(selector->fds, size*sizeof(struct pollfd));
+      selector->deadlines = (pn_timestamp_t *) realloc(selector->deadlines, size*sizeof(pn_timestamp_t));
+      selector->capacity = size;
+    }
+
+    pni_selectable_set_index(selectable, size - 1);
+  }
+
+  pn_selector_update(selector, selectable);
+}
+
+void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable)
+{
+  int idx = pni_selectable_get_index(selectable);
+  assert(idx >= 0);
+  selector->fds[idx].fd = pn_selectable_get_fd(selectable);
+  selector->fds[idx].events = 0;
+  selector->fds[idx].revents = 0;
+  if (pn_selectable_is_reading(selectable)) {
+    selector->fds[idx].events |= POLLIN;
+  }
+  if (pn_selectable_is_writing(selectable)) {
+    selector->fds[idx].events |= POLLOUT;
+  }
+  selector->deadlines[idx] = pn_selectable_get_deadline(selectable);
+}
+
+void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable)
+{
+  assert(selector);
+  assert(selectable);
+
+  int idx = pni_selectable_get_index(selectable);
+  assert(idx >= 0);
+  pn_list_del(selector->selectables, idx, 1);
+  size_t size = pn_list_size(selector->selectables);
+  for (size_t i = idx; i < size; i++) {
+    pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i);
+    pni_selectable_set_index(sel, i);
+    selector->fds[i] = selector->fds[i + 1];
+  }
+
+  pni_selectable_set_index(selectable, -1);
+
+  if (selector->current >= (size_t) idx) {
+    selector->current--;
+  }
+}
+
+size_t pn_selector_size(pn_selector_t *selector) {
+  assert(selector);
+  return pn_list_size(selector->selectables);
+}
+
+int pn_selector_select(pn_selector_t *selector, int timeout)
+{
+  assert(selector);
+
+  size_t size = pn_list_size(selector->selectables);
+
+  if (timeout) {
+    pn_timestamp_t deadline = 0;
+    for (size_t i = 0; i < size; i++) {
+      pn_timestamp_t d = selector->deadlines[i];
+      if (d)
+        deadline = (deadline == 0) ? d : pn_min(deadline, d);
+    }
+
+    if (deadline) {
+      pn_timestamp_t now = pn_i_now();
+      int64_t delta = deadline - now;
+      if (delta < 0) {
+        timeout = 0;
+      } else if (delta < timeout) {
+        timeout = delta;
+      }
+    }
+  }
+
+  int error = 0;
+  int result = poll(selector->fds, size, timeout);
+  if (result == -1) {
+    error = pn_i_error_from_errno(selector->error, "poll");
+  } else {
+    selector->current = 0;
+    selector->awoken = pn_i_now();
+  }
+
+  return error;
+}
+
+pn_selectable_t *pn_selector_next(pn_selector_t *selector, int *events)
+{
+  pn_list_t *l = selector->selectables;
+  size_t size = pn_list_size(l);
+  while (selector->current < size) {
+    pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(l, selector->current);
+    struct pollfd *pfd = &selector->fds[selector->current];
+    pn_timestamp_t deadline = selector->deadlines[selector->current];
+    int ev = 0;
+    if (pfd->revents & POLLIN) {
+      ev |= PN_READABLE;
+    }
+    if ((pfd->revents & POLLERR) ||
+        (pfd->revents & POLLHUP) ||
+        (pfd->revents & POLLNVAL)) {
+      ev |= PN_ERROR;
+    }
+    if (pfd->revents & POLLOUT) {
+      ev |= PN_WRITABLE;
+    }
+    if (deadline && selector->awoken >= deadline) {
+      ev |= PN_EXPIRED;
+    }
+    selector->current++;
+    if (ev) {
+      *events = ev;
+      return sel;
+    }
+  }
+  return NULL;
+}
+
+void pn_selector_free(pn_selector_t *selector)
+{
+  assert(selector);
+  pn_free(selector);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/reactor/io/windows/io.c
----------------------------------------------------------------------
diff --git a/c/src/reactor/io/windows/io.c b/c/src/reactor/io/windows/io.c
new file mode 100644
index 0000000..07692d1
--- /dev/null
+++ b/c/src/reactor/io/windows/io.c
@@ -0,0 +1,464 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#define FD_SETSIZE 2048
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <mswsock.h>
+#include <Ws2tcpip.h>
+
+#include "reactor/io.h"
+#include "reactor/selector.h"
+
+#include "platform/platform.h"
+#include "iocp.h"
+#include "core/util.h"
+
+#include <proton/object.h>
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdio.h>
+#include <assert.h>
+
+int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code)
+{
+  // Error code can be from GetLastError or WSAGetLastError,
+  char err[1024] = {0};
+  FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
+                FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
+  return pn_error_format(error, PN_ERR, "%s: %s", msg, err);
+}
+
+static void io_log(const char *fmt, ...)
+{
+  va_list ap;
+  va_start(ap, fmt);
+  vfprintf(stderr, fmt, ap);
+  va_end(ap);
+  fflush(stderr);
+}
+
+struct pn_io_t {
+  char host[NI_MAXHOST];
+  char serv[NI_MAXSERV];
+  pn_error_t *error;
+  bool trace;
+  bool wouldblock;
+  iocp_t *iocp;
+};
+
+void pn_io_initialize(void *obj)
+{
+  pn_io_t *io = (pn_io_t *) obj;
+  io->error = pn_error();
+  io->wouldblock = false;
+  io->trace = pn_env_bool("PN_TRACE_DRV");
+
+  /* Request WinSock 2.2 */
+  WORD wsa_ver = MAKEWORD(2, 2);
+  WSADATA unused;
+  int err = WSAStartup(wsa_ver, &unused);
+  if (err) {
+    pni_win32_error(io->error, "WSAStartup", WSAGetLastError());
+    fprintf(stderr, "Can't load WinSock: %s\n", pn_error_text(io->error));
+  }
+  io->iocp = pni_iocp();
+}
+
+void pn_io_finalize(void *obj)
+{
+  pn_io_t *io = (pn_io_t *) obj;
+  pn_selector_t *sel = io->iocp->selector;
+  pn_free(io->iocp);
+  if (sel)
+    pn_decref(sel);
+  pn_error_free(io->error);
+  WSACleanup();
+}
+
+#define pn_io_hashcode NULL
+#define pn_io_compare NULL
+#define pn_io_inspect
+
+pn_io_t *pn_io(void)
+{
+  static const pn_class_t clazz = PN_CLASS(pn_io);
+  pn_io_t *io = (pn_io_t *) pn_class_new(&clazz, sizeof(pn_io_t));
+  return io;
+}
+
+void pn_io_free(pn_io_t *io)
+{
+  pn_free(io);
+}
+
+pn_error_t *pn_io_error(pn_io_t *io)
+{
+  assert(io);
+  return io->error;
+}
+
+static void ensure_unique(pn_io_t *io, pn_socket_t new_socket)
+{
+  // A brand new socket can have the same HANDLE value as a previous
+  // one after a socketclose.  If the application closes one itself
+  // (i.e. not using pn_close), we don't find out about it until here.
+  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, new_socket);
+  if (iocpd) {
+    if (io->trace)
+      io_log("Stale external socket reference discarded\n");
+    // Re-use means former socket instance was closed
+    assert(iocpd->ops_in_progress == 0);
+    assert(iocpd->external);
+    // Clean up the straggler as best we can
+    pn_socket_t sock = iocpd->socket;
+    iocpd->socket = INVALID_SOCKET;
+    pni_iocpdesc_map_del(io->iocp, sock);  // may free the iocpdesc_t depending on refcount
+  }
+}
+
+
+/*
+ * This heavyweight surrogate pipe could be replaced with a normal Windows pipe
+ * now that select() is no longer used.  If interrupt semantics are all that is
+ * needed, a simple user space counter and reserved completion status would
+ * probably suffice.
+ */
+static int pni_socket_pair(pn_io_t *io, SOCKET sv[2]);
+
+int pn_pipe(pn_io_t *io, pn_socket_t *dest)
+{
+  int n = pni_socket_pair(io, dest);
+  if (n) {
+    pni_win32_error(io->error, "pipe", WSAGetLastError());
+  }
+  return n;
+}
+
+static void pn_configure_sock(pn_io_t *io, pn_socket_t sock) {
+  //
+  // Disable the Nagle algorithm on TCP connections.
+  //
+  int flag = 1;
+  if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) {
+    perror("setsockopt");
+  }
+
+  u_long nonblock = 1;
+  if (ioctlsocket(sock, FIONBIO, &nonblock)) {
+    perror("ioctlsocket");
+  }
+}
+
+static inline pn_socket_t pni_create_socket(int domain, int protocol);
+
+static const char *amqp_service(const char *port) {
+  // Help older Windows to know about amqp[s] ports
+  if (port) {
+    if (!strcmp("amqp", port)) return "5672";
+    if (!strcmp("amqps", port)) return "5671";
+  }
+  return port;
+}
+
+pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port)
+{
+  struct addrinfo *addr;
+  int code = getaddrinfo(host, amqp_service(port), NULL, &addr);
+  if (code) {
+    pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code));
+    return INVALID_SOCKET;
+  }
+
+  pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol);
+  if (sock == INVALID_SOCKET) {
+    pni_win32_error(io->error, "pni_create_socket", WSAGetLastError());
+    return INVALID_SOCKET;
+  }
+  ensure_unique(io, sock);
+
+  bool optval = 1;
+  if (setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *) &optval,
+                 sizeof(optval)) == -1) {
+    pni_win32_error(io->error, "setsockopt", WSAGetLastError());
+    closesocket(sock);
+    return INVALID_SOCKET;
+  }
+
+  if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+    pni_win32_error(io->error, "bind", WSAGetLastError());
+    freeaddrinfo(addr);
+    closesocket(sock);
+    return INVALID_SOCKET;
+  }
+  freeaddrinfo(addr);
+
+  if (listen(sock, 50) == -1) {
+    pni_win32_error(io->error, "listen", WSAGetLastError());
+    closesocket(sock);
+    return INVALID_SOCKET;
+  }
+
+  if (io->iocp->selector) {
+    iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
+    if (!iocpd) {
+      pn_i_error_from_errno(io->error, "register");
+      closesocket(sock);
+      return INVALID_SOCKET;
+    }
+    pni_iocpdesc_start(iocpd);
+  }
+
+  return sock;
+}
+
+pn_socket_t pn_connect(pn_io_t *io, const char *hostarg, const char *port)
+{
+  // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
+  const char *host = strcmp("0.0.0.0", hostarg) ? hostarg : "127.0.0.1";
+
+  struct addrinfo *addr;
+  int code = getaddrinfo(host, amqp_service(port), NULL, &addr);
+  if (code) {
+    pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code));
+    return INVALID_SOCKET;
+  }
+
+  pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol);
+  if (sock == INVALID_SOCKET) {
+    pni_win32_error(io->error, "proton pni_create_socket", WSAGetLastError());
+    freeaddrinfo(addr);
+    return INVALID_SOCKET;
+  }
+
+  ensure_unique(io, sock);
+  pn_configure_sock(io, sock);
+
+  if (io->iocp->selector) {
+    return pni_iocp_begin_connect(io->iocp, sock, addr, io->error);
+  } else {
+    if (connect(sock, addr->ai_addr, addr->ai_addrlen) != 0) {
+      if (WSAGetLastError() != WSAEWOULDBLOCK) {
+	pni_win32_error(io->error, "connect", WSAGetLastError());
+	freeaddrinfo(addr);
+	closesocket(sock);
+	return INVALID_SOCKET;
+      }
+    }
+
+    freeaddrinfo(addr);
+    return sock;
+  }
+}
+
+pn_socket_t pn_accept(pn_io_t *io, pn_socket_t listen_sock, char *name, size_t size)
+{
+  struct sockaddr_storage addr;
+  socklen_t addrlen = sizeof(addr);
+  iocpdesc_t *listend = pni_iocpdesc_map_get(io->iocp, listen_sock);
+  pn_socket_t accept_sock;
+
+  *name = '\0';
+  if (listend)
+    accept_sock = pni_iocp_end_accept(listend, (struct sockaddr *) &addr, &addrlen, &io->wouldblock, io->error);
+  else {
+    // User supplied socket
+    accept_sock = accept(listen_sock, (struct sockaddr *) &addr, &addrlen);
+    if (accept_sock == INVALID_SOCKET)
+      pni_win32_error(io->error, "sync accept", WSAGetLastError());
+  }
+
+  if (accept_sock == INVALID_SOCKET)
+    return accept_sock;
+
+  int code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
+                         io->serv, NI_MAXSERV, 0);
+  if (code)
+    code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
+                       io->serv, NI_MAXSERV, NI_NUMERICHOST | NI_NUMERICSERV);
+  if (code) {
+    pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code));
+    pn_close(io, accept_sock);
+    return INVALID_SOCKET;
+  } else {
+    pn_configure_sock(io, accept_sock);
+    pni_snprintf(name, size, "%s:%s", io->host, io->serv);
+    if (listend) {
+      pni_iocpdesc_start(pni_iocpdesc_map_get(io->iocp, accept_sock));
+    }
+    return accept_sock;
+  }
+}
+
+static inline pn_socket_t pni_create_socket(int domain, int protocol) {
+  return socket(domain, SOCK_STREAM, protocol);
+}
+
+ssize_t pn_send(pn_io_t *io, pn_socket_t sockfd, const void *buf, size_t len) {
+  ssize_t count;
+  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, sockfd);
+  if (iocpd) {
+    count = pni_iocp_begin_write(iocpd, buf, len, &io->wouldblock, io->error);
+  } else {
+    count = send(sockfd, (const char *) buf, len, 0);
+    io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
+  }
+  return count;
+}
+
+ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
+{
+  ssize_t count;
+  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
+  if (iocpd) {
+    count = pni_iocp_recv(iocpd, buf, size, &io->wouldblock, io->error);
+  } else {
+    count = recv(socket, (char *) buf, size, 0);
+    io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
+  }
+  return count;
+}
+
+ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size)
+{
+  // non-socket io is mapped to socket io for now.  See pn_pipe()
+  return pn_send(io, socket, buf, size);
+}
+
+ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
+{
+  return pn_recv(io, socket, buf, size);
+}
+
+void pn_close(pn_io_t *io, pn_socket_t socket)
+{
+  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
+  if (iocpd)
+    pni_iocp_begin_close(iocpd);
+  else {
+    closesocket(socket);
+  }
+}
+
+bool pn_wouldblock(pn_io_t *io)
+{
+  return io->wouldblock;
+}
+
+pn_selector_t *pn_io_selector(pn_io_t *io)
+{
+  if (io->iocp->selector == NULL) {
+    io->iocp->selector = pni_selector_create(io->iocp);
+    pn_incref(io->iocp->selector);
+  }
+  return io->iocp->selector;
+}
+
+static void configure_pipe_socket(pn_io_t *io, pn_socket_t sock)
+{
+  u_long v = 1;
+  ioctlsocket (sock, FIONBIO, &v);
+  ensure_unique(io, sock);
+  iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
+  pni_iocpdesc_start(iocpd);
+}
+
+
+static int pni_socket_pair (pn_io_t *io, SOCKET sv[2]) {
+  // no socketpair on windows.  provide pipe() semantics using sockets
+  struct protoent * pe_tcp = getprotobyname("tcp");
+  if (pe_tcp == NULL) {
+    perror("getprotobyname");
+    return -1;
+  }
+
+  SOCKET sock = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto);
+  if (sock == INVALID_SOCKET) {
+    perror("socket");
+    return -1;
+  }
+
+  BOOL b = 1;
+  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &b, sizeof(b)) == -1) {
+    perror("setsockopt");
+    closesocket(sock);
+    return -1;
+  }
+  else {
+    struct sockaddr_in addr = {0};
+    addr.sin_family = AF_INET;
+    addr.sin_port = 0;
+    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
+
+    if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
+      perror("bind");
+      closesocket(sock);
+      return -1;
+    }
+  }
+
+  if (listen(sock, 50) == -1) {
+    perror("listen");
+    closesocket(sock);
+    return -1;
+  }
+
+  if ((sv[1] = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto)) == INVALID_SOCKET) {
+    perror("sock1");
+    closesocket(sock);
+    return -1;
+  }
+  else {
+    struct sockaddr addr = {0};
+    int l = sizeof(addr);
+    if (getsockname(sock, &addr, &l) == -1) {
+      perror("getsockname");
+      closesocket(sock);
+      return -1;
+    }
+
+    if (connect(sv[1], &addr, sizeof(addr)) == -1) {
+      int err = WSAGetLastError();
+      fprintf(stderr, "connect wsaerrr %d\n", err);
+      closesocket(sock);
+      closesocket(sv[1]);
+      return -1;
+    }
+
+    if ((sv[0] = accept(sock, &addr, &l)) == INVALID_SOCKET) {
+      perror("accept");
+      closesocket(sock);
+      closesocket(sv[1]);
+      return -1;
+    }
+  }
+
+  configure_pipe_socket(io, sv[0]);
+  configure_pipe_socket(io, sv[1]);
+  closesocket(sock);
+  return 0;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org