You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2020/08/12 15:34:11 UTC

[qpid-proton] 01/05: PROTON-2247: Raw connections API for the proactor - API defined in header files - Simple test applicationis derived from the direct.c example

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit a2528bb4e1bb37a0d2d765ed0b261ee2e8a56471
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Thu Jan 23 15:46:15 2020 -0500

    PROTON-2247: Raw connections API for the proactor
    - API defined in header files
    - Simple test applicationis derived from the direct.c example
---
 c/CMakeLists.txt                  |   1 +
 c/docs/user.doxygen.in            |   1 +
 c/examples/CMakeLists.txt         |   2 +-
 c/examples/raw_connect.c          | 256 ++++++++++++++++++++++++++++++++++++
 c/examples/raw_echo.c             | 240 ++++++++++++++++++++++++++++++++++
 c/include/proton/event.h          | 115 ++++++++++++++++-
 c/include/proton/listener.h       |  19 +++
 c/include/proton/proactor.h       |  32 +++++
 c/include/proton/raw_connection.h | 265 ++++++++++++++++++++++++++++++++++++++
 c/include/proton/types.h          |  11 ++
 c/src/proactor/epoll.c            |   1 +
 c/src/proactor/libuv.c            |   1 +
 c/src/proactor/win_iocp.cpp       |   1 +
 13 files changed, 943 insertions(+), 2 deletions(-)

diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt
index 17495a5..b686c42 100644
--- a/c/CMakeLists.txt
+++ b/c/CMakeLists.txt
@@ -302,6 +302,7 @@ set (qpid-proton-include
   include/proton/netaddr.h
   include/proton/object.h
   include/proton/proactor.h
+  include/proton/raw_connection.h
   include/proton/sasl.h
   include/proton/sasl-plugin.h
   include/proton/session.h
diff --git a/c/docs/user.doxygen.in b/c/docs/user.doxygen.in
index ae6894a..8034e6c 100644
--- a/c/docs/user.doxygen.in
+++ b/c/docs/user.doxygen.in
@@ -33,6 +33,7 @@ INLINE_SIMPLE_STRUCTS   = YES
 HIDE_UNDOC_CLASSES      = YES
 HIDE_COMPOUND_REFERENCE = YES
 HIDE_SCOPE_NAMES        = YES
+TYPEDEF_HIDES_STRUCT    = YES
 MAX_INITIALIZER_LINES   = 0
 ALPHABETICAL_INDEX      = NO
 SORT_MEMBER_DOCS        = NO
diff --git a/c/examples/CMakeLists.txt b/c/examples/CMakeLists.txt
index 9771ec5..ec29888 100644
--- a/c/examples/CMakeLists.txt
+++ b/c/examples/CMakeLists.txt
@@ -26,7 +26,7 @@ find_package(Threads REQUIRED)
 include_directories(${Proton_INCLUDE_DIRS})
 add_definitions(${Proton_DEFINITIONS})
 
-foreach (name broker send receive direct send-abort send-ssl)
+foreach (name broker send receive direct send-abort send-ssl raw_echo raw_connect)
   add_executable(c-${name} ${name}.c)
   target_link_libraries(c-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
   set_target_properties(c-${name} PROPERTIES
diff --git a/c/examples/raw_connect.c b/c/examples/raw_connect.c
new file mode 100644
index 0000000..d38a3d7
--- /dev/null
+++ b/c/examples/raw_connect.c
@@ -0,0 +1,256 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/condition.h>
+#include <proton/raw_connection.h>
+#include <proton/listener.h>
+#include <proton/netaddr.h>
+#include <proton/proactor.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+typedef struct app_data_t {
+  const char *host, *port;
+  const char *amqp_address;
+
+  pn_proactor_t *proactor;
+  pn_listener_t *listener;
+
+  int connects;
+  int disconnects;
+
+  /* Sender values */
+
+  /* Receiver values */
+} app_data_t;
+
+typedef struct connection_data_t {
+  bool sender;
+} connection_data_t;
+
+static int exit_code = 0;
+
+/* Close the connection and the listener so so we will get a
+ * PN_PROACTOR_INACTIVE event and exit, once all outstanding events
+ * are processed.
+ */
+static void close_all(pn_raw_connection_t *c, app_data_t *app) {
+  if (c) pn_raw_connection_close(c);
+  if (app->listener) pn_listener_close(app->listener);
+}
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app) {
+  if (pn_condition_is_set(cond)) {
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+    close_all(pn_event_raw_connection(e), app);
+    exit_code = 1;
+  }
+}
+
+static void send_message(pn_raw_connection_t *c, const char* msg) {
+  pn_raw_buffer_t buffer;
+  uint32_t len = strlen(msg);
+  char *buf = (char*) malloc(1024);
+  memcpy(buf, msg, len);
+  buffer.bytes = buf;
+  buffer.capacity = 1024;
+  buffer.offset = 0;
+  buffer.size = len;
+  pn_raw_connection_write_buffers(c, &buffer, 1);
+}
+
+static void recv_message(pn_raw_buffer_t buf) {
+  fwrite(buf.bytes, buf.size, 1, stdout);
+}
+
+connection_data_t *make_receiver_data(void) {
+  connection_data_t *cd = (connection_data_t*) malloc(sizeof(connection_data_t));
+  cd->sender = false;
+  return cd;
+}
+
+connection_data_t *make_sender_data(void) {
+  connection_data_t *cd = (connection_data_t*) malloc(sizeof(connection_data_t));
+  cd->sender = true;
+  return cd;
+}
+
+#define READ_BUFFERS 4
+
+/* This function handles events when we are acting as the receiver */
+static void handle_receive(app_data_t *app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+    case PN_RAW_CONNECTION_CONNECTED: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_buffer_t buffers[READ_BUFFERS] = {{0}};
+      int i = READ_BUFFERS;
+      for (; i; --i) {
+        pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
+        buff->bytes = (char*) malloc(1024);
+        buff->capacity = 1024;
+        buff->size = 0;
+        buff->offset = 0;
+      }
+      pn_raw_connection_give_read_buffers(c, buffers, READ_BUFFERS);
+    } break;
+
+    case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
+    } break;
+
+    default:
+      break;
+  }
+}
+
+#define WRITE_BUFFERS 4
+
+/* This function handles events when we are acting as the sender */
+static void handle_send(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+    case PN_RAW_CONNECTION_CONNECTED: {
+      printf("**raw connection connected\n");
+      app->connects++;
+    } break;
+
+    case PN_RAW_CONNECTION_DISCONNECTED: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      connection_data_t *cd = (connection_data_t*) pn_raw_connection_get_context(c);
+      free(cd);
+      printf("**raw connection disconnected\n");
+      app->disconnects++;
+      check_condition(event, pn_raw_connection_condition(c), app);
+    } break;
+
+    case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      char line[120];
+      if (fgets(line, sizeof(line), stdin)) {
+        send_message(c, line);
+      } else {
+        pn_raw_connection_close(c);
+      }
+    } break;
+
+    /* This path handles both received bytes and freeing buffers at close */
+    case PN_RAW_CONNECTION_READ: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_buffer_t buffs[READ_BUFFERS];
+      size_t n;
+      while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) {
+        unsigned i;
+        for (i=0; i<n && buffs[i].bytes; ++i) {
+          recv_message(buffs[i]);
+          free(buffs[i].bytes);
+        }
+      }
+    } break;
+
+    case PN_RAW_CONNECTION_CLOSED_READ: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_connection_close(c);
+    } break;
+
+    case PN_RAW_CONNECTION_WRITTEN: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_buffer_t buffs[READ_BUFFERS];
+      size_t n;
+      while ( (n = pn_raw_connection_take_written_buffers(c, buffs, READ_BUFFERS)) ) {
+        if (!pn_raw_connection_is_read_closed(c)) {
+          pn_raw_connection_give_read_buffers(c, buffs, n);
+        } else {
+          unsigned i;
+          for (i=0; i<n && buffs[i].bytes; ++i) {
+            free(buffs[i].bytes);
+          }
+        }
+      };
+    } break;
+
+    default:
+      break;
+  }
+}
+
+/* Handle all events, delegate to handle_send or handle_receive
+   Return true to continue, false to exit
+*/
+static bool handle(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+    case PN_PROACTOR_TIMEOUT: {
+    }  break;
+
+    case PN_PROACTOR_INACTIVE: {
+      return false;
+    } break;
+
+    default: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      if (c) {
+        connection_data_t *cd = (connection_data_t*) pn_raw_connection_get_context(c);
+        if (cd && cd->sender) {
+            handle_send(app, event);
+        } else {
+            handle_receive(app, event);
+        }
+      }
+    }
+  }
+  return exit_code == 0;
+}
+
+void run(app_data_t *app) {
+  /* Loop and handle events */
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+    pn_event_t *e;
+    for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+      if (!handle(app, e)) {
+        return;
+      }
+    }
+    pn_proactor_done(app->proactor, events);
+  } while(true);
+}
+
+int main(int argc, char **argv) {
+  struct app_data_t app = {0};
+  char addr[PN_MAX_ADDR];
+  pn_raw_connection_t *c = pn_raw_connection();
+  connection_data_t *cd = make_sender_data();
+
+  app.host = (argc > 1) ? argv[1] : "";
+  app.port = (argc > 2) ? argv[2] : "amqp";
+
+  /* Create the proactor and connect */
+  app.proactor = pn_proactor();
+  pn_raw_connection_set_context(c, cd);
+  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+  pn_proactor_raw_connect(app.proactor, c, addr);
+  run(&app);
+  pn_proactor_free(app.proactor);
+  return exit_code;
+}
diff --git a/c/examples/raw_echo.c b/c/examples/raw_echo.c
new file mode 100644
index 0000000..53fd47b
--- /dev/null
+++ b/c/examples/raw_echo.c
@@ -0,0 +1,240 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/condition.h>
+#include <proton/raw_connection.h>
+#include <proton/listener.h>
+#include <proton/netaddr.h>
+#include <proton/proactor.h>
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+typedef struct app_data_t {
+  const char *host, *port;
+  const char *amqp_address;
+
+  pn_proactor_t *proactor;
+  pn_listener_t *listener;
+
+  int connects;
+  int disconnects;
+
+  /* Sender values */
+
+  /* Receiver values */
+} app_data_t;
+
+static int exit_code = 0;
+
+/* Close the connection and the listener so so we will get a
+ * PN_PROACTOR_INACTIVE event and exit, once all outstanding events
+ * are processed.
+ */
+static void close_all(pn_raw_connection_t *c, app_data_t *app) {
+  if (c) pn_raw_connection_close(c);
+  if (app->listener) pn_listener_close(app->listener);
+}
+
+static bool check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app) {
+  if (pn_condition_is_set(cond)) {
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+    return true;
+  }
+
+  return false;
+}
+
+static void check_condition_fatal(pn_event_t *e, pn_condition_t *cond, app_data_t *app) {
+  if (check_condition(e, cond, app)) {
+    close_all(pn_event_raw_connection(e), app);
+    exit_code = 1;
+  }
+}
+
+static void recv_message(pn_raw_buffer_t buf) {
+  fwrite(buf.bytes, buf.size, 1, stdout);
+}
+
+void *make_receiver_data(void) {
+  return NULL;
+}
+
+#define READ_BUFFERS 4
+
+/* This function handles events when we are acting as the receiver */
+static void handle_receive(app_data_t *app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+    case PN_RAW_CONNECTION_CONNECTED: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_buffer_t buffers[READ_BUFFERS] = {{0}};
+      int i = READ_BUFFERS;
+      printf("**raw connection connected\n");
+      app->connects++;
+      for (; i; --i) {
+        pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
+        buff->bytes = (char*) malloc(1024);
+        buff->capacity = 1024;
+        buff->size = 0;
+        buff->offset = 0;
+      }
+      pn_raw_connection_give_read_buffers(c, buffers, READ_BUFFERS);
+    } break;
+
+    case PN_RAW_CONNECTION_DISCONNECTED: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      void *cd = pn_raw_connection_get_context(c);
+      free(cd);
+      printf("**raw connection disconnected\n");
+      app->disconnects++;
+      check_condition(event, pn_raw_connection_condition(c), app);
+    } break;
+
+    case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
+    } break;
+
+    /* This path handles both received bytes and freeing buffers at close */
+    case PN_RAW_CONNECTION_READ: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_buffer_t buffs[READ_BUFFERS];
+      size_t n;
+      while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) {
+        unsigned i;
+        for (i=0; i<n && buffs[i].bytes; ++i) {
+          recv_message(buffs[i]);
+        }
+
+        if (!pn_raw_connection_is_write_closed(c)) {
+          pn_raw_connection_write_buffers(c, buffs, n);
+        } else if (!pn_raw_connection_is_read_closed(c)) {
+          pn_raw_connection_give_read_buffers(c, buffs, n);
+        } else {
+          unsigned i;
+          for (i=0; i<n && buffs[i].bytes; ++i) {
+            free(buffs[i].bytes);
+          }
+        }
+      }
+    } break;
+    case PN_RAW_CONNECTION_CLOSED_WRITE:
+    case PN_RAW_CONNECTION_CLOSED_READ: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_connection_close(c);
+    } break;
+    case PN_RAW_CONNECTION_WRITTEN: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_buffer_t buffs[READ_BUFFERS];
+      size_t n;
+      while ( (n = pn_raw_connection_take_written_buffers(c, buffs, READ_BUFFERS)) ) {
+        if (!pn_raw_connection_is_read_closed(c)) {
+          pn_raw_connection_give_read_buffers(c, buffs, n);
+        } else {
+          unsigned i;
+          for (i=0; i<n && buffs[i].bytes; ++i) {
+            free(buffs[i].bytes);
+          }
+        }
+      };
+    } break;
+    default:
+      break;
+  }
+}
+
+#define WRITE_BUFFERS 4
+
+/* Handle all events, delegate to handle_send or handle_receive
+   Return true to continue, false to exit
+*/
+static bool handle(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+    case PN_LISTENER_OPEN: {
+      char port[256];             /* Get the listening port */
+      pn_netaddr_host_port(pn_listener_addr(pn_event_listener(event)), NULL, 0, port, sizeof(port));
+      printf("**listening on %s\n", port);
+      fflush(stdout);
+      break;
+    }
+    case PN_LISTENER_ACCEPT: {
+      pn_raw_connection_t *c = pn_raw_connection();
+      void *cd = make_receiver_data();
+      pn_raw_connection_set_context(c, cd);
+      pn_listener_raw_accept(pn_event_listener(event), c);
+
+      if (app->connects>2) pn_listener_close(app->listener);
+    } break;
+
+    case PN_LISTENER_CLOSE: {
+      app->listener = NULL;        /* Listener is closed */
+      check_condition_fatal(event, pn_listener_condition(pn_event_listener(event)), app);
+    } break;
+
+    case PN_PROACTOR_TIMEOUT: {
+    }  break;
+
+    case PN_PROACTOR_INACTIVE: {
+      return false;
+    } break;
+
+    default: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      if (c) {
+          handle_receive(app, event);
+      }
+    }
+  }
+  return exit_code == 0;
+}
+
+void run(app_data_t *app) {
+  /* Loop and handle events */
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+    pn_event_t *e;
+    for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+      if (!handle(app, e)) {
+        return;
+      }
+    }
+    pn_proactor_done(app->proactor, events);
+  } while(true);
+}
+
+int main(int argc, char **argv) {
+  struct app_data_t app = {0};
+  char addr[PN_MAX_ADDR];
+  app.host = (argc > 1) ? argv[1] : "";
+  app.port = (argc > 2) ? argv[2] : "amqp";
+
+  /* Create the proactor and connect */
+  app.proactor = pn_proactor();
+  app.listener = pn_listener();
+  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+  pn_proactor_listen(app.proactor, app.listener, addr, 16);
+  run(&app);
+  pn_proactor_free(app.proactor);
+  return exit_code;
+}
diff --git a/c/include/proton/event.h b/c/include/proton/event.h
index 8e5fba2..9562e78 100644
--- a/c/include/proton/event.h
+++ b/c/include/proton/event.h
@@ -347,7 +347,120 @@ typedef enum {
    * The listener is listening.
    * Events of this type point to the @ref pn_listener_t.
    */
-  PN_LISTENER_OPEN
+  PN_LISTENER_OPEN,
+
+  /**
+   * The raw connection connected.
+   * Now would be a good time to give the raw connection some buffers
+   * to read bytes from the underlying socket. If you don't do it
+   * now you will get @ref PN_RAW_CONNECTION_NEED_READ_BUFFERS (and
+   * @ref PN_RAW_CONNECTION_NEED_WRITE_BUFFERS) events when the socket is readable
+   * (or writable) but there are not buffers available.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_CONNECTED,
+
+  /**
+   * The remote end of the raw connection closed the connection so
+   * that we can no longer read.
+   *
+   * When both this and @ref PN_RAW_CONNECTION_CLOSED_WRITE event have
+   * occurred then the @ref PN_RAW_CONNECTION_DISCONNECTED event will be
+   * generated.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_CLOSED_READ,
+
+  /**
+   * The remote end of the raw connection closed the connection so
+   * that we can no longer write.
+   *
+   * When both this and @ref PN_RAW_CONNECTION_CLOSED_READ event have
+   * occurred then the @ref PN_RAW_CONNECTION_DISCONNECTED event will be
+   * generated.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_CLOSED_WRITE,
+
+  /**
+   * The raw connection is disconnected.
+   * No more bytes will be read or written on the connection. Every buffer
+   * in use will already either have been returned using a
+   * @ref PN_RAW_CONNECTION_READ or @ref PN_RAW_CONNECTION_WRITTEN event.
+   * This event will always be the last for this raw connection, and so
+   * the application can clean up the raw connection at this point.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_DISCONNECTED,
+
+  /**
+   * The raw connection might need more read buffers.
+   * The connection is readable, but the connection has no buffer to read the
+   * bytes into. If you supply some buffers now maybe you'll get a
+   * @ref PN_RAW_CONNECTION_READ event soon, but no guarantees.
+   *
+   * This event is edge triggered and you will only get it once until you give
+   * the raw connection some more read buffers.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_NEED_READ_BUFFERS,
+
+  /**
+   * The raw connection might need more write buffers.
+   * The connection is writable but has no buffers to write. If you give the
+   * raw connection something to write using @ref pn_raw_connection_write_buffers
+   * the raw connection can write them. It is not necessary to wait for this event
+   * before sending buffers to write, but it can be used to aid in flow control (maybe).
+   *
+   * This event is edge triggered and you will only get it once until you give
+   * the raw connection something more to write.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_NEED_WRITE_BUFFERS,
+
+  /**
+   * The raw connection read bytes: The bytes that were read are
+   * in one of the read buffers given to the raw connection.
+   *
+   * This event will be sent if there are bytes that have been read
+   * in a buffer owned by the raw connection and there is no
+   * @ref PN_RAW_CONNECTION_READ event still queued.
+   *
+   * When a connection closes all read buffers are returned to the
+   * application using @ref PN_RAW_CONNECTION_READ events with empty buffers.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_READ,
+
+  /**
+   * The raw connection has finished a write and the buffers that were
+   * used are no longer in use and can be recycled.
+   *
+   * This event will be sent if there are buffers that have been written still
+   * owned by the raw connection and there is no @ref PN_RAW_CONNECTION_WRITTEN
+   * event currently queued.
+   *
+   * When a connection closes all write buffers are returned using
+   * @ref PN_RAW_CONNECTION_WRITTEN events.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_WRITTEN,
+
+  /**
+   * The raw connection was woken by @ref pn_raw_connection_wake
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_WAKE
+
 } pn_event_type_t;
 
 
diff --git a/c/include/proton/listener.h b/c/include/proton/listener.h
index e4bbf35..5d2cfe2 100644
--- a/c/include/proton/listener.h
+++ b/c/include/proton/listener.h
@@ -135,6 +135,25 @@ PNP_EXTERN pn_proactor_t *pn_listener_proactor(pn_listener_t *c);
 PNP_EXTERN pn_listener_t *pn_event_listener(pn_event_t *event);
 
 /**
+ * Accept an incoming connection request as a raw connection.
+ *
+ * Call after a @ref PN_LISTENER_ACCEPT event.
+ *
+ * Errors are returned as @ref PN_RAW_CONNECTION_DISCONNECTED by pn_proactor_wait().
+ *
+ * @param[in] listener the listener
+ * @param[in] raw_connection the application must create a raw connection with pn_raw_connection()
+ * this parameter cannot be null.If NULL a new connection is created.
+ *
+ * The proactor that owns the @p listener *takes ownership* of @p raw_connection and will
+ * automatically call pn_raw_connection_free() after the final @ref
+ * PN_RAW_CONNECTION_DISCONNECTED event is handled, or when pn_proactor_free() is
+ * called.
+ *
+ */
+PNP_EXTERN void pn_listener_raw_accept(pn_listener_t *listener, pn_raw_connection_t *raw_connection);
+
+/**
  *@}
  */
 
diff --git a/c/include/proton/proactor.h b/c/include/proton/proactor.h
index b0303c9..93b9c89 100644
--- a/c/include/proton/proactor.h
+++ b/c/include/proton/proactor.h
@@ -341,6 +341,29 @@ PNP_EXTERN pn_millis_t pn_proactor_now(void);
 PNP_EXTERN int64_t pn_proactor_now_64(void);
 
 /**
+ * Connect @p addr and bind to @p raw_connection.
+ *
+ * Errors are returned as  @ref PN_RAW_CONNECTION_DISCONNECTED events by pn_proactor_wait().
+ *
+ * @note Thread-safe
+ *
+ * @param[in] proactor the proactor
+ *
+ * @param[in] raw_connection the application must create a raw connection with pn_raw_connection()
+ * this parameter cannot be null.
+ *
+ * @p proactor *takes ownership* of @p raw_connection and will
+ * automatically call pn_raw_connection_free() after the final @ref
+ * PN_RAW_CONNECTION_DISCONNECTED event is handled, or when pn_proactor_free() is
+ * called.
+ *
+ * @param[in] addr the "host:port" network address, constructed by pn_proactor_addr()
+ * An empty host will connect to the local host via the default protocol (IPV6 or IPV4).
+ * An empty port will connect to the standard AMQP port (5672).
+ */
+PNP_EXTERN void pn_proactor_raw_connect(pn_proactor_t *proactor, pn_raw_connection_t *raw_connection, const char *addr);
+
+/**
  * @}
  */
 
@@ -391,6 +414,15 @@ PNP_EXTERN int64_t pn_proactor_now_64(void);
  * @ref PN_PROACTOR_TIMEOUT | @copybrief PN_PROACTOR_TIMEOUT
  * @ref PN_PROACTOR_INACTIVE | @copybrief PN_PROACTOR_INACTIVE
  * @ref PN_CONNECTION_WAKE | @copybrief PN_CONNECTION_WAKE
+ * @ref PN_RAW_CONNECTION_CONNECTED | @copybrief PN_RAW_CONNECTION_CONNECTED
+ * @ref PN_RAW_CONNECTION_CLOSED_READ | @copybrief PN_RAW_CONNECTION_CLOSED_READ
+ * @ref PN_RAW_CONNECTION_CLOSED_WRITE | @copybrief PN_RAW_CONNECTION_CLOSED_WRITE
+ * @ref PN_RAW_CONNECTION_DISCONNECTED | @copybrief PN_RAW_CONNECTION_DISCONNECTED
+ * @ref PN_RAW_CONNECTION_NEED_READ_BUFFERS | @copybrief PN_RAW_CONNECTION_NEED_READ_BUFFERS
+ * @ref PN_RAW_CONNECTION_NEED_WRITE_BUFFERS | @copybrief PN_RAW_CONNECTION_NEED_WRITE_BUFFERS
+ * @ref PN_RAW_CONNECTION_READ | @copybrief PN_RAW_CONNECTION_READ
+ * @ref PN_RAW_CONNECTION_WRITTEN | @copybrief PN_RAW_CONNECTION_WRITTEN
+ * @ref PN_RAW_CONNECTION_WAKE | @copybrief PN_RAW_CONNECTION_WAKE
  *
  * @}
  */
diff --git a/c/include/proton/raw_connection.h b/c/include/proton/raw_connection.h
new file mode 100644
index 0000000..a483665
--- /dev/null
+++ b/c/include/proton/raw_connection.h
@@ -0,0 +1,265 @@
+#ifndef PROTON_RAW_CONNECTION_H
+#define PROTON_RAW_CONNECTION_H 1
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/condition.h>
+#include <proton/event.h>
+#include <proton/import_export.h>
+#include <proton/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @file
+ *
+ * @addtogroup raw_connection
+ * @{
+ */
+
+/**
+ * A descriptor used to represent a single raw buffer in memory.
+ *
+ * @note The intent of the offset is to allow the actual bytes being read/written to be at a variable
+ * location relative to the head of the buffer because of other data or structures that are important to the application
+ * associated with the data to be written but not themselves read/written to the connection.
+ *
+ * @note For read buffers: When read buffers are returned to the application size will be the number of bytes read.
+ * Read operations will not change the context, bytes or capacity members of the structure.
+ *
+ * @note For write buffers: When write buffers are returned to the application all of the struct members will be returned
+ * unaltered. Also write operations will not modify the bytes of the buffer passed in at all. In principle this means that
+ * the write buffer can be used for multiple writes at the same time as long as the actual buffer is unmodified by the
+ * application at any time the buffer is being used by any raw connection.
+ */
+typedef struct pn_raw_buffer_t {
+  uintptr_t context; /**< Used to associate arbitrary application data with this raw buffer */
+  char *bytes; /**< Pointer to the start of the raw buffer, if this is null then no buffer is represented */
+  uint32_t capacity; /**< Count of available bytes starting at @ref bytes */
+  uint32_t size; /**< Number of bytes read or to be written starting at @ref offset */
+  uint32_t offset; /**< First byte in the buffer to be read or written */
+} pn_raw_buffer_t;
+
+/**
+ * Create a new raw connection for use with the @ref proactor.
+ * See @ref pn_proactor_raw_connect and @ref pn_listener_raw_accept.
+ *
+ * @return A newly allocated pn_connection_t or NULL if there wasn't sufficient memory.
+ *
+ * @note This is the only pn_connection_t function that allocates memory. So an application that
+ * wants good control of out of memory conditions should check the return value for NULL.
+ *
+ * @note It would be a good practice is to create a raw connection and attach application
+ * specific context to it before giving it to the proactor.
+ *
+ * @note There is no way to free a pn_connection_t as once passed to the proactor the proactor owns
+ * it and controls its lifecycle.
+ */
+PNP_EXTERN pn_raw_connection_t *pn_raw_connection(void);
+
+/**
+ * Get the local address of a raw connection. Return `NULL` if not available.
+ * Pointer is invalid after the transport closes (@ref PN_RAW_CONNECTION_DISCONNECTED event is handled)
+ */
+PNP_EXTERN const struct pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *connection);
+
+/**
+ * Get the local address of a raw connection. Return `NULL` if not available.
+ * Pointer is invalid after the transport closes (@ref PN_RAW_CONNECTION_DISCONNECTED event is handled)
+ */
+PNP_EXTERN const struct pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *connection);
+
+/**
+ * Close a raw connection.
+ * This will close the underlying socket and release all buffers held by the raw connection.
+ * It will cause @ref PN_RAW_CONNECTION_READ and @ref PN_RAW_CONNECTION_WRITTEN to be emitted so
+ * the application can clean up buffers given to the raw connection. After that a
+ * @ref PN_RAW_CONNECTION_DISCONNECTED event will be emitted to allow the application to clean up
+ * any other state held by the raw connection.
+ *
+ */
+PNP_EXTERN void pn_raw_connection_close(pn_raw_connection_t *connection);
+
+/**
+ * Query the raw connection for how many more read buffers it can be given
+ */
+PNP_EXTERN size_t pn_raw_connection_read_buffers_capacity(pn_raw_connection_t *connection);
+
+/**
+ * Query the raw connection for how many more write buffers it can be given
+ */
+PNP_EXTERN size_t pn_raw_connection_write_buffers_capacity(pn_raw_connection_t *connection);
+
+/**
+ * Give the raw connection buffers to use for reading from the underlying socket.
+ * If the raw socket has no read buffers then the application will never receive
+ * a @ref PN_RAW_CONNECTION_READ event.
+ *
+ * A @ref PN_RAW_CONNECTION_NEED_READ_BUFFERS event will be generated immediately after
+ * the @ref PN_RAW_CONNECTION_CONNECTED event if there are no read buffers. It will also be
+ * generated whenever the raw connection runs out of read buffers. In both these cases the
+ * event will not be generated again until @ref pn_raw_connection_give_read_buffers is called.
+ *
+ * @return the number of buffers actually given to the raw connection. This will only be different
+ * from the number supplied if the connection has no more space to record more buffers. In this case
+ * the buffers taken will be the earlier buffers in the array supplied, the elements 0 to the
+ * returned value-1.
+ *
+ * @note The buffers given to the raw connection are owned by it until the application
+ * receives a @ref PN_RAW_CONNECTION_READ event giving them back to the application. They must
+ * not be accessed at all (written or even read) from calling @ref pn_raw_connection_give_read_buffers
+ * until receiving this event.
+ *
+ * @note The application should not assume that the @ref PN_RAW_CONNECTION_NEED_READ_BUFFERS
+ * event signifies that the connection is readable.
+ */
+PNP_EXTERN size_t pn_raw_connection_give_read_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t const *buffers, size_t num);
+
+/**
+ * Fetch buffers with bytes read from the raw socket
+ *
+ * @param[out] buffers pointer to an array of @ref pn_raw_buffer_t structures which will be filled in with the read buffer information
+ * @param[in] num the number of buffers allocated in the passed in array of buffers
+ * @return the number of buffers being returned, if there is are no read bytes then this will be 0. As many buffers will be returned
+ * as can be given the number that are passed in. So if the number returned is less than the number passed in there are no more buffers
+ * read. But if the number is the same there may be more read buffers to take.
+ *
+ * @note After the application receives @ref PN_RAW_CONNECTION_READ there should be bytes read from the socket and
+ * hence this call should return buffers. It is safe to carry on calling @ref pn_raw_connection_take_read_buffers
+ * until it returns 0.
+ */
+PNP_EXTERN size_t pn_raw_connection_take_read_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t *buffers, size_t num);
+
+/**
+ * Give the raw connection buffers to write to the underlying socket.
+ *
+ * A @ref PN_RAW_CONNECTION_WRITTEN event will be generated once the buffers have been written to the socket
+ * until this point the buffers must not be accessed at all (written or even read).
+ *
+ * A @ref PN_RAW_CONNECTION_NEED_WRITE_BUFFERS event will be generated immediately after
+ * the @ref PN_RAW_CONNECTION_CONNECTED event if there are no write buffers. It will also be
+ * generated whenever the raw connection finishes writing all the write buffers. In both these cases the
+ * event will not be generated again until @ref pn_raw_connection_write_buffers is called.
+ *
+ * @return the number of buffers actually recorded by the raw connection to write. This will only be different
+ * from the number supplied if the connection has no more space to record more buffers. In this case
+ * the buffers recorded will be the earlier buffers in the array supplied, the elements 0 to the
+ * returned value-1.
+ *
+ */
+PNP_EXTERN size_t pn_raw_connection_write_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t const *buffers, size_t num);
+
+/**
+ * Return a buffer chain with buffers that have all been written to the raw socket
+ *
+ * @param[out] buffers pointer to an array of @ref pn_raw_buffer_t structures which will be filled in with the written buffer information
+ * @param[in] num the number of buffers allocated in the passed in array of buffers
+ * @return the number of buffers being returned, if there is are no written buffers to return then this will be 0. As many buffers will be returned
+ * as can be given the number that are passed in. So if the number returned is less than the number passed in there are no more buffers
+ * written. But if the number is the same there may be more written buffers to take.
+ *
+ * @note After the application receives @ref PN_RAW_CONNECTION_WRITTEN there should be bytes written to the socket and
+ * hence this call should return buffers. It is safe to carry on calling @ref pn_raw_connection_take_written_buffers
+ * until it returns 0.
+ */
+PNP_EXTERN size_t pn_raw_connection_take_written_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t *buffers, size_t num);
+
+/**
+ * Is @p connection closed for read?
+ *
+ * @return true if the raw connection is closed for read.
+ */
+PNP_EXTERN bool pn_raw_connection_is_read_closed(pn_raw_connection_t *connection);
+
+/**
+ * Is @p connection closed for write?
+ *
+ * @return true if the raw connection is closed for write.
+ */
+PNP_EXTERN bool pn_raw_connection_is_write_closed(pn_raw_connection_t *connection);
+
+/**
+ * Return a @ref PN_RAW_CONNECTION_WAKE event for @p connection as soon as possible.
+ *
+ * At least one wake event will be returned, serialized with other @ref proactor_events
+ * for the same raw connection.  Wakes can be "coalesced" - if several
+ * @ref pn_raw_connection_wake() calls happen close together, there may be only one
+ * @ref PN_RAW_CONNECTION_WAKE event that occurs after all of them.
+ *
+ * @note Thread-safe
+ */
+PNP_EXTERN void pn_raw_connection_wake(pn_raw_connection_t *connection);
+
+/**
+ * Get additional information about a raw connection error.
+ * There is a raw connection error if the @ref PN_RAW_CONNECTION_DISCONNECTED event
+ * is received and the pn_condition_t associated is non null (@see pn_condition_is_set).
+ *
+ * The value returned is only valid until the end of handler for the
+ * @ref PN_RAW_CONNECTION_DISCONNECTED event.
+ */
+PNP_EXTERN pn_condition_t *pn_raw_connection_condition(pn_raw_connection_t *connection);
+
+/**
+ * Get the application context associated with this raw connection.
+ *
+ * The application context for a raw connection may be set using
+ * ::pn_raw_connection_set_context.
+ *
+ * @param[in] connection the raw connection whose context is to be returned.
+ * @return the application context for the raw connection
+ */
+PNP_EXTERN void *pn_raw_connection_get_context(pn_raw_connection_t *connection);
+
+/**
+ * Set a new application context for a raw connection.
+ *
+ * The application context for a raw connection may be retrieved
+ * using ::pn_raw_connection_get_context.
+ *
+ * @param[in] connection the raw connection object
+ * @param[in] context the application context
+ */
+PNP_EXTERN void pn_raw_connection_set_context(pn_raw_connection_t *connection, void *context);
+
+/**
+ * Get the attachments that are associated with a raw connection.
+ */
+PNP_EXTERN pn_record_t *pn_raw_connection_attachments(pn_raw_connection_t *connection);
+
+/**
+ * Return the raw connection associated with an event.
+ *
+ * @return NULL if the event is not associated with a raw connection.
+ */
+PNP_EXTERN pn_raw_connection_t *pn_event_raw_connection(pn_event_t *event);
+
+
+/**
+ * @}
+ */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* raw_connection.h */
diff --git a/c/include/proton/types.h b/c/include/proton/types.h
index edcca98..f4f496e 100644
--- a/c/include/proton/types.h
+++ b/c/include/proton/types.h
@@ -115,6 +115,10 @@
  * @brief **Unsettled API** - A listener for incoming connections.
  * @ingroup io
  *
+ * @defgroup raw_connection Raw connection
+ * @brief **Unsettled API** - An API allowing raw sockets to be used with proactor
+ * @ingroup io
+ *
  * @defgroup connection_driver Connection driver
  * @brief **Unsettled API** - An API for low-level IO integration.
  * @ingroup io
@@ -438,6 +442,13 @@ typedef struct pn_transport_t pn_transport_t;
 typedef struct pn_proactor_t pn_proactor_t;
 
 /**
+ * A raw network connection used with the proactor.
+ *
+ * @ingroup raw_connection
+ */
+typedef struct pn_raw_connection_t pn_raw_connection_t;
+
+/**
  * A batch of events that must be handled in sequence.
  *
  * A pn_event_batch_t encapsulates potentially multiple events that relate
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 5170aa6..5c07e28 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -70,6 +70,7 @@
 #include <proton/transport.h>
 #include <proton/listener.h>
 #include <proton/netaddr.h>
+#include <proton/raw_connection.h>
 
 #include <assert.h>
 #include <stddef.h>
diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c
index 1c87f94..dd33b41 100644
--- a/c/src/proactor/libuv.c
+++ b/c/src/proactor/libuv.c
@@ -34,6 +34,7 @@
 #include <proton/message.h>
 #include <proton/netaddr.h>
 #include <proton/proactor.h>
+#include <proton/raw_connection.h>
 #include <proton/transport.h>
 
 #include <uv.h>
diff --git a/c/src/proactor/win_iocp.cpp b/c/src/proactor/win_iocp.cpp
index 764a7c4..348564c 100644
--- a/c/src/proactor/win_iocp.cpp
+++ b/c/src/proactor/win_iocp.cpp
@@ -29,6 +29,7 @@
 #include <proton/transport.h>
 #include <proton/listener.h>
 #include <proton/proactor.h>
+#include <proton/raw_connection.h>
 
 #include <assert.h>
 #include <stddef.h>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org