You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2020/08/12 15:34:10 UTC

[qpid-proton] branch master updated (c8aba5e -> 728a2ef)

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git.


    from c8aba5e  PROTON-2226: remove assertion check that generates false positive
     new a2528bb  PROTON-2247: Raw connections API for the proactor - API defined in header files - Simple test applicationis derived from the direct.c example
     new f1d6197  PROTON-2247: Common raw connection implementation: - Buffer management - Read/Write logic - Stubbed out implementation specific code
     new 1b4bded  PROTON-2247: Tests for common raw connection code
     new 7dc9240  PROTON-2247: Epoll implementation of raw connection API
     new 728a2ef  PROTON-2247: Work on raw echo to improve output and add some wakes

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 c/CMakeLists.txt                         |   4 +-
 c/docs/user.doxygen.in                   |   1 +
 c/examples/CMakeLists.txt                |   2 +-
 c/examples/raw_connect.c                 | 256 +++++++++++
 c/examples/raw_echo.c                    | 321 ++++++++++++++
 c/include/proton/cid.h                   |   3 +-
 c/include/proton/event.h                 | 115 ++++-
 c/include/proton/listener.h              |  19 +
 c/include/proton/proactor.h              |  32 ++
 c/include/proton/raw_connection.h        | 265 +++++++++++
 c/include/proton/types.h                 |  11 +
 c/src/core/event.c                       | 161 +++----
 c/src/core/memory.c                      |   2 +-
 c/src/proactor/epoll-internal.h          |  84 +++-
 c/src/proactor/epoll.c                   | 134 +++---
 c/src/proactor/epoll_raw_connection.c    | 376 ++++++++++++++++
 c/src/proactor/libuv.c                   |   9 +
 c/src/proactor/raw_connection-internal.h | 109 +++++
 c/src/proactor/raw_connection.c          | 581 +++++++++++++++++++++++++
 c/src/proactor/win_iocp.cpp              |   9 +
 c/tests/CMakeLists.txt                   |   4 +
 c/tests/raw_connection_test.cpp          | 723 +++++++++++++++++++++++++++++++
 22 files changed, 3040 insertions(+), 181 deletions(-)
 create mode 100644 c/examples/raw_connect.c
 create mode 100644 c/examples/raw_echo.c
 create mode 100644 c/include/proton/raw_connection.h
 create mode 100644 c/src/proactor/epoll_raw_connection.c
 create mode 100644 c/src/proactor/raw_connection-internal.h
 create mode 100644 c/src/proactor/raw_connection.c
 create mode 100644 c/tests/raw_connection_test.cpp


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton] 02/05: PROTON-2247: Common raw connection implementation: - Buffer management - Read/Write logic - Stubbed out implementation specific code

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit f1d61976d2064d4a9d92af3dc8cd304b956f9184
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Mon Jun 22 13:36:10 2020 -0400

    PROTON-2247: Common raw connection implementation:
    - Buffer management
    - Read/Write logic
    - Stubbed out implementation specific code
---
 c/CMakeLists.txt                         |   1 +
 c/include/proton/cid.h                   |   3 +-
 c/src/core/event.c                       | 161 ++++-----
 c/src/core/memory.c                      |   2 +-
 c/src/proactor/epoll.c                   |   8 +
 c/src/proactor/libuv.c                   |   8 +
 c/src/proactor/raw_connection-internal.h | 108 ++++++
 c/src/proactor/raw_connection.c          | 578 +++++++++++++++++++++++++++++++
 c/src/proactor/win_iocp.cpp              |   8 +
 9 files changed, 776 insertions(+), 101 deletions(-)

diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt
index b686c42..6b2c167 100644
--- a/c/CMakeLists.txt
+++ b/c/CMakeLists.txt
@@ -333,6 +333,7 @@ string(TOLOWER "${PROACTOR}" PROACTOR)
 set (qpid-proton-proactor-common
   src/proactor/proactor-internal.c
   src/proactor/netaddr-internal.c
+  src/proactor/raw_connection.c
   )
 
 if (PROACTOR STREQUAL "epoll" OR (NOT PROACTOR AND NOT BUILD_PROACTOR))
diff --git a/c/include/proton/cid.h b/c/include/proton/cid.h
index d35c8ac..7e0c052 100644
--- a/c/include/proton/cid.h
+++ b/c/include/proton/cid.h
@@ -67,7 +67,8 @@ typedef enum {
   CID_pn_listener,
   CID_pn_proactor,
 
-  CID_pn_listener_socket
+  CID_pn_listener_socket,
+  CID_pn_raw_connection
 } pn_cid_t;
 
 /**
diff --git a/c/src/core/event.c b/c/src/core/event.c
index 5a20d87..cecc615 100644
--- a/c/src/core/event.c
+++ b/c/src/core/event.c
@@ -298,109 +298,72 @@ pn_record_t *pn_event_attachments(pn_event_t *event)
   return event->attachments;
 }
 
+
 const char *pn_event_type_name(pn_event_type_t type)
 {
+  #define CASE(X) case X: return #X
   switch (type) {
-  case PN_EVENT_NONE:
-    return "PN_EVENT_NONE";
-  case PN_REACTOR_INIT:
-    return "PN_REACTOR_INIT";
-  case PN_REACTOR_QUIESCED:
-    return "PN_REACTOR_QUIESCED";
-  case PN_REACTOR_FINAL:
-    return "PN_REACTOR_FINAL";
-  case PN_TIMER_TASK:
-    return "PN_TIMER_TASK";
-  case PN_CONNECTION_INIT:
-    return "PN_CONNECTION_INIT";
-  case PN_CONNECTION_BOUND:
-    return "PN_CONNECTION_BOUND";
-  case PN_CONNECTION_UNBOUND:
-    return "PN_CONNECTION_UNBOUND";
-  case PN_CONNECTION_REMOTE_OPEN:
-    return "PN_CONNECTION_REMOTE_OPEN";
-  case PN_CONNECTION_LOCAL_OPEN:
-    return "PN_CONNECTION_LOCAL_OPEN";
-  case PN_CONNECTION_REMOTE_CLOSE:
-    return "PN_CONNECTION_REMOTE_CLOSE";
-  case PN_CONNECTION_LOCAL_CLOSE:
-    return "PN_CONNECTION_LOCAL_CLOSE";
-  case PN_CONNECTION_FINAL:
-    return "PN_CONNECTION_FINAL";
-  case PN_SESSION_INIT:
-    return "PN_SESSION_INIT";
-  case PN_SESSION_REMOTE_OPEN:
-    return "PN_SESSION_REMOTE_OPEN";
-  case PN_SESSION_LOCAL_OPEN:
-    return "PN_SESSION_LOCAL_OPEN";
-  case PN_SESSION_REMOTE_CLOSE:
-    return "PN_SESSION_REMOTE_CLOSE";
-  case PN_SESSION_LOCAL_CLOSE:
-    return "PN_SESSION_LOCAL_CLOSE";
-  case PN_SESSION_FINAL:
-    return "PN_SESSION_FINAL";
-  case PN_LINK_INIT:
-    return "PN_LINK_INIT";
-  case PN_LINK_REMOTE_OPEN:
-    return "PN_LINK_REMOTE_OPEN";
-  case PN_LINK_LOCAL_OPEN:
-    return "PN_LINK_LOCAL_OPEN";
-  case PN_LINK_REMOTE_CLOSE:
-    return "PN_LINK_REMOTE_CLOSE";
-  case PN_LINK_LOCAL_DETACH:
-    return "PN_LINK_LOCAL_DETACH";
-  case PN_LINK_REMOTE_DETACH:
-    return "PN_LINK_REMOTE_DETACH";
-  case PN_LINK_LOCAL_CLOSE:
-    return "PN_LINK_LOCAL_CLOSE";
-  case PN_LINK_FLOW:
-    return "PN_LINK_FLOW";
-  case PN_LINK_FINAL:
-    return "PN_LINK_FINAL";
-  case PN_DELIVERY:
-    return "PN_DELIVERY";
-  case PN_TRANSPORT:
-    return "PN_TRANSPORT";
-  case PN_TRANSPORT_AUTHENTICATED:
-    return "PN_TRANSPORT_AUTHENTICATED";
-  case PN_TRANSPORT_ERROR:
-    return "PN_TRANSPORT_ERROR";
-  case PN_TRANSPORT_HEAD_CLOSED:
-    return "PN_TRANSPORT_HEAD_CLOSED";
-  case PN_TRANSPORT_TAIL_CLOSED:
-    return "PN_TRANSPORT_TAIL_CLOSED";
-  case PN_TRANSPORT_CLOSED:
-    return "PN_TRANSPORT_CLOSED";
-  case PN_SELECTABLE_INIT:
-    return "PN_SELECTABLE_INIT";
-  case PN_SELECTABLE_UPDATED:
-    return "PN_SELECTABLE_UPDATED";
-  case PN_SELECTABLE_READABLE:
-    return "PN_SELECTABLE_READABLE";
-  case PN_SELECTABLE_WRITABLE:
-    return "PN_SELECTABLE_WRITABLE";
-  case PN_SELECTABLE_ERROR:
-    return "PN_SELECTABLE_ERROR";
-  case PN_SELECTABLE_EXPIRED:
-    return "PN_SELECTABLE_EXPIRED";
-  case PN_SELECTABLE_FINAL:
-    return "PN_SELECTABLE_FINAL";
-   case PN_CONNECTION_WAKE:
-    return "PN_CONNECTION_WAKE";
-   case PN_LISTENER_ACCEPT:
-    return "PN_LISTENER_ACCEPT";
-   case PN_LISTENER_CLOSE:
-    return "PN_LISTENER_CLOSE";
-   case PN_PROACTOR_INTERRUPT:
-    return "PN_PROACTOR_INTERRUPT";
-   case PN_PROACTOR_TIMEOUT:
-    return "PN_PROACTOR_TIMEOUT";
-   case PN_PROACTOR_INACTIVE:
-    return "PN_PROACTOR_INACTIVE";
-   case PN_LISTENER_OPEN:
-    return "PN_LISTENER_OPEN";
-   default:
+  CASE(PN_EVENT_NONE);
+  CASE(PN_REACTOR_INIT);
+  CASE(PN_REACTOR_QUIESCED);
+  CASE(PN_REACTOR_FINAL);
+  CASE(PN_TIMER_TASK);
+  CASE(PN_CONNECTION_INIT);
+  CASE(PN_CONNECTION_BOUND);
+  CASE(PN_CONNECTION_UNBOUND);
+  CASE(PN_CONNECTION_REMOTE_OPEN);
+  CASE(PN_CONNECTION_LOCAL_OPEN);
+  CASE(PN_CONNECTION_REMOTE_CLOSE);
+  CASE(PN_CONNECTION_LOCAL_CLOSE);
+  CASE(PN_CONNECTION_FINAL);
+  CASE(PN_SESSION_INIT);
+  CASE(PN_SESSION_REMOTE_OPEN);
+  CASE(PN_SESSION_LOCAL_OPEN);
+  CASE(PN_SESSION_REMOTE_CLOSE);
+  CASE(PN_SESSION_LOCAL_CLOSE);
+  CASE(PN_SESSION_FINAL);
+  CASE(PN_LINK_INIT);
+  CASE(PN_LINK_REMOTE_OPEN);
+  CASE(PN_LINK_LOCAL_OPEN);
+  CASE(PN_LINK_REMOTE_CLOSE);
+  CASE(PN_LINK_LOCAL_DETACH);
+  CASE(PN_LINK_REMOTE_DETACH);
+  CASE(PN_LINK_LOCAL_CLOSE);
+  CASE(PN_LINK_FLOW);
+  CASE(PN_LINK_FINAL);
+  CASE(PN_DELIVERY);
+  CASE(PN_TRANSPORT);
+  CASE(PN_TRANSPORT_AUTHENTICATED);
+  CASE(PN_TRANSPORT_ERROR);
+  CASE(PN_TRANSPORT_HEAD_CLOSED);
+  CASE(PN_TRANSPORT_TAIL_CLOSED);
+  CASE(PN_TRANSPORT_CLOSED);
+  CASE(PN_SELECTABLE_INIT);
+  CASE(PN_SELECTABLE_UPDATED);
+  CASE(PN_SELECTABLE_READABLE);
+  CASE(PN_SELECTABLE_WRITABLE);
+  CASE(PN_SELECTABLE_ERROR);
+  CASE(PN_SELECTABLE_EXPIRED);
+  CASE(PN_SELECTABLE_FINAL);
+  CASE(PN_CONNECTION_WAKE);
+  CASE(PN_LISTENER_ACCEPT);
+  CASE(PN_LISTENER_CLOSE);
+  CASE(PN_PROACTOR_INTERRUPT);
+  CASE(PN_PROACTOR_TIMEOUT);
+  CASE(PN_PROACTOR_INACTIVE);
+  CASE(PN_LISTENER_OPEN);
+  CASE(PN_RAW_CONNECTION_CONNECTED);
+  CASE(PN_RAW_CONNECTION_DISCONNECTED);
+  CASE(PN_RAW_CONNECTION_CLOSED_READ);
+  CASE(PN_RAW_CONNECTION_CLOSED_WRITE);
+  CASE(PN_RAW_CONNECTION_NEED_READ_BUFFERS);
+  CASE(PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
+  CASE(PN_RAW_CONNECTION_READ);
+  CASE(PN_RAW_CONNECTION_WRITTEN);
+  CASE(PN_RAW_CONNECTION_WAKE);
+  default:
     return "PN_UNKNOWN";
   }
   return NULL;
+#undef CASE
 }
diff --git a/c/src/core/memory.c b/c/src/core/memory.c
index 213eab1..2b00c06 100644
--- a/c/src/core/memory.c
+++ b/c/src/core/memory.c
@@ -61,7 +61,7 @@ static struct stats {
   size_t subrequested;
   size_t suballoc;
   size_t subdealloc;
-} stats[CID_pn_listener_socket+1] = {{0}}; // Just happens to be the last CID
+} stats[CID_pn_raw_connection+1] = {{0}}; // Just happens to be the last CID
 
 static bool debug_memory = false;
 
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 5c07e28..3b5f32e 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -3047,3 +3047,11 @@ int64_t pn_proactor_now_64(void) {
   clock_gettime(CLOCK_MONOTONIC, &t);
   return t.tv_sec * 1000 + t.tv_nsec / 1000000;
 }
+
+// Empty stubs for raw connection code
+pn_raw_connection_t *pn_raw_connection(void) { return NULL; }
+void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {}
+void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) {}
+void pn_raw_connection_wake(pn_raw_connection_t *conn) {}
+const struct pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *connection) { return NULL; }
+const struct pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *connection) { return NULL; }
diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c
index dd33b41..409d6c5 100644
--- a/c/src/proactor/libuv.c
+++ b/c/src/proactor/libuv.c
@@ -1349,3 +1349,11 @@ pn_millis_t pn_proactor_now(void) {
 int64_t pn_proactor_now_64(void) {
   return uv_hrtime() / 1000000; // uv_hrtime returns time in nanoseconds
 }
+
+// Empty stubs for raw connection code
+pn_raw_connection_t *pn_raw_connection(void) { return NULL; }
+void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {}
+void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) {}
+void pn_raw_connection_wake(pn_raw_connection_t *conn) {}
+const struct pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *connection) { return NULL; }
+const struct pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *connection) { return NULL; }
diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h
new file mode 100644
index 0000000..79633e4
--- /dev/null
+++ b/c/src/proactor/raw_connection-internal.h
@@ -0,0 +1,108 @@
+#ifndef PROACTOR_RAW_CONNECTION_INTERNAL_H
+#define PROACTOR_RAW_CONNECTION_INTERNAL_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+enum {
+  read_buffer_count = 16,
+  write_buffer_count = 16
+};
+
+typedef enum {
+  buff_rempty    = 0,
+  buff_unread    = 1,
+  buff_read      = 2,
+  buff_wempty    = 4,
+  buff_unwritten = 5,
+  buff_written   = 6
+} buff_type;
+
+typedef uint16_t buff_ptr; // This is always the index+1 so that 0 can be special
+
+typedef struct pbuffer_t {
+  uintptr_t context;
+  char *bytes;
+  uint32_t capacity;
+  uint32_t size;
+  uint32_t offset;
+  buff_ptr next;
+  uint8_t type; // For debugging
+} pbuffer_t;
+
+struct pn_raw_connection_t {
+  pbuffer_t rbuffers[read_buffer_count];
+  pbuffer_t wbuffers[write_buffer_count];
+  pn_condition_t *condition;
+  pn_collector_t *collector;
+  pn_record_t *attachments;
+  uint32_t unwritten_offset;
+  uint16_t rbuffer_count;
+  uint16_t wbuffer_count;
+
+  buff_ptr rbuffer_first_empty;
+  buff_ptr rbuffer_first_unused;
+  buff_ptr rbuffer_last_unused;
+  buff_ptr rbuffer_first_read;
+  buff_ptr rbuffer_last_read;
+
+  buff_ptr wbuffer_first_empty;
+  buff_ptr wbuffer_first_towrite;
+  buff_ptr wbuffer_last_towrite;
+  buff_ptr wbuffer_first_written;
+  buff_ptr wbuffer_last_written;
+  bool rneedbufferevent;
+  bool wneedbufferevent;
+  bool rpending;
+  bool wpending;
+  bool rclosed;
+  bool wclosed;
+  bool rclosedpending;
+  bool wclosedpending;
+  bool rdrainpending;
+  bool wdrainpending;
+  bool disconnectpending;
+};
+
+/*
+ * Raw connection internal API
+ */
+bool pni_raw_validate(pn_raw_connection_t *conn);
+void pni_raw_connected(pn_raw_connection_t *conn);
+void pni_raw_disconnect(pn_raw_connection_t *conn);
+void pni_raw_process(pn_raw_connection_t *conn, int events, bool wake);
+void pni_raw_wake(pn_raw_connection_t *conn);
+void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int));
+void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int));
+bool pni_raw_can_read(pn_raw_connection_t *conn);
+bool pni_raw_can_write(pn_raw_connection_t *conn);
+pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn);
+void pni_raw_initialize(pn_raw_connection_t *conn);
+void pni_raw_finalize(pn_raw_connection_t *conn);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // PROACTOR_RAW_CONNECTION_INTERNAL_H
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
new file mode 100644
index 0000000..3e4a74c
--- /dev/null
+++ b/c/src/proactor/raw_connection.c
@@ -0,0 +1,578 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* Enable POSIX features beyond c99 for modern pthread and standard strerror_r() */
+#ifndef _POSIX_C_SOURCE
+#define _POSIX_C_SOURCE 200809L
+#endif
+/* Avoid GNU extensions, in particular the incompatible alternative strerror_r() */
+#undef _GNU_SOURCE
+
+#include "proton/raw_connection.h"
+
+#include "proton/event.h"
+#include "proton/listener.h"
+#include "proton/object.h"
+#include "proton/proactor.h"
+#include "proton/types.h"
+
+#include "core/util.h"
+#include "proactor-internal.h"
+
+#include <assert.h>
+#include <errno.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "raw_connection-internal.h"
+
+PN_STRUCT_CLASSDEF(pn_raw_connection)
+
+void pni_raw_initialize(pn_raw_connection_t *conn) {
+  // Link together free lists
+  for (buff_ptr i = 1; i<=read_buffer_count; i++) {
+    conn->rbuffers[i-1].next = i==read_buffer_count ? 0 : i+1;
+    conn->rbuffers[i-1].type = buff_rempty;
+    conn->wbuffers[i-1].next = i==read_buffer_count ? 0 : i+1;
+    conn->wbuffers[i-1].type = buff_wempty;
+  }
+
+  //conn->batch.next_event = pni_raw_batch_next;
+  conn->condition = pn_condition();
+  conn->collector = pn_collector();
+  conn->attachments = pn_record();
+
+  conn->rbuffer_first_empty = 1;
+  conn->wbuffer_first_empty = 1;
+}
+
+bool pni_raw_validate(pn_raw_connection_t *conn) {
+  int rempty_count = 0;
+  for (buff_ptr i = conn->rbuffer_first_empty; i; i = conn->rbuffers[i-1].next) {
+    if (conn->rbuffers[i-1].type != buff_rempty) return false;
+    rempty_count++;
+  }
+  int runused_count = 0;
+  for (buff_ptr i = conn->rbuffer_first_unused; i; i = conn->rbuffers[i-1].next) {
+    if (conn->rbuffers[i-1].type != buff_unread) return false;
+    runused_count++;
+  }
+  int rread_count = 0;
+  for (buff_ptr i = conn->rbuffer_first_read; i; i = conn->rbuffers[i-1].next) {
+    if (conn->rbuffers[i-1].type != buff_read) return false;
+    rread_count++;
+  }
+  if (rempty_count+runused_count+rread_count != read_buffer_count) return false;
+  if (!conn->rbuffer_first_unused && conn->rbuffer_last_unused) return false;
+  if (conn->rbuffer_last_unused &&
+    (conn->rbuffers[conn->rbuffer_last_unused-1].type != buff_unread || conn->rbuffers[conn->rbuffer_last_unused-1].next != 0)) return false;
+  if (!conn->rbuffer_first_read && conn->rbuffer_last_read) return false;
+  if (conn->rbuffer_last_read &&
+    (conn->rbuffers[conn->rbuffer_last_read-1].type != buff_read || conn->rbuffers[conn->rbuffer_last_read-1].next != 0)) return false;
+
+  int wempty_count = 0;
+  for (buff_ptr i = conn->wbuffer_first_empty; i; i = conn->wbuffers[i-1].next) {
+    if (conn->wbuffers[i-1].type != buff_wempty) return false;
+    wempty_count++;
+  }
+  int wunwritten_count = 0;
+  for (buff_ptr i = conn->wbuffer_first_towrite; i; i = conn->wbuffers[i-1].next) {
+    if (conn->wbuffers[i-1].type != buff_unwritten) return false;
+    wunwritten_count++;
+  }
+  int wwritten_count = 0;
+  for (buff_ptr i = conn->wbuffer_first_written; i; i = conn->wbuffers[i-1].next) {
+    if (conn->wbuffers[i-1].type != buff_written) return false;
+    wwritten_count++;
+  }
+  if (wempty_count+wunwritten_count+wwritten_count != write_buffer_count) return false;
+  if (!conn->wbuffer_first_towrite && conn->wbuffer_last_towrite) return false;
+  if (conn->wbuffer_last_towrite &&
+    (conn->wbuffers[conn->wbuffer_last_towrite-1].type != buff_unwritten || conn->wbuffers[conn->wbuffer_last_towrite-1].next != 0)) return false;
+  if (!conn->wbuffer_first_written && conn->wbuffer_last_written) return false;
+  if (conn->wbuffer_last_written &&
+    (conn->wbuffers[conn->wbuffer_last_written-1].type != buff_written || conn->wbuffers[conn->wbuffer_last_written-1].next != 0)) return false;
+  return true;
+}
+
+void pni_raw_finalize(pn_raw_connection_t *conn) {
+  pn_condition_free(conn->condition);
+  pn_collector_free(conn->collector);
+  pn_free(conn->attachments);
+}
+
+size_t pn_raw_connection_read_buffers_capacity(pn_raw_connection_t *conn) {
+  assert(conn);
+  return read_buffer_count - conn->rbuffer_count;
+}
+
+size_t pn_raw_connection_write_buffers_capacity(pn_raw_connection_t *conn) {
+  assert(conn);
+  return write_buffer_count-conn->wbuffer_count;
+}
+
+size_t pn_raw_connection_give_read_buffers(pn_raw_connection_t *conn, pn_raw_buffer_t const *buffers, size_t num) {
+  assert(conn);
+  size_t can_take = pn_min(num, pn_raw_connection_read_buffers_capacity(conn));
+  if ( can_take==0 ) return 0;
+
+  buff_ptr current = conn->rbuffer_first_empty;
+  assert(current);
+
+  buff_ptr previous;
+  for (size_t i = 0; i < can_take; i++) {
+    // Get next free
+    assert(conn->rbuffers[current-1].type == buff_rempty);
+    conn->rbuffers[current-1].context = buffers[i].context;
+    conn->rbuffers[current-1].bytes = buffers[i].bytes;
+    conn->rbuffers[current-1].capacity = buffers[i].capacity;
+    conn->rbuffers[current-1].size = 0;
+    conn->rbuffers[current-1].offset = buffers[i].offset;
+    conn->rbuffers[current-1].type = buff_unread;
+
+    previous = current;
+    current = conn->rbuffers[current-1].next;
+  }
+  if (!conn->rbuffer_last_unused) {
+    conn->rbuffer_last_unused = previous;
+  }
+
+  conn->rbuffers[previous-1].next = conn->rbuffer_first_unused;
+  conn->rbuffer_first_unused = conn->rbuffer_first_empty;
+  conn->rbuffer_first_empty = current;
+
+  conn->rbuffer_count += can_take;
+  conn->rneedbufferevent = false;
+  return can_take;
+}
+
+size_t pn_raw_connection_take_read_buffers(pn_raw_connection_t *conn, pn_raw_buffer_t *buffers, size_t num) {
+  assert(conn);
+  size_t count = 0;
+
+  buff_ptr current = conn->rbuffer_first_read;
+  if (!current) return 0;
+
+  buff_ptr previous;
+  for (; current && count < num; count++) {
+    assert(conn->rbuffers[current-1].type == buff_read);
+    buffers[count].context = conn->rbuffers[current-1].context;
+    buffers[count].bytes = conn->rbuffers[current-1].bytes;
+    buffers[count].capacity = conn->rbuffers[current-1].capacity;
+    buffers[count].size = conn->rbuffers[current-1].size;
+    buffers[count].offset = conn->rbuffers[current-1].offset - conn->rbuffers[current-1].size;
+    conn->rbuffers[current-1].type = buff_rempty;
+
+    previous = current;
+    current = conn->rbuffers[current-1].next;
+  }
+  if (!count) return 0;
+
+  conn->rbuffers[previous-1].next = conn->rbuffer_first_empty;
+  conn->rbuffer_first_empty = conn->rbuffer_first_read;
+
+  conn->rbuffer_first_read = current;
+  if (!current) {
+    conn->rbuffer_last_read = 0;
+  }
+  conn->rbuffer_count -= count;
+  return count;
+}
+
+size_t pn_raw_connection_write_buffers(pn_raw_connection_t *conn, pn_raw_buffer_t const *buffers, size_t num) {
+  assert(conn);
+  size_t can_take = pn_min(num, pn_raw_connection_write_buffers_capacity(conn));
+  if ( can_take==0 ) return 0;
+
+  buff_ptr current = conn->wbuffer_first_empty;
+  assert(current);
+
+  buff_ptr previous;
+  for (size_t i = 0; i < can_take; i++) {
+    // Get next free
+    assert(conn->wbuffers[current-1].type == buff_wempty);
+    conn->wbuffers[current-1].context = buffers[i].context;
+    conn->wbuffers[current-1].bytes = buffers[i].bytes;
+    conn->wbuffers[current-1].capacity = buffers[i].capacity;
+    conn->wbuffers[current-1].size = buffers[i].size;
+    conn->wbuffers[current-1].offset = buffers[i].offset;
+    conn->wbuffers[current-1].type = buff_unwritten;
+
+    previous = current;
+    current = conn->wbuffers[current-1].next;
+  }
+
+  if (!conn->wbuffer_first_towrite) {
+    conn->wbuffer_first_towrite = conn->wbuffer_first_empty;
+  }
+  if (conn->wbuffer_last_towrite) {
+    conn->wbuffers[conn->wbuffer_last_towrite-1].next = conn->wbuffer_first_empty;
+  }
+
+  conn->wbuffer_last_towrite = previous;
+  conn->wbuffers[previous-1].next = 0;
+  conn->wbuffer_first_empty = current;
+
+  conn->wbuffer_count += can_take;
+  conn->wneedbufferevent = false;
+  return can_take;
+}
+
+size_t pn_raw_connection_take_written_buffers(pn_raw_connection_t *conn, pn_raw_buffer_t *buffers, size_t num) {
+  assert(conn);
+  size_t count = 0;
+
+  buff_ptr current = conn->wbuffer_first_written;
+  if (!current) return 0;
+
+  buff_ptr previous;
+  for (; current && count < num; count++) {
+    assert(conn->wbuffers[current-1].type == buff_written);
+    buffers[count].context = conn->wbuffers[current-1].context;
+    buffers[count].bytes = conn->wbuffers[current-1].bytes;
+    buffers[count].capacity = conn->wbuffers[current-1].capacity;
+    buffers[count].size = conn->wbuffers[current-1].size;
+    buffers[count].offset = conn->wbuffers[current-1].offset;
+    conn->wbuffers[current-1].type = buff_wempty;
+
+    previous = current;
+    current = conn->wbuffers[current-1].next;
+  }
+  if (!count) return 0;
+
+  conn->wbuffers[previous-1].next = conn->wbuffer_first_empty;
+  conn->wbuffer_first_empty = conn->wbuffer_first_written;
+
+  conn->wbuffer_first_written = current;
+  if (!current) {
+    conn->wbuffer_last_written = 0;
+  }
+  conn->wbuffer_count -= count;
+  return count;
+}
+
+static inline void pni_raw_put_event(pn_raw_connection_t *conn, pn_event_type_t type) {
+  pn_collector_put(conn->collector, PN_CLASSCLASS(pn_raw_connection), (void*)conn, type);
+}
+
+static inline void pni_raw_release_buffers(pn_raw_connection_t *conn) {
+  for(;conn->rbuffer_first_unused;) {
+    buff_ptr p = conn->rbuffer_first_unused;
+    assert(conn->rbuffers[p-1].type == buff_unread);
+    conn->rbuffers[p-1].size = 0;
+    if (!conn->rbuffer_first_read) {
+      conn->rbuffer_first_read = p;
+    }
+    if (conn->rbuffer_last_read) {
+      conn->rbuffers[conn->rbuffer_last_read-1].next = p;
+    }
+    conn->rbuffer_last_read = p;
+    conn->rbuffer_first_unused = conn->rbuffers[p-1].next;
+
+    conn->rbuffers[p-1].next = 0;
+    conn->rbuffers[p-1].type = buff_read;
+  }
+  conn->rbuffer_last_unused = 0;
+  for(;conn->wbuffer_first_towrite;) {
+    buff_ptr p = conn->wbuffer_first_towrite;
+    assert(conn->wbuffers[p-1].type == buff_unwritten);
+    if (!conn->wbuffer_first_written) {
+      conn->wbuffer_first_written = p;
+    }
+    if (conn->wbuffer_last_written) {
+      conn->wbuffers[conn->wbuffer_last_written-1].next = p;
+    }
+    conn->wbuffer_last_written = p;
+    conn->wbuffer_first_towrite = conn->wbuffers[p-1].next;
+
+    conn->wbuffers[p-1].next = 0;
+    conn->wbuffers[p-1].type = buff_written;
+  }
+  conn->wbuffer_last_towrite = 0;
+  conn->rdrainpending = (bool)(conn->rbuffer_first_read);
+  conn->wdrainpending = (bool)(conn->wbuffer_first_written);
+}
+
+void pni_raw_disconnect(pn_raw_connection_t *conn) {
+  pni_raw_release_buffers(conn);
+  conn->disconnectpending = true;
+}
+
+void pni_raw_connected(pn_raw_connection_t *conn) {
+  pn_condition_clear(conn->condition);
+  pni_raw_put_event(conn, PN_RAW_CONNECTION_CONNECTED);
+}
+
+void pni_raw_wake(pn_raw_connection_t *conn) {
+  pni_raw_put_event(conn, PN_RAW_CONNECTION_WAKE);
+}
+
+void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) {
+  assert(conn);
+  bool closed = false;
+  for(;conn->rbuffer_first_unused;) {
+    buff_ptr p = conn->rbuffer_first_unused;
+    assert(conn->rbuffers[p-1].type == buff_unread);
+    char *bytes = conn->rbuffers[p-1].bytes+conn->rbuffers[p-1].offset;
+    size_t s = conn->rbuffers[p-1].capacity-conn->rbuffers[p-1].offset;
+    int r = recv(sock, bytes, s);
+    if (r < 0) {
+      switch (errno) {
+        // Interrupted system call try again
+        case EINTR: continue;
+
+        // Would block
+        case EWOULDBLOCK: goto finished_reading;
+
+        // Detected an error
+        default:
+          set_error(conn, "recv error", errno);
+          pn_raw_connection_close(conn);
+          return;
+      }
+    }
+    conn->rbuffers[p-1].size += r;
+    conn->rbuffers[p-1].offset += r;
+
+    if (!conn->rbuffer_first_read) {
+      conn->rbuffer_first_read = p;
+    }
+    if (conn->rbuffer_last_read) {
+      conn->rbuffers[conn->rbuffer_last_read-1].next = p;
+    }
+    conn->rbuffer_last_read = p;
+    conn->rbuffer_first_unused = conn->rbuffers[p-1].next;
+
+    conn->rbuffers[p-1].next = 0;
+    conn->rbuffers[p-1].type = buff_read;
+
+    // Checking for end of stream here ensures that there is a buffer at the end with nothing in it
+    if (r == 0) {
+      closed = true;
+      break;
+    }
+  }
+finished_reading:
+  if (!conn->rbuffer_first_unused) {
+    conn->rbuffer_last_unused = 0;
+  }
+  // Read something - we are now either out of buffers; end of stream; or blocked for read
+  if (conn->rbuffer_first_read && !conn->rpending) {
+    conn->rpending = true;
+  }
+  // Socket closed for read
+  if (closed) {
+    if (!conn->rclosed) {
+      conn->rclosed = true;
+      conn->rclosedpending = true;
+      if (conn->wclosed) {
+        pni_raw_disconnect(conn);
+      }
+    }
+  }
+  return;
+}
+
+void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) {
+  assert(conn);
+  bool closed = false;
+  for(;conn->wbuffer_first_towrite;) {
+    buff_ptr p = conn->wbuffer_first_towrite;
+    assert(conn->wbuffers[p-1].type == buff_unwritten);
+    char *bytes = conn->wbuffers[p-1].bytes+conn->wbuffers[p-1].offset+conn->unwritten_offset;
+    size_t s = conn->wbuffers[p-1].size-conn->unwritten_offset;
+    int r = send(sock,  bytes, s);
+    if (r < 0) {
+      // Interrupted system call try again
+      switch (errno) {
+        // Interrupted system call try again
+        case EINTR: continue;
+
+        case EWOULDBLOCK:
+          goto finished_writing;
+
+        default:
+          set_error(conn, "send error", errno);
+          pn_raw_connection_close(conn);
+          return;
+      }
+    }
+    // return of 0 was never observed in testing and the documentation
+    // implies that 0 could only be returned if 0 bytes were sent; however
+    // leaving this case here seems safe.
+    if (r == 0 && s > 0) {
+      closed = true;
+      break;
+    }
+
+    // Only wrote a partial buffer  - adjust buffer
+    if (r != (int)s) {
+      conn->unwritten_offset += r;
+      break;
+    }
+
+    conn->unwritten_offset = 0;
+
+    if (!conn->wbuffer_first_written) {
+      conn->wbuffer_first_written = p;
+    }
+    if (conn->wbuffer_last_written) {
+      conn->wbuffers[conn->wbuffer_last_written-1].next = p;
+    }
+    conn->wbuffer_last_written = p;
+    conn->wbuffer_first_towrite = conn->wbuffers[p-1].next;
+
+    conn->wbuffers[p-1].next = 0;
+    conn->wbuffers[p-1].type = buff_written;
+  }
+finished_writing:
+  if (!conn->wbuffer_first_towrite) {
+    conn->wbuffer_last_towrite = 0;
+  }
+  // Wrote something; end of stream; out of buffers; or blocked for write
+  if (conn->wbuffer_first_written && !conn->wpending) {
+    conn->wpending = true;
+  }
+  // Socket closed for write
+  if (closed) {
+    if (!conn->wclosed) {
+      conn->wclosed = true;
+      conn->wclosedpending = true;
+      if (conn->rclosed) {
+        pni_raw_disconnect(conn);;
+      }
+    }
+  }
+  return;
+}
+
+bool pni_raw_can_read(pn_raw_connection_t *conn) {
+  return !conn->rclosed && conn->rbuffer_first_unused;
+}
+
+bool pni_raw_can_write(pn_raw_connection_t *conn) {
+  return !conn->wclosed && conn->wbuffer_first_towrite;
+}
+
+pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
+  assert(conn);
+  do {
+    pn_event_t *event = pn_collector_next(conn->collector);
+    if (event) {
+      pn_event_type_t type = pn_event_type(event);
+      switch (type) {
+        default: break;
+      }
+    } else if (conn->rpending) {
+      pni_raw_put_event(conn, PN_RAW_CONNECTION_READ);
+      conn->rpending = false;
+      continue;
+    } else if (conn->wpending) {
+      pni_raw_put_event(conn, PN_RAW_CONNECTION_WRITTEN);
+      conn->wpending = false;
+      continue;
+    } else if (conn->rclosedpending) {
+      pni_raw_put_event(conn, PN_RAW_CONNECTION_CLOSED_READ);
+      conn->rclosedpending = false;
+      continue;
+    } else if (conn->wclosedpending) {
+      pni_raw_put_event(conn, PN_RAW_CONNECTION_CLOSED_WRITE);
+      conn->wclosedpending = false;
+      continue;
+    } else if (conn->rdrainpending) {
+      pni_raw_put_event(conn, PN_RAW_CONNECTION_READ);
+      conn->rdrainpending = false;
+      continue;
+    } else if (conn->wdrainpending) {
+      pni_raw_put_event(conn, PN_RAW_CONNECTION_WRITTEN);
+      conn->wdrainpending = false;
+      continue;
+    } else if (conn->disconnectpending) {
+      pni_raw_put_event(conn, PN_RAW_CONNECTION_DISCONNECTED);
+      conn->disconnectpending = false;
+      continue;
+    } else if (!conn->wclosed && !conn->wbuffer_first_towrite && !conn->wneedbufferevent) {
+      // Ran out of write buffers
+      pni_raw_put_event(conn, PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
+      conn->wneedbufferevent = true;
+      continue;
+    } else if (!conn->rclosed && !conn->rbuffer_first_unused && !conn->rneedbufferevent) {
+      // Ran out of read buffers
+      pni_raw_put_event(conn, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
+      conn->rneedbufferevent = true;
+      continue;
+    }
+    return pni_log_event(conn, event);
+  } while (true);
+}
+
+void pn_raw_connection_close(pn_raw_connection_t *conn) {
+  // TODO: Do we need different flags here?
+  // TODO: What is the precise semantics for close?
+  bool rclosed = conn->rclosed;
+  if (!rclosed) {
+    conn->rclosed = true;
+    conn->rclosedpending = true;
+  }
+  bool wclosed = conn->wclosed;
+  if (!wclosed) {
+    conn->wclosed = true;
+    conn->wclosedpending = true;
+  }
+  if (!rclosed || !wclosed) {
+    pni_raw_disconnect(conn);
+  }
+}
+
+bool pn_raw_connection_is_read_closed(pn_raw_connection_t *conn) {
+  assert(conn);
+  return conn->rclosed;
+}
+
+bool pn_raw_connection_is_write_closed(pn_raw_connection_t *conn) {
+  assert(conn);
+  return conn->wclosed;
+}
+
+pn_condition_t *pn_raw_connection_condition(pn_raw_connection_t *conn) {
+  assert(conn);
+  return conn->condition;
+}
+
+void *pn_raw_connection_get_context(pn_raw_connection_t *conn) {
+  assert(conn);
+  return pn_record_get(conn->attachments, PN_LEGCTX);
+}
+
+void pn_raw_connection_set_context(pn_raw_connection_t *conn, void *context) {
+  assert(conn);
+  pn_record_set(conn->attachments, PN_LEGCTX, context);
+}
+
+pn_record_t *pn_raw_connection_attachments(pn_raw_connection_t *conn) {
+  assert(conn);
+  return conn->attachments;
+}
+
+pn_raw_connection_t *pn_event_raw_connection(pn_event_t *event) {
+  return (pn_event_class(event) == PN_CLASSCLASS(pn_raw_connection)) ? (pn_raw_connection_t*)pn_event_context(event) : NULL;
+}
diff --git a/c/src/proactor/win_iocp.cpp b/c/src/proactor/win_iocp.cpp
index 348564c..88737e4 100644
--- a/c/src/proactor/win_iocp.cpp
+++ b/c/src/proactor/win_iocp.cpp
@@ -3420,3 +3420,11 @@ pn_millis_t pn_proactor_now(void) {
 int64_t pn_proactor_now_64(void) {
   return GetTickCount64();
 }
+
+// Empty stubs for raw connection code
+pn_raw_connection_t *pn_raw_connection(void) { return NULL; }
+void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {}
+void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) {}
+void pn_raw_connection_wake(pn_raw_connection_t *conn) {}
+const struct pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *connection) { return NULL; }
+const struct pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *connection) { return NULL; }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton] 03/05: PROTON-2247: Tests for common raw connection code

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 1b4bdedb696988271e9455b70d95f6fef90ca3b3
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Thu Mar 12 14:30:27 2020 -0400

    PROTON-2247: Tests for common raw connection code
---
 c/tests/CMakeLists.txt          |   4 +
 c/tests/raw_connection_test.cpp | 723 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 727 insertions(+)

diff --git a/c/tests/CMakeLists.txt b/c/tests/CMakeLists.txt
index 6c762b4..3417304 100644
--- a/c/tests/CMakeLists.txt
+++ b/c/tests/CMakeLists.txt
@@ -75,6 +75,10 @@ if (CMAKE_CXX_COMPILER)
     add_c_test(c-proactor-test pn_test_proactor.cpp proactor_test.cpp)
     target_link_libraries(c-proactor-test qpid-proton-core qpid-proton-proactor ${PLATFORM_LIBS})
 
+    list(TRANSFORM qpid-proton-proactor PREPEND "../" OUTPUT_VARIABLE qpid-proton-proactor-src)
+    add_c_test(c-raw-connection-test raw_connection_test.cpp ${qpid-proton-proactor-src})
+    target_link_libraries(c-raw-connection-test qpid-proton-core ${PLATFORM_LIBS} ${PROACTOR_LIBS})
+
     add_c_test(c-ssl-proactor-test pn_test_proactor.cpp ssl_proactor_test.cpp)
     target_link_libraries(c-ssl-proactor-test qpid-proton-core qpid-proton-proactor ${PLATFORM_LIBS})
 
diff --git a/c/tests/raw_connection_test.cpp b/c/tests/raw_connection_test.cpp
new file mode 100644
index 0000000..d32103b
--- /dev/null
+++ b/c/tests/raw_connection_test.cpp
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/raw_connection.h>
+#include "proactor/raw_connection-internal.h"
+
+#include "pn_test.hpp"
+
+#ifdef _WIN32
+#include <errno.h>
+#else
+#include <sys/socket.h>
+#include <unistd.h>
+#include <errno.h>
+#endif
+
+#include <string.h>
+
+#include <array>
+#include <utility>
+#include <string>
+#include <vector>
+
+using namespace pn_test;
+using Catch::Matchers::Contains;
+using Catch::Matchers::Equals;
+
+namespace {
+  pn_raw_connection_t* mk_raw_connection() {
+    pn_raw_connection_t* rc = (pn_raw_connection_t*) calloc(1, sizeof(struct pn_raw_connection_t));
+    pni_raw_initialize(rc);
+    return rc;
+  }
+  void free_raw_connection(pn_raw_connection_t* c) {
+    pni_raw_finalize(c);
+    free(c);
+  }
+  int read_err;
+  void set_read_error(pn_raw_connection_t*, const char*, int err) {
+    read_err = err;
+  }
+  int write_err;
+  void set_write_error(pn_raw_connection_t*, const char*, int err) {
+    write_err = err;
+  }
+
+  size_t max_send_size = 0;
+  size_t max_recv_size = 0;
+
+#ifdef MSG_DONTWAIT
+  long rcv(int fd, void* b, size_t s) {
+    read_err = 0;
+    if (max_recv_size && max_recv_size < s) s = max_recv_size;
+    return ::recv(fd, b, s, MSG_DONTWAIT);
+  }
+
+  void freepair(int fds[2]) {
+      ::close(fds[0]);
+      ::close(fds[1]);
+  }
+
+  void rcv_stop(int fd) {
+      ::shutdown(fd, SHUT_RD);
+  }
+
+  void snd_stop(int fd) {
+      ::shutdown(fd, SHUT_WR);
+  }
+
+#ifdef MSG_NOSIGNAL
+  long snd(int fd, const void* b, size_t s) {
+    write_err = 0;
+    if (max_send_size && max_send_size < s) s = max_send_size;
+    return ::send(fd, b, s, MSG_NOSIGNAL | MSG_DONTWAIT);
+  }
+
+  int makepair(int fds[2]) {
+    return ::socketpair(AF_LOCAL, SOCK_STREAM, PF_UNSPEC, fds);
+  }
+#elif defined(SO_NOSIGPIPE)
+  long snd(int fd, const void* b, size_t s) {
+    write_err = 0;
+    if (max_send_size && max_send_size < s) s = max_send_size;
+    return ::send(fd, b, s, MSG_DONTWAIT);
+  }
+
+  int makepair(int fds[2]) {
+    int rc = ::socketpair(AF_LOCAL, SOCK_STREAM, PF_UNSPEC, fds);
+    if (rc == 0) {
+      int optval = 1;
+      ::setsockopt(fds[0], SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof(optval));
+      ::setsockopt(fds[1], SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof(optval));
+    }
+    return rc;
+  }
+#endif
+#else
+  // Simple mock up of the read/write functions of a socketpair for testing
+  // systems without socketpairs (Windows really)
+  // TODO: perhaps this should used everywhere
+  static const uint16_t buffsize = 4096;
+  struct fbuf {
+    uint8_t buff[buffsize*2] = {};
+    int linked_fd = -1;
+    size_t head = 0;
+    size_t size = 0;
+    bool rclosed = true;
+    bool wclosed = true;
+
+    bool closed() {
+      return rclosed && wclosed && linked_fd == -1;
+    }
+
+    void open_linked(int linked_fd0) {
+      CHECK(closed());
+      CHECK(head == 0);
+      CHECK(size == 0);
+      linked_fd = linked_fd0;
+      rclosed = false;
+      wclosed = false;
+    }
+
+    void shutdown_rd() {
+      rclosed = true;
+    }
+
+    void shutdown_wrt() {
+      wclosed = true;
+    }
+
+    void close() {
+      CHECK_FALSE(closed());
+      linked_fd = -1;
+      rclosed = true;
+      wclosed = true;
+      head = 0;
+      size = 0;
+    }
+  };
+
+  static std::vector<fbuf> buffers;
+
+  long rcv(int fd, void* b, size_t s){
+    CHECK(fd < buffers.size());
+    read_err = 0;
+    if (max_recv_size && max_recv_size < s) s = max_recv_size;
+
+    fbuf& buffer = buffers[fd];
+    if (buffer.size == 0) {
+      if (buffer.rclosed) {
+        return 0;
+      } else {
+        errno = EWOULDBLOCK;
+        return -1;
+      }
+    }
+
+    if (buffer.size < s) s = buffer.size;
+
+    ::memcpy(b, &buffer.buff[buffer.head], s);
+    buffer.head += s % buffsize;
+    buffer.size -= s;
+    return s;
+  }
+
+  long snd(int fd, const void* b, size_t s){
+    CHECK(fd < buffers.size());
+    write_err = 0;
+    if (max_send_size && max_send_size < s) s = max_send_size;
+
+    // Write to linked buffer
+    fbuf& buffer = buffers[fd];
+    fbuf& linked_buffer = buffers[buffer.linked_fd];
+    if (linked_buffer.size == buffsize) {
+      errno = EWOULDBLOCK;
+      return -1;
+    }
+    if (linked_buffer.rclosed) {
+      errno = EPIPE;
+      return -1;
+    }
+    if (s + linked_buffer.size > buffsize) s = buffsize - linked_buffer.size;
+    ::memcpy(&linked_buffer.buff[linked_buffer.head+linked_buffer.size], b, s);
+    // If we wrote into the second half them write again into the hole at the front
+    if (linked_buffer.head+linked_buffer.size > buffsize) {
+      size_t r = linked_buffer.head+linked_buffer.size - buffsize;
+      ::memmove(&linked_buffer.buff[0], &linked_buffer.buff[buffsize], r);
+    }
+    linked_buffer.size += s;
+    return s;
+  }
+
+  void rcv_stop(int fd) {
+    CHECK(fd < buffers.size());
+    buffers[fd].shutdown_rd();
+  }
+
+  void snd_stop(int fd) {
+    CHECK(fd < buffers.size());
+    buffers[fd].shutdown_wrt();
+    buffers[buffers[fd].linked_fd].shutdown_rd();
+  }
+
+  int makepair(int fds[2]) {
+    size_t maximum_fd = buffers.size();
+    buffers.resize( buffers.size()+2);
+    buffers[maximum_fd].open_linked(maximum_fd+1);
+    buffers[maximum_fd+1].open_linked(maximum_fd);
+    fds[0] = maximum_fd;
+    fds[1] = maximum_fd+1;
+    return 0;
+  }
+
+  void freepair(int fds[2]) {
+    CHECK(fds[0] < buffers.size());
+    CHECK(fds[1] < buffers.size());
+    buffers[fds[0]].close();
+    buffers[fds[1]].close();
+  }
+#endif
+
+  // Block of memory for buffers
+  const size_t BUFFMEMSIZE = 8*1024;
+  const size_t RBUFFCOUNT = 32;
+  const size_t WBUFFCOUNT = 32;
+
+  char rbuffer_memory[BUFFMEMSIZE];
+  char *rbuffer_brk = rbuffer_memory;
+
+  pn_raw_buffer_t rbuffs[RBUFFCOUNT];
+  pn_raw_buffer_t wbuffs[WBUFFCOUNT];
+
+  class BufferAllocator {
+    char* buffer;
+    uint32_t size;
+    uint32_t brk;
+
+  public:
+    BufferAllocator(char* b, uint32_t s) : buffer(b), size(s), brk(0) {};
+
+    char* next(uint32_t s) {
+      if ( brk+s > size) return NULL;
+
+      char *r = buffer+brk;
+      brk += s;
+      return r;
+    }
+
+    template <class B>
+    B next_buffer(uint32_t s);
+
+    template <class B, int N>
+    void split_buffers(B (&buffers)[N]) {
+      uint32_t buffsize  = (size-brk)/N;
+      uint32_t remainder = (size-brk)%N;
+      for (int i = 0; i<N; ++i) {
+        buffers[i] = next_buffer<B>(i==0 ? buffsize+remainder : buffsize);
+      }
+    }
+  };
+
+  template <>
+  pn_raw_buffer_t BufferAllocator::next_buffer(uint32_t s) {
+    pn_raw_buffer_t b = {};
+    b.bytes = next(s);
+    if (b.bytes) {b.capacity = s; b.size = s;}
+    return b;
+  }
+}
+
+char message[] =
+"Jabberwocky\n"
+"By Lewis Carroll\n"
+"\n"
+"'Twas brillig, and the slithy toves\n"
+"Did gyre and gimble in the wabe:\n"
+"All mimsy were the borogroves,\n"
+"And the mome raths outgrabe.\n"
+"\n"
+"Beware the Jabberwock, my son!\n"
+"The jaws that bite, the claws that catch!\n"
+"Beware the Jubjub bird, and shun\n"
+"The frumious Bandersnatch!\n"
+"\n"
+"He took his vorpal sword in hand;\n"
+"Long time the manxome foe he sought-\n"
+"So rested he by the Tumtum tree\n"
+"And stood awhile in thought.\n"
+"\n"
+"And, as in uffish thought he stood,\n"
+"The Jabberwock with eyes of flame,\n"
+"Came whiffling through the tulgey wood,\n"
+"And burbled as it came!\n"
+"\n"
+"One, two! One, two! And through and through,\n"
+"The vorpal blade went snicker-snack!\n"
+"He left it dead, and with its head\n"
+"He went galumphing back.\n"
+"\n"
+"\"And hast thou slain the JabberWock?\n"
+"Come to my arms, my beamish boy!\n"
+"O frabjous day! Callooh! Callay!\"\n"
+"He chortled in his joy.\n"
+"\n"
+"'Twas brillig, and the slithy toves\n"
+"Did gyre and gimble in the wabe:\n"
+"All mimsy were the borogroves,\n"
+"And the mome raths outgrabe.\n"
+;
+
+TEST_CASE("raw connection") {
+  auto_free<pn_raw_connection_t, free_raw_connection> p(mk_raw_connection());
+  max_send_size = 0;
+
+  REQUIRE(p);
+  REQUIRE(pni_raw_validate(p));
+  CHECK_FALSE(pn_raw_connection_is_read_closed(p));
+  CHECK_FALSE(pn_raw_connection_is_write_closed(p));
+
+  int rbuff_count = pn_raw_connection_read_buffers_capacity(p);
+  CHECK(rbuff_count>0);
+  int wbuff_count = pn_raw_connection_write_buffers_capacity(p);
+  CHECK(wbuff_count>0);
+
+  BufferAllocator rb(rbuffer_memory, sizeof(rbuffer_memory));
+  BufferAllocator wb(message, sizeof(message));
+
+  rb.split_buffers(rbuffs);
+  wb.split_buffers(wbuffs);
+
+  int rtaken = pn_raw_connection_give_read_buffers(p, rbuffs, RBUFFCOUNT);
+  REQUIRE(pni_raw_validate(p));
+  REQUIRE(rtaken==rbuff_count);
+
+  SECTION("Write multiple per event loop") {
+    int wtaken = 0;
+    for (size_t i = 0; i < WBUFFCOUNT; ++i) {
+      int taken = pn_raw_connection_write_buffers(p, &wbuffs[i], 1);
+      if (taken==0) break;
+      REQUIRE(pni_raw_validate(p));
+      REQUIRE(taken==1);
+      wtaken += taken;
+    }
+
+    CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+    CHECK(pn_raw_connection_write_buffers_capacity(p) == 0);
+
+    std::vector<pn_raw_buffer_t> read(rtaken);
+    std::vector<pn_raw_buffer_t> written(wtaken);
+
+    SECTION("Simple tests using a looped back socketpair") {
+      int fds[2];
+      REQUIRE(makepair(fds) == 0);
+      pni_raw_connected(p);
+
+      // First event is always connected
+      REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_CONNECTED);
+      // Mo need buffers event as we already gave buffers
+      REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+
+      SECTION("Write then read") {
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == 0);
+        int wgiven = pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(wgiven==wtaken);
+
+        // Write more
+        for (size_t i = wtaken; i < WBUFFCOUNT; ++i) {
+          int taken = pn_raw_connection_write_buffers(p, &wbuffs[i], 1);
+          if (taken==0) break;
+          REQUIRE(pni_raw_validate(p));
+          CHECK(taken==1);
+          wtaken += taken;
+        }
+
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
+        wgiven += pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(wgiven==wtaken);
+
+        // At this point we've written every buffer
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == wbuff_count);
+
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+        int rgiven = pn_raw_connection_take_read_buffers(p, &read[0], read.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(rgiven > 0);
+
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == rgiven);
+
+        // At this point we should have read everything - make sure it matches
+        char* start = message;
+        for (int i = 0; i < rgiven; ++i) {
+          CHECK(read[i].size > 0);
+          CHECK(std::string(read[i].bytes, read[i].size) == std::string(start, read[i].size));
+          start += read[i].size;
+        }
+        REQUIRE(start-message == sizeof(message));
+      }
+      SECTION("Write then read, short writes") {
+        max_send_size = 10;
+        int wgiven = 0;
+        do {
+          do {
+            pni_raw_write(p, fds[0], snd, set_write_error);
+            CHECK(write_err == 0);
+            REQUIRE(pni_raw_validate(p));
+          } while (pn_event_type(pni_raw_event_next(p)) != PN_RAW_CONNECTION_WRITTEN);
+          CHECK(pn_raw_connection_write_buffers_capacity(p) == wgiven);
+          int given = pn_raw_connection_take_written_buffers(p, &written[wgiven], written.size()-wgiven);
+          REQUIRE(pni_raw_validate(p));
+          CHECK(given == 1);
+          CHECK(written[wgiven].offset == wbuffs[wgiven].offset);
+          CHECK(written[wgiven].size == wbuffs[wgiven].size);
+          wgiven += given;
+        } while (wgiven != wtaken);
+
+        // Write more
+        for (size_t i = wtaken; i < WBUFFCOUNT; ++i) {
+          int taken = pn_raw_connection_write_buffers(p, &wbuffs[i], 1);
+          if (taken==0) break;
+          REQUIRE(pni_raw_validate(p));
+          CHECK(taken==1);
+          wtaken += taken;
+        }
+
+        do {
+          do {
+            pni_raw_write(p, fds[0], snd, set_write_error);
+            CHECK(write_err == 0);
+            REQUIRE(pni_raw_validate(p));
+          } while (pn_event_type(pni_raw_event_next(p)) != PN_RAW_CONNECTION_WRITTEN);
+        } while (pn_event_type(pni_raw_event_next(p)) != PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
+        wgiven += pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(wgiven==wtaken);
+
+        // At this point we've written every buffer
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == wbuff_count);
+
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+        int rgiven = pn_raw_connection_take_read_buffers(p, &read[0], read.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(rgiven > 0);
+
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == rgiven);
+
+        // At this point we should have read everything - make sure it matches
+        char* start = message;
+        for (int i = 0; i < rgiven; ++i) {
+          CHECK(read[i].size > 0);
+          CHECK(std::string(read[i].bytes, read[i].size) == std::string(start, read[i].size));
+          start += read[i].size;
+        }
+        REQUIRE(start-message == sizeof(message));
+      }
+      freepair(fds);
+    }
+  }
+
+  SECTION("Write once per event loop") {
+    int wtaken = pn_raw_connection_write_buffers(p, wbuffs, WBUFFCOUNT);
+    REQUIRE(pni_raw_validate(p));
+    CHECK(wtaken==wbuff_count);
+
+    CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+    CHECK(pn_raw_connection_write_buffers_capacity(p) == 0);
+
+    std::vector<pn_raw_buffer_t> read(rtaken);
+    std::vector<pn_raw_buffer_t> written(wtaken);
+
+    SECTION("Check no change in buffer use without read/write") {
+
+      int rgiven = pn_raw_connection_take_read_buffers(p, &read[0], rtaken);
+      REQUIRE(pni_raw_validate(p));
+      CHECK(rgiven==0);
+      int wgiven = pn_raw_connection_take_written_buffers(p, &written[0], wtaken);
+      REQUIRE(pni_raw_validate(p));
+      CHECK(wgiven==0);
+    }
+
+    SECTION("Simple tests using a looped back socketpair") {
+      int fds[2];
+      REQUIRE(makepair(fds) == 0);
+      pni_raw_connected(p);
+
+      // First event is always connected
+      REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_CONNECTED);
+      // Mo need buffers event as we already gave buffers
+      REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+
+      SECTION("Ensure nothing is read if nothing is written") {
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+        CHECK(pn_raw_connection_take_read_buffers(p, &read[0], read.size()) == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pni_raw_event_next(p) == NULL);
+
+        snd_stop(fds[0]);
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(pn_raw_connection_is_read_closed(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_CLOSED_READ);
+        rcv_stop(fds[1]);
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == EPIPE);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(pn_raw_connection_is_write_closed(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_CLOSED_WRITE);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_DISCONNECTED);
+      }
+
+      SECTION("Read/Write interleaved") {
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == 0);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        int wgiven = pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(wgiven==wtaken);
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == wbuff_count);
+
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        int rgiven = pn_raw_connection_take_read_buffers(p, &read[0], read.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(rgiven > 0);
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == rgiven);
+
+        // Write more
+        wtaken += pn_raw_connection_write_buffers(p, &wbuffs[wtaken], WBUFFCOUNT-wtaken);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(wtaken==WBUFFCOUNT);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        wgiven += pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
+        CHECK(wgiven==wtaken);
+
+        // At this point we've written every buffer
+
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        int rgiven_before = rgiven;
+        rgiven += pn_raw_connection_take_read_buffers(p, &read[rgiven], read.size()-rgiven);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(rgiven > rgiven_before);
+
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == rgiven);
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == wbuff_count);
+
+        // At this point we should have read everything - make sure it matches
+        char* start = message;
+        for (int i = 0; i < rgiven; ++i) {
+          CHECK(read[i].size > 0);
+          CHECK(std::string(read[i].bytes, read[i].size) == std::string(start, read[i].size));
+          start += read[i].size;
+        }
+        REQUIRE(start-message == sizeof(message));
+      }
+
+      SECTION("Write then read") {
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == 0);
+        int wgiven = pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(wgiven==wtaken);
+
+        // Write more
+        wtaken += pn_raw_connection_write_buffers(p, &wbuffs[wtaken], WBUFFCOUNT-wtaken);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(wtaken==WBUFFCOUNT);
+
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
+        wgiven += pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(wgiven==wtaken);
+
+        // At this point we've written every buffer
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == wbuff_count);
+
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+        int rgiven = pn_raw_connection_take_read_buffers(p, &read[0], read.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(rgiven > 0);
+
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == rgiven);
+
+        // At this point we should have read everything - make sure it matches
+        char* start = message;
+        for (int i = 0; i < rgiven; ++i) {
+          CHECK(read[i].size > 0);
+          CHECK(std::string(read[i].bytes, read[i].size) == std::string(start, read[i].size));
+          start += read[i].size;
+        }
+        REQUIRE(start-message == sizeof(message));
+      }
+
+      SECTION("Write, close, then read") {
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == 0);
+        int wgiven = pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(wgiven==wtaken);
+
+        // Write more
+        wtaken += pn_raw_connection_write_buffers(p, &wbuffs[wtaken], WBUFFCOUNT-wtaken);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(wtaken==WBUFFCOUNT);
+
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
+        wgiven += pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(wgiven==wtaken);
+
+        // At this point we've written every buffer
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == wbuff_count);
+
+        snd_stop(fds[0]);
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_CLOSED_READ);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+        int rgiven = pn_raw_connection_take_read_buffers(p, &read[0], read.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(rgiven > 0);
+
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == rgiven);
+        CHECK(read[rgiven-1].size == 0);
+
+        // At this point we should have read everything - make sure it matches
+        char* start = message;
+        for (int i = 0; i < rgiven-1; ++i) {
+          CHECK(read[i].size > 0);
+          CHECK(std::string(read[i].bytes, read[i].size) == std::string(start, read[i].size));
+          start += read[i].size;
+        }
+        REQUIRE(start-message == sizeof(message));
+      }
+
+      freepair(fds);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton] 05/05: PROTON-2247: Work on raw echo to improve output and add some wakes

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 728a2ef3522b6214e4a4a4dcd0bab2fdc60d0135
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Tue Jun 9 23:22:26 2020 -0400

    PROTON-2247: Work on raw echo to improve output and add some wakes
---
 c/examples/raw_echo.c | 117 ++++++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 99 insertions(+), 18 deletions(-)

diff --git a/c/examples/raw_echo.c b/c/examples/raw_echo.c
index 53fd47b..53a4ada 100644
--- a/c/examples/raw_echo.c
+++ b/c/examples/raw_echo.c
@@ -37,6 +37,9 @@ typedef struct app_data_t {
   pn_proactor_t *proactor;
   pn_listener_t *listener;
 
+  int64_t first_idle_time;
+  int64_t try_accept_time;
+  int64_t wake_conn_time;
   int connects;
   int disconnects;
 
@@ -45,6 +48,17 @@ typedef struct app_data_t {
   /* Receiver values */
 } app_data_t;
 
+#define MAX_CONNECTIONS 5
+
+typedef struct conn_data_t {
+  pn_raw_connection_t *connection;
+  int64_t last_recv_time;
+  int bytes;
+  int buffers;
+} conn_data_t;
+
+static conn_data_t conn_data[MAX_CONNECTIONS] = {{0}};
+
 static int exit_code = 0;
 
 /* Close the connection and the listener so so we will get a
@@ -77,10 +91,22 @@ static void recv_message(pn_raw_buffer_t buf) {
   fwrite(buf.bytes, buf.size, 1, stdout);
 }
 
-void *make_receiver_data(void) {
+conn_data_t *make_conn_data(pn_raw_connection_t *c) {
+  int i;
+  for (i = 0; i < MAX_CONNECTIONS; ++i) {
+    if (!conn_data[i].connection) {
+      conn_data[i].connection = c;
+      return &conn_data[i];
+    }
+  }
   return NULL;
 }
 
+void free_conn_data(conn_data_t *c) {
+  if (!c) return;
+  c->connection = NULL;
+}
+
 #define READ_BUFFERS 4
 
 /* This function handles events when we are acting as the receiver */
@@ -89,27 +115,42 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
 
     case PN_RAW_CONNECTION_CONNECTED: {
       pn_raw_connection_t *c = pn_event_raw_connection(event);
+      conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
       pn_raw_buffer_t buffers[READ_BUFFERS] = {{0}};
-      int i = READ_BUFFERS;
-      printf("**raw connection connected\n");
-      app->connects++;
-      for (; i; --i) {
-        pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
-        buff->bytes = (char*) malloc(1024);
-        buff->capacity = 1024;
-        buff->size = 0;
-        buff->offset = 0;
+      if (cd) {
+        int i = READ_BUFFERS;
+        printf("**raw connection %ld connected\n", cd-conn_data);
+        app->connects++;
+        for (; i; --i) {
+          pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
+          buff->bytes = (char*) malloc(1024);
+          buff->capacity = 1024;
+          buff->size = 0;
+          buff->offset = 0;
+        }
+        pn_raw_connection_give_read_buffers(c, buffers, READ_BUFFERS);
+      } else {
+        printf("**raw connection connected: not connected\n");
       }
-      pn_raw_connection_give_read_buffers(c, buffers, READ_BUFFERS);
+    } break;
+
+    case PN_RAW_CONNECTION_WAKE: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
+      printf("**raw connection %ld woken\n", cd-conn_data);
     } break;
 
     case PN_RAW_CONNECTION_DISCONNECTED: {
       pn_raw_connection_t *c = pn_event_raw_connection(event);
-      void *cd = pn_raw_connection_get_context(c);
-      free(cd);
-      printf("**raw connection disconnected\n");
+      conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
+      if (cd) {
+        printf("**raw connection %ld disconnected: bytes: %d, buffers: %d\n", cd-conn_data, cd->bytes, cd->buffers);
+      } else {
+        printf("**raw connection disconnected: not connected\n");
+      }
       app->disconnects++;
       check_condition(event, pn_raw_connection_condition(c), app);
+      free_conn_data(cd);
     } break;
 
     case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
@@ -118,13 +159,17 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
     /* This path handles both received bytes and freeing buffers at close */
     case PN_RAW_CONNECTION_READ: {
       pn_raw_connection_t *c = pn_event_raw_connection(event);
+      conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
       pn_raw_buffer_t buffs[READ_BUFFERS];
       size_t n;
+      cd->last_recv_time = pn_proactor_now_64();
       while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) {
         unsigned i;
         for (i=0; i<n && buffs[i].bytes; ++i) {
+          cd->bytes += buffs[i].size;
           recv_message(buffs[i]);
         }
+        cd->buffers += n;
 
         if (!pn_raw_connection_is_write_closed(c)) {
           pn_raw_connection_write_buffers(c, buffs, n);
@@ -179,12 +224,29 @@ static bool handle(app_data_t* app, pn_event_t* event) {
       break;
     }
     case PN_LISTENER_ACCEPT: {
+      pn_listener_t *listener = pn_event_listener(event);
       pn_raw_connection_t *c = pn_raw_connection();
-      void *cd = make_receiver_data();
-      pn_raw_connection_set_context(c, cd);
-      pn_listener_raw_accept(pn_event_listener(event), c);
+      void *cd = make_conn_data(c);
+      int64_t now = pn_proactor_now_64();
+
+      if (cd) {
+        app->first_idle_time = 0;
+        app->try_accept_time = 0;
+        if (app->wake_conn_time < now) {
+          app->wake_conn_time = now + 5000;
+          pn_proactor_set_timeout(pn_listener_proactor(listener), 5000);
+        }
+        pn_raw_connection_set_context(c, cd);
+
+        pn_listener_raw_accept(listener, c);
+      } else {
+        printf("**too many connections, trying again later...\n");
+
+        /* No other way to reject connection */
+        pn_listener_raw_accept(listener, c);
+        pn_raw_connection_close(c);
+      }
 
-      if (app->connects>2) pn_listener_close(app->listener);
     } break;
 
     case PN_LISTENER_CLOSE: {
@@ -193,6 +255,25 @@ static bool handle(app_data_t* app, pn_event_t* event) {
     } break;
 
     case PN_PROACTOR_TIMEOUT: {
+      int64_t now = pn_proactor_now_64();
+      pn_millis_t timeout = 5000;
+      if (app->connects - app->disconnects == 0) {
+        timeout = 20000;
+        if (app->first_idle_time == 0) {
+          printf("**idle detected, shutting down in %dms\n", timeout);
+          app->first_idle_time = now;
+        } else if (app->first_idle_time + 20000 <= now) {
+          pn_listener_close(app->listener);
+          break;
+        }
+      } else if (now >= app->wake_conn_time) {
+        int i;
+        for (i = 0; i < MAX_CONNECTIONS; ++i) {
+          if (conn_data[i].connection) pn_raw_connection_wake(conn_data[i].connection);
+        }
+        app->wake_conn_time = now + 5000;
+      }
+      pn_proactor_set_timeout(pn_event_proactor(event), timeout);
     }  break;
 
     case PN_PROACTOR_INACTIVE: {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton] 04/05: PROTON-2247: Epoll implementation of raw connection API

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 7dc9240bd52a74225bd1c7a24a84ba37a08b30e4
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Fri Jun 19 14:00:16 2020 -0400

    PROTON-2247: Epoll implementation of raw connection API
---
 c/CMakeLists.txt                         |   2 +-
 c/src/proactor/epoll-internal.h          |  84 ++++++-
 c/src/proactor/epoll.c                   | 141 +++++-------
 c/src/proactor/epoll_raw_connection.c    | 376 +++++++++++++++++++++++++++++++
 c/src/proactor/raw_connection-internal.h |   1 +
 c/src/proactor/raw_connection.c          |   7 +-
 6 files changed, 523 insertions(+), 88 deletions(-)

diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt
index 6b2c167..9e224de 100644
--- a/c/CMakeLists.txt
+++ b/c/CMakeLists.txt
@@ -340,7 +340,7 @@ if (PROACTOR STREQUAL "epoll" OR (NOT PROACTOR AND NOT BUILD_PROACTOR))
   check_symbol_exists(epoll_wait "sys/epoll.h" HAVE_EPOLL)
   if (HAVE_EPOLL)
     set (PROACTOR_OK epoll)
-    set (qpid-proton-proactor src/proactor/epoll.c ${qpid-proton-proactor-common})
+    set (qpid-proton-proactor src/proactor/epoll.c src/proactor/epoll_raw_connection.c ${qpid-proton-proactor-common})
     set (PROACTOR_LIBS Threads::Threads ${TIME_LIB})
   endif()
 endif()
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index fd02817..78cad14 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -22,8 +22,18 @@
  *
  */
 
+/* Enable POSIX features beyond c99 for modern pthread and standard strerror_r() */
+#ifndef _POSIX_C_SOURCE
+#define _POSIX_C_SOURCE 200809L
+#endif
+/* Avoid GNU extensions, in particular the incompatible alternative strerror_r() */
+#undef _GNU_SOURCE
+
 #include <stdbool.h>
 #include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+
 #include <pthread.h>
 
 #include <netdb.h>
@@ -50,7 +60,8 @@ typedef enum {
   PCONNECTION_IO,
   PCONNECTION_TIMER,
   LISTENER_IO,
-  PROACTOR_TIMER
+  PROACTOR_TIMER,
+  RAW_CONNECTION_IO
 } epoll_type_t;
 
 // Data to use with epoll.
@@ -73,7 +84,8 @@ typedef struct ptimer_t {
 typedef enum {
   PROACTOR,
   PCONNECTION,
-  LISTENER
+  LISTENER,
+  RAW_CONNECTION
 } pcontext_type_t;
 
 typedef struct pcontext_t {
@@ -82,7 +94,7 @@ typedef struct pcontext_t {
   pcontext_type_t type;
   bool working;
   bool on_wake_list;
-  bool wake_pending;             // unprocessed eventfd wake callback (convert to bool?)
+  bool wake_pending;             // unprocessed eventfd wake callback
   struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
   bool closing;
   // Next 4 are protected by the proactor mutex
@@ -209,7 +221,7 @@ typedef struct pconnection_t {
   ptimer_t timer;  // TODO: review one timerfd per connection
   const char *host, *port;
   uint32_t new_events;
-  int wake_count;
+  int wake_count; // TODO: protected by context.mutex so should be moved in there (also really bool)
   bool server;                /* accept, not connect */
   bool tick_pending;
   bool timer_armed;
@@ -279,6 +291,70 @@ struct pn_listener_t {
   uint32_t sched_io_events;
 };
 
+typedef char strerrorbuf[1024];      /* used for pstrerror message buffer */
+void pstrerror(int err, strerrorbuf msg);
+
+// In general all locks to be held singly and shortly (possibly as spin locks).
+// See above about lock ordering.
+
+static inline void pmutex_init(pthread_mutex_t *pm){
+  pthread_mutexattr_t attr;
+
+  pthread_mutexattr_init(&attr);
+  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP);
+  if (pthread_mutex_init(pm, &attr)) {
+    perror("pthread failure");
+    abort();
+  }
+}
+
+static inline void pmutex_finalize(pthread_mutex_t *m) { pthread_mutex_destroy(m); }
+static inline void lock(pmutex *m) { pthread_mutex_lock(m); }
+static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }
+
+static inline bool pconnection_has_event(pconnection_t *pc) {
+  return pn_connection_driver_has_event(&pc->driver);
+}
+
+static inline bool listener_has_event(pn_listener_t *l) {
+  return pn_collector_peek(l->collector) || (l->pending_count);
+}
+
+static inline bool proactor_has_event(pn_proactor_t *p) {
+  return pn_collector_peek(p->collector);
+}
+
+bool wake_if_inactive(pn_proactor_t *p);
+int pclosefd(pn_proactor_t *p, int fd);
+
+void proactor_add(pcontext_t *ctx);
+bool proactor_remove(pcontext_t *ctx);
+
+bool unassign_thread(tslot_t *ts, tslot_state new_state);
+
+void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p);
+bool wake(pcontext_t *ctx);
+void wake_notify(pcontext_t *ctx);
+void wake_done(pcontext_t *ctx);
+
+void psocket_init(psocket_t* ps, pn_proactor_t* p, epoll_type_t type);
+bool start_polling(epoll_extended_t *ee, int epollfd);
+void stop_polling(epoll_extended_t *ee, int epollfd);
+void rearm_polling(epoll_extended_t *ee, int epollfd);
+
+int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res);
+void configure_socket(int sock);
+
+accepted_t *listener_accepted_next(pn_listener_t *listener);
+
+pcontext_t *pni_psocket_raw_context(psocket_t *ps);
+pn_event_batch_t *pni_raw_connection_process(pcontext_t *c, bool sched_wake);
+
+typedef struct praw_connection_t praw_connection_t;
+pcontext_t *pni_raw_connection_context(praw_connection_t *rc);
+praw_connection_t *pni_batch_raw_connection(pn_event_batch_t* batch);
+void pni_raw_connection_done(praw_connection_t *rc);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 3b5f32e..1e693fe 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -106,10 +106,8 @@
 // could be several eventfds with random assignment of wakeables.
 
 
-typedef char strerrorbuf[1024];      /* used for pstrerror message buffer */
-
 /* Like strerror_r but provide a default message if strerror_r fails */
-static void pstrerror(int err, strerrorbuf msg) {
+void pstrerror(int err, strerrorbuf msg) {
   int e = strerror_r(err, msg, sizeof(strerrorbuf));
   if (e) snprintf(msg, sizeof(strerrorbuf), "unknown error %d", err);
 }
@@ -128,24 +126,6 @@ static void pstrerror(int err, strerrorbuf msg) {
 // First define a proactor mutex (pmutex) and timer mechanism (ptimer) to taste.
 // ========================================================================
 
-// In general all locks to be held singly and shortly (possibly as spin locks).
-// See above about lock ordering.
-
-static void pmutex_init(pthread_mutex_t *pm){
-  pthread_mutexattr_t attr;
-
-  pthread_mutexattr_init(&attr);
-  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP);
-  if (pthread_mutex_init(pm, &attr)) {
-    perror("pthread failure");
-    abort();
-  }
-}
-
-static void pmutex_finalize(pthread_mutex_t *m) { pthread_mutex_destroy(m); }
-static inline void lock(pmutex *m) { pthread_mutex_lock(m); }
-static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }
-
 /* epoll_ctl()/epoll_wait() do not form a memory barrier, so cached memory
    writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
    visible to epoll_wait() thread. This function creates a memory barrier,
@@ -286,7 +266,7 @@ const char *AMQP_PORT_NAME = "amqp";
 PN_STRUCT_CLASSDEF(pn_proactor)
 PN_STRUCT_CLASSDEF(pn_listener)
 
-static bool start_polling(epoll_extended_t *ee, int epollfd) {
+bool start_polling(epoll_extended_t *ee, int epollfd) {
   if (ee->polling)
     return false;
   ee->polling = true;
@@ -297,7 +277,7 @@ static bool start_polling(epoll_extended_t *ee, int epollfd) {
   return (epoll_ctl(epollfd, EPOLL_CTL_ADD, ee->fd, &ev) == 0);
 }
 
-static void stop_polling(epoll_extended_t *ee, int epollfd) {
+void stop_polling(epoll_extended_t *ee, int epollfd) {
   // TODO: check for error, return bool or just log?
   // TODO: is EPOLL_CTL_DEL ever needed beyond auto de-register when ee->fd is closed?
   if (ee->fd == -1 || !ee->polling || epollfd == -1)
@@ -312,6 +292,19 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) {
   ee->polling = false;
 }
 
+void rearm_polling(epoll_extended_t *ee, int epollfd) {
+  struct epoll_event ev = {0};
+  ev.data.ptr = ee;
+  ev.events = ee->wanted | EPOLLONESHOT;
+  memory_barrier(ee);
+  if (epoll_ctl(epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
+    EPOLL_FATAL("arming polled file descriptor", errno);
+}
+
+static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
+  rearm_polling(ee, p->epollfd);
+}
+
 /*
  * The proactor maintains a number of serialization contexts: each
  * connection, each listener, the proactor itself.  The serialization
@@ -339,7 +332,7 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) {
 // Fake thread for temporarily disabling the scheduling of a context.
 static struct tslot_t *REWAKE_PLACEHOLDER = (struct tslot_t*) -1;
 
-static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p) {
+void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p) {
   memset(ctx, 0, sizeof(*ctx));
   pmutex_init(&ctx->mutex);
   ctx->proactor = p;
@@ -350,8 +343,6 @@ static void pcontext_finalize(pcontext_t* ctx) {
   pmutex_finalize(&ctx->mutex);
 }
 
-static void rearm(pn_proactor_t *p, epoll_extended_t *ee);
-
 /*
  * Wake strategy with eventfd.
  *  - wakees can be in the list only once
@@ -411,7 +402,9 @@ static void pop_wake(pcontext_t *ctx) {
 }
 
 // part1: call with ctx->owner lock held, return true if notify required by caller
-static bool wake(pcontext_t *ctx) {
+// Note that this will return false if either there is a pending wake OR if we are already
+// in the connection context that is to be woken (as we don't have to wake it up)
+bool wake(pcontext_t *ctx) {
   bool notify = false;
 
   if (!ctx->wake_pending) {
@@ -440,7 +433,7 @@ static bool wake(pcontext_t *ctx) {
 }
 
 // part2: make OS call without lock held
-static inline void wake_notify(pcontext_t *ctx) {
+void wake_notify(pcontext_t *ctx) {
   pn_proactor_t *p = ctx->proactor;
   if (p->eventfd == -1)
     return;
@@ -448,7 +441,7 @@ static inline void wake_notify(pcontext_t *ctx) {
 }
 
 // call with owner lock held, once for each pop from the wake list
-static inline void wake_done(pcontext_t *ctx) {
+void wake_done(pcontext_t *ctx) {
 //  assert(ctx->wake_pending > 0);
   ctx->wake_pending = false;
 }
@@ -572,7 +565,7 @@ static bool rewake(pcontext_t *ctx) {
 }
 
 // Call with sched lock
-static bool unassign_thread(tslot_t *ts, tslot_state new_state) {
+bool unassign_thread(tslot_t *ts, tslot_state new_state) {
   pcontext_t *ctx = ts->context;
   bool notify = false;
   bool deleting = (ts->state == DELETING);
@@ -660,7 +653,7 @@ static void make_runnable(pcontext_t *ctx) {
 
 
 
-static void psocket_init(psocket_t* ps, pn_proactor_t* p, epoll_type_t type)
+void psocket_init(psocket_t* ps, pn_proactor_t* p, epoll_type_t type)
 {
   ps->epoll_io.fd = -1;
   ps->epoll_io.type = type;
@@ -698,8 +691,6 @@ static void set_pconnection(pn_connection_t* c, pconnection_t *pc) {
 static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool wake, bool topup);
 static void write_flush(pconnection_t *pc);
 static void listener_begin_close(pn_listener_t* l);
-static void proactor_add(pcontext_t *ctx);
-static bool proactor_remove(pcontext_t *ctx);
 static void poller_done(struct pn_proactor_t* p, tslot_t *ts);
 
 static inline pconnection_t *psocket_pconnection(psocket_t* ps) {
@@ -741,18 +732,6 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
     containerof(batch, pconnection_t, batch) : NULL;
 }
 
-static inline bool pconnection_has_event(pconnection_t *pc) {
-  return pn_connection_driver_has_event(&pc->driver);
-}
-
-static inline bool listener_has_event(pn_listener_t *l) {
-  return pn_collector_peek(l->collector) || (l->pending_count);
-}
-
-static inline bool proactor_has_event(pn_proactor_t *p) {
-  return pn_collector_peek(p->collector);
-}
-
 static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) {
   pconnection_t *pc = psocket_pconnection(ps);
   if (pc) {
@@ -780,15 +759,6 @@ static void psocket_gai_error(psocket_t *ps, int gai_err, const char* what) {
   psocket_error_str(ps, gai_strerror(gai_err), what);
 }
 
-static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
-  struct epoll_event ev = {0};
-  ev.data.ptr = ee;
-  ev.events = ee->wanted | EPOLLONESHOT;
-  memory_barrier(ee);
-  if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
-    EPOLL_FATAL("arming polled file descriptor", errno);
-}
-
 static void listener_accepted_append(pn_listener_t *listener, accepted_t item) {
   if (listener->pending_first+listener->pending_count >= listener->backlog) return;
 
@@ -796,7 +766,7 @@ static void listener_accepted_append(pn_listener_t *listener, accepted_t item) {
   listener->pending_count++;
 }
 
-static accepted_t *listener_accepted_next(pn_listener_t *listener) {
+accepted_t *listener_accepted_next(pn_listener_t *listener) {
   if (!listener->pending_count) return NULL;
 
   listener->pending_count--;
@@ -861,7 +831,7 @@ static void proactor_rearm_overflow(pn_proactor_t *p) {
 }
 
 // Close an FD and rearm overflow listeners.  Call with no listener locks held.
-static int pclosefd(pn_proactor_t *p, int fd) {
+int pclosefd(pn_proactor_t *p, int fd) {
   int err = close(fd);
   if (!err) proactor_rearm_overflow(p);
   return err;
@@ -1258,7 +1228,6 @@ static void pconnection_connected_lh(pconnection_t *pc);
 static void pconnection_maybe_connect_lh(pconnection_t *pc);
 
 static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool sched_wake, bool topup) {
-  bool inbound_wake = sched_wake;
   bool rearm_timer = false;
   bool timer_fired = false;
   bool waking = false;
@@ -1281,10 +1250,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
     pc->tick_pending = true;
     timer_fired = false;
   }
-  if (inbound_wake) {
-    wake_done(&pc->context);
-    inbound_wake = false;
-  }
+  if (sched_wake) wake_done(&pc->context);
 
   if (rearm_timer)
     pc->timer_armed = false;
@@ -1462,7 +1428,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
   return NULL;
 }
 
-static void configure_socket(int sock) {
+void configure_socket(int sock) {
   int flags = fcntl(sock, F_GETFL);
   flags |= O_NONBLOCK;
   (void)fcntl(sock, F_SETFL, flags); // TODO: check for error
@@ -1536,7 +1502,7 @@ static void pconnection_maybe_connect_lh(pconnection_t *pc) {
   pc->disconnected = true;
 }
 
-static int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
+int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
 {
   struct addrinfo hints = { 0 };
   hints.ai_family = AF_UNSPEC;
@@ -1550,7 +1516,7 @@ static inline bool is_inactive(pn_proactor_t *p) {
 }
 
 /* If inactive set need_inactive and return true if the proactor needs a wakeup */
-static bool wake_if_inactive(pn_proactor_t *p) {
+bool wake_if_inactive(pn_proactor_t *p) {
   if (is_inactive(p)) {
     p->need_inactive = true;
     return wake(&p->context);
@@ -2280,7 +2246,7 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout, bool i
   return NULL;
 }
 
-static void proactor_add(pcontext_t *ctx) {
+void proactor_add(pcontext_t *ctx) {
   pn_proactor_t *p = ctx->proactor;
   lock(&p->context.mutex);
   if (p->contexts) {
@@ -2294,7 +2260,7 @@ static void proactor_add(pcontext_t *ctx) {
 
 // call with psocket's mutex held
 // return true if safe for caller to free psocket
-static bool proactor_remove(pcontext_t *ctx) {
+bool proactor_remove(pcontext_t *ctx) {
   pn_proactor_t *p = ctx->proactor;
   // Disassociate this context from scheduler
   if (!p->shutting_down) {
@@ -2420,6 +2386,11 @@ static pn_event_batch_t *process(pcontext_t *ctx) {
     batch = listener_process(l, n_events, ctx_wake);
     break;
   }
+  case RAW_CONNECTION: {
+    unlock(&p->sched_mutex);
+    batch = pni_raw_connection_process(ctx, ctx_wake);
+    break;
+  }
   default:
     assert(NULL);
   }
@@ -2469,20 +2440,20 @@ static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
     ctx->sched_pending = true;
     break;
 
-  case PCONNECTION_IO: {
-    psocket_t *ps = containerof(ee, psocket_t, epoll_io);
-    pconnection_t *pc = psocket_pconnection(ps);
+  case PCONNECTION_TIMER: {
+    pconnection_t *pc = containerof(containerof(ee, ptimer_t, epoll_io), pconnection_t, timer);
     assert(pc);
     ctx = &pc->context;
-    ps->sched_io_events = evp->events;
+    pc->sched_timeout = true;;
     ctx->sched_pending = true;
     break;
   }
-  case PCONNECTION_TIMER: {
-    pconnection_t *pc = containerof(containerof(ee, ptimer_t, epoll_io), pconnection_t, timer);
+  case PCONNECTION_IO: {
+    psocket_t *ps = containerof(ee, psocket_t, epoll_io);
+    pconnection_t *pc = psocket_pconnection(ps);
     assert(pc);
     ctx = &pc->context;
-    pc->sched_timeout = true;;
+    ps->sched_io_events = evp->events;
     ctx->sched_pending = true;
     break;
   }
@@ -2495,6 +2466,13 @@ static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
     ctx->sched_pending = true;
     break;
   }
+  case RAW_CONNECTION_IO: {
+    psocket_t *ps = containerof(ee, psocket_t, epoll_io);
+    ctx = pni_psocket_raw_context(ps);
+    ps->sched_io_events = evp->events;
+    ctx->sched_pending = true;
+    break;
+  }
   }
   if (ctx && !ctx->runnable && !ctx->runner)
     return ctx;
@@ -2860,6 +2838,14 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
     check_earmark_override(p, ts);
     return;
   }
+  praw_connection_t *rc = pni_batch_raw_connection(batch);
+  if (rc) {
+    tslot_t *ts = pni_raw_connection_context(rc)->runner;
+    pni_raw_connection_done(rc);
+    // rc possibly freed/invalid
+    check_earmark_override(p, ts);
+    return;
+  }
   pn_proactor_t *bp = batch_proactor(batch);
   if (bp == p) {
     bool notify = false;
@@ -2962,6 +2948,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
     bool do_free = false;
     bool ctx_notify = false;
     pmutex *ctx_mutex = NULL;
+    // TODO: Need to extend this for raw connections too
     pconnection_t *pc = pcontext_pconnection(ctx);
     if (pc) {
       ctx_mutex = &pc->context.mutex;
@@ -3047,11 +3034,3 @@ int64_t pn_proactor_now_64(void) {
   clock_gettime(CLOCK_MONOTONIC, &t);
   return t.tv_sec * 1000 + t.tv_nsec / 1000000;
 }
-
-// Empty stubs for raw connection code
-pn_raw_connection_t *pn_raw_connection(void) { return NULL; }
-void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {}
-void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) {}
-void pn_raw_connection_wake(pn_raw_connection_t *conn) {}
-const struct pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *connection) { return NULL; }
-const struct pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *connection) { return NULL; }
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
new file mode 100644
index 0000000..2b9c4d4
--- /dev/null
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -0,0 +1,376 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* This is currently epoll implementation specific - and will need changing for the other proactors */
+
+#include "epoll-internal.h"
+#include "proactor-internal.h"
+#include "raw_connection-internal.h"
+
+#include <proton/proactor.h>
+#include <proton/listener.h>
+#include <proton/netaddr.h>
+#include <proton/raw_connection.h>
+
+#include <alloca.h>
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <sys/epoll.h>
+
+/* epoll specific raw connection struct */
+struct praw_connection_t {
+  pcontext_t context;
+  struct pn_raw_connection_t raw_connection;
+  psocket_t psocket;
+  struct pn_netaddr_t local, remote; /* Actual addresses */
+  pmutex rearm_mutex;                /* protects pconnection_rearm from out of order arming*/
+  pn_event_batch_t batch;
+  struct addrinfo *addrinfo;         /* Resolved address list */
+  struct addrinfo *ai;               /* Current connect address */
+  bool connected;
+  bool disconnected;
+  bool waking; // TODO: This is actually protected by context.mutex so should be moved into context (pconnection too)
+};
+
+static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
+  pn_condition_t *cond = rc->raw_connection.condition;
+  if (!pn_condition_is_set(cond)) { /* Preserve older error information */
+    strerrorbuf what;
+    pstrerror(err, what);
+    char addr[PN_MAX_ADDR];
+    pn_netaddr_str(&rc->remote, addr, sizeof(addr));
+    pn_condition_format(cond, PNI_IO_CONDITION, "%s - %s %s", what, msg, addr);
+  }
+}
+
+static void psocket_gai_error(praw_connection_t *rc, int gai_err, const char* what, const char *addr) {
+  pn_condition_format(rc->raw_connection.condition, PNI_IO_CONDITION, "%s - %s %s",
+                      gai_strerror(gai_err), what, addr);
+}
+
+static void praw_connection_connected_lh(praw_connection_t *prc) {
+  // Need to check socket for connection error
+  prc->connected = true;
+  if (prc->addrinfo) {
+    freeaddrinfo(prc->addrinfo);
+        prc->addrinfo = NULL;
+  }
+  prc->ai = NULL;
+  socklen_t len = sizeof(prc->remote.ss);
+  (void)getpeername(prc->psocket.epoll_io.fd, (struct sockaddr*)&prc->remote.ss, &len);
+
+  pni_raw_connected(&prc->raw_connection);
+}
+
+/* multi-address connections may call pconnection_start multiple times with diffferent FDs  */
+static void praw_connection_start(praw_connection_t *prc, int fd) {
+  int efd = prc->psocket.proactor->epollfd;
+
+  /* Get the local socket name now, get the peer name in pconnection_connected */
+  socklen_t len = sizeof(prc->local.ss);
+  (void)getsockname(fd, (struct sockaddr*)&prc->local.ss, &len);
+
+  epoll_extended_t *ee = &prc->psocket.epoll_io;
+  if (ee->polling) {     /* This is not the first attempt, stop polling and close the old FD */
+    int fd = ee->fd;     /* Save fd, it will be set to -1 by stop_polling */
+    stop_polling(ee, efd);
+    pclosefd(prc->psocket.proactor, fd);
+  }
+  ee->fd = fd;
+  ee->wanted = EPOLLIN | EPOLLOUT;
+  start_polling(ee, efd);  // TODO: check for error
+}
+
+/* Called on initial connect, and if connection fails to try another address */
+static void praw_connection_maybe_connect_lh(praw_connection_t *prc) {
+  while (prc->ai) {            /* Have an address */
+    struct addrinfo *ai = prc->ai;
+    prc->ai = prc->ai->ai_next; /* Move to next address in case this fails */
+    int fd = socket(ai->ai_family, SOCK_STREAM, 0);
+    if (fd >= 0) {
+      configure_socket(fd);
+      if (!connect(fd, ai->ai_addr, ai->ai_addrlen) || errno == EINPROGRESS) {
+
+        /* Until we finish connecting save away the address we're trying to connect to */
+        memcpy((struct sockaddr *) &prc->remote.ss, ai->ai_addr, ai->ai_addrlen);
+
+        praw_connection_start(prc, fd);
+        return;               /* Async connection started */
+      } else {
+        close(fd);
+      }
+    }
+    /* connect failed immediately, go round the loop to try the next addr */
+  }
+  int err;
+  socklen_t errlen = sizeof(err);
+  getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);
+  psocket_error(prc, err, "on connect");
+
+  freeaddrinfo(prc->addrinfo);
+  prc->addrinfo = NULL;
+  prc->disconnected = true;
+}
+
+//
+// Raw socket API
+//
+static pn_event_t * pni_raw_batch_next(pn_event_batch_t *batch);
+
+static void praw_connection_init(praw_connection_t *prc, pn_proactor_t *p, pn_raw_connection_t *rc) {
+  pcontext_init(&prc->context, RAW_CONNECTION, p);
+  psocket_init(&prc->psocket, p, RAW_CONNECTION_IO);
+
+  prc->connected = false;
+  prc->disconnected = false;
+  prc->waking = false;
+  prc->batch.next_event = pni_raw_batch_next;
+
+  pmutex_init(&prc->rearm_mutex);
+}
+
+static void praw_connection_cleanup(praw_connection_t *prc) {
+  int fd = prc->psocket.epoll_io.fd;
+  stop_polling(&prc->psocket.epoll_io, prc->psocket.proactor->epollfd);
+  if (fd != -1)
+    pclosefd(prc->psocket.proactor, fd);
+
+  lock(&prc->context.mutex);
+  bool can_free = proactor_remove(&prc->context);
+  unlock(&prc->context.mutex);
+  if (can_free) {
+    free(prc);
+  }
+  // else proactor_disconnect logic owns prc and its final free
+}
+
+pn_raw_connection_t *pn_raw_connection(void) {
+  praw_connection_t *conn = (praw_connection_t*) calloc(1, sizeof(praw_connection_t));
+  if (!conn) return NULL;
+
+  pni_raw_initialize(&conn->raw_connection);
+
+  return &conn->raw_connection;
+}
+
+void pn_raw_connection_free(pn_raw_connection_t *conn) {
+}
+
+void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {
+  assert(rc);
+  praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+  praw_connection_init(prc, p, rc);
+  // TODO: check case of proactor shutting down
+
+  lock(&prc->context.mutex);
+  proactor_add(&prc->context);
+
+  bool notify = false;
+  bool notify_proactor = false;
+
+  const char *host;
+  const char *port;
+  size_t addrlen = strlen(addr);
+  char *addr_buf = (char*) alloca(addrlen+1);
+  pni_parse_addr(addr, addr_buf, addrlen+1, &host, &port);
+
+  int gai_error = pgetaddrinfo(host, port, 0, &prc->addrinfo);
+  if (!gai_error) {
+    prc->ai = prc->addrinfo;
+    praw_connection_maybe_connect_lh(prc); /* Start connection attempts */
+    if (prc->disconnected) notify = wake(&prc->context);
+  } else {
+    psocket_gai_error(prc, gai_error, "connect to ", addr);
+    prc->disconnected = true;
+    notify = wake(&prc->context);
+    lock(&p->context.mutex);
+    notify_proactor = wake_if_inactive(p);
+    unlock(&p->context.mutex);
+  }
+
+  /* We need to issue INACTIVE on immediate failure */
+  unlock(&prc->context.mutex);
+  if (notify) wake_notify(&prc->context);
+  if (notify_proactor) wake_notify(&p->context);
+}
+
+void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) {
+  assert(rc);
+  praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+  praw_connection_init(prc, pn_listener_proactor(l), rc);
+  // TODO: fuller sanity check on input args
+
+  int err = 0;
+  int fd = -1;
+  bool notify = false;
+  lock(&l->context.mutex);
+  if (l->context.closing)
+    err = EBADF;
+  else {
+    accepted_t *a = listener_accepted_next(l);
+    if (a) {
+      fd = a->accepted_fd;
+      a->accepted_fd = -1;
+    }
+    else err = EWOULDBLOCK;
+  }
+
+  proactor_add(&prc->context);
+
+  lock(&prc->context.mutex);
+  if (fd >= 0) {
+    configure_socket(fd);
+    praw_connection_start(prc, fd);
+    praw_connection_connected_lh(prc);
+  } else {
+    psocket_error(prc, err, "pn_listener_accept");
+  }
+
+  if (!l->context.working && listener_has_event(l)) {
+    notify = wake(&l->context);
+  }
+  unlock(&prc->context.mutex);
+  unlock(&l->context.mutex);
+  if (notify) wake_notify(&l->context);
+}
+
+const pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *rc) {
+  praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+  if (!prc) return NULL;
+  return &prc->local;
+}
+
+const pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *rc) {
+  praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+  if (!prc) return NULL;
+  return &prc->remote;
+}
+
+void pn_raw_connection_wake(pn_raw_connection_t *rc) {
+  bool notify = false;
+  praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+  if (prc) {
+    lock(&prc->context.mutex);
+    if (!prc->context.closing) {
+      prc->waking = true;
+      notify = wake(&prc->context);
+    }
+    unlock(&prc->context.mutex);
+  }
+  if (notify) wake_notify(&prc->context);
+}
+
+static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) {
+  pn_raw_connection_t *raw = &containerof(batch, praw_connection_t, batch)->raw_connection;
+  return pni_raw_event_next(raw);
+}
+
+pcontext_t *pni_psocket_raw_context(psocket_t* ps) {
+  return &containerof(ps, praw_connection_t, psocket)->context;
+}
+
+praw_connection_t *pni_batch_raw_connection(pn_event_batch_t *batch) {
+  return (batch->next_event == pni_raw_batch_next) ?
+    containerof(batch, praw_connection_t, batch) : NULL;
+}
+
+pcontext_t *pni_raw_connection_context(praw_connection_t *rc) {
+  return &rc->context;
+}
+
+static long snd(int fd, const void* b, size_t s) {
+  return send(fd, b, s, MSG_NOSIGNAL | MSG_DONTWAIT);
+}
+
+static long rcv(int fd, void* b, size_t s) {
+  return recv(fd, b, s, MSG_DONTWAIT);
+}
+
+static void  set_error(pn_raw_connection_t *conn, const char *msg, int err) {
+  psocket_error(containerof(conn, praw_connection_t, raw_connection), err, msg);
+}
+
+pn_event_batch_t *pni_raw_connection_process(pcontext_t *c, bool sched_wake) {
+  praw_connection_t *rc = containerof(c, praw_connection_t, context);
+  int events = rc->psocket.sched_io_events;
+  int fd = rc->psocket.epoll_io.fd;
+  if (!rc->connected) {
+    if (events & (EPOLLHUP | EPOLLERR)) {
+      praw_connection_maybe_connect_lh(rc);
+    }
+    if (rc->disconnected) {
+      pni_raw_disconnect(&rc->raw_connection);
+      return &rc->batch;
+    }
+    if (events & (EPOLLHUP | EPOLLERR)) {
+      return NULL;
+    }
+    praw_connection_connected_lh(rc);
+  }
+
+  bool wake = false;
+  lock(&c->mutex);
+  c->working = true;
+  if (sched_wake) wake_done(c);
+  wake = sched_wake || rc->waking;
+  rc->waking = false;
+  unlock(&c->mutex);
+
+  if (wake) pni_raw_wake(&rc->raw_connection);
+  if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
+  if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error);
+  return &rc->batch;
+}
+
+void pni_raw_connection_done(praw_connection_t *rc) {
+  bool self_notify = false;
+  lock(&rc->context.mutex);
+  pn_proactor_t *p = rc->context.proactor;
+  tslot_t *ts = rc->context.runner;
+  rc->context.working = false;
+  self_notify = rc->waking && wake(&rc->context);
+  unlock(&rc->context.mutex);
+  if (self_notify) wake_notify(&rc->context);
+
+  pn_raw_connection_t *raw = &rc->raw_connection;
+  int wanted =
+    (pni_raw_can_read(raw)  ? EPOLLIN : 0) |
+    (pni_raw_can_write(raw) ? EPOLLOUT : 0);
+  if (wanted) {
+    rc->psocket.epoll_io.wanted = wanted;
+    rearm_polling(&rc->psocket.epoll_io, p->epollfd);  // TODO: check for error
+  } else {
+    bool finished_disconnect = raw->rclosed && raw->wclosed && !raw->disconnectpending;
+    if (finished_disconnect) {
+      // If we're closed and we've sent the disconnect then close
+      pni_raw_finalize(raw);
+      praw_connection_cleanup(rc);
+    }
+  }
+
+  lock(&p->sched_mutex);
+  bool notify = unassign_thread(ts, UNUSED);
+  unlock(&p->sched_mutex);
+  if (notify) wake_notify(&p->context);
+}
diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h
index 79633e4..02c3af2 100644
--- a/c/src/proactor/raw_connection-internal.h
+++ b/c/src/proactor/raw_connection-internal.h
@@ -83,6 +83,7 @@ struct pn_raw_connection_t {
   bool rdrainpending;
   bool wdrainpending;
   bool disconnectpending;
+  bool wakepending;
 };
 
 /*
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
index 3e4a74c..a225df9 100644
--- a/c/src/proactor/raw_connection.c
+++ b/c/src/proactor/raw_connection.c
@@ -56,7 +56,6 @@ void pni_raw_initialize(pn_raw_connection_t *conn) {
     conn->wbuffers[i-1].type = buff_wempty;
   }
 
-  //conn->batch.next_event = pni_raw_batch_next;
   conn->condition = pn_condition();
   conn->collector = pn_collector();
   conn->attachments = pn_record();
@@ -323,7 +322,7 @@ void pni_raw_connected(pn_raw_connection_t *conn) {
 }
 
 void pni_raw_wake(pn_raw_connection_t *conn) {
-  pni_raw_put_event(conn, PN_RAW_CONNECTION_WAKE);
+  conn->wakepending = true;
 }
 
 void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) {
@@ -482,6 +481,10 @@ pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
       switch (type) {
         default: break;
       }
+    } else if (conn->wakepending) {
+      pni_raw_put_event(conn, PN_RAW_CONNECTION_WAKE);
+      conn->wakepending = false;
+      continue;
     } else if (conn->rpending) {
       pni_raw_put_event(conn, PN_RAW_CONNECTION_READ);
       conn->rpending = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton] 01/05: PROTON-2247: Raw connections API for the proactor - API defined in header files - Simple test applicationis derived from the direct.c example

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit a2528bb4e1bb37a0d2d765ed0b261ee2e8a56471
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Thu Jan 23 15:46:15 2020 -0500

    PROTON-2247: Raw connections API for the proactor
    - API defined in header files
    - Simple test applicationis derived from the direct.c example
---
 c/CMakeLists.txt                  |   1 +
 c/docs/user.doxygen.in            |   1 +
 c/examples/CMakeLists.txt         |   2 +-
 c/examples/raw_connect.c          | 256 ++++++++++++++++++++++++++++++++++++
 c/examples/raw_echo.c             | 240 ++++++++++++++++++++++++++++++++++
 c/include/proton/event.h          | 115 ++++++++++++++++-
 c/include/proton/listener.h       |  19 +++
 c/include/proton/proactor.h       |  32 +++++
 c/include/proton/raw_connection.h | 265 ++++++++++++++++++++++++++++++++++++++
 c/include/proton/types.h          |  11 ++
 c/src/proactor/epoll.c            |   1 +
 c/src/proactor/libuv.c            |   1 +
 c/src/proactor/win_iocp.cpp       |   1 +
 13 files changed, 943 insertions(+), 2 deletions(-)

diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt
index 17495a5..b686c42 100644
--- a/c/CMakeLists.txt
+++ b/c/CMakeLists.txt
@@ -302,6 +302,7 @@ set (qpid-proton-include
   include/proton/netaddr.h
   include/proton/object.h
   include/proton/proactor.h
+  include/proton/raw_connection.h
   include/proton/sasl.h
   include/proton/sasl-plugin.h
   include/proton/session.h
diff --git a/c/docs/user.doxygen.in b/c/docs/user.doxygen.in
index ae6894a..8034e6c 100644
--- a/c/docs/user.doxygen.in
+++ b/c/docs/user.doxygen.in
@@ -33,6 +33,7 @@ INLINE_SIMPLE_STRUCTS   = YES
 HIDE_UNDOC_CLASSES      = YES
 HIDE_COMPOUND_REFERENCE = YES
 HIDE_SCOPE_NAMES        = YES
+TYPEDEF_HIDES_STRUCT    = YES
 MAX_INITIALIZER_LINES   = 0
 ALPHABETICAL_INDEX      = NO
 SORT_MEMBER_DOCS        = NO
diff --git a/c/examples/CMakeLists.txt b/c/examples/CMakeLists.txt
index 9771ec5..ec29888 100644
--- a/c/examples/CMakeLists.txt
+++ b/c/examples/CMakeLists.txt
@@ -26,7 +26,7 @@ find_package(Threads REQUIRED)
 include_directories(${Proton_INCLUDE_DIRS})
 add_definitions(${Proton_DEFINITIONS})
 
-foreach (name broker send receive direct send-abort send-ssl)
+foreach (name broker send receive direct send-abort send-ssl raw_echo raw_connect)
   add_executable(c-${name} ${name}.c)
   target_link_libraries(c-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
   set_target_properties(c-${name} PROPERTIES
diff --git a/c/examples/raw_connect.c b/c/examples/raw_connect.c
new file mode 100644
index 0000000..d38a3d7
--- /dev/null
+++ b/c/examples/raw_connect.c
@@ -0,0 +1,256 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/condition.h>
+#include <proton/raw_connection.h>
+#include <proton/listener.h>
+#include <proton/netaddr.h>
+#include <proton/proactor.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+typedef struct app_data_t {
+  const char *host, *port;
+  const char *amqp_address;
+
+  pn_proactor_t *proactor;
+  pn_listener_t *listener;
+
+  int connects;
+  int disconnects;
+
+  /* Sender values */
+
+  /* Receiver values */
+} app_data_t;
+
+typedef struct connection_data_t {
+  bool sender;
+} connection_data_t;
+
+static int exit_code = 0;
+
+/* Close the connection and the listener so so we will get a
+ * PN_PROACTOR_INACTIVE event and exit, once all outstanding events
+ * are processed.
+ */
+static void close_all(pn_raw_connection_t *c, app_data_t *app) {
+  if (c) pn_raw_connection_close(c);
+  if (app->listener) pn_listener_close(app->listener);
+}
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app) {
+  if (pn_condition_is_set(cond)) {
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+    close_all(pn_event_raw_connection(e), app);
+    exit_code = 1;
+  }
+}
+
+static void send_message(pn_raw_connection_t *c, const char* msg) {
+  pn_raw_buffer_t buffer;
+  uint32_t len = strlen(msg);
+  char *buf = (char*) malloc(1024);
+  memcpy(buf, msg, len);
+  buffer.bytes = buf;
+  buffer.capacity = 1024;
+  buffer.offset = 0;
+  buffer.size = len;
+  pn_raw_connection_write_buffers(c, &buffer, 1);
+}
+
+static void recv_message(pn_raw_buffer_t buf) {
+  fwrite(buf.bytes, buf.size, 1, stdout);
+}
+
+connection_data_t *make_receiver_data(void) {
+  connection_data_t *cd = (connection_data_t*) malloc(sizeof(connection_data_t));
+  cd->sender = false;
+  return cd;
+}
+
+connection_data_t *make_sender_data(void) {
+  connection_data_t *cd = (connection_data_t*) malloc(sizeof(connection_data_t));
+  cd->sender = true;
+  return cd;
+}
+
+#define READ_BUFFERS 4
+
+/* This function handles events when we are acting as the receiver */
+static void handle_receive(app_data_t *app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+    case PN_RAW_CONNECTION_CONNECTED: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_buffer_t buffers[READ_BUFFERS] = {{0}};
+      int i = READ_BUFFERS;
+      for (; i; --i) {
+        pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
+        buff->bytes = (char*) malloc(1024);
+        buff->capacity = 1024;
+        buff->size = 0;
+        buff->offset = 0;
+      }
+      pn_raw_connection_give_read_buffers(c, buffers, READ_BUFFERS);
+    } break;
+
+    case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
+    } break;
+
+    default:
+      break;
+  }
+}
+
+#define WRITE_BUFFERS 4
+
+/* This function handles events when we are acting as the sender */
+static void handle_send(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+    case PN_RAW_CONNECTION_CONNECTED: {
+      printf("**raw connection connected\n");
+      app->connects++;
+    } break;
+
+    case PN_RAW_CONNECTION_DISCONNECTED: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      connection_data_t *cd = (connection_data_t*) pn_raw_connection_get_context(c);
+      free(cd);
+      printf("**raw connection disconnected\n");
+      app->disconnects++;
+      check_condition(event, pn_raw_connection_condition(c), app);
+    } break;
+
+    case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      char line[120];
+      if (fgets(line, sizeof(line), stdin)) {
+        send_message(c, line);
+      } else {
+        pn_raw_connection_close(c);
+      }
+    } break;
+
+    /* This path handles both received bytes and freeing buffers at close */
+    case PN_RAW_CONNECTION_READ: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_buffer_t buffs[READ_BUFFERS];
+      size_t n;
+      while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) {
+        unsigned i;
+        for (i=0; i<n && buffs[i].bytes; ++i) {
+          recv_message(buffs[i]);
+          free(buffs[i].bytes);
+        }
+      }
+    } break;
+
+    case PN_RAW_CONNECTION_CLOSED_READ: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_connection_close(c);
+    } break;
+
+    case PN_RAW_CONNECTION_WRITTEN: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_buffer_t buffs[READ_BUFFERS];
+      size_t n;
+      while ( (n = pn_raw_connection_take_written_buffers(c, buffs, READ_BUFFERS)) ) {
+        if (!pn_raw_connection_is_read_closed(c)) {
+          pn_raw_connection_give_read_buffers(c, buffs, n);
+        } else {
+          unsigned i;
+          for (i=0; i<n && buffs[i].bytes; ++i) {
+            free(buffs[i].bytes);
+          }
+        }
+      };
+    } break;
+
+    default:
+      break;
+  }
+}
+
+/* Handle all events, delegate to handle_send or handle_receive
+   Return true to continue, false to exit
+*/
+static bool handle(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+    case PN_PROACTOR_TIMEOUT: {
+    }  break;
+
+    case PN_PROACTOR_INACTIVE: {
+      return false;
+    } break;
+
+    default: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      if (c) {
+        connection_data_t *cd = (connection_data_t*) pn_raw_connection_get_context(c);
+        if (cd && cd->sender) {
+            handle_send(app, event);
+        } else {
+            handle_receive(app, event);
+        }
+      }
+    }
+  }
+  return exit_code == 0;
+}
+
+void run(app_data_t *app) {
+  /* Loop and handle events */
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+    pn_event_t *e;
+    for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+      if (!handle(app, e)) {
+        return;
+      }
+    }
+    pn_proactor_done(app->proactor, events);
+  } while(true);
+}
+
+int main(int argc, char **argv) {
+  struct app_data_t app = {0};
+  char addr[PN_MAX_ADDR];
+  pn_raw_connection_t *c = pn_raw_connection();
+  connection_data_t *cd = make_sender_data();
+
+  app.host = (argc > 1) ? argv[1] : "";
+  app.port = (argc > 2) ? argv[2] : "amqp";
+
+  /* Create the proactor and connect */
+  app.proactor = pn_proactor();
+  pn_raw_connection_set_context(c, cd);
+  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+  pn_proactor_raw_connect(app.proactor, c, addr);
+  run(&app);
+  pn_proactor_free(app.proactor);
+  return exit_code;
+}
diff --git a/c/examples/raw_echo.c b/c/examples/raw_echo.c
new file mode 100644
index 0000000..53fd47b
--- /dev/null
+++ b/c/examples/raw_echo.c
@@ -0,0 +1,240 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/condition.h>
+#include <proton/raw_connection.h>
+#include <proton/listener.h>
+#include <proton/netaddr.h>
+#include <proton/proactor.h>
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+typedef struct app_data_t {
+  const char *host, *port;
+  const char *amqp_address;
+
+  pn_proactor_t *proactor;
+  pn_listener_t *listener;
+
+  int connects;
+  int disconnects;
+
+  /* Sender values */
+
+  /* Receiver values */
+} app_data_t;
+
+static int exit_code = 0;
+
+/* Close the connection and the listener so so we will get a
+ * PN_PROACTOR_INACTIVE event and exit, once all outstanding events
+ * are processed.
+ */
+static void close_all(pn_raw_connection_t *c, app_data_t *app) {
+  if (c) pn_raw_connection_close(c);
+  if (app->listener) pn_listener_close(app->listener);
+}
+
+static bool check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app) {
+  if (pn_condition_is_set(cond)) {
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+    return true;
+  }
+
+  return false;
+}
+
+static void check_condition_fatal(pn_event_t *e, pn_condition_t *cond, app_data_t *app) {
+  if (check_condition(e, cond, app)) {
+    close_all(pn_event_raw_connection(e), app);
+    exit_code = 1;
+  }
+}
+
+static void recv_message(pn_raw_buffer_t buf) {
+  fwrite(buf.bytes, buf.size, 1, stdout);
+}
+
+void *make_receiver_data(void) {
+  return NULL;
+}
+
+#define READ_BUFFERS 4
+
+/* This function handles events when we are acting as the receiver */
+static void handle_receive(app_data_t *app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+    case PN_RAW_CONNECTION_CONNECTED: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_buffer_t buffers[READ_BUFFERS] = {{0}};
+      int i = READ_BUFFERS;
+      printf("**raw connection connected\n");
+      app->connects++;
+      for (; i; --i) {
+        pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
+        buff->bytes = (char*) malloc(1024);
+        buff->capacity = 1024;
+        buff->size = 0;
+        buff->offset = 0;
+      }
+      pn_raw_connection_give_read_buffers(c, buffers, READ_BUFFERS);
+    } break;
+
+    case PN_RAW_CONNECTION_DISCONNECTED: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      void *cd = pn_raw_connection_get_context(c);
+      free(cd);
+      printf("**raw connection disconnected\n");
+      app->disconnects++;
+      check_condition(event, pn_raw_connection_condition(c), app);
+    } break;
+
+    case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
+    } break;
+
+    /* This path handles both received bytes and freeing buffers at close */
+    case PN_RAW_CONNECTION_READ: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_buffer_t buffs[READ_BUFFERS];
+      size_t n;
+      while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) {
+        unsigned i;
+        for (i=0; i<n && buffs[i].bytes; ++i) {
+          recv_message(buffs[i]);
+        }
+
+        if (!pn_raw_connection_is_write_closed(c)) {
+          pn_raw_connection_write_buffers(c, buffs, n);
+        } else if (!pn_raw_connection_is_read_closed(c)) {
+          pn_raw_connection_give_read_buffers(c, buffs, n);
+        } else {
+          unsigned i;
+          for (i=0; i<n && buffs[i].bytes; ++i) {
+            free(buffs[i].bytes);
+          }
+        }
+      }
+    } break;
+    case PN_RAW_CONNECTION_CLOSED_WRITE:
+    case PN_RAW_CONNECTION_CLOSED_READ: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_connection_close(c);
+    } break;
+    case PN_RAW_CONNECTION_WRITTEN: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_buffer_t buffs[READ_BUFFERS];
+      size_t n;
+      while ( (n = pn_raw_connection_take_written_buffers(c, buffs, READ_BUFFERS)) ) {
+        if (!pn_raw_connection_is_read_closed(c)) {
+          pn_raw_connection_give_read_buffers(c, buffs, n);
+        } else {
+          unsigned i;
+          for (i=0; i<n && buffs[i].bytes; ++i) {
+            free(buffs[i].bytes);
+          }
+        }
+      };
+    } break;
+    default:
+      break;
+  }
+}
+
+#define WRITE_BUFFERS 4
+
+/* Handle all events, delegate to handle_send or handle_receive
+   Return true to continue, false to exit
+*/
+static bool handle(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+    case PN_LISTENER_OPEN: {
+      char port[256];             /* Get the listening port */
+      pn_netaddr_host_port(pn_listener_addr(pn_event_listener(event)), NULL, 0, port, sizeof(port));
+      printf("**listening on %s\n", port);
+      fflush(stdout);
+      break;
+    }
+    case PN_LISTENER_ACCEPT: {
+      pn_raw_connection_t *c = pn_raw_connection();
+      void *cd = make_receiver_data();
+      pn_raw_connection_set_context(c, cd);
+      pn_listener_raw_accept(pn_event_listener(event), c);
+
+      if (app->connects>2) pn_listener_close(app->listener);
+    } break;
+
+    case PN_LISTENER_CLOSE: {
+      app->listener = NULL;        /* Listener is closed */
+      check_condition_fatal(event, pn_listener_condition(pn_event_listener(event)), app);
+    } break;
+
+    case PN_PROACTOR_TIMEOUT: {
+    }  break;
+
+    case PN_PROACTOR_INACTIVE: {
+      return false;
+    } break;
+
+    default: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      if (c) {
+          handle_receive(app, event);
+      }
+    }
+  }
+  return exit_code == 0;
+}
+
+void run(app_data_t *app) {
+  /* Loop and handle events */
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+    pn_event_t *e;
+    for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+      if (!handle(app, e)) {
+        return;
+      }
+    }
+    pn_proactor_done(app->proactor, events);
+  } while(true);
+}
+
+int main(int argc, char **argv) {
+  struct app_data_t app = {0};
+  char addr[PN_MAX_ADDR];
+  app.host = (argc > 1) ? argv[1] : "";
+  app.port = (argc > 2) ? argv[2] : "amqp";
+
+  /* Create the proactor and connect */
+  app.proactor = pn_proactor();
+  app.listener = pn_listener();
+  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+  pn_proactor_listen(app.proactor, app.listener, addr, 16);
+  run(&app);
+  pn_proactor_free(app.proactor);
+  return exit_code;
+}
diff --git a/c/include/proton/event.h b/c/include/proton/event.h
index 8e5fba2..9562e78 100644
--- a/c/include/proton/event.h
+++ b/c/include/proton/event.h
@@ -347,7 +347,120 @@ typedef enum {
    * The listener is listening.
    * Events of this type point to the @ref pn_listener_t.
    */
-  PN_LISTENER_OPEN
+  PN_LISTENER_OPEN,
+
+  /**
+   * The raw connection connected.
+   * Now would be a good time to give the raw connection some buffers
+   * to read bytes from the underlying socket. If you don't do it
+   * now you will get @ref PN_RAW_CONNECTION_NEED_READ_BUFFERS (and
+   * @ref PN_RAW_CONNECTION_NEED_WRITE_BUFFERS) events when the socket is readable
+   * (or writable) but there are not buffers available.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_CONNECTED,
+
+  /**
+   * The remote end of the raw connection closed the connection so
+   * that we can no longer read.
+   *
+   * When both this and @ref PN_RAW_CONNECTION_CLOSED_WRITE event have
+   * occurred then the @ref PN_RAW_CONNECTION_DISCONNECTED event will be
+   * generated.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_CLOSED_READ,
+
+  /**
+   * The remote end of the raw connection closed the connection so
+   * that we can no longer write.
+   *
+   * When both this and @ref PN_RAW_CONNECTION_CLOSED_READ event have
+   * occurred then the @ref PN_RAW_CONNECTION_DISCONNECTED event will be
+   * generated.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_CLOSED_WRITE,
+
+  /**
+   * The raw connection is disconnected.
+   * No more bytes will be read or written on the connection. Every buffer
+   * in use will already either have been returned using a
+   * @ref PN_RAW_CONNECTION_READ or @ref PN_RAW_CONNECTION_WRITTEN event.
+   * This event will always be the last for this raw connection, and so
+   * the application can clean up the raw connection at this point.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_DISCONNECTED,
+
+  /**
+   * The raw connection might need more read buffers.
+   * The connection is readable, but the connection has no buffer to read the
+   * bytes into. If you supply some buffers now maybe you'll get a
+   * @ref PN_RAW_CONNECTION_READ event soon, but no guarantees.
+   *
+   * This event is edge triggered and you will only get it once until you give
+   * the raw connection some more read buffers.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_NEED_READ_BUFFERS,
+
+  /**
+   * The raw connection might need more write buffers.
+   * The connection is writable but has no buffers to write. If you give the
+   * raw connection something to write using @ref pn_raw_connection_write_buffers
+   * the raw connection can write them. It is not necessary to wait for this event
+   * before sending buffers to write, but it can be used to aid in flow control (maybe).
+   *
+   * This event is edge triggered and you will only get it once until you give
+   * the raw connection something more to write.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_NEED_WRITE_BUFFERS,
+
+  /**
+   * The raw connection read bytes: The bytes that were read are
+   * in one of the read buffers given to the raw connection.
+   *
+   * This event will be sent if there are bytes that have been read
+   * in a buffer owned by the raw connection and there is no
+   * @ref PN_RAW_CONNECTION_READ event still queued.
+   *
+   * When a connection closes all read buffers are returned to the
+   * application using @ref PN_RAW_CONNECTION_READ events with empty buffers.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_READ,
+
+  /**
+   * The raw connection has finished a write and the buffers that were
+   * used are no longer in use and can be recycled.
+   *
+   * This event will be sent if there are buffers that have been written still
+   * owned by the raw connection and there is no @ref PN_RAW_CONNECTION_WRITTEN
+   * event currently queued.
+   *
+   * When a connection closes all write buffers are returned using
+   * @ref PN_RAW_CONNECTION_WRITTEN events.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_WRITTEN,
+
+  /**
+   * The raw connection was woken by @ref pn_raw_connection_wake
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_WAKE
+
 } pn_event_type_t;
 
 
diff --git a/c/include/proton/listener.h b/c/include/proton/listener.h
index e4bbf35..5d2cfe2 100644
--- a/c/include/proton/listener.h
+++ b/c/include/proton/listener.h
@@ -135,6 +135,25 @@ PNP_EXTERN pn_proactor_t *pn_listener_proactor(pn_listener_t *c);
 PNP_EXTERN pn_listener_t *pn_event_listener(pn_event_t *event);
 
 /**
+ * Accept an incoming connection request as a raw connection.
+ *
+ * Call after a @ref PN_LISTENER_ACCEPT event.
+ *
+ * Errors are returned as @ref PN_RAW_CONNECTION_DISCONNECTED by pn_proactor_wait().
+ *
+ * @param[in] listener the listener
+ * @param[in] raw_connection the application must create a raw connection with pn_raw_connection()
+ * this parameter cannot be null.If NULL a new connection is created.
+ *
+ * The proactor that owns the @p listener *takes ownership* of @p raw_connection and will
+ * automatically call pn_raw_connection_free() after the final @ref
+ * PN_RAW_CONNECTION_DISCONNECTED event is handled, or when pn_proactor_free() is
+ * called.
+ *
+ */
+PNP_EXTERN void pn_listener_raw_accept(pn_listener_t *listener, pn_raw_connection_t *raw_connection);
+
+/**
  *@}
  */
 
diff --git a/c/include/proton/proactor.h b/c/include/proton/proactor.h
index b0303c9..93b9c89 100644
--- a/c/include/proton/proactor.h
+++ b/c/include/proton/proactor.h
@@ -341,6 +341,29 @@ PNP_EXTERN pn_millis_t pn_proactor_now(void);
 PNP_EXTERN int64_t pn_proactor_now_64(void);
 
 /**
+ * Connect @p addr and bind to @p raw_connection.
+ *
+ * Errors are returned as  @ref PN_RAW_CONNECTION_DISCONNECTED events by pn_proactor_wait().
+ *
+ * @note Thread-safe
+ *
+ * @param[in] proactor the proactor
+ *
+ * @param[in] raw_connection the application must create a raw connection with pn_raw_connection()
+ * this parameter cannot be null.
+ *
+ * @p proactor *takes ownership* of @p raw_connection and will
+ * automatically call pn_raw_connection_free() after the final @ref
+ * PN_RAW_CONNECTION_DISCONNECTED event is handled, or when pn_proactor_free() is
+ * called.
+ *
+ * @param[in] addr the "host:port" network address, constructed by pn_proactor_addr()
+ * An empty host will connect to the local host via the default protocol (IPV6 or IPV4).
+ * An empty port will connect to the standard AMQP port (5672).
+ */
+PNP_EXTERN void pn_proactor_raw_connect(pn_proactor_t *proactor, pn_raw_connection_t *raw_connection, const char *addr);
+
+/**
  * @}
  */
 
@@ -391,6 +414,15 @@ PNP_EXTERN int64_t pn_proactor_now_64(void);
  * @ref PN_PROACTOR_TIMEOUT | @copybrief PN_PROACTOR_TIMEOUT
  * @ref PN_PROACTOR_INACTIVE | @copybrief PN_PROACTOR_INACTIVE
  * @ref PN_CONNECTION_WAKE | @copybrief PN_CONNECTION_WAKE
+ * @ref PN_RAW_CONNECTION_CONNECTED | @copybrief PN_RAW_CONNECTION_CONNECTED
+ * @ref PN_RAW_CONNECTION_CLOSED_READ | @copybrief PN_RAW_CONNECTION_CLOSED_READ
+ * @ref PN_RAW_CONNECTION_CLOSED_WRITE | @copybrief PN_RAW_CONNECTION_CLOSED_WRITE
+ * @ref PN_RAW_CONNECTION_DISCONNECTED | @copybrief PN_RAW_CONNECTION_DISCONNECTED
+ * @ref PN_RAW_CONNECTION_NEED_READ_BUFFERS | @copybrief PN_RAW_CONNECTION_NEED_READ_BUFFERS
+ * @ref PN_RAW_CONNECTION_NEED_WRITE_BUFFERS | @copybrief PN_RAW_CONNECTION_NEED_WRITE_BUFFERS
+ * @ref PN_RAW_CONNECTION_READ | @copybrief PN_RAW_CONNECTION_READ
+ * @ref PN_RAW_CONNECTION_WRITTEN | @copybrief PN_RAW_CONNECTION_WRITTEN
+ * @ref PN_RAW_CONNECTION_WAKE | @copybrief PN_RAW_CONNECTION_WAKE
  *
  * @}
  */
diff --git a/c/include/proton/raw_connection.h b/c/include/proton/raw_connection.h
new file mode 100644
index 0000000..a483665
--- /dev/null
+++ b/c/include/proton/raw_connection.h
@@ -0,0 +1,265 @@
+#ifndef PROTON_RAW_CONNECTION_H
+#define PROTON_RAW_CONNECTION_H 1
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/condition.h>
+#include <proton/event.h>
+#include <proton/import_export.h>
+#include <proton/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @file
+ *
+ * @addtogroup raw_connection
+ * @{
+ */
+
+/**
+ * A descriptor used to represent a single raw buffer in memory.
+ *
+ * @note The intent of the offset is to allow the actual bytes being read/written to be at a variable
+ * location relative to the head of the buffer because of other data or structures that are important to the application
+ * associated with the data to be written but not themselves read/written to the connection.
+ *
+ * @note For read buffers: When read buffers are returned to the application size will be the number of bytes read.
+ * Read operations will not change the context, bytes or capacity members of the structure.
+ *
+ * @note For write buffers: When write buffers are returned to the application all of the struct members will be returned
+ * unaltered. Also write operations will not modify the bytes of the buffer passed in at all. In principle this means that
+ * the write buffer can be used for multiple writes at the same time as long as the actual buffer is unmodified by the
+ * application at any time the buffer is being used by any raw connection.
+ */
+typedef struct pn_raw_buffer_t {
+  uintptr_t context; /**< Used to associate arbitrary application data with this raw buffer */
+  char *bytes; /**< Pointer to the start of the raw buffer, if this is null then no buffer is represented */
+  uint32_t capacity; /**< Count of available bytes starting at @ref bytes */
+  uint32_t size; /**< Number of bytes read or to be written starting at @ref offset */
+  uint32_t offset; /**< First byte in the buffer to be read or written */
+} pn_raw_buffer_t;
+
+/**
+ * Create a new raw connection for use with the @ref proactor.
+ * See @ref pn_proactor_raw_connect and @ref pn_listener_raw_accept.
+ *
+ * @return A newly allocated pn_connection_t or NULL if there wasn't sufficient memory.
+ *
+ * @note This is the only pn_connection_t function that allocates memory. So an application that
+ * wants good control of out of memory conditions should check the return value for NULL.
+ *
+ * @note It would be a good practice is to create a raw connection and attach application
+ * specific context to it before giving it to the proactor.
+ *
+ * @note There is no way to free a pn_connection_t as once passed to the proactor the proactor owns
+ * it and controls its lifecycle.
+ */
+PNP_EXTERN pn_raw_connection_t *pn_raw_connection(void);
+
+/**
+ * Get the local address of a raw connection. Return `NULL` if not available.
+ * Pointer is invalid after the transport closes (@ref PN_RAW_CONNECTION_DISCONNECTED event is handled)
+ */
+PNP_EXTERN const struct pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *connection);
+
+/**
+ * Get the local address of a raw connection. Return `NULL` if not available.
+ * Pointer is invalid after the transport closes (@ref PN_RAW_CONNECTION_DISCONNECTED event is handled)
+ */
+PNP_EXTERN const struct pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *connection);
+
+/**
+ * Close a raw connection.
+ * This will close the underlying socket and release all buffers held by the raw connection.
+ * It will cause @ref PN_RAW_CONNECTION_READ and @ref PN_RAW_CONNECTION_WRITTEN to be emitted so
+ * the application can clean up buffers given to the raw connection. After that a
+ * @ref PN_RAW_CONNECTION_DISCONNECTED event will be emitted to allow the application to clean up
+ * any other state held by the raw connection.
+ *
+ */
+PNP_EXTERN void pn_raw_connection_close(pn_raw_connection_t *connection);
+
+/**
+ * Query the raw connection for how many more read buffers it can be given
+ */
+PNP_EXTERN size_t pn_raw_connection_read_buffers_capacity(pn_raw_connection_t *connection);
+
+/**
+ * Query the raw connection for how many more write buffers it can be given
+ */
+PNP_EXTERN size_t pn_raw_connection_write_buffers_capacity(pn_raw_connection_t *connection);
+
+/**
+ * Give the raw connection buffers to use for reading from the underlying socket.
+ * If the raw socket has no read buffers then the application will never receive
+ * a @ref PN_RAW_CONNECTION_READ event.
+ *
+ * A @ref PN_RAW_CONNECTION_NEED_READ_BUFFERS event will be generated immediately after
+ * the @ref PN_RAW_CONNECTION_CONNECTED event if there are no read buffers. It will also be
+ * generated whenever the raw connection runs out of read buffers. In both these cases the
+ * event will not be generated again until @ref pn_raw_connection_give_read_buffers is called.
+ *
+ * @return the number of buffers actually given to the raw connection. This will only be different
+ * from the number supplied if the connection has no more space to record more buffers. In this case
+ * the buffers taken will be the earlier buffers in the array supplied, the elements 0 to the
+ * returned value-1.
+ *
+ * @note The buffers given to the raw connection are owned by it until the application
+ * receives a @ref PN_RAW_CONNECTION_READ event giving them back to the application. They must
+ * not be accessed at all (written or even read) from calling @ref pn_raw_connection_give_read_buffers
+ * until receiving this event.
+ *
+ * @note The application should not assume that the @ref PN_RAW_CONNECTION_NEED_READ_BUFFERS
+ * event signifies that the connection is readable.
+ */
+PNP_EXTERN size_t pn_raw_connection_give_read_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t const *buffers, size_t num);
+
+/**
+ * Fetch buffers with bytes read from the raw socket
+ *
+ * @param[out] buffers pointer to an array of @ref pn_raw_buffer_t structures which will be filled in with the read buffer information
+ * @param[in] num the number of buffers allocated in the passed in array of buffers
+ * @return the number of buffers being returned, if there is are no read bytes then this will be 0. As many buffers will be returned
+ * as can be given the number that are passed in. So if the number returned is less than the number passed in there are no more buffers
+ * read. But if the number is the same there may be more read buffers to take.
+ *
+ * @note After the application receives @ref PN_RAW_CONNECTION_READ there should be bytes read from the socket and
+ * hence this call should return buffers. It is safe to carry on calling @ref pn_raw_connection_take_read_buffers
+ * until it returns 0.
+ */
+PNP_EXTERN size_t pn_raw_connection_take_read_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t *buffers, size_t num);
+
+/**
+ * Give the raw connection buffers to write to the underlying socket.
+ *
+ * A @ref PN_RAW_CONNECTION_WRITTEN event will be generated once the buffers have been written to the socket
+ * until this point the buffers must not be accessed at all (written or even read).
+ *
+ * A @ref PN_RAW_CONNECTION_NEED_WRITE_BUFFERS event will be generated immediately after
+ * the @ref PN_RAW_CONNECTION_CONNECTED event if there are no write buffers. It will also be
+ * generated whenever the raw connection finishes writing all the write buffers. In both these cases the
+ * event will not be generated again until @ref pn_raw_connection_write_buffers is called.
+ *
+ * @return the number of buffers actually recorded by the raw connection to write. This will only be different
+ * from the number supplied if the connection has no more space to record more buffers. In this case
+ * the buffers recorded will be the earlier buffers in the array supplied, the elements 0 to the
+ * returned value-1.
+ *
+ */
+PNP_EXTERN size_t pn_raw_connection_write_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t const *buffers, size_t num);
+
+/**
+ * Return a buffer chain with buffers that have all been written to the raw socket
+ *
+ * @param[out] buffers pointer to an array of @ref pn_raw_buffer_t structures which will be filled in with the written buffer information
+ * @param[in] num the number of buffers allocated in the passed in array of buffers
+ * @return the number of buffers being returned, if there is are no written buffers to return then this will be 0. As many buffers will be returned
+ * as can be given the number that are passed in. So if the number returned is less than the number passed in there are no more buffers
+ * written. But if the number is the same there may be more written buffers to take.
+ *
+ * @note After the application receives @ref PN_RAW_CONNECTION_WRITTEN there should be bytes written to the socket and
+ * hence this call should return buffers. It is safe to carry on calling @ref pn_raw_connection_take_written_buffers
+ * until it returns 0.
+ */
+PNP_EXTERN size_t pn_raw_connection_take_written_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t *buffers, size_t num);
+
+/**
+ * Is @p connection closed for read?
+ *
+ * @return true if the raw connection is closed for read.
+ */
+PNP_EXTERN bool pn_raw_connection_is_read_closed(pn_raw_connection_t *connection);
+
+/**
+ * Is @p connection closed for write?
+ *
+ * @return true if the raw connection is closed for write.
+ */
+PNP_EXTERN bool pn_raw_connection_is_write_closed(pn_raw_connection_t *connection);
+
+/**
+ * Return a @ref PN_RAW_CONNECTION_WAKE event for @p connection as soon as possible.
+ *
+ * At least one wake event will be returned, serialized with other @ref proactor_events
+ * for the same raw connection.  Wakes can be "coalesced" - if several
+ * @ref pn_raw_connection_wake() calls happen close together, there may be only one
+ * @ref PN_RAW_CONNECTION_WAKE event that occurs after all of them.
+ *
+ * @note Thread-safe
+ */
+PNP_EXTERN void pn_raw_connection_wake(pn_raw_connection_t *connection);
+
+/**
+ * Get additional information about a raw connection error.
+ * There is a raw connection error if the @ref PN_RAW_CONNECTION_DISCONNECTED event
+ * is received and the pn_condition_t associated is non null (@see pn_condition_is_set).
+ *
+ * The value returned is only valid until the end of handler for the
+ * @ref PN_RAW_CONNECTION_DISCONNECTED event.
+ */
+PNP_EXTERN pn_condition_t *pn_raw_connection_condition(pn_raw_connection_t *connection);
+
+/**
+ * Get the application context associated with this raw connection.
+ *
+ * The application context for a raw connection may be set using
+ * ::pn_raw_connection_set_context.
+ *
+ * @param[in] connection the raw connection whose context is to be returned.
+ * @return the application context for the raw connection
+ */
+PNP_EXTERN void *pn_raw_connection_get_context(pn_raw_connection_t *connection);
+
+/**
+ * Set a new application context for a raw connection.
+ *
+ * The application context for a raw connection may be retrieved
+ * using ::pn_raw_connection_get_context.
+ *
+ * @param[in] connection the raw connection object
+ * @param[in] context the application context
+ */
+PNP_EXTERN void pn_raw_connection_set_context(pn_raw_connection_t *connection, void *context);
+
+/**
+ * Get the attachments that are associated with a raw connection.
+ */
+PNP_EXTERN pn_record_t *pn_raw_connection_attachments(pn_raw_connection_t *connection);
+
+/**
+ * Return the raw connection associated with an event.
+ *
+ * @return NULL if the event is not associated with a raw connection.
+ */
+PNP_EXTERN pn_raw_connection_t *pn_event_raw_connection(pn_event_t *event);
+
+
+/**
+ * @}
+ */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* raw_connection.h */
diff --git a/c/include/proton/types.h b/c/include/proton/types.h
index edcca98..f4f496e 100644
--- a/c/include/proton/types.h
+++ b/c/include/proton/types.h
@@ -115,6 +115,10 @@
  * @brief **Unsettled API** - A listener for incoming connections.
  * @ingroup io
  *
+ * @defgroup raw_connection Raw connection
+ * @brief **Unsettled API** - An API allowing raw sockets to be used with proactor
+ * @ingroup io
+ *
  * @defgroup connection_driver Connection driver
  * @brief **Unsettled API** - An API for low-level IO integration.
  * @ingroup io
@@ -438,6 +442,13 @@ typedef struct pn_transport_t pn_transport_t;
 typedef struct pn_proactor_t pn_proactor_t;
 
 /**
+ * A raw network connection used with the proactor.
+ *
+ * @ingroup raw_connection
+ */
+typedef struct pn_raw_connection_t pn_raw_connection_t;
+
+/**
  * A batch of events that must be handled in sequence.
  *
  * A pn_event_batch_t encapsulates potentially multiple events that relate
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 5170aa6..5c07e28 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -70,6 +70,7 @@
 #include <proton/transport.h>
 #include <proton/listener.h>
 #include <proton/netaddr.h>
+#include <proton/raw_connection.h>
 
 #include <assert.h>
 #include <stddef.h>
diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c
index 1c87f94..dd33b41 100644
--- a/c/src/proactor/libuv.c
+++ b/c/src/proactor/libuv.c
@@ -34,6 +34,7 @@
 #include <proton/message.h>
 #include <proton/netaddr.h>
 #include <proton/proactor.h>
+#include <proton/raw_connection.h>
 #include <proton/transport.h>
 
 #include <uv.h>
diff --git a/c/src/proactor/win_iocp.cpp b/c/src/proactor/win_iocp.cpp
index 764a7c4..348564c 100644
--- a/c/src/proactor/win_iocp.cpp
+++ b/c/src/proactor/win_iocp.cpp
@@ -29,6 +29,7 @@
 #include <proton/transport.h>
 #include <proton/listener.h>
 #include <proton/proactor.h>
+#include <proton/raw_connection.h>
 
 #include <assert.h>
 #include <stddef.h>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org