You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2016/11/14 18:28:24 UTC

[03/20] qpid-proton git commit: PROTON-1350 PROTON-1351: Introduce proton-c core library - Created new core proton library qpid-proton-core which only contains protocol processsing and no IO. - Rearranged source tree to separate core protocol code and

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/transport/transport.h
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.h b/proton-c/src/transport/transport.h
deleted file mode 100644
index 66ebc51..0000000
--- a/proton-c/src/transport/transport.h
+++ /dev/null
@@ -1,31 +0,0 @@
-#ifndef _PROTON_TRANSPORT_INTERNAL_H
-#define _PROTON_TRANSPORT_INTERNAL_H 1
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next);
-void pn_delivery_map_del(pn_delivery_map_t *db, pn_delivery_t *delivery);
-void pn_delivery_map_free(pn_delivery_map_t *db);
-void pn_unmap_handle(pn_session_t *ssn, pn_link_t *link);
-void pn_unmap_channel(pn_transport_t *transport, pn_session_t *ssn);
-
-#endif /* transport.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/types.c
----------------------------------------------------------------------
diff --git a/proton-c/src/types.c b/proton-c/src/types.c
deleted file mode 100644
index 4f8048d..0000000
--- a/proton-c/src/types.c
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "platform.h"
-#include <proton/types.h>
-#include <stdlib.h>
-#include <string.h>
-
-pn_bytes_t pn_bytes(size_t size, const char *start)
-{
-  pn_bytes_t bytes = {size, start};
-  return bytes;
-}
-
-pn_rwbytes_t pn_rwbytes(size_t size, char *start)
-{
-  pn_rwbytes_t bytes = {size, start};
-  return bytes;
-}
-
-pn_timestamp_t pn_timestamp_now() {
-  return pn_i_now();
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/types.xml
----------------------------------------------------------------------
diff --git a/proton-c/src/types.xml b/proton-c/src/types.xml
new file mode 100644
index 0000000..4aa9c0f
--- /dev/null
+++ b/proton-c/src/types.xml
@@ -0,0 +1,125 @@
+<?xml version="1.0"?>
+
+<!--
+Copyright Bank of America, N.A., Barclays Bank PLC, Cisco Systems, Credit
+Suisse, Deutsche Boerse, Envoy Technologies Inc., Goldman Sachs, HCL
+Technologies Ltd, IIT Software GmbH, iMatix Corporation, INETCO Systems Limited,
+Informatica Corporation, JPMorgan Chase & Co., Kaazing Corporation, N.A,
+Microsoft Corporation, my-Channels, Novell, Progress Software, Red Hat Inc.,
+Software AG, Solace Systems Inc., StormMQ Ltd., Tervela Inc., TWIST Process
+Innovations Ltd, VMware, Inc., and WS02 Inc. 2006-2011. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+1. Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer in the
+documentation and/or other materials provided with the distribution.
+3. The name of the author may not be used to endorse or promote products
+derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+-->
+
+<amqp name="types" xmlns="http://www.amqp.org/schema/amqp.xsd">
+  <section name="encodings">
+    <type name="null" class="primitive">
+      <encoding code="0x40" category="fixed" width="0"/>
+    </type>
+    <type name="boolean" class="primitive">
+      <encoding code="0x56" category="fixed" width="1"/>
+      <encoding name="true" code="0x41" category="fixed" width="0"/>
+      <encoding name="false" code="0x42" category="fixed" width="0"/>
+    </type>
+    <type name="ubyte" class="primitive">
+      <encoding code="0x50" category="fixed" width="1"/>
+    </type>
+    <type name="ushort" class="primitive">
+      <encoding code="0x60" category="fixed" width="2"/>
+    </type>
+    <type name="uint" class="primitive">
+      <encoding code="0x70" category="fixed" width="4"/>
+      <encoding name="smalluint" code="0x52" category="fixed" width="1"/>
+      <encoding name="uint0" code="0x43" category="fixed" width="0"/>
+    </type>
+    <type name="ulong" class="primitive">
+      <encoding code="0x80" category="fixed" width="8"/>
+      <encoding name="smallulong" code="0x53" category="fixed" width="1"/>
+      <encoding name="ulong0" code="0x44" category="fixed" width="0"/>
+    </type>
+    <type name="byte" class="primitive">
+      <encoding code="0x51" category="fixed" width="1"/>
+    </type>
+    <type name="short" class="primitive">
+      <encoding code="0x61" category="fixed" width="2"/>
+    </type>
+    <type name="int" class="primitive">
+      <encoding code="0x71" category="fixed" width="4"/>
+      <encoding name="smallint" code="0x54" category="fixed" width="1"/>
+    </type>
+    <type name="long" class="primitive">
+      <encoding code="0x81" category="fixed" width="8"/>
+      <encoding name="smalllong" code="0x55" category="fixed" width="1"/>
+    </type>
+    <type name="float" class="primitive">
+      <encoding name="ieee-754" code="0x72" category="fixed" width="4"/>
+    </type>
+    <type name="double" class="primitive">
+      <encoding name="ieee-754" code="0x82" category="fixed" width="8"/>
+    </type>
+    <type name="decimal32" class="primitive">
+      <encoding name="ieee-754" code="0x74" category="fixed" width="4"/>
+    </type>
+    <type name="decimal64" class="primitive">
+      <encoding name="ieee-754" code="0x84" category="fixed" width="8"/>
+    </type>
+    <type name="decimal128" class="primitive">
+      <encoding name="ieee-754" code="0x94" category="fixed" width="16"/>
+    </type>
+    <type name="char" class="primitive">
+      <encoding name="utf32" code="0x73" category="fixed" width="4"/>
+    </type>
+    <type name="timestamp" class="primitive">
+      <encoding name="ms64" code="0x83" category="fixed" width="8"/>
+    </type>
+    <type name="uuid" class="primitive">
+      <encoding code="0x98" category="fixed" width="16"/>
+    </type>
+    <type name="binary" class="primitive">
+      <encoding name="vbin8" code="0xa0" category="variable" width="1"/>
+      <encoding name="vbin32" code="0xb0" category="variable" width="4"/>
+    </type>
+    <type name="string" class="primitive">
+      <encoding name="str8-utf8" code="0xa1" category="variable" width="1"/>
+      <encoding name="str32-utf8" code="0xb1" category="variable" width="4"/>
+    </type>
+    <type name="symbol" class="primitive">
+      <encoding name="sym8" code="0xa3" category="variable" width="1"/>
+      <encoding name="sym32" code="0xb3" category="variable" width="4"/>
+    </type>
+    <type name="list" class="primitive">
+      <encoding name="list0" code="0x45" category="fixed" width="0"/>
+      <encoding name="list8" code="0xc0" category="compound" width="1"/>
+      <encoding name="list32" code="0xd0" category="compound" width="4"/>
+    </type>
+    <type name="map" class="primitive">
+      <encoding name="map8" code="0xc1" category="compound" width="1"/>
+      <encoding name="map32" code="0xd1" category="compound" width="4"/>
+    </type>
+    <type name="array" class="primitive">
+      <encoding name="array8" code="0xe0" category="array" width="1"/>
+      <encoding name="array32" code="0xf0" category="array" width="4"/>
+    </type>
+  </section>
+</amqp>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/url.c
----------------------------------------------------------------------
diff --git a/proton-c/src/url.c b/proton-c/src/url.c
deleted file mode 100644
index 566e91e..0000000
--- a/proton-c/src/url.c
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "proton/url.h"
-#include "proton/object.h"
-#include "util.h"
-#include "platform.h"
-
-#include <stdlib.h>
-#include <string.h>
-#include <stdio.h>
-
-static char* copy(const char* str) {
-    if (str ==  NULL) return NULL;
-    char *str2 = (char*)malloc(strlen(str)+1);
-    if (str2) strcpy(str2, str);
-    return str2;
-}
-
-struct pn_url_t {
-    char *scheme;
-    char *username;
-    char *password;
-    char *host;
-    char *port;
-    char *path;
-    pn_string_t *str;
-};
-
-/** Internal use only, returns the pn_string_t. Public function is pn_url_str() */
-static pn_string_t *pn_url_string(pn_url_t* url)
-{
-    pn_url_str(url);               /* Make sure str is up to date */
-    return url->str;
-}
-
-static void pn_url_finalize(void *object)
-{
-    pn_url_t *url = (pn_url_t *) object;
-    pn_url_clear(url);
-    pn_free(url->str);
-}
-
-static uintptr_t pn_url_hashcode(void *object)
-{
-    pn_url_t *url = (pn_url_t *) object;
-    return pn_hashcode(pn_url_string(url));
-}
-
-static intptr_t pn_url_compare(void *oa, void *ob)
-{
-    pn_url_t *a = (pn_url_t *) oa;
-    pn_url_t *b = (pn_url_t *) ob;
-    return pn_compare(pn_url_string(a), pn_url_string(b));
-}
-
-
-static int pn_url_inspect(void *obj, pn_string_t *dst)
-{
-    pn_url_t *url = (pn_url_t *) obj;
-    int err = 0;
-    err = pn_string_addf(dst, "Url("); if (err) return err;
-    err = pn_inspect(pn_url_string(url), dst); if (err) return err;
-    return pn_string_addf(dst, ")");
-}
-
-#define pn_url_initialize NULL
-
-
-pn_url_t *pn_url() {
-    static const pn_class_t clazz = PN_CLASS(pn_url);
-    pn_url_t *url = (pn_url_t*) pn_class_new(&clazz, sizeof(pn_url_t));
-    if (!url) return NULL;
-    memset(url, 0, sizeof(*url));
-    url->str = pn_string(NULL);
-    return url;
-}
-
-/** Parse a string URL as a pn_url_t.
- *@param[in] url A URL string.
- *@return The parsed pn_url_t or NULL if url is not a valid URL string.
- */
-pn_url_t *pn_url_parse(const char *str) {
-    if (!str || !*str)          /* Empty string or NULL is illegal. */
-        return NULL;
-
-    pn_url_t *url = pn_url();
-    char *str2 = copy(str);
-    pni_parse_url(str2, &url->scheme, &url->username, &url->password, &url->host, &url->port, &url->path);
-    url->scheme = copy(url->scheme);
-    url->username = copy(url->username);
-    url->password = copy(url->password);
-    url->host = (url->host && !*url->host) ? NULL : copy(url->host);
-    url->port = copy(url->port);
-    url->path = copy(url->path);
-
-    free(str2);
-    return url;
-}
-
-/** Free a URL */
-void pn_url_free(pn_url_t *url) { pn_free(url); }
-
-/** Clear the contents of the URL. */
-void pn_url_clear(pn_url_t *url) {
-    pn_url_set_scheme(url, NULL);
-    pn_url_set_username(url, NULL);
-    pn_url_set_password(url, NULL);
-    pn_url_set_host(url, NULL);
-    pn_url_set_port(url, NULL);
-    pn_url_set_path(url, NULL);
-    pn_string_clear(url->str);
-}
-
-/** URL-encode src and append to dst. */
-static void pni_urlencode(pn_string_t *dst, const char* src) {
-    static const char *bad = "@:/";
-
-    if (!src) return;
-    const char *i = src;
-    const char *j = strpbrk(i, bad);
-    while (j) {
-        pn_string_addf(dst, "%.*s", (int)(j-i), i);
-        pn_string_addf(dst, "%%%02X", (int)*j);
-        i = j + 1;
-        j = strpbrk(i, bad);
-    }
-    pn_string_addf(dst, "%s", i);
-}
-
-
-/** Return the string form of a URL. */
-const char *pn_url_str(pn_url_t *url) {
-    if (pn_string_get(url->str) == NULL) {
-        pn_string_set(url->str, "");
-        if (url->scheme) pn_string_addf(url->str, "%s://", url->scheme);
-        if (url->username) pni_urlencode(url->str, url->username);
-        if (url->password) {
-            pn_string_addf(url->str, ":");
-            pni_urlencode(url->str, url->password);
-        }
-        if (url->username || url->password) pn_string_addf(url->str, "@");
-        if (url->host) {
-            if (strchr(url->host, ':')) pn_string_addf(url->str, "[%s]", url->host);
-            else pn_string_addf(url->str, "%s", url->host);
-        }
-        if (url->port) pn_string_addf(url->str, ":%s", url->port);
-        if (url->path) pn_string_addf(url->str, "/%s", url->path);
-    }
-    return pn_string_get(url->str);
-}
-
-const char *pn_url_get_scheme(pn_url_t *url) { return url->scheme; }
-const char *pn_url_get_username(pn_url_t *url) { return url->username; }
-const char *pn_url_get_password(pn_url_t *url) { return url->password; }
-const char *pn_url_get_host(pn_url_t *url) { return url->host; }
-const char *pn_url_get_port(pn_url_t *url) { return url->port; }
-const char *pn_url_get_path(pn_url_t *url) { return url->path; }
-
-#define SET(part) free(url->part); url->part = copy(part); pn_string_clear(url->str)
-void pn_url_set_scheme(pn_url_t *url, const char *scheme) { SET(scheme); }
-void pn_url_set_username(pn_url_t *url, const char *username) { SET(username); }
-void pn_url_set_password(pn_url_t *url, const char *password) { SET(password); }
-void pn_url_set_host(pn_url_t *url, const char *host) { SET(host); }
-void pn_url_set_port(pn_url_t *url, const char *port) { SET(port); }
-void pn_url_set_path(pn_url_t *url, const char *path) { SET(path); }
-
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/util.c
----------------------------------------------------------------------
diff --git a/proton-c/src/util.c b/proton-c/src/util.c
deleted file mode 100644
index 47fbc34..0000000
--- a/proton-c/src/util.c
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <stdarg.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <proton/type_compat.h>
-#include <ctype.h>
-#include <string.h>
-#include <proton/error.h>
-#include <proton/types.h>
-#include "util.h"
-
-ssize_t pn_quote_data(char *dst, size_t capacity, const char *src, size_t size)
-{
-  int idx = 0;
-  for (unsigned i = 0; i < size; i++)
-  {
-    uint8_t c = src[i];
-    if (isprint(c)) {
-      if (idx < (int) (capacity - 1)) {
-        dst[idx++] = c;
-      } else {
-        if (idx > 0) {
-          dst[idx - 1] = '\0';
-        }
-        return PN_OVERFLOW;
-      }
-    } else {
-      if (idx < (int) (capacity - 4)) {
-        idx += sprintf(dst + idx, "\\x%.2x", c);
-      } else {
-        if (idx > 0) {
-          dst[idx - 1] = '\0';
-        }
-        return PN_OVERFLOW;
-      }
-    }
-  }
-
-  dst[idx] = '\0';
-  return idx;
-}
-
-int pn_quote(pn_string_t *dst, const char *src, size_t size)
-{
-  while (true) {
-    size_t str_size = pn_string_size(dst);
-    char *str = pn_string_buffer(dst) + str_size;
-    size_t capacity = pn_string_capacity(dst) - str_size;
-    ssize_t ssize = pn_quote_data(str, capacity, src, size);
-    if (ssize == PN_OVERFLOW) {
-      int err = pn_string_grow(dst, (str_size + capacity) ? 2*(str_size + capacity) : 16);
-      if (err) return err;
-    } else if (ssize >= 0) {
-      return pn_string_resize(dst, str_size + ssize);
-    } else {
-      return ssize;
-    }
-  }
-}
-
-void pn_fprint_data(FILE *stream, const char *bytes, size_t size)
-{
-  char buf[256];
-  ssize_t n = pn_quote_data(buf, 256, bytes, size);
-  if (n >= 0) {
-    fputs(buf, stream);
-  } else {
-    if (n == PN_OVERFLOW) {
-      fputs(buf, stream);
-      fputs("... (truncated)", stream);
-    }
-    else
-      fprintf(stderr, "pn_quote_data: %s\n", pn_code(n));
-  }
-}
-
-void pn_print_data(const char *bytes, size_t size)
-{
-  pn_fprint_data(stdout, bytes, size);
-}
-
-void pni_urldecode(const char *src, char *dst)
-{
-  const char *in = src;
-  char *out = dst;
-  while (*in != '\0')
-  {
-    if ('%' == *in)
-    {
-      if ((in[1] != '\0') && (in[2] != '\0'))
-      {
-        char esc[3];
-        esc[0] = in[1];
-        esc[1] = in[2];
-        esc[2] = '\0';
-        unsigned long d = strtoul(esc, NULL, 16);
-        *out = (char)d;
-        in += 3;
-        out++;
-      }
-      else
-      {
-        *out = *in;
-        in++;
-        out++;
-      }
-    }
-    else
-    {
-      *out = *in;
-      in++;
-      out++;
-    }
-  }
-  *out = '\0';
-}
-
-void pni_parse_url(char *url, char **scheme, char **user, char **pass, char **host, char **port, char **path)
-{
-  if (!url) return;
-
-  char *slash = strchr(url, '/');
-
-  if (slash && slash>url) {
-    char *scheme_end = strstr(slash-1, "://");
-
-    if (scheme_end && scheme_end<slash) {
-      *scheme_end = '\0';
-      *scheme = url;
-      url = scheme_end + 3;
-      slash = strchr(url, '/');
-    }
-  }
-
-  if (slash) {
-    *slash = '\0';
-    *path = slash + 1;
-  }
-
-  char *at = strchr(url, '@');
-  if (at) {
-    *at = '\0';
-    char *up = url;
-    *user = up;
-    url = at + 1;
-    char *colon = strchr(up, ':');
-    if (colon) {
-      *colon = '\0';
-      *pass = colon + 1;
-    }
-  }
-
-  *host = url;
-  char *open = (*url == '[') ? url : 0;
-  if (open) {
-    char *close = strchr(open, ']');
-    if (close) {
-        *host = open + 1;
-        *close = '\0';
-        url = close + 1;
-    }
-  }
-
-  char *colon = strchr(url, ':');
-  if (colon) {
-    *colon = '\0';
-    *port = colon + 1;
-  }
-
-  if (*user) pni_urldecode(*user, *user);
-  if (*pass) pni_urldecode(*pass, *pass);
-}
-
-void pni_vfatal(const char *fmt, va_list ap)
-{
-  vfprintf(stderr, fmt, ap);
-  abort();
-}
-
-void pni_fatal(const char *fmt, ...)
-{
-  va_list ap;
-  va_start(ap, fmt);
-  pni_vfatal(fmt, ap);
-  va_end(ap);
-}
-
-int pn_strcasecmp(const char *a, const char *b)
-{
-  int diff;
-  while (*b) {
-    char aa = *a++, bb = *b++;
-    diff = tolower(aa)-tolower(bb);
-    if ( diff!=0 ) return diff;
-  }
-  return *a;
-}
-
-int pn_strncasecmp(const char* a, const char* b, size_t len)
-{
-  int diff = 0;
-  while (*b && len > 0) {
-    char aa = *a++, bb = *b++;
-    diff = tolower(aa)-tolower(bb);
-    if ( diff!=0 ) return diff;
-    --len;
-  };
-  return len==0 ? diff : *a;
-}
-
-bool pn_env_bool(const char *name)
-{
-  char *v = getenv(name);
-  return v && (!pn_strcasecmp(v, "true") || !pn_strcasecmp(v, "1") ||
-               !pn_strcasecmp(v, "yes")  || !pn_strcasecmp(v, "on"));
-}
-
-char *pn_strdup(const char *src)
-{
-  if (src) {
-    char *dest = (char *) malloc((strlen(src)+1)*sizeof(char));
-    if (!dest) return NULL;
-    return strcpy(dest, src);
-  } else {
-    return NULL;
-  }
-}
-
-char *pn_strndup(const char *src, size_t n)
-{
-  if (src) {
-    unsigned size = 0;
-    for (const char *c = src; size < n && *c; c++) {
-      size++;
-    }
-
-    char *dest = (char *) malloc(size + 1);
-    if (!dest) return NULL;
-    strncpy(dest, src, n);
-    dest[size] = '\0';
-    return dest;
-  } else {
-    return NULL;
-  }
-}
-
-// which timestamp will expire next, or zero if none set
-pn_timestamp_t pn_timestamp_min( pn_timestamp_t a, pn_timestamp_t b )
-{
-  if (a && b) return pn_min(a, b);
-  if (a) return a;
-  return b;
-}
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/util.h
----------------------------------------------------------------------
diff --git a/proton-c/src/util.h b/proton-c/src/util.h
deleted file mode 100644
index ec59a07..0000000
--- a/proton-c/src/util.h
+++ /dev/null
@@ -1,126 +0,0 @@
-#ifndef _PROTON_SRC_UTIL_H
-#define _PROTON_SRC_UTIL_H 1
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <errno.h>
-#ifndef __cplusplus
-#include <stdbool.h>
-#endif
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <proton/types.h>
-#include <proton/object.h>
-
-void pni_parse_url(char *url, char **scheme, char **user, char **pass, char **host, char **port, char **path);
-void pni_fatal(const char *fmt, ...);
-void pni_vfatal(const char *fmt, va_list ap);
-ssize_t pn_quote_data(char *dst, size_t capacity, const char *src, size_t size);
-int pn_quote(pn_string_t *dst, const char *src, size_t size);
-void pn_fprint_data(FILE *stream, const char *bytes, size_t size);
-void pn_print_data(const char *bytes, size_t size);
-bool pn_env_bool(const char *name);
-pn_timestamp_t pn_timestamp_min(pn_timestamp_t a, pn_timestamp_t b);
-
-char *pn_strdup(const char *src);
-char *pn_strndup(const char *src, size_t n);
-int pn_strcasecmp(const char* a, const char* b);
-int pn_strncasecmp(const char* a, const char* b, size_t len);
-
-#define DIE_IFR(EXPR, STRERR)                                           \
-  do {                                                                  \
-    int __code__ = (EXPR);                                              \
-    if (__code__) {                                                     \
-      fprintf(stderr, "%s:%d: %s: %s (%d)\n", __FILE__, __LINE__,       \
-              #EXPR, STRERR(__code__), __code__);                       \
-      exit(-1);                                                         \
-    }                                                                   \
-  } while (0)
-
-#define DIE_IFE(EXPR)                                                   \
-  do {                                                                  \
-    if ((EXPR) == -1) {                                                 \
-      int __code__ = errno;                                             \
-      fprintf(stderr, "%s:%d: %s: %s (%d)\n", __FILE__, __LINE__,       \
-              #EXPR, strerror(__code__), __code__);                     \
-      exit(-1);                                                         \
-    }                                                                   \
-  } while (0)
-
-
-#define LL_HEAD(ROOT, LIST) ((ROOT)-> LIST ## _head)
-#define LL_TAIL(ROOT, LIST) ((ROOT)-> LIST ## _tail)
-#define LL_ADD(ROOT, LIST, NODE)                              \
-  {                                                           \
-    (NODE)-> LIST ## _next = NULL;                            \
-    (NODE)-> LIST ## _prev = (ROOT)-> LIST ## _tail;          \
-    if (LL_TAIL(ROOT, LIST))                                  \
-      LL_TAIL(ROOT, LIST)-> LIST ## _next = (NODE);           \
-    LL_TAIL(ROOT, LIST) = (NODE);                             \
-    if (!LL_HEAD(ROOT, LIST)) LL_HEAD(ROOT, LIST) = (NODE);   \
-  }
-
-#define LL_POP(ROOT, LIST, TYPE)                              \
-  {                                                           \
-    if (LL_HEAD(ROOT, LIST)) {                                \
-      TYPE *_old = LL_HEAD(ROOT, LIST);                       \
-      LL_HEAD(ROOT, LIST) = LL_HEAD(ROOT, LIST)-> LIST ## _next; \
-      _old-> LIST ## _next = NULL;                            \
-      if (_old == LL_TAIL(ROOT, LIST)) {                      \
-        LL_TAIL(ROOT, LIST) = NULL;                           \
-      } else {                                                \
-        LL_HEAD(ROOT, LIST)-> LIST ## _prev = NULL;           \
-      }                                                       \
-    }                                                         \
-  }
-
-#define LL_REMOVE(ROOT, LIST, NODE)                                    \
-  {                                                                    \
-    if ((NODE)-> LIST ## _prev)                                        \
-      (NODE)-> LIST ## _prev-> LIST ## _next = (NODE)-> LIST ## _next; \
-    if ((NODE)-> LIST ## _next)                                        \
-      (NODE)-> LIST ## _next-> LIST ## _prev = (NODE)-> LIST ## _prev; \
-    if ((NODE) == LL_HEAD(ROOT, LIST))                                 \
-      LL_HEAD(ROOT, LIST) = (NODE)-> LIST ## _next;                    \
-    if ((NODE) == LL_TAIL(ROOT, LIST))                                 \
-      LL_TAIL(ROOT, LIST) = (NODE)-> LIST ## _prev;                    \
-  }
-
-#define pn_min(X,Y) ((X) > (Y) ? (Y) : (X))
-#define pn_max(X,Y) ((X) < (Y) ? (Y) : (X))
-
-#define PN_ENSURE(ARRAY, CAPACITY, COUNT, TYPE)                 \
-  while ((CAPACITY) < (COUNT)) {                                \
-    (CAPACITY) = (CAPACITY) ? 2 * (CAPACITY) : 16;              \
-    (ARRAY) = (TYPE *) realloc((ARRAY), (CAPACITY) * sizeof (TYPE));    \
-  }                                                             \
-
-#define PN_ENSUREZ(ARRAY, CAPACITY, COUNT, TYPE)           \
-  {                                                        \
-    size_t _old_capacity = (CAPACITY);                     \
-    PN_ENSURE(ARRAY, CAPACITY, COUNT, TYPE);               \
-    memset((ARRAY) + _old_capacity, 0,                     \
-           sizeof(TYPE)*((CAPACITY) - _old_capacity));     \
-  }
-
-#endif /* util.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/windows/io.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/io.c b/proton-c/src/windows/io.c
deleted file mode 100644
index 4a87fd2..0000000
--- a/proton-c/src/windows/io.c
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#define FD_SETSIZE 2048
-#ifndef _WIN32_WINNT
-#define _WIN32_WINNT 0x0501
-#endif
-#if _WIN32_WINNT < 0x0501
-#error "Proton requires Windows API support for XP or later."
-#endif
-#include <winsock2.h>
-#include <mswsock.h>
-#include <Ws2tcpip.h>
-
-#include "platform.h"
-#include <proton/io.h>
-#include <proton/object.h>
-#include <proton/selector.h>
-#include "iocp.h"
-#include "util.h"
-
-#include <ctype.h>
-#include <errno.h>
-#include <stdio.h>
-#include <assert.h>
-
-int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code)
-{
-  // Error code can be from GetLastError or WSAGetLastError,
-  char err[1024] = {0};
-  FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
-                FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
-  return pn_error_format(error, PN_ERR, "%s: %s", msg, err);
-}
-
-static void io_log(const char *fmt, ...)
-{
-  va_list ap;
-  va_start(ap, fmt);
-  vfprintf(stderr, fmt, ap);
-  va_end(ap);
-  fflush(stderr);
-}
-
-struct pn_io_t {
-  char host[NI_MAXHOST];
-  char serv[NI_MAXSERV];
-  pn_error_t *error;
-  bool trace;
-  bool wouldblock;
-  iocp_t *iocp;
-};
-
-void pn_io_initialize(void *obj)
-{
-  pn_io_t *io = (pn_io_t *) obj;
-  io->error = pn_error();
-  io->wouldblock = false;
-  io->trace = pn_env_bool("PN_TRACE_DRV");
-
-  /* Request WinSock 2.2 */
-  WORD wsa_ver = MAKEWORD(2, 2);
-  WSADATA unused;
-  int err = WSAStartup(wsa_ver, &unused);
-  if (err) {
-    pni_win32_error(io->error, "WSAStartup", WSAGetLastError());
-    fprintf(stderr, "Can't load WinSock: %s\n", pn_error_text(io->error));
-  }
-  io->iocp = pni_iocp();
-}
-
-void pn_io_finalize(void *obj)
-{
-  pn_io_t *io = (pn_io_t *) obj;
-  pn_error_free(io->error);
-  pn_free(io->iocp);
-  WSACleanup();
-}
-
-#define pn_io_hashcode NULL
-#define pn_io_compare NULL
-#define pn_io_inspect
-
-pn_io_t *pn_io(void)
-{
-  static const pn_class_t clazz = PN_CLASS(pn_io);
-  pn_io_t *io = (pn_io_t *) pn_class_new(&clazz, sizeof(pn_io_t));
-  return io;
-}
-
-void pn_io_free(pn_io_t *io)
-{
-  pn_free(io);
-}
-
-pn_error_t *pn_io_error(pn_io_t *io)
-{
-  assert(io);
-  return io->error;
-}
-
-static void ensure_unique(pn_io_t *io, pn_socket_t new_socket)
-{
-  // A brand new socket can have the same HANDLE value as a previous
-  // one after a socketclose.  If the application closes one itself
-  // (i.e. not using pn_close), we don't find out about it until here.
-  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, new_socket);
-  if (iocpd) {
-    if (io->trace)
-      io_log("Stale external socket reference discarded\n");
-    // Re-use means former socket instance was closed
-    assert(iocpd->ops_in_progress == 0);
-    assert(iocpd->external);
-    // Clean up the straggler as best we can
-    pn_socket_t sock = iocpd->socket;
-    iocpd->socket = INVALID_SOCKET;
-    pni_iocpdesc_map_del(io->iocp, sock);  // may free the iocpdesc_t depending on refcount
-  }
-}
-
-
-/*
- * This heavyweight surrogate pipe could be replaced with a normal Windows pipe
- * now that select() is no longer used.  If interrupt semantics are all that is
- * needed, a simple user space counter and reserved completion status would
- * probably suffice.
- */
-static int pni_socket_pair(pn_io_t *io, SOCKET sv[2]);
-
-int pn_pipe(pn_io_t *io, pn_socket_t *dest)
-{
-  int n = pni_socket_pair(io, dest);
-  if (n) {
-    pni_win32_error(io->error, "pipe", WSAGetLastError());
-  }
-  return n;
-}
-
-static void pn_configure_sock(pn_io_t *io, pn_socket_t sock) {
-  //
-  // Disable the Nagle algorithm on TCP connections.
-  //
-  int flag = 1;
-  if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) {
-    perror("setsockopt");
-  }
-
-  u_long nonblock = 1;
-  if (ioctlsocket(sock, FIONBIO, &nonblock)) {
-    perror("ioctlsocket");
-  }
-}
-
-static inline pn_socket_t pni_create_socket(int domain, int protocol);
-
-static const char *amqp_service(const char *port) {
-  // Help older Windows to know about amqp[s] ports
-  if (port) {
-    if (!strcmp("amqp", port)) return "5672";
-    if (!strcmp("amqps", port)) return "5671";
-  }
-  return port;
-}
-
-pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port)
-{
-  struct addrinfo *addr;
-  int code = getaddrinfo(host, amqp_service(port), NULL, &addr);
-  if (code) {
-    pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code));
-    return INVALID_SOCKET;
-  }
-
-  pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol);
-  if (sock == INVALID_SOCKET) {
-    pni_win32_error(io->error, "pni_create_socket", WSAGetLastError());
-    return INVALID_SOCKET;
-  }
-  ensure_unique(io, sock);
-
-  bool optval = 1;
-  if (setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *) &optval,
-                 sizeof(optval)) == -1) {
-    pni_win32_error(io->error, "setsockopt", WSAGetLastError());
-    closesocket(sock);
-    return INVALID_SOCKET;
-  }
-
-  if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
-    pni_win32_error(io->error, "bind", WSAGetLastError());
-    freeaddrinfo(addr);
-    closesocket(sock);
-    return INVALID_SOCKET;
-  }
-  freeaddrinfo(addr);
-
-  if (listen(sock, 50) == -1) {
-    pni_win32_error(io->error, "listen", WSAGetLastError());
-    closesocket(sock);
-    return INVALID_SOCKET;
-  }
-
-  if (io->iocp->selector) {
-    iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
-    if (!iocpd) {
-      pn_i_error_from_errno(io->error, "register");
-      closesocket(sock);
-      return INVALID_SOCKET;
-    }
-    pni_iocpdesc_start(iocpd);
-  }
-
-  return sock;
-}
-
-pn_socket_t pn_connect(pn_io_t *io, const char *hostarg, const char *port)
-{
-  // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
-  const char *host = strcmp("0.0.0.0", hostarg) ? hostarg : "127.0.0.1";
-
-  struct addrinfo *addr;
-  int code = getaddrinfo(host, amqp_service(port), NULL, &addr);
-  if (code) {
-    pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code));
-    return INVALID_SOCKET;
-  }
-
-  pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol);
-  if (sock == INVALID_SOCKET) {
-    pni_win32_error(io->error, "proton pni_create_socket", WSAGetLastError());
-    freeaddrinfo(addr);
-    return INVALID_SOCKET;
-  }
-
-  ensure_unique(io, sock);
-  pn_configure_sock(io, sock);
-
-  if (io->iocp->selector) {
-    return pni_iocp_begin_connect(io->iocp, sock, addr, io->error);
-  } else {
-    if (connect(sock, addr->ai_addr, addr->ai_addrlen) != 0) {
-      if (WSAGetLastError() != WSAEWOULDBLOCK) {
-	pni_win32_error(io->error, "connect", WSAGetLastError());
-	freeaddrinfo(addr);
-	closesocket(sock);
-	return INVALID_SOCKET;
-      }
-    }
-
-    freeaddrinfo(addr);
-    return sock;
-  }
-}
-
-pn_socket_t pn_accept(pn_io_t *io, pn_socket_t listen_sock, char *name, size_t size)
-{
-  struct sockaddr_storage addr;
-  socklen_t addrlen = sizeof(addr);
-  iocpdesc_t *listend = pni_iocpdesc_map_get(io->iocp, listen_sock);
-  pn_socket_t accept_sock;
-
-  *name = '\0';
-  if (listend)
-    accept_sock = pni_iocp_end_accept(listend, (struct sockaddr *) &addr, &addrlen, &io->wouldblock, io->error);
-  else {
-    // User supplied socket
-    accept_sock = accept(listen_sock, (struct sockaddr *) &addr, &addrlen);
-    if (accept_sock == INVALID_SOCKET)
-      pni_win32_error(io->error, "sync accept", WSAGetLastError());
-  }
-
-  if (accept_sock == INVALID_SOCKET)
-    return accept_sock;
-
-  int code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
-                         io->serv, NI_MAXSERV, 0);
-  if (code)
-    code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
-                       io->serv, NI_MAXSERV, NI_NUMERICHOST | NI_NUMERICSERV);
-  if (code) {
-    pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code));
-    pn_close(io, accept_sock);
-    return INVALID_SOCKET;
-  } else {
-    pn_configure_sock(io, accept_sock);
-    snprintf(name, size, "%s:%s", io->host, io->serv);
-    if (listend) {
-      pni_iocpdesc_start(pni_iocpdesc_map_get(io->iocp, accept_sock));
-    }
-    return accept_sock;
-  }
-}
-
-static inline pn_socket_t pni_create_socket(int domain, int protocol) {
-  return socket(domain, SOCK_STREAM, protocol);
-}
-
-ssize_t pn_send(pn_io_t *io, pn_socket_t sockfd, const void *buf, size_t len) {
-  ssize_t count;
-  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, sockfd);
-  if (iocpd) {
-    count = pni_iocp_begin_write(iocpd, buf, len, &io->wouldblock, io->error);
-  } else {
-    count = send(sockfd, (const char *) buf, len, 0);
-    io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
-  }
-  return count;
-}
-
-ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
-{
-  ssize_t count;
-  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
-  if (iocpd) {
-    count = pni_iocp_recv(iocpd, buf, size, &io->wouldblock, io->error);
-  } else {
-    count = recv(socket, (char *) buf, size, 0);
-    io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
-  }
-  return count;
-}
-
-ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size)
-{
-  // non-socket io is mapped to socket io for now.  See pn_pipe()
-  return pn_send(io, socket, buf, size);
-}
-
-ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
-{
-  return pn_recv(io, socket, buf, size);
-}
-
-void pn_close(pn_io_t *io, pn_socket_t socket)
-{
-  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
-  if (iocpd)
-    pni_iocp_begin_close(iocpd);
-  else {
-    closesocket(socket);
-  }
-}
-
-bool pn_wouldblock(pn_io_t *io)
-{
-  return io->wouldblock;
-}
-
-pn_selector_t *pn_io_selector(pn_io_t *io)
-{
-  if (io->iocp->selector == NULL)
-    io->iocp->selector = pni_selector_create(io->iocp);
-  return io->iocp->selector;
-}
-
-static void configure_pipe_socket(pn_io_t *io, pn_socket_t sock)
-{
-  u_long v = 1;
-  ioctlsocket (sock, FIONBIO, &v);
-  ensure_unique(io, sock);
-  iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
-  pni_iocpdesc_start(iocpd);
-}
-
-
-static int pni_socket_pair (pn_io_t *io, SOCKET sv[2]) {
-  // no socketpair on windows.  provide pipe() semantics using sockets
-  struct protoent * pe_tcp = getprotobyname("tcp");
-  if (pe_tcp == NULL) {
-    perror("getprotobyname");
-    return -1;
-  }
-
-  SOCKET sock = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto);
-  if (sock == INVALID_SOCKET) {
-    perror("socket");
-    return -1;
-  }
-
-  BOOL b = 1;
-  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &b, sizeof(b)) == -1) {
-    perror("setsockopt");
-    closesocket(sock);
-    return -1;
-  }
-  else {
-    struct sockaddr_in addr = {0};
-    addr.sin_family = AF_INET;
-    addr.sin_port = 0;
-    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
-
-    if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
-      perror("bind");
-      closesocket(sock);
-      return -1;
-    }
-  }
-
-  if (listen(sock, 50) == -1) {
-    perror("listen");
-    closesocket(sock);
-    return -1;
-  }
-
-  if ((sv[1] = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto)) == INVALID_SOCKET) {
-    perror("sock1");
-    closesocket(sock);
-    return -1;
-  }
-  else {
-    struct sockaddr addr = {0};
-    int l = sizeof(addr);
-    if (getsockname(sock, &addr, &l) == -1) {
-      perror("getsockname");
-      closesocket(sock);
-      return -1;
-    }
-
-    if (connect(sv[1], &addr, sizeof(addr)) == -1) {
-      int err = WSAGetLastError();
-      fprintf(stderr, "connect wsaerrr %d\n", err);
-      closesocket(sock);
-      closesocket(sv[1]);
-      return -1;
-    }
-
-    if ((sv[0] = accept(sock, &addr, &l)) == INVALID_SOCKET) {
-      perror("accept");
-      closesocket(sock);
-      closesocket(sv[1]);
-      return -1;
-    }
-  }
-
-  configure_pipe_socket(io, sv[0]);
-  configure_pipe_socket(io, sv[1]);
-  closesocket(sock);
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/windows/iocp.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/iocp.c b/proton-c/src/windows/iocp.c
deleted file mode 100644
index d1abc9a..0000000
--- a/proton-c/src/windows/iocp.c
+++ /dev/null
@@ -1,1176 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#ifndef _WIN32_WINNT
-#define _WIN32_WINNT 0x0501
-#endif
-#if _WIN32_WINNT < 0x0501
-#error "Proton requires Windows API support for XP or later."
-#endif
-#include <winsock2.h>
-#include <mswsock.h>
-#include <Ws2tcpip.h>
-
-#include "platform.h"
-#include <proton/object.h>
-#include <proton/io.h>
-#include <proton/selector.h>
-#include <proton/error.h>
-#include <proton/transport.h>
-#include "iocp.h"
-#include "util.h"
-#include <assert.h>
-
-/*
- * Windows IO Completion Port support for Proton.
- *
- * Overlapped writes are used to avoid lengthy stalls between write
- * completion and starting a new write.  Non-overlapped reads are used
- * since Windows accumulates inbound traffic without stalling and
- * managing read buffers would not avoid a memory copy at the pn_read
- * boundary.
- *
- * A socket must not get a Windows closesocket() unless the
- * application has called pn_close on the socket or a global
- * pn_io_finalize().  On error, the internal accounting for
- * write_closed or read_closed may be updated along with the external
- * event notification.  A socket may be closed if it is never added to
- * the iocpdesc_map or is on its way out of the map.
- */
-
-// Max number of overlapped accepts per listener
-#define IOCP_MAX_ACCEPTS 10
-
-// AcceptEx squishes the local and remote addresses and optional data
-// all together when accepting the connection. Reserve enough for
-// IPv6 addresses, even if the socket is IPv4. The 16 bytes padding
-// per address is required by AcceptEx.
-#define IOCP_SOCKADDRMAXLEN (sizeof(sockaddr_in6) + 16)
-#define IOCP_SOCKADDRBUFLEN (2 * IOCP_SOCKADDRMAXLEN)
-
-static void iocp_log(const char *fmt, ...)
-{
-  va_list ap;
-  va_start(ap, fmt);
-  vfprintf(stderr, fmt, ap);
-  va_end(ap);
-  fflush(stderr);
-}
-
-static void set_iocp_error_status(pn_error_t *error, int code, HRESULT status)
-{
-  char buf[512];
-  if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM,
-                    0, status, 0, buf, sizeof(buf), 0))
-    pn_error_set(error, code, buf);
-  else {
-    fprintf(stderr, "pn internal Windows error: %lu\n", GetLastError());
-  }
-}
-
-static void reap_check(iocpdesc_t *);
-static void bind_to_completion_port(iocpdesc_t *iocpd);
-static void iocp_shutdown(iocpdesc_t *iocpd);
-static void start_reading(iocpdesc_t *iocpd);
-static bool is_listener(iocpdesc_t *iocpd);
-static void release_sys_sendbuf(SOCKET s);
-
-static void iocpdesc_fail(iocpdesc_t *iocpd, HRESULT status, const char* text)
-{
-  pni_win32_error(iocpd->error, text, status);
-  if (iocpd->iocp->iocp_trace) {
-    iocp_log("connection terminated: %s\n", pn_error_text(iocpd->error));
-  }
-  iocpd->write_closed = true;
-  iocpd->read_closed = true;
-  iocpd->poll_error = true;
-  pni_events_update(iocpd, iocpd->events & ~(PN_READABLE | PN_WRITABLE));
-}
-
-// Helper functions to use specialized IOCP AcceptEx() and ConnectEx()
-static LPFN_ACCEPTEX lookup_accept_ex(SOCKET s)
-{
-  GUID guid = WSAID_ACCEPTEX;
-  DWORD bytes = 0;
-  LPFN_ACCEPTEX fn;
-  WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
-           &fn, sizeof(fn), &bytes, NULL, NULL);
-  assert(fn);
-  return fn;
-}
-
-static LPFN_CONNECTEX lookup_connect_ex(SOCKET s)
-{
-  GUID guid = WSAID_CONNECTEX;
-  DWORD bytes = 0;
-  LPFN_CONNECTEX fn;
-  WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
-           &fn, sizeof(fn), &bytes, NULL, NULL);
-  assert(fn);
-  return fn;
-}
-
-static LPFN_GETACCEPTEXSOCKADDRS lookup_get_accept_ex_sockaddrs(SOCKET s)
-{
-  GUID guid = WSAID_GETACCEPTEXSOCKADDRS;
-  DWORD bytes = 0;
-  LPFN_GETACCEPTEXSOCKADDRS fn;
-  WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
-           &fn, sizeof(fn), &bytes, NULL, NULL);
-  assert(fn);
-  return fn;
-}
-
-// match accept socket to listener socket
-static iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd)
-{
-  sockaddr_storage sa;
-  socklen_t salen = sizeof(sa);
-  if (getsockname(iocpd->socket, (sockaddr*)&sa, &salen) == -1)
-    return NULL;
-  SOCKET s = socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
-  if (s == INVALID_SOCKET)
-    return NULL;
-  return pni_iocpdesc_create(iocpd->iocp, s, false);
-}
-
-static bool is_listener(iocpdesc_t *iocpd)
-{
-  return iocpd && iocpd->acceptor;
-}
-
-// === Async accept processing
-
-typedef struct {
-  iocp_result_t base;
-  iocpdesc_t *new_sock;
-  char address_buffer[IOCP_SOCKADDRBUFLEN];
-  DWORD unused;
-} accept_result_t;
-
-static accept_result_t *accept_result(iocpdesc_t *listen_sock) {
-  accept_result_t *result = (accept_result_t *)calloc(1, sizeof(accept_result_t));
-  if (result) {
-    result->base.type = IOCP_ACCEPT;
-    result->base.iocpd = listen_sock;
-  }
-  return result;
-}
-
-static void reset_accept_result(accept_result_t *result) {
-  memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
-  memset(&result->address_buffer, 0, IOCP_SOCKADDRBUFLEN);
-}
-
-struct pni_acceptor_t {
-  int accept_queue_size;
-  pn_list_t *accepts;
-  iocpdesc_t *listen_sock;
-  bool signalled;
-  LPFN_ACCEPTEX fn_accept_ex;
-  LPFN_GETACCEPTEXSOCKADDRS fn_get_accept_ex_sockaddrs;
-};
-
-#define pni_acceptor_compare NULL
-#define pni_acceptor_inspect NULL
-#define pni_acceptor_hashcode NULL
-
-static void pni_acceptor_initialize(void *object)
-{
-  pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
-  acceptor->accepts = pn_list(PN_VOID, IOCP_MAX_ACCEPTS);
-}
-
-static void pni_acceptor_finalize(void *object)
-{
-  pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
-  size_t len = pn_list_size(acceptor->accepts);
-  for (size_t i = 0; i < len; i++)
-    free(pn_list_get(acceptor->accepts, i));
-  pn_free(acceptor->accepts);
-}
-
-static pni_acceptor_t *pni_acceptor(iocpdesc_t *iocpd)
-{
-  static const pn_cid_t CID_pni_acceptor = CID_pn_void;
-  static const pn_class_t clazz = PN_CLASS(pni_acceptor);
-  pni_acceptor_t *acceptor = (pni_acceptor_t *) pn_class_new(&clazz, sizeof(pni_acceptor_t));
-  acceptor->listen_sock = iocpd;
-  acceptor->accept_queue_size = 0;
-  acceptor->signalled = false;
-  pn_socket_t sock = acceptor->listen_sock->socket;
-  acceptor->fn_accept_ex = lookup_accept_ex(sock);
-  acceptor->fn_get_accept_ex_sockaddrs = lookup_get_accept_ex_sockaddrs(sock);
-  return acceptor;
-}
-
-static void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result)
-{
-  if (acceptor->listen_sock->closing) {
-    if (result) {
-      free(result);
-      acceptor->accept_queue_size--;
-    }
-    if (acceptor->accept_queue_size == 0)
-      acceptor->signalled = true;
-    return;
-  }
-
-  if (result) {
-    reset_accept_result(result);
-  } else {
-    if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS &&
-        pn_list_size(acceptor->accepts) == acceptor->accept_queue_size ) {
-      result = accept_result(acceptor->listen_sock);
-      acceptor->accept_queue_size++;
-    } else {
-      // an async accept is still pending or max concurrent accepts already hit
-      return;
-    }
-  }
-
-  result->new_sock = create_same_type_socket(acceptor->listen_sock);
-  if (result->new_sock) {
-    // Not yet connected.
-    result->new_sock->read_closed = true;
-    result->new_sock->write_closed = true;
-
-    bool success = acceptor->fn_accept_ex(acceptor->listen_sock->socket, result->new_sock->socket,
-                     result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN,
-                     &result->unused, (LPOVERLAPPED) result);
-    if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
-      result->base.status = WSAGetLastError();
-      pn_list_add(acceptor->accepts, result);
-      pni_events_update(acceptor->listen_sock, acceptor->listen_sock->events | PN_READABLE);
-    } else {
-      acceptor->listen_sock->ops_in_progress++;
-      // This socket is equally involved in the async operation.
-      result->new_sock->ops_in_progress++;
-    }
-  } else {
-    iocpdesc_fail(acceptor->listen_sock, WSAGetLastError(), "create accept socket");
-  }
-}
-
-static void complete_accept(accept_result_t *result, HRESULT status)
-{
-  result->new_sock->ops_in_progress--;
-  iocpdesc_t *ld = result->base.iocpd;
-  if (ld->read_closed) {
-    if (!result->new_sock->closing)
-      pni_iocp_begin_close(result->new_sock);
-    free(result);    // discard
-    reap_check(ld);
-  } else {
-    result->base.status = status;
-    pn_list_add(ld->acceptor->accepts, result);
-    pni_events_update(ld, ld->events | PN_READABLE);
-  }
-}
-
-pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error)
-{
-  if (!is_listener(ld)) {
-    set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
-    return INVALID_SOCKET;
-  }
-  if (ld->read_closed) {
-    set_iocp_error_status(error, PN_ERR, WSAENOTSOCK);
-    return INVALID_SOCKET;
-  }
-  if (pn_list_size(ld->acceptor->accepts) == 0) {
-    if (ld->events & PN_READABLE && ld->iocp->iocp_trace)
-      iocp_log("listen socket readable with no available accept completions\n");
-    *would_block = true;
-    return INVALID_SOCKET;
-  }
-
-  accept_result_t *result = (accept_result_t *) pn_list_get(ld->acceptor->accepts, 0);
-  pn_list_del(ld->acceptor->accepts, 0, 1);
-  if (!pn_list_size(ld->acceptor->accepts))
-    pni_events_update(ld, ld->events & ~PN_READABLE);  // No pending accepts
-
-  pn_socket_t accept_sock;
-  if (result->base.status) {
-    accept_sock = INVALID_SOCKET;
-    pni_win32_error(ld->error, "accept failure", result->base.status);
-    if (ld->iocp->iocp_trace)
-      iocp_log("%s\n", pn_error_text(ld->error));
-    // App never sees this socket so close it here.
-    pni_iocp_begin_close(result->new_sock);
-  } else {
-    accept_sock = result->new_sock->socket;
-    // AcceptEx special setsockopt:
-    setsockopt(accept_sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&ld->socket,
-                  sizeof (SOCKET));
-    if (addr && addrlen && *addrlen > 0) {
-      sockaddr_storage *local_addr = NULL;
-      sockaddr_storage *remote_addr = NULL;
-      int local_addrlen, remote_addrlen;
-      LPFN_GETACCEPTEXSOCKADDRS fn = ld->acceptor->fn_get_accept_ex_sockaddrs;
-      fn(result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN,
-         (SOCKADDR **) &local_addr, &local_addrlen, (SOCKADDR **) &remote_addr,
-         &remote_addrlen);
-      *addrlen = pn_min(*addrlen, remote_addrlen);
-      memmove(addr, remote_addr, *addrlen);
-    }
-  }
-
-  if (accept_sock != INVALID_SOCKET) {
-    // Connected.
-    result->new_sock->read_closed = false;
-    result->new_sock->write_closed = false;
-  }
-
-  // Done with the completion result, so reuse it
-  result->new_sock = NULL;
-  begin_accept(ld->acceptor, result);
-  return accept_sock;
-}
-
-
-// === Async connect processing
-
-typedef struct {
-  iocp_result_t base;
-  char address_buffer[IOCP_SOCKADDRBUFLEN];
-  struct addrinfo *addrinfo;
-} connect_result_t;
-
-#define connect_result_initialize NULL
-#define connect_result_compare NULL
-#define connect_result_inspect NULL
-#define connect_result_hashcode NULL
-
-static void connect_result_finalize(void *object)
-{
-  connect_result_t *result = (connect_result_t *) object;
-  // Do not release addrinfo until ConnectEx completes
-  if (result->addrinfo)
-    freeaddrinfo(result->addrinfo);
-}
-
-static connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr) {
-  static const pn_cid_t CID_connect_result = CID_pn_void;
-  static const pn_class_t clazz = PN_CLASS(connect_result);
-  connect_result_t *result = (connect_result_t *) pn_class_new(&clazz, sizeof(connect_result_t));
-  if (result) {
-    memset(result, 0, sizeof(connect_result_t));
-    result->base.type = IOCP_CONNECT;
-    result->base.iocpd = iocpd;
-    result->addrinfo = addr;
-  }
-  return result;
-}
-
-pn_socket_t pni_iocp_begin_connect(iocp_t *iocp, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error)
-{
-  // addr lives for the duration of the async connect.  Caller has passed ownership here.
-  // See connect_result_finalize().
-  // Use of Windows-specific ConnectEx() requires our socket to be "loosely" pre-bound:
-  sockaddr_storage sa;
-  memset(&sa, 0, sizeof(sa));
-  sa.ss_family = addr->ai_family;
-  if (bind(sock, (SOCKADDR *) &sa, addr->ai_addrlen)) {
-    pni_win32_error(error, "begin async connection", WSAGetLastError());
-    if (iocp->iocp_trace)
-      iocp_log("%s\n", pn_error_text(error));
-    closesocket(sock);
-    freeaddrinfo(addr);
-    return INVALID_SOCKET;
-  }
-
-  iocpdesc_t *iocpd = pni_iocpdesc_create(iocp, sock, false);
-  bind_to_completion_port(iocpd);
-  LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex(iocpd->socket);
-  connect_result_t *result = connect_result(iocpd, addr);
-  DWORD unused;
-  bool success = fn_connect_ex(iocpd->socket, result->addrinfo->ai_addr, result->addrinfo->ai_addrlen,
-                               NULL, 0, &unused, (LPOVERLAPPED) result);
-  if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
-    pni_win32_error(error, "ConnectEx failure", WSAGetLastError());
-    pn_free(result);
-    iocpd->write_closed = true;
-    iocpd->read_closed = true;
-    if (iocp->iocp_trace)
-      iocp_log("%s\n", pn_error_text(error));
-  } else {
-    iocpd->ops_in_progress++;
-  }
-  return sock;
-}
-
-static void complete_connect(connect_result_t *result, HRESULT status)
-{
-  iocpdesc_t *iocpd = result->base.iocpd;
-  if (iocpd->closing) {
-    pn_free(result);
-    reap_check(iocpd);
-    return;
-  }
-
-  if (status) {
-    iocpdesc_fail(iocpd, status, "Connect failure");
-    // Posix sets selectable events as follows:
-    pni_events_update(iocpd, PN_READABLE | PN_EXPIRED);
-  } else {
-    release_sys_sendbuf(iocpd->socket);
-    if (setsockopt(iocpd->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT,  NULL, 0)) {
-      iocpdesc_fail(iocpd, WSAGetLastError(), "Internal connect failure (update context)");
-    } else {
-      pni_events_update(iocpd, PN_WRITABLE);
-      start_reading(iocpd);
-    }
-  }
-  pn_free(result);
-  return;
-}
-
-
-// === Async writes
-
-static bool write_in_progress(iocpdesc_t *iocpd)
-{
-  return pni_write_pipeline_size(iocpd->pipeline) != 0;
-}
-
-write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen)
-{
-  write_result_t *result = (write_result_t *) calloc(sizeof(write_result_t), 1);
-  if (result) {
-    result->base.type = IOCP_WRITE;
-    result->base.iocpd = iocpd;
-    result->buffer.start = buf;
-    result->buffer.size = buflen;
-  }
-  return result;
-}
-
-static int submit_write(write_result_t *result, const void *buf, size_t len)
-{
-  WSABUF wsabuf;
-  wsabuf.buf = (char *) buf;
-  wsabuf.len = len;
-  memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
-  return WSASend(result->base.iocpd->socket, &wsabuf, 1, NULL, 0,
-                 (LPOVERLAPPED) result, 0);
-}
-
-ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, bool *would_block, pn_error_t *error)
-{
-  if (len == 0) return 0;
-  *would_block = false;
-  if (is_listener(iocpd)) {
-    set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
-    return INVALID_SOCKET;
-  }
-  if (iocpd->closing) {
-    set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
-    return SOCKET_ERROR;
-  }
-  if (iocpd->write_closed) {
-    assert(pn_error_code(iocpd->error));
-    pn_error_copy(error, iocpd->error);
-    if (iocpd->iocp->iocp_trace)
-      iocp_log("write error: %s\n", pn_error_text(error));
-    return SOCKET_ERROR;
-  }
-  if (len == 0) return 0;
-  if (!(iocpd->events & PN_WRITABLE)) {
-    *would_block = true;
-    return SOCKET_ERROR;
-  }
-
-  size_t written = 0;
-  size_t requested = len;
-  const char *outgoing = (const char *) buf;
-  size_t available = pni_write_pipeline_reserve(iocpd->pipeline, len);
-  if (!available) {
-    *would_block = true;
-    return SOCKET_ERROR;
-  }
-
-  for (size_t wr_count = 0; wr_count < available; wr_count++) {
-    write_result_t *result = pni_write_pipeline_next(iocpd->pipeline);
-    assert(result);
-    result->base.iocpd = iocpd;
-    ssize_t actual_len = pn_min(len, result->buffer.size);
-    result->requested = actual_len;
-    memmove((void *)result->buffer.start, outgoing, actual_len);
-    outgoing += actual_len;
-    written += actual_len;
-    len -= actual_len;
-
-    int werror = submit_write(result, result->buffer.start, actual_len);
-    if (werror && WSAGetLastError() != ERROR_IO_PENDING) {
-      pni_write_pipeline_return(iocpd->pipeline, result);
-      iocpdesc_fail(iocpd, WSAGetLastError(), "overlapped send");
-      return SOCKET_ERROR;
-    }
-    iocpd->ops_in_progress++;
-  }
-
-  if (!pni_write_pipeline_writable(iocpd->pipeline))
-    pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE);
-  return written;
-}
-
-/*
- * Note: iocp write completion is not "bytes on the wire", it is "peer
- * acked the sent bytes".  Completion can be seconds on a slow
- * consuming peer.
- */
-static void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status)
-{
-  iocpdesc_t *iocpd = result->base.iocpd;
-  if (iocpd->closing) {
-    pni_write_pipeline_return(iocpd->pipeline, result);
-    if (!iocpd->write_closed && !write_in_progress(iocpd))
-      iocp_shutdown(iocpd);
-    reap_check(iocpd);
-    return;
-  }
-  if (status == 0 && xfer_count > 0) {
-    if (xfer_count != result->requested) {
-      // Is this recoverable?  How to preserve order if multiple overlapped writes?
-      pni_write_pipeline_return(iocpd->pipeline, result);
-      iocpdesc_fail(iocpd, WSA_OPERATION_ABORTED, "Partial overlapped write on socket");
-      return;
-    } else {
-      // Success.
-      pni_write_pipeline_return(iocpd->pipeline, result);
-      if (pni_write_pipeline_writable(iocpd->pipeline))
-        pni_events_update(iocpd, iocpd->events | PN_WRITABLE);
-      return;
-    }
-  }
-  // Other error
-  pni_write_pipeline_return(iocpd->pipeline, result);
-  if (status == WSAECONNABORTED || status == WSAECONNRESET || status == WSAENOTCONN
-      || status == ERROR_NETNAME_DELETED) {
-    iocpd->write_closed = true;
-    iocpd->poll_error = true;
-    pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE);
-    pni_win32_error(iocpd->error, "Remote close or timeout", status);
-  } else {
-    iocpdesc_fail(iocpd, status, "IOCP async write error");
-  }
-}
-
-
-// === Async reads
-
-struct read_result_t {
-  iocp_result_t base;
-  size_t drain_count;
-  char unused_buf[1];
-};
-
-static read_result_t *read_result(iocpdesc_t *iocpd)
-{
-  read_result_t *result = (read_result_t *) calloc(sizeof(read_result_t), 1);
-  if (result) {
-    result->base.type = IOCP_READ;
-    result->base.iocpd = iocpd;
-  }
-  return result;
-}
-
-static void begin_zero_byte_read(iocpdesc_t *iocpd)
-{
-  if (iocpd->read_in_progress) return;
-  if (iocpd->read_closed) {
-    pni_events_update(iocpd, iocpd->events | PN_READABLE);
-    return;
-  }
-
-  read_result_t *result = iocpd->read_result;
-  memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
-  DWORD flags = 0;
-  WSABUF wsabuf;
-  wsabuf.buf = result->unused_buf;
-  wsabuf.len = 0;
-  int rc = WSARecv(iocpd->socket, &wsabuf, 1, NULL, &flags,
-                       &result->base.overlapped, 0);
-  if (rc && WSAGetLastError() != ERROR_IO_PENDING) {
-    iocpdesc_fail(iocpd, WSAGetLastError(), "IOCP read error");
-    return;
-  }
-  iocpd->ops_in_progress++;
-  iocpd->read_in_progress = true;
-}
-
-static void drain_until_closed(iocpdesc_t *iocpd) {
-  size_t max_drain = 16 * 1024;
-  char buf[512];
-  read_result_t *result = iocpd->read_result;
-  while (result->drain_count < max_drain) {
-    int rv = recv(iocpd->socket, buf, 512, 0);
-    if (rv > 0)
-      result->drain_count += rv;
-    else if (rv == 0) {
-      iocpd->read_closed = true;
-      return;
-    } else if (WSAGetLastError() == WSAEWOULDBLOCK) {
-      // wait a little longer
-      start_reading(iocpd);
-      return;
-    }
-    else
-      break;
-  }
-  // Graceful close indication unlikely, force the issue
-  if (iocpd->iocp->iocp_trace)
-    if (result->drain_count >= max_drain)
-      iocp_log("graceful close on reader abandoned (too many chars)\n");
-    else
-      iocp_log("graceful close on reader abandoned: %d\n", WSAGetLastError());
-  iocpd->read_closed = true;
-}
-
-
-static void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status)
-{
-  iocpdesc_t *iocpd = result->base.iocpd;
-  iocpd->read_in_progress = false;
-
-  if (iocpd->closing) {
-    // Application no longer reading, but we are looking for a zero length read
-    if (!iocpd->read_closed)
-      drain_until_closed(iocpd);
-    reap_check(iocpd);
-    return;
-  }
-
-  if (status == 0 && xfer_count == 0) {
-    // Success.
-    pni_events_update(iocpd, iocpd->events | PN_READABLE);
-  } else {
-    iocpdesc_fail(iocpd, status, "IOCP read complete error");
-  }
-}
-
-ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error)
-{
-  if (size == 0) return 0;
-  *would_block = false;
-  if (is_listener(iocpd)) {
-    set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
-    return SOCKET_ERROR;
-  }
-  if (iocpd->closing) {
-    // Previous call to pn_close()
-    set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
-    return SOCKET_ERROR;
-  }
-  if (iocpd->read_closed) {
-    if (pn_error_code(iocpd->error))
-      pn_error_copy(error, iocpd->error);
-    else
-      set_iocp_error_status(error, PN_ERR, WSAENOTCONN);
-    return SOCKET_ERROR;
-  }
-
-  int count = recv(iocpd->socket, (char *) buf, size, 0);
-  if (count > 0) {
-    pni_events_update(iocpd, iocpd->events & ~PN_READABLE);
-    begin_zero_byte_read(iocpd);
-    return (ssize_t) count;
-  } else if (count == 0) {
-    iocpd->read_closed = true;
-    return 0;
-  }
-  if (WSAGetLastError() == WSAEWOULDBLOCK)
-    *would_block = true;
-  else {
-    set_iocp_error_status(error, PN_ERR, WSAGetLastError());
-    iocpd->read_closed = true;
-  }
-  return SOCKET_ERROR;
-}
-
-static void start_reading(iocpdesc_t *iocpd)
-{
-  begin_zero_byte_read(iocpd);
-}
-
-
-// === The iocp descriptor
-
-static void pni_iocpdesc_initialize(void *object)
-{
-  iocpdesc_t *iocpd = (iocpdesc_t *) object;
-  memset(iocpd, 0, sizeof(iocpdesc_t));
-  iocpd->socket = INVALID_SOCKET;
-}
-
-static void pni_iocpdesc_finalize(void *object)
-{
-  iocpdesc_t *iocpd = (iocpdesc_t *) object;
-  pn_free(iocpd->acceptor);
-  pn_error_free(iocpd->error);
-   if (iocpd->pipeline)
-    if (write_in_progress(iocpd))
-      iocp_log("iocp descriptor write leak\n");
-    else
-      pn_free(iocpd->pipeline);
-  if (iocpd->read_in_progress)
-    iocp_log("iocp descriptor read leak\n");
-  else
-    free(iocpd->read_result);
-}
-
-static uintptr_t pni_iocpdesc_hashcode(void *object)
-{
-  iocpdesc_t *iocpd = (iocpdesc_t *) object;
-  return iocpd->socket;
-}
-
-#define pni_iocpdesc_compare NULL
-#define pni_iocpdesc_inspect NULL
-
-// Reference counted in the iocpdesc map, zombie_list, selector.
-static iocpdesc_t *pni_iocpdesc(pn_socket_t s)
-{
-  static const pn_cid_t CID_pni_iocpdesc = CID_pn_void;
-  static pn_class_t clazz = PN_CLASS(pni_iocpdesc);
-  iocpdesc_t *iocpd = (iocpdesc_t *) pn_class_new(&clazz, sizeof(iocpdesc_t));
-  assert(iocpd);
-  iocpd->socket = s;
-  return iocpd;
-}
-
-static bool is_listener_socket(pn_socket_t s)
-{
-  BOOL tval = false;
-  int tvalsz = sizeof(tval);
-  int code = getsockopt(s, SOL_SOCKET, SO_ACCEPTCONN, (char *)&tval, &tvalsz);
-  return code == 0 && tval;
-}
-
-iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) {
-  assert (s != INVALID_SOCKET);
-  assert(!pni_iocpdesc_map_get(iocp, s));
-  bool listening = is_listener_socket(s);
-  iocpdesc_t *iocpd = pni_iocpdesc(s);
-  iocpd->iocp = iocp;
-  if (iocpd) {
-    iocpd->external = external;
-    iocpd->error = pn_error();
-    if (listening) {
-      iocpd->acceptor = pni_acceptor(iocpd);
-    } else {
-      iocpd->pipeline = pni_write_pipeline(iocpd);
-      iocpd->read_result = read_result(iocpd);
-    }
-    pni_iocpdesc_map_push(iocpd);
-  }
-  return iocpd;
-}
-
-iocpdesc_t *pni_deadline_desc(iocp_t *iocp) {
-  // Non IO descriptor for selector deadlines.  Do not add to iocpdesc map or
-  // zombie list.  Selector responsible to free/decref object.
-  iocpdesc_t *iocpd = pni_iocpdesc(PN_INVALID_SOCKET);
-  iocpd->iocp = iocp;
-  iocpd->deadline_desc = true;
-  return iocpd;
-}
-
-// === Fast lookup of a socket's iocpdesc_t
-
-iocpdesc_t *pni_iocpdesc_map_get(iocp_t *iocp, pn_socket_t s) {
-  iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_get(iocp->iocpdesc_map, s);
-  return iocpd;
-}
-
-void pni_iocpdesc_map_push(iocpdesc_t *iocpd) {
-  pn_hash_put(iocpd->iocp->iocpdesc_map, iocpd->socket, iocpd);
-  pn_decref(iocpd);
-  assert(pn_refcount(iocpd) == 1);
-}
-
-void pni_iocpdesc_map_del(iocp_t *iocp, pn_socket_t s) {
-  pn_hash_del(iocp->iocpdesc_map, (uintptr_t) s);
-}
-
-static void bind_to_completion_port(iocpdesc_t *iocpd)
-{
-  if (iocpd->bound) return;
-  if (!iocpd->iocp->completion_port) {
-    iocpdesc_fail(iocpd, WSAEINVAL, "Incomplete setup, no completion port.");
-    return;
-  }
-
-  if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0))
-    iocpd->bound = true;
-  else {
-    iocpdesc_fail(iocpd, GetLastError(), "IOCP socket setup.");
-  }
-}
-
-static void release_sys_sendbuf(SOCKET s)
-{
-  // Set the socket's send buffer size to zero.
-  int sz = 0;
-  int status = setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&sz, sizeof(int));
-  assert(status == 0);
-}
-
-void pni_iocpdesc_start(iocpdesc_t *iocpd)
-{
-  if (iocpd->bound) return;
-  bind_to_completion_port(iocpd);
-  if (is_listener(iocpd)) {
-    begin_accept(iocpd->acceptor, NULL);
-  }
-  else {
-    release_sys_sendbuf(iocpd->socket);
-    pni_events_update(iocpd, PN_WRITABLE);
-    start_reading(iocpd);
-  }
-}
-
-static void complete(iocp_result_t *result, bool success, DWORD num_transferred) {
-  result->iocpd->ops_in_progress--;
-  DWORD status = success ? 0 : GetLastError();
-
-  switch (result->type) {
-  case IOCP_ACCEPT:
-    complete_accept((accept_result_t *) result, status);
-    break;
-  case IOCP_CONNECT:
-    complete_connect((connect_result_t *) result, status);
-    break;
-  case IOCP_WRITE:
-    complete_write((write_result_t *) result, num_transferred, status);
-    break;
-  case IOCP_READ:
-    complete_read((read_result_t *) result, num_transferred, status);
-    break;
-  default:
-    assert(false);
-  }
-}
-
-void pni_iocp_drain_completions(iocp_t *iocp)
-{
-  while (true) {
-    DWORD timeout_ms = 0;
-    DWORD num_transferred = 0;
-    ULONG_PTR completion_key = 0;
-    OVERLAPPED *overlapped = 0;
-
-    bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
-                                               &completion_key, &overlapped, timeout_ms);
-    if (!overlapped)
-      return;  // timed out
-    iocp_result_t *result = (iocp_result_t *) overlapped;
-    complete(result, good_op, num_transferred);
-  }
-}
-
-// returns: -1 on error, 0 on timeout, 1 successful completion
-int pni_iocp_wait_one(iocp_t *iocp, int timeout, pn_error_t *error) {
-  DWORD win_timeout = (timeout < 0) ? INFINITE : (DWORD) timeout;
-  DWORD num_transferred = 0;
-  ULONG_PTR completion_key = 0;
-  OVERLAPPED *overlapped = 0;
-
-  bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
-                                            &completion_key, &overlapped, win_timeout);
-  if (!overlapped)
-    if (GetLastError() == WAIT_TIMEOUT)
-      return 0;
-    else {
-      if (error)
-        pni_win32_error(error, "GetQueuedCompletionStatus", GetLastError());
-      return -1;
-    }
-
-  iocp_result_t *result = (iocp_result_t *) overlapped;
-  complete(result, good_op, num_transferred);
-  return 1;
-}
-
-// === Close (graceful and otherwise)
-
-// zombie_list is for sockets transitioning out of iocp on their way to zero ops_in_progress
-// and fully closed.
-
-static void zombie_list_add(iocpdesc_t *iocpd)
-{
-  assert(iocpd->closing);
-  if (!iocpd->ops_in_progress) {
-    // No need to make a zombie.
-    if (iocpd->socket != INVALID_SOCKET) {
-      closesocket(iocpd->socket);
-      iocpd->socket = INVALID_SOCKET;
-      iocpd->read_closed = true;
-    }
-    return;
-  }
-  // Allow 2 seconds for graceful shutdown before releasing socket resource.
-  iocpd->reap_time = pn_i_now() + 2000;
-  pn_list_add(iocpd->iocp->zombie_list, iocpd);
-}
-
-static void reap_check(iocpdesc_t *iocpd)
-{
-  if (iocpd->closing && !iocpd->ops_in_progress) {
-    if (iocpd->socket != INVALID_SOCKET) {
-      closesocket(iocpd->socket);
-      iocpd->socket = INVALID_SOCKET;
-    }
-    pn_list_remove(iocpd->iocp->zombie_list, iocpd);
-    // iocpd is decref'ed and possibly released
-  }
-}
-
-pn_timestamp_t pni_zombie_deadline(iocp_t *iocp)
-{
-  if (pn_list_size(iocp->zombie_list)) {
-    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, 0);
-    return iocpd->reap_time;
-  }
-  return 0;
-}
-
-void pni_zombie_check(iocp_t *iocp, pn_timestamp_t now)
-{
-  pn_list_t *zl = iocp->zombie_list;
-  // Look for stale zombies that should have been reaped by "now"
-  for (size_t idx = 0; idx < pn_list_size(zl); idx++) {
-    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(zl, idx);
-    if (iocpd->reap_time > now)
-      return;
-    if (iocpd->socket == INVALID_SOCKET)
-      continue;
-    assert(iocpd->ops_in_progress > 0);
-    if (iocp->iocp_trace)
-      iocp_log("async close: graceful close timeout exceeded\n");
-    closesocket(iocpd->socket);
-    iocpd->socket = INVALID_SOCKET;
-    iocpd->read_closed = true;
-    // outstanding ops should complete immediately now
-  }
-}
-
-static void drain_zombie_completions(iocp_t *iocp)
-{
-  // No more pn_selector_select() from App, but zombies still need care and feeding
-  // until their outstanding async actions complete.
-  pni_iocp_drain_completions(iocp);
-
-  // Discard any that have no pending async IO
-  size_t sz = pn_list_size(iocp->zombie_list);
-  for (size_t idx = 0; idx < sz;) {
-    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, idx);
-    if (!iocpd->ops_in_progress) {
-      pn_list_del(iocp->zombie_list, idx, 1);
-      sz--;
-    } else {
-      idx++;
-    }
-  }
-
-  unsigned shutdown_grace = 2000;
-  char *override = getenv("PN_SHUTDOWN_GRACE");
-  if (override) {
-    int grace = atoi(override);
-    if (grace > 0 && grace < 60000)
-      shutdown_grace = (unsigned) grace;
-  }
-  pn_timestamp_t now = pn_i_now();
-  pn_timestamp_t deadline = now + shutdown_grace;
-
-  while (pn_list_size(iocp->zombie_list)) {
-    if (now >= deadline)
-      break;
-    int rv = pni_iocp_wait_one(iocp, deadline - now, NULL);
-    if (rv < 0) {
-      iocp_log("unexpected IOCP failure on Proton IO shutdown %d\n", GetLastError());
-      break;
-    }
-    now = pn_i_now();
-  }
-  if (now >= deadline && pn_list_size(iocp->zombie_list) && iocp->iocp_trace)
-    // Should only happen if really slow TCP handshakes, i.e. total network failure
-    iocp_log("network failure on Proton shutdown\n");
-}
-
-static pn_list_t *iocp_map_close_all(iocp_t *iocp)
-{
-  // Zombify stragglers, i.e. no pn_close() from the application.
-  pn_list_t *externals = pn_list(PN_OBJECT, 0);
-  for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry;
-       entry = pn_hash_next(iocp->iocpdesc_map, entry)) {
-    iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry);
-    // Just listeners first.
-    if (is_listener(iocpd)) {
-      if (iocpd->external) {
-        // Owned by application, just keep a temporary reference to it.
-        // iocp_result_t structs must not be free'd until completed or
-        // the completion port is closed.
-        if (iocpd->ops_in_progress)
-          pn_list_add(externals, iocpd);
-        pni_iocpdesc_map_del(iocp, iocpd->socket);
-      } else {
-        // Make it a zombie.
-        pni_iocp_begin_close(iocpd);
-      }
-    }
-  }
-  pni_iocp_drain_completions(iocp);
-
-  for (pn_handle_t entry = pn_hash_head(iocp->iocpdesc_map); entry;
-       entry = pn_hash_next(iocp->iocpdesc_map, entry)) {
-    iocpdesc_t *iocpd = (iocpdesc_t *) pn_hash_value(iocp->iocpdesc_map, entry);
-    if (iocpd->external) {
-      iocpd->read_closed = true;   // Do not consume from read side
-      iocpd->write_closed = true;  // Do not shutdown write side
-      if (iocpd->ops_in_progress)
-        pn_list_add(externals, iocpd);
-      pni_iocpdesc_map_del(iocp, iocpd->socket);
-    } else {
-      // Make it a zombie.
-      pni_iocp_begin_close(iocpd);
-    }
-  }
-  return externals;
-}
-
-static void zombie_list_hard_close_all(iocp_t *iocp)
-{
-  pni_iocp_drain_completions(iocp);
-  size_t zs = pn_list_size(iocp->zombie_list);
-  for (size_t i = 0; i < zs; i++) {
-    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
-    if (iocpd->socket != INVALID_SOCKET) {
-      closesocket(iocpd->socket);
-      iocpd->socket = INVALID_SOCKET;
-      iocpd->read_closed = true;
-      iocpd->write_closed = true;
-    }
-  }
-  pni_iocp_drain_completions(iocp);
-
-  // Zombies should be all gone.  Do a sanity check.
-  zs = pn_list_size(iocp->zombie_list);
-  int remaining = 0;
-  int ops = 0;
-  for (size_t i = 0; i < zs; i++) {
-    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
-    remaining++;
-    ops += iocpd->ops_in_progress;
-  }
-  if (remaining)
-    iocp_log("Proton: %d unfinished close operations (ops count = %d)\n", remaining, ops);
-}
-
-static void iocp_shutdown(iocpdesc_t *iocpd)
-{
-  if (iocpd->socket == PN_INVALID_SOCKET)
-    return;    // Hard close in progress
-  if (shutdown(iocpd->socket, SD_SEND)) {
-    int err = WSAGetLastError();
-    if (err != WSAECONNABORTED && err != WSAECONNRESET && err != WSAENOTCONN)
-      if (iocpd->iocp->iocp_trace)
-        iocp_log("socket shutdown failed %d\n", err);
-  }
-  iocpd->write_closed = true;
-}
-
-void pni_iocp_begin_close(iocpdesc_t *iocpd)
-{
-  assert (!iocpd->closing);
-  if (is_listener(iocpd)) {
-    // Listening socket is easy.  Close the socket which will cancel async ops.
-    pn_socket_t old_sock = iocpd->socket;
-    iocpd->socket = INVALID_SOCKET;
-    iocpd->closing = true;
-    iocpd->read_closed = true;
-    iocpd->write_closed = true;
-    closesocket(old_sock);
-    // Pending accepts will now complete.  Zombie can die when all consumed.
-    zombie_list_add(iocpd);
-    pni_iocpdesc_map_del(iocpd->iocp, old_sock);  // may pn_free *iocpd
-  } else {
-    // Continue async operation looking for graceful close confirmation or timeout.
-    pn_socket_t old_sock = iocpd->socket;
-    iocpd->closing = true;
-    if (!iocpd->write_closed && !write_in_progress(iocpd))
-      iocp_shutdown(iocpd);
-    zombie_list_add(iocpd);
-    pni_iocpdesc_map_del(iocpd->iocp, old_sock);  // may pn_free *iocpd
-  }
-}
-
-
-// === iocp_t
-
-#define pni_iocp_hashcode NULL
-#define pni_iocp_compare NULL
-#define pni_iocp_inspect NULL
-
-void pni_iocp_initialize(void *obj)
-{
-  iocp_t *iocp = (iocp_t *) obj;
-  memset(iocp, 0, sizeof(iocp_t));
-  pni_shared_pool_create(iocp);
-  iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
-  assert(iocp->completion_port != NULL);
-  iocp->iocpdesc_map = pn_hash(PN_OBJECT, 0, 0.75);
-  iocp->zombie_list = pn_list(PN_OBJECT, 0);
-  iocp->iocp_trace = pn_env_bool("PN_TRACE_DRV");
-  iocp->selector = NULL;
-}
-
-void pni_iocp_finalize(void *obj)
-{
-  iocp_t *iocp = (iocp_t *) obj;
-  // Move sockets to closed state, except external sockets.
-  pn_list_t *externals = iocp_map_close_all(iocp);
-  // Now everything with ops_in_progress is in the zombie_list or the externals list.
-  assert(!pn_hash_head(iocp->iocpdesc_map));
-  pn_free(iocp->iocpdesc_map);
-
-  drain_zombie_completions(iocp);    // Last chance for graceful close
-  zombie_list_hard_close_all(iocp);
-  CloseHandle(iocp->completion_port);  // This cancels all our async ops
-  iocp->completion_port = NULL;
-
-  if (pn_list_size(externals) && iocp->iocp_trace)
-    iocp_log("%d external sockets not closed and removed from Proton IOCP control\n", pn_list_size(externals));
-
-  // Now safe to free everything that might be touched by a former async operation.
-  pn_free(externals);
-  pn_free(iocp->zombie_list);
-  pni_shared_pool_free(iocp);
-}
-
-iocp_t *pni_iocp()
-{
-  static const pn_cid_t CID_pni_iocp = CID_pn_void;
-  static const pn_class_t clazz = PN_CLASS(pni_iocp);
-  iocp_t *iocp = (iocp_t *) pn_class_new(&clazz, sizeof(iocp_t));
-  return iocp;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/windows/iocp.h
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/iocp.h b/proton-c/src/windows/iocp.h
deleted file mode 100644
index 0e052e5..0000000
--- a/proton-c/src/windows/iocp.h
+++ /dev/null
@@ -1,144 +0,0 @@
-#ifndef PROTON_SRC_IOCP_H
-#define PROTON_SRC_IOCP_H 1
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/import_export.h>
-#include <proton/selectable.h>
-#include <proton/type_compat.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-typedef struct pni_acceptor_t pni_acceptor_t;
-typedef struct write_result_t write_result_t;
-typedef struct read_result_t read_result_t;
-typedef struct write_pipeline_t write_pipeline_t;
-typedef struct iocpdesc_t iocpdesc_t;
-
-
-// One per pn_io_t.
-
-struct iocp_t {
-  HANDLE completion_port;
-  pn_hash_t *iocpdesc_map;
-  pn_list_t *zombie_list;
-  int shared_pool_size;
-  char *shared_pool_memory;
-  write_result_t **shared_results;
-  write_result_t **available_results;
-  size_t shared_available_count;
-  size_t writer_count;
-  int loopback_bufsize;
-  bool iocp_trace;
-  pn_selector_t *selector;
-};
-
-
-// One for each socket.
-// This iocpdesc_t structure is ref counted by the iocpdesc_map, zombie_list,
-// selector->iocp_descriptors list.  It should remain ref counted in the
-// zombie_list until ops_in_progress == 0 or the completion port is closed.
-
-struct iocpdesc_t {
-  pn_socket_t socket;
-  iocp_t *iocp;
-  pni_acceptor_t *acceptor;
-  pn_error_t *error;
-  int ops_in_progress;
-  bool read_in_progress;
-  write_pipeline_t *pipeline;
-  read_result_t *read_result;
-  bool external;       // true if socket set up outside Proton
-  bool bound;          // associted with the completion port
-  bool closing;        // pn_close called by application
-  bool read_closed;    // EOF or read error
-  bool write_closed;   // shutdown sent or write error
-  bool poll_error;     // flag posix-like POLLERR/POLLHUP/POLLNVAL
-  bool deadline_desc;  // Socket-less deadline descriptor for selectors
-  pn_selector_t *selector;
-  pn_selectable_t *selectable;
-  int events;
-  int interests;
-  pn_timestamp_t deadline;
-  iocpdesc_t *triggered_list_next;
-  iocpdesc_t *triggered_list_prev;
-  iocpdesc_t *deadlines_next;
-  iocpdesc_t *deadlines_prev;
-  pn_timestamp_t reap_time;;
-};
-
-typedef enum { IOCP_ACCEPT, IOCP_CONNECT, IOCP_READ, IOCP_WRITE } iocp_type_t;
-
-typedef struct {
-  OVERLAPPED overlapped;
-  iocp_type_t type;
-  iocpdesc_t *iocpd;
-  HRESULT status;
-} iocp_result_t;
-
-struct write_result_t {
-  iocp_result_t base;
-  size_t requested;
-  bool in_use;
-  pn_bytes_t buffer;
-};
-
-iocpdesc_t *pni_iocpdesc_create(iocp_t *, pn_socket_t s, bool external);
-iocpdesc_t *pni_iocpdesc_map_get(iocp_t *, pn_socket_t s);
-iocpdesc_t *pni_deadline_desc(iocp_t *);
-void pni_iocpdesc_map_del(iocp_t *, pn_socket_t s);
-void pni_iocpdesc_map_push(iocpdesc_t *iocpd);
-void pni_iocpdesc_start(iocpdesc_t *iocpd);
-void pni_iocp_drain_completions(iocp_t *);
-int pni_iocp_wait_one(iocp_t *, int timeout, pn_error_t *);
-void pni_iocp_start_accepting(iocpdesc_t *iocpd);
-pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error);
-pn_socket_t pni_iocp_begin_connect(iocp_t *, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error);
-ssize_t pni_iocp_begin_write(iocpdesc_t *, const void *, size_t, bool *, pn_error_t *);
-ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error);
-void pni_iocp_begin_close(iocpdesc_t *iocpd);
-iocp_t *pni_iocp();
-
-void pni_events_update(iocpdesc_t *iocpd, int events);
-write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen);
-write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd);
-size_t pni_write_pipeline_size(write_pipeline_t *);
-bool pni_write_pipeline_writable(write_pipeline_t *);
-void pni_write_pipeline_return(write_pipeline_t *, write_result_t *);
-size_t pni_write_pipeline_reserve(write_pipeline_t *, size_t);
-write_result_t *pni_write_pipeline_next(write_pipeline_t *);
-void pni_shared_pool_create(iocp_t *);
-void pni_shared_pool_free(iocp_t *);
-void pni_zombie_check(iocp_t *, pn_timestamp_t);
-pn_timestamp_t pni_zombie_deadline(iocp_t *);
-
-pn_selector_t *pni_selector_create(iocp_t *iocp);
-
-int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /* iocp.h */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org