You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2020/08/12 15:34:11 UTC
[qpid-proton] 01/05: PROTON-2247: Raw connections API for the
proactor - API defined in header files - Simple test applicationis derived
from the direct.c example
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit a2528bb4e1bb37a0d2d765ed0b261ee2e8a56471
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Thu Jan 23 15:46:15 2020 -0500
PROTON-2247: Raw connections API for the proactor
- API defined in header files
- Simple test applicationis derived from the direct.c example
---
c/CMakeLists.txt | 1 +
c/docs/user.doxygen.in | 1 +
c/examples/CMakeLists.txt | 2 +-
c/examples/raw_connect.c | 256 ++++++++++++++++++++++++++++++++++++
c/examples/raw_echo.c | 240 ++++++++++++++++++++++++++++++++++
c/include/proton/event.h | 115 ++++++++++++++++-
c/include/proton/listener.h | 19 +++
c/include/proton/proactor.h | 32 +++++
c/include/proton/raw_connection.h | 265 ++++++++++++++++++++++++++++++++++++++
c/include/proton/types.h | 11 ++
c/src/proactor/epoll.c | 1 +
c/src/proactor/libuv.c | 1 +
c/src/proactor/win_iocp.cpp | 1 +
13 files changed, 943 insertions(+), 2 deletions(-)
diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt
index 17495a5..b686c42 100644
--- a/c/CMakeLists.txt
+++ b/c/CMakeLists.txt
@@ -302,6 +302,7 @@ set (qpid-proton-include
include/proton/netaddr.h
include/proton/object.h
include/proton/proactor.h
+ include/proton/raw_connection.h
include/proton/sasl.h
include/proton/sasl-plugin.h
include/proton/session.h
diff --git a/c/docs/user.doxygen.in b/c/docs/user.doxygen.in
index ae6894a..8034e6c 100644
--- a/c/docs/user.doxygen.in
+++ b/c/docs/user.doxygen.in
@@ -33,6 +33,7 @@ INLINE_SIMPLE_STRUCTS = YES
HIDE_UNDOC_CLASSES = YES
HIDE_COMPOUND_REFERENCE = YES
HIDE_SCOPE_NAMES = YES
+TYPEDEF_HIDES_STRUCT = YES
MAX_INITIALIZER_LINES = 0
ALPHABETICAL_INDEX = NO
SORT_MEMBER_DOCS = NO
diff --git a/c/examples/CMakeLists.txt b/c/examples/CMakeLists.txt
index 9771ec5..ec29888 100644
--- a/c/examples/CMakeLists.txt
+++ b/c/examples/CMakeLists.txt
@@ -26,7 +26,7 @@ find_package(Threads REQUIRED)
include_directories(${Proton_INCLUDE_DIRS})
add_definitions(${Proton_DEFINITIONS})
-foreach (name broker send receive direct send-abort send-ssl)
+foreach (name broker send receive direct send-abort send-ssl raw_echo raw_connect)
add_executable(c-${name} ${name}.c)
target_link_libraries(c-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
set_target_properties(c-${name} PROPERTIES
diff --git a/c/examples/raw_connect.c b/c/examples/raw_connect.c
new file mode 100644
index 0000000..d38a3d7
--- /dev/null
+++ b/c/examples/raw_connect.c
@@ -0,0 +1,256 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/condition.h>
+#include <proton/raw_connection.h>
+#include <proton/listener.h>
+#include <proton/netaddr.h>
+#include <proton/proactor.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+typedef struct app_data_t {
+ const char *host, *port;
+ const char *amqp_address;
+
+ pn_proactor_t *proactor;
+ pn_listener_t *listener;
+
+ int connects;
+ int disconnects;
+
+ /* Sender values */
+
+ /* Receiver values */
+} app_data_t;
+
+typedef struct connection_data_t {
+ bool sender;
+} connection_data_t;
+
+static int exit_code = 0;
+
+/* Close the connection and the listener so so we will get a
+ * PN_PROACTOR_INACTIVE event and exit, once all outstanding events
+ * are processed.
+ */
+static void close_all(pn_raw_connection_t *c, app_data_t *app) {
+ if (c) pn_raw_connection_close(c);
+ if (app->listener) pn_listener_close(app->listener);
+}
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app) {
+ if (pn_condition_is_set(cond)) {
+ fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+ pn_condition_get_name(cond), pn_condition_get_description(cond));
+ close_all(pn_event_raw_connection(e), app);
+ exit_code = 1;
+ }
+}
+
+static void send_message(pn_raw_connection_t *c, const char* msg) {
+ pn_raw_buffer_t buffer;
+ uint32_t len = strlen(msg);
+ char *buf = (char*) malloc(1024);
+ memcpy(buf, msg, len);
+ buffer.bytes = buf;
+ buffer.capacity = 1024;
+ buffer.offset = 0;
+ buffer.size = len;
+ pn_raw_connection_write_buffers(c, &buffer, 1);
+}
+
+static void recv_message(pn_raw_buffer_t buf) {
+ fwrite(buf.bytes, buf.size, 1, stdout);
+}
+
+connection_data_t *make_receiver_data(void) {
+ connection_data_t *cd = (connection_data_t*) malloc(sizeof(connection_data_t));
+ cd->sender = false;
+ return cd;
+}
+
+connection_data_t *make_sender_data(void) {
+ connection_data_t *cd = (connection_data_t*) malloc(sizeof(connection_data_t));
+ cd->sender = true;
+ return cd;
+}
+
+#define READ_BUFFERS 4
+
+/* This function handles events when we are acting as the receiver */
+static void handle_receive(app_data_t *app, pn_event_t* event) {
+ switch (pn_event_type(event)) {
+
+ case PN_RAW_CONNECTION_CONNECTED: {
+ pn_raw_connection_t *c = pn_event_raw_connection(event);
+ pn_raw_buffer_t buffers[READ_BUFFERS] = {{0}};
+ int i = READ_BUFFERS;
+ for (; i; --i) {
+ pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
+ buff->bytes = (char*) malloc(1024);
+ buff->capacity = 1024;
+ buff->size = 0;
+ buff->offset = 0;
+ }
+ pn_raw_connection_give_read_buffers(c, buffers, READ_BUFFERS);
+ } break;
+
+ case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
+ } break;
+
+ default:
+ break;
+ }
+}
+
+#define WRITE_BUFFERS 4
+
+/* This function handles events when we are acting as the sender */
+static void handle_send(app_data_t* app, pn_event_t* event) {
+ switch (pn_event_type(event)) {
+
+ case PN_RAW_CONNECTION_CONNECTED: {
+ printf("**raw connection connected\n");
+ app->connects++;
+ } break;
+
+ case PN_RAW_CONNECTION_DISCONNECTED: {
+ pn_raw_connection_t *c = pn_event_raw_connection(event);
+ connection_data_t *cd = (connection_data_t*) pn_raw_connection_get_context(c);
+ free(cd);
+ printf("**raw connection disconnected\n");
+ app->disconnects++;
+ check_condition(event, pn_raw_connection_condition(c), app);
+ } break;
+
+ case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
+ pn_raw_connection_t *c = pn_event_raw_connection(event);
+ char line[120];
+ if (fgets(line, sizeof(line), stdin)) {
+ send_message(c, line);
+ } else {
+ pn_raw_connection_close(c);
+ }
+ } break;
+
+ /* This path handles both received bytes and freeing buffers at close */
+ case PN_RAW_CONNECTION_READ: {
+ pn_raw_connection_t *c = pn_event_raw_connection(event);
+ pn_raw_buffer_t buffs[READ_BUFFERS];
+ size_t n;
+ while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) {
+ unsigned i;
+ for (i=0; i<n && buffs[i].bytes; ++i) {
+ recv_message(buffs[i]);
+ free(buffs[i].bytes);
+ }
+ }
+ } break;
+
+ case PN_RAW_CONNECTION_CLOSED_READ: {
+ pn_raw_connection_t *c = pn_event_raw_connection(event);
+ pn_raw_connection_close(c);
+ } break;
+
+ case PN_RAW_CONNECTION_WRITTEN: {
+ pn_raw_connection_t *c = pn_event_raw_connection(event);
+ pn_raw_buffer_t buffs[READ_BUFFERS];
+ size_t n;
+ while ( (n = pn_raw_connection_take_written_buffers(c, buffs, READ_BUFFERS)) ) {
+ if (!pn_raw_connection_is_read_closed(c)) {
+ pn_raw_connection_give_read_buffers(c, buffs, n);
+ } else {
+ unsigned i;
+ for (i=0; i<n && buffs[i].bytes; ++i) {
+ free(buffs[i].bytes);
+ }
+ }
+ };
+ } break;
+
+ default:
+ break;
+ }
+}
+
+/* Handle all events, delegate to handle_send or handle_receive
+ Return true to continue, false to exit
+*/
+static bool handle(app_data_t* app, pn_event_t* event) {
+ switch (pn_event_type(event)) {
+
+ case PN_PROACTOR_TIMEOUT: {
+ } break;
+
+ case PN_PROACTOR_INACTIVE: {
+ return false;
+ } break;
+
+ default: {
+ pn_raw_connection_t *c = pn_event_raw_connection(event);
+ if (c) {
+ connection_data_t *cd = (connection_data_t*) pn_raw_connection_get_context(c);
+ if (cd && cd->sender) {
+ handle_send(app, event);
+ } else {
+ handle_receive(app, event);
+ }
+ }
+ }
+ }
+ return exit_code == 0;
+}
+
+void run(app_data_t *app) {
+ /* Loop and handle events */
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+ pn_event_t *e;
+ for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+ if (!handle(app, e)) {
+ return;
+ }
+ }
+ pn_proactor_done(app->proactor, events);
+ } while(true);
+}
+
+int main(int argc, char **argv) {
+ struct app_data_t app = {0};
+ char addr[PN_MAX_ADDR];
+ pn_raw_connection_t *c = pn_raw_connection();
+ connection_data_t *cd = make_sender_data();
+
+ app.host = (argc > 1) ? argv[1] : "";
+ app.port = (argc > 2) ? argv[2] : "amqp";
+
+ /* Create the proactor and connect */
+ app.proactor = pn_proactor();
+ pn_raw_connection_set_context(c, cd);
+ pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+ pn_proactor_raw_connect(app.proactor, c, addr);
+ run(&app);
+ pn_proactor_free(app.proactor);
+ return exit_code;
+}
diff --git a/c/examples/raw_echo.c b/c/examples/raw_echo.c
new file mode 100644
index 0000000..53fd47b
--- /dev/null
+++ b/c/examples/raw_echo.c
@@ -0,0 +1,240 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/condition.h>
+#include <proton/raw_connection.h>
+#include <proton/listener.h>
+#include <proton/netaddr.h>
+#include <proton/proactor.h>
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+typedef struct app_data_t {
+ const char *host, *port;
+ const char *amqp_address;
+
+ pn_proactor_t *proactor;
+ pn_listener_t *listener;
+
+ int connects;
+ int disconnects;
+
+ /* Sender values */
+
+ /* Receiver values */
+} app_data_t;
+
+static int exit_code = 0;
+
+/* Close the connection and the listener so so we will get a
+ * PN_PROACTOR_INACTIVE event and exit, once all outstanding events
+ * are processed.
+ */
+static void close_all(pn_raw_connection_t *c, app_data_t *app) {
+ if (c) pn_raw_connection_close(c);
+ if (app->listener) pn_listener_close(app->listener);
+}
+
+static bool check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app) {
+ if (pn_condition_is_set(cond)) {
+ fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+ pn_condition_get_name(cond), pn_condition_get_description(cond));
+ return true;
+ }
+
+ return false;
+}
+
+static void check_condition_fatal(pn_event_t *e, pn_condition_t *cond, app_data_t *app) {
+ if (check_condition(e, cond, app)) {
+ close_all(pn_event_raw_connection(e), app);
+ exit_code = 1;
+ }
+}
+
+static void recv_message(pn_raw_buffer_t buf) {
+ fwrite(buf.bytes, buf.size, 1, stdout);
+}
+
+void *make_receiver_data(void) {
+ return NULL;
+}
+
+#define READ_BUFFERS 4
+
+/* This function handles events when we are acting as the receiver */
+static void handle_receive(app_data_t *app, pn_event_t* event) {
+ switch (pn_event_type(event)) {
+
+ case PN_RAW_CONNECTION_CONNECTED: {
+ pn_raw_connection_t *c = pn_event_raw_connection(event);
+ pn_raw_buffer_t buffers[READ_BUFFERS] = {{0}};
+ int i = READ_BUFFERS;
+ printf("**raw connection connected\n");
+ app->connects++;
+ for (; i; --i) {
+ pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
+ buff->bytes = (char*) malloc(1024);
+ buff->capacity = 1024;
+ buff->size = 0;
+ buff->offset = 0;
+ }
+ pn_raw_connection_give_read_buffers(c, buffers, READ_BUFFERS);
+ } break;
+
+ case PN_RAW_CONNECTION_DISCONNECTED: {
+ pn_raw_connection_t *c = pn_event_raw_connection(event);
+ void *cd = pn_raw_connection_get_context(c);
+ free(cd);
+ printf("**raw connection disconnected\n");
+ app->disconnects++;
+ check_condition(event, pn_raw_connection_condition(c), app);
+ } break;
+
+ case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
+ } break;
+
+ /* This path handles both received bytes and freeing buffers at close */
+ case PN_RAW_CONNECTION_READ: {
+ pn_raw_connection_t *c = pn_event_raw_connection(event);
+ pn_raw_buffer_t buffs[READ_BUFFERS];
+ size_t n;
+ while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) {
+ unsigned i;
+ for (i=0; i<n && buffs[i].bytes; ++i) {
+ recv_message(buffs[i]);
+ }
+
+ if (!pn_raw_connection_is_write_closed(c)) {
+ pn_raw_connection_write_buffers(c, buffs, n);
+ } else if (!pn_raw_connection_is_read_closed(c)) {
+ pn_raw_connection_give_read_buffers(c, buffs, n);
+ } else {
+ unsigned i;
+ for (i=0; i<n && buffs[i].bytes; ++i) {
+ free(buffs[i].bytes);
+ }
+ }
+ }
+ } break;
+ case PN_RAW_CONNECTION_CLOSED_WRITE:
+ case PN_RAW_CONNECTION_CLOSED_READ: {
+ pn_raw_connection_t *c = pn_event_raw_connection(event);
+ pn_raw_connection_close(c);
+ } break;
+ case PN_RAW_CONNECTION_WRITTEN: {
+ pn_raw_connection_t *c = pn_event_raw_connection(event);
+ pn_raw_buffer_t buffs[READ_BUFFERS];
+ size_t n;
+ while ( (n = pn_raw_connection_take_written_buffers(c, buffs, READ_BUFFERS)) ) {
+ if (!pn_raw_connection_is_read_closed(c)) {
+ pn_raw_connection_give_read_buffers(c, buffs, n);
+ } else {
+ unsigned i;
+ for (i=0; i<n && buffs[i].bytes; ++i) {
+ free(buffs[i].bytes);
+ }
+ }
+ };
+ } break;
+ default:
+ break;
+ }
+}
+
+#define WRITE_BUFFERS 4
+
+/* Handle all events, delegate to handle_send or handle_receive
+ Return true to continue, false to exit
+*/
+static bool handle(app_data_t* app, pn_event_t* event) {
+ switch (pn_event_type(event)) {
+
+ case PN_LISTENER_OPEN: {
+ char port[256]; /* Get the listening port */
+ pn_netaddr_host_port(pn_listener_addr(pn_event_listener(event)), NULL, 0, port, sizeof(port));
+ printf("**listening on %s\n", port);
+ fflush(stdout);
+ break;
+ }
+ case PN_LISTENER_ACCEPT: {
+ pn_raw_connection_t *c = pn_raw_connection();
+ void *cd = make_receiver_data();
+ pn_raw_connection_set_context(c, cd);
+ pn_listener_raw_accept(pn_event_listener(event), c);
+
+ if (app->connects>2) pn_listener_close(app->listener);
+ } break;
+
+ case PN_LISTENER_CLOSE: {
+ app->listener = NULL; /* Listener is closed */
+ check_condition_fatal(event, pn_listener_condition(pn_event_listener(event)), app);
+ } break;
+
+ case PN_PROACTOR_TIMEOUT: {
+ } break;
+
+ case PN_PROACTOR_INACTIVE: {
+ return false;
+ } break;
+
+ default: {
+ pn_raw_connection_t *c = pn_event_raw_connection(event);
+ if (c) {
+ handle_receive(app, event);
+ }
+ }
+ }
+ return exit_code == 0;
+}
+
+void run(app_data_t *app) {
+ /* Loop and handle events */
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+ pn_event_t *e;
+ for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+ if (!handle(app, e)) {
+ return;
+ }
+ }
+ pn_proactor_done(app->proactor, events);
+ } while(true);
+}
+
+int main(int argc, char **argv) {
+ struct app_data_t app = {0};
+ char addr[PN_MAX_ADDR];
+ app.host = (argc > 1) ? argv[1] : "";
+ app.port = (argc > 2) ? argv[2] : "amqp";
+
+ /* Create the proactor and connect */
+ app.proactor = pn_proactor();
+ app.listener = pn_listener();
+ pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+ pn_proactor_listen(app.proactor, app.listener, addr, 16);
+ run(&app);
+ pn_proactor_free(app.proactor);
+ return exit_code;
+}
diff --git a/c/include/proton/event.h b/c/include/proton/event.h
index 8e5fba2..9562e78 100644
--- a/c/include/proton/event.h
+++ b/c/include/proton/event.h
@@ -347,7 +347,120 @@ typedef enum {
* The listener is listening.
* Events of this type point to the @ref pn_listener_t.
*/
- PN_LISTENER_OPEN
+ PN_LISTENER_OPEN,
+
+ /**
+ * The raw connection connected.
+ * Now would be a good time to give the raw connection some buffers
+ * to read bytes from the underlying socket. If you don't do it
+ * now you will get @ref PN_RAW_CONNECTION_NEED_READ_BUFFERS (and
+ * @ref PN_RAW_CONNECTION_NEED_WRITE_BUFFERS) events when the socket is readable
+ * (or writable) but there are not buffers available.
+ *
+ * Events of this type point to a @ref pn_raw_connection_t
+ */
+ PN_RAW_CONNECTION_CONNECTED,
+
+ /**
+ * The remote end of the raw connection closed the connection so
+ * that we can no longer read.
+ *
+ * When both this and @ref PN_RAW_CONNECTION_CLOSED_WRITE event have
+ * occurred then the @ref PN_RAW_CONNECTION_DISCONNECTED event will be
+ * generated.
+ *
+ * Events of this type point to a @ref pn_raw_connection_t
+ */
+ PN_RAW_CONNECTION_CLOSED_READ,
+
+ /**
+ * The remote end of the raw connection closed the connection so
+ * that we can no longer write.
+ *
+ * When both this and @ref PN_RAW_CONNECTION_CLOSED_READ event have
+ * occurred then the @ref PN_RAW_CONNECTION_DISCONNECTED event will be
+ * generated.
+ *
+ * Events of this type point to a @ref pn_raw_connection_t
+ */
+ PN_RAW_CONNECTION_CLOSED_WRITE,
+
+ /**
+ * The raw connection is disconnected.
+ * No more bytes will be read or written on the connection. Every buffer
+ * in use will already either have been returned using a
+ * @ref PN_RAW_CONNECTION_READ or @ref PN_RAW_CONNECTION_WRITTEN event.
+ * This event will always be the last for this raw connection, and so
+ * the application can clean up the raw connection at this point.
+ *
+ * Events of this type point to a @ref pn_raw_connection_t
+ */
+ PN_RAW_CONNECTION_DISCONNECTED,
+
+ /**
+ * The raw connection might need more read buffers.
+ * The connection is readable, but the connection has no buffer to read the
+ * bytes into. If you supply some buffers now maybe you'll get a
+ * @ref PN_RAW_CONNECTION_READ event soon, but no guarantees.
+ *
+ * This event is edge triggered and you will only get it once until you give
+ * the raw connection some more read buffers.
+ *
+ * Events of this type point to a @ref pn_raw_connection_t
+ */
+ PN_RAW_CONNECTION_NEED_READ_BUFFERS,
+
+ /**
+ * The raw connection might need more write buffers.
+ * The connection is writable but has no buffers to write. If you give the
+ * raw connection something to write using @ref pn_raw_connection_write_buffers
+ * the raw connection can write them. It is not necessary to wait for this event
+ * before sending buffers to write, but it can be used to aid in flow control (maybe).
+ *
+ * This event is edge triggered and you will only get it once until you give
+ * the raw connection something more to write.
+ *
+ * Events of this type point to a @ref pn_raw_connection_t
+ */
+ PN_RAW_CONNECTION_NEED_WRITE_BUFFERS,
+
+ /**
+ * The raw connection read bytes: The bytes that were read are
+ * in one of the read buffers given to the raw connection.
+ *
+ * This event will be sent if there are bytes that have been read
+ * in a buffer owned by the raw connection and there is no
+ * @ref PN_RAW_CONNECTION_READ event still queued.
+ *
+ * When a connection closes all read buffers are returned to the
+ * application using @ref PN_RAW_CONNECTION_READ events with empty buffers.
+ *
+ * Events of this type point to a @ref pn_raw_connection_t
+ */
+ PN_RAW_CONNECTION_READ,
+
+ /**
+ * The raw connection has finished a write and the buffers that were
+ * used are no longer in use and can be recycled.
+ *
+ * This event will be sent if there are buffers that have been written still
+ * owned by the raw connection and there is no @ref PN_RAW_CONNECTION_WRITTEN
+ * event currently queued.
+ *
+ * When a connection closes all write buffers are returned using
+ * @ref PN_RAW_CONNECTION_WRITTEN events.
+ *
+ * Events of this type point to a @ref pn_raw_connection_t
+ */
+ PN_RAW_CONNECTION_WRITTEN,
+
+ /**
+ * The raw connection was woken by @ref pn_raw_connection_wake
+ *
+ * Events of this type point to a @ref pn_raw_connection_t
+ */
+ PN_RAW_CONNECTION_WAKE
+
} pn_event_type_t;
diff --git a/c/include/proton/listener.h b/c/include/proton/listener.h
index e4bbf35..5d2cfe2 100644
--- a/c/include/proton/listener.h
+++ b/c/include/proton/listener.h
@@ -135,6 +135,25 @@ PNP_EXTERN pn_proactor_t *pn_listener_proactor(pn_listener_t *c);
PNP_EXTERN pn_listener_t *pn_event_listener(pn_event_t *event);
/**
+ * Accept an incoming connection request as a raw connection.
+ *
+ * Call after a @ref PN_LISTENER_ACCEPT event.
+ *
+ * Errors are returned as @ref PN_RAW_CONNECTION_DISCONNECTED by pn_proactor_wait().
+ *
+ * @param[in] listener the listener
+ * @param[in] raw_connection the application must create a raw connection with pn_raw_connection()
+ * this parameter cannot be null.If NULL a new connection is created.
+ *
+ * The proactor that owns the @p listener *takes ownership* of @p raw_connection and will
+ * automatically call pn_raw_connection_free() after the final @ref
+ * PN_RAW_CONNECTION_DISCONNECTED event is handled, or when pn_proactor_free() is
+ * called.
+ *
+ */
+PNP_EXTERN void pn_listener_raw_accept(pn_listener_t *listener, pn_raw_connection_t *raw_connection);
+
+/**
*@}
*/
diff --git a/c/include/proton/proactor.h b/c/include/proton/proactor.h
index b0303c9..93b9c89 100644
--- a/c/include/proton/proactor.h
+++ b/c/include/proton/proactor.h
@@ -341,6 +341,29 @@ PNP_EXTERN pn_millis_t pn_proactor_now(void);
PNP_EXTERN int64_t pn_proactor_now_64(void);
/**
+ * Connect @p addr and bind to @p raw_connection.
+ *
+ * Errors are returned as @ref PN_RAW_CONNECTION_DISCONNECTED events by pn_proactor_wait().
+ *
+ * @note Thread-safe
+ *
+ * @param[in] proactor the proactor
+ *
+ * @param[in] raw_connection the application must create a raw connection with pn_raw_connection()
+ * this parameter cannot be null.
+ *
+ * @p proactor *takes ownership* of @p raw_connection and will
+ * automatically call pn_raw_connection_free() after the final @ref
+ * PN_RAW_CONNECTION_DISCONNECTED event is handled, or when pn_proactor_free() is
+ * called.
+ *
+ * @param[in] addr the "host:port" network address, constructed by pn_proactor_addr()
+ * An empty host will connect to the local host via the default protocol (IPV6 or IPV4).
+ * An empty port will connect to the standard AMQP port (5672).
+ */
+PNP_EXTERN void pn_proactor_raw_connect(pn_proactor_t *proactor, pn_raw_connection_t *raw_connection, const char *addr);
+
+/**
* @}
*/
@@ -391,6 +414,15 @@ PNP_EXTERN int64_t pn_proactor_now_64(void);
* @ref PN_PROACTOR_TIMEOUT | @copybrief PN_PROACTOR_TIMEOUT
* @ref PN_PROACTOR_INACTIVE | @copybrief PN_PROACTOR_INACTIVE
* @ref PN_CONNECTION_WAKE | @copybrief PN_CONNECTION_WAKE
+ * @ref PN_RAW_CONNECTION_CONNECTED | @copybrief PN_RAW_CONNECTION_CONNECTED
+ * @ref PN_RAW_CONNECTION_CLOSED_READ | @copybrief PN_RAW_CONNECTION_CLOSED_READ
+ * @ref PN_RAW_CONNECTION_CLOSED_WRITE | @copybrief PN_RAW_CONNECTION_CLOSED_WRITE
+ * @ref PN_RAW_CONNECTION_DISCONNECTED | @copybrief PN_RAW_CONNECTION_DISCONNECTED
+ * @ref PN_RAW_CONNECTION_NEED_READ_BUFFERS | @copybrief PN_RAW_CONNECTION_NEED_READ_BUFFERS
+ * @ref PN_RAW_CONNECTION_NEED_WRITE_BUFFERS | @copybrief PN_RAW_CONNECTION_NEED_WRITE_BUFFERS
+ * @ref PN_RAW_CONNECTION_READ | @copybrief PN_RAW_CONNECTION_READ
+ * @ref PN_RAW_CONNECTION_WRITTEN | @copybrief PN_RAW_CONNECTION_WRITTEN
+ * @ref PN_RAW_CONNECTION_WAKE | @copybrief PN_RAW_CONNECTION_WAKE
*
* @}
*/
diff --git a/c/include/proton/raw_connection.h b/c/include/proton/raw_connection.h
new file mode 100644
index 0000000..a483665
--- /dev/null
+++ b/c/include/proton/raw_connection.h
@@ -0,0 +1,265 @@
+#ifndef PROTON_RAW_CONNECTION_H
+#define PROTON_RAW_CONNECTION_H 1
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/condition.h>
+#include <proton/event.h>
+#include <proton/import_export.h>
+#include <proton/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @file
+ *
+ * @addtogroup raw_connection
+ * @{
+ */
+
+/**
+ * A descriptor used to represent a single raw buffer in memory.
+ *
+ * @note The intent of the offset is to allow the actual bytes being read/written to be at a variable
+ * location relative to the head of the buffer because of other data or structures that are important to the application
+ * associated with the data to be written but not themselves read/written to the connection.
+ *
+ * @note For read buffers: When read buffers are returned to the application size will be the number of bytes read.
+ * Read operations will not change the context, bytes or capacity members of the structure.
+ *
+ * @note For write buffers: When write buffers are returned to the application all of the struct members will be returned
+ * unaltered. Also write operations will not modify the bytes of the buffer passed in at all. In principle this means that
+ * the write buffer can be used for multiple writes at the same time as long as the actual buffer is unmodified by the
+ * application at any time the buffer is being used by any raw connection.
+ */
+typedef struct pn_raw_buffer_t {
+ uintptr_t context; /**< Used to associate arbitrary application data with this raw buffer */
+ char *bytes; /**< Pointer to the start of the raw buffer, if this is null then no buffer is represented */
+ uint32_t capacity; /**< Count of available bytes starting at @ref bytes */
+ uint32_t size; /**< Number of bytes read or to be written starting at @ref offset */
+ uint32_t offset; /**< First byte in the buffer to be read or written */
+} pn_raw_buffer_t;
+
+/**
+ * Create a new raw connection for use with the @ref proactor.
+ * See @ref pn_proactor_raw_connect and @ref pn_listener_raw_accept.
+ *
+ * @return A newly allocated pn_connection_t or NULL if there wasn't sufficient memory.
+ *
+ * @note This is the only pn_connection_t function that allocates memory. So an application that
+ * wants good control of out of memory conditions should check the return value for NULL.
+ *
+ * @note It would be a good practice is to create a raw connection and attach application
+ * specific context to it before giving it to the proactor.
+ *
+ * @note There is no way to free a pn_connection_t as once passed to the proactor the proactor owns
+ * it and controls its lifecycle.
+ */
+PNP_EXTERN pn_raw_connection_t *pn_raw_connection(void);
+
+/**
+ * Get the local address of a raw connection. Return `NULL` if not available.
+ * Pointer is invalid after the transport closes (@ref PN_RAW_CONNECTION_DISCONNECTED event is handled)
+ */
+PNP_EXTERN const struct pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *connection);
+
+/**
+ * Get the local address of a raw connection. Return `NULL` if not available.
+ * Pointer is invalid after the transport closes (@ref PN_RAW_CONNECTION_DISCONNECTED event is handled)
+ */
+PNP_EXTERN const struct pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *connection);
+
+/**
+ * Close a raw connection.
+ * This will close the underlying socket and release all buffers held by the raw connection.
+ * It will cause @ref PN_RAW_CONNECTION_READ and @ref PN_RAW_CONNECTION_WRITTEN to be emitted so
+ * the application can clean up buffers given to the raw connection. After that a
+ * @ref PN_RAW_CONNECTION_DISCONNECTED event will be emitted to allow the application to clean up
+ * any other state held by the raw connection.
+ *
+ */
+PNP_EXTERN void pn_raw_connection_close(pn_raw_connection_t *connection);
+
+/**
+ * Query the raw connection for how many more read buffers it can be given
+ */
+PNP_EXTERN size_t pn_raw_connection_read_buffers_capacity(pn_raw_connection_t *connection);
+
+/**
+ * Query the raw connection for how many more write buffers it can be given
+ */
+PNP_EXTERN size_t pn_raw_connection_write_buffers_capacity(pn_raw_connection_t *connection);
+
+/**
+ * Give the raw connection buffers to use for reading from the underlying socket.
+ * If the raw socket has no read buffers then the application will never receive
+ * a @ref PN_RAW_CONNECTION_READ event.
+ *
+ * A @ref PN_RAW_CONNECTION_NEED_READ_BUFFERS event will be generated immediately after
+ * the @ref PN_RAW_CONNECTION_CONNECTED event if there are no read buffers. It will also be
+ * generated whenever the raw connection runs out of read buffers. In both these cases the
+ * event will not be generated again until @ref pn_raw_connection_give_read_buffers is called.
+ *
+ * @return the number of buffers actually given to the raw connection. This will only be different
+ * from the number supplied if the connection has no more space to record more buffers. In this case
+ * the buffers taken will be the earlier buffers in the array supplied, the elements 0 to the
+ * returned value-1.
+ *
+ * @note The buffers given to the raw connection are owned by it until the application
+ * receives a @ref PN_RAW_CONNECTION_READ event giving them back to the application. They must
+ * not be accessed at all (written or even read) from calling @ref pn_raw_connection_give_read_buffers
+ * until receiving this event.
+ *
+ * @note The application should not assume that the @ref PN_RAW_CONNECTION_NEED_READ_BUFFERS
+ * event signifies that the connection is readable.
+ */
+PNP_EXTERN size_t pn_raw_connection_give_read_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t const *buffers, size_t num);
+
+/**
+ * Fetch buffers with bytes read from the raw socket
+ *
+ * @param[out] buffers pointer to an array of @ref pn_raw_buffer_t structures which will be filled in with the read buffer information
+ * @param[in] num the number of buffers allocated in the passed in array of buffers
+ * @return the number of buffers being returned, if there is are no read bytes then this will be 0. As many buffers will be returned
+ * as can be given the number that are passed in. So if the number returned is less than the number passed in there are no more buffers
+ * read. But if the number is the same there may be more read buffers to take.
+ *
+ * @note After the application receives @ref PN_RAW_CONNECTION_READ there should be bytes read from the socket and
+ * hence this call should return buffers. It is safe to carry on calling @ref pn_raw_connection_take_read_buffers
+ * until it returns 0.
+ */
+PNP_EXTERN size_t pn_raw_connection_take_read_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t *buffers, size_t num);
+
+/**
+ * Give the raw connection buffers to write to the underlying socket.
+ *
+ * A @ref PN_RAW_CONNECTION_WRITTEN event will be generated once the buffers have been written to the socket
+ * until this point the buffers must not be accessed at all (written or even read).
+ *
+ * A @ref PN_RAW_CONNECTION_NEED_WRITE_BUFFERS event will be generated immediately after
+ * the @ref PN_RAW_CONNECTION_CONNECTED event if there are no write buffers. It will also be
+ * generated whenever the raw connection finishes writing all the write buffers. In both these cases the
+ * event will not be generated again until @ref pn_raw_connection_write_buffers is called.
+ *
+ * @return the number of buffers actually recorded by the raw connection to write. This will only be different
+ * from the number supplied if the connection has no more space to record more buffers. In this case
+ * the buffers recorded will be the earlier buffers in the array supplied, the elements 0 to the
+ * returned value-1.
+ *
+ */
+PNP_EXTERN size_t pn_raw_connection_write_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t const *buffers, size_t num);
+
+/**
+ * Return a buffer chain with buffers that have all been written to the raw socket
+ *
+ * @param[out] buffers pointer to an array of @ref pn_raw_buffer_t structures which will be filled in with the written buffer information
+ * @param[in] num the number of buffers allocated in the passed in array of buffers
+ * @return the number of buffers being returned, if there is are no written buffers to return then this will be 0. As many buffers will be returned
+ * as can be given the number that are passed in. So if the number returned is less than the number passed in there are no more buffers
+ * written. But if the number is the same there may be more written buffers to take.
+ *
+ * @note After the application receives @ref PN_RAW_CONNECTION_WRITTEN there should be bytes written to the socket and
+ * hence this call should return buffers. It is safe to carry on calling @ref pn_raw_connection_take_written_buffers
+ * until it returns 0.
+ */
+PNP_EXTERN size_t pn_raw_connection_take_written_buffers(pn_raw_connection_t *connection, pn_raw_buffer_t *buffers, size_t num);
+
+/**
+ * Is @p connection closed for read?
+ *
+ * @return true if the raw connection is closed for read.
+ */
+PNP_EXTERN bool pn_raw_connection_is_read_closed(pn_raw_connection_t *connection);
+
+/**
+ * Is @p connection closed for write?
+ *
+ * @return true if the raw connection is closed for write.
+ */
+PNP_EXTERN bool pn_raw_connection_is_write_closed(pn_raw_connection_t *connection);
+
+/**
+ * Return a @ref PN_RAW_CONNECTION_WAKE event for @p connection as soon as possible.
+ *
+ * At least one wake event will be returned, serialized with other @ref proactor_events
+ * for the same raw connection. Wakes can be "coalesced" - if several
+ * @ref pn_raw_connection_wake() calls happen close together, there may be only one
+ * @ref PN_RAW_CONNECTION_WAKE event that occurs after all of them.
+ *
+ * @note Thread-safe
+ */
+PNP_EXTERN void pn_raw_connection_wake(pn_raw_connection_t *connection);
+
+/**
+ * Get additional information about a raw connection error.
+ * There is a raw connection error if the @ref PN_RAW_CONNECTION_DISCONNECTED event
+ * is received and the pn_condition_t associated is non null (@see pn_condition_is_set).
+ *
+ * The value returned is only valid until the end of handler for the
+ * @ref PN_RAW_CONNECTION_DISCONNECTED event.
+ */
+PNP_EXTERN pn_condition_t *pn_raw_connection_condition(pn_raw_connection_t *connection);
+
+/**
+ * Get the application context associated with this raw connection.
+ *
+ * The application context for a raw connection may be set using
+ * ::pn_raw_connection_set_context.
+ *
+ * @param[in] connection the raw connection whose context is to be returned.
+ * @return the application context for the raw connection
+ */
+PNP_EXTERN void *pn_raw_connection_get_context(pn_raw_connection_t *connection);
+
+/**
+ * Set a new application context for a raw connection.
+ *
+ * The application context for a raw connection may be retrieved
+ * using ::pn_raw_connection_get_context.
+ *
+ * @param[in] connection the raw connection object
+ * @param[in] context the application context
+ */
+PNP_EXTERN void pn_raw_connection_set_context(pn_raw_connection_t *connection, void *context);
+
+/**
+ * Get the attachments that are associated with a raw connection.
+ */
+PNP_EXTERN pn_record_t *pn_raw_connection_attachments(pn_raw_connection_t *connection);
+
+/**
+ * Return the raw connection associated with an event.
+ *
+ * @return NULL if the event is not associated with a raw connection.
+ */
+PNP_EXTERN pn_raw_connection_t *pn_event_raw_connection(pn_event_t *event);
+
+
+/**
+ * @}
+ */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* raw_connection.h */
diff --git a/c/include/proton/types.h b/c/include/proton/types.h
index edcca98..f4f496e 100644
--- a/c/include/proton/types.h
+++ b/c/include/proton/types.h
@@ -115,6 +115,10 @@
* @brief **Unsettled API** - A listener for incoming connections.
* @ingroup io
*
+ * @defgroup raw_connection Raw connection
+ * @brief **Unsettled API** - An API allowing raw sockets to be used with proactor
+ * @ingroup io
+ *
* @defgroup connection_driver Connection driver
* @brief **Unsettled API** - An API for low-level IO integration.
* @ingroup io
@@ -438,6 +442,13 @@ typedef struct pn_transport_t pn_transport_t;
typedef struct pn_proactor_t pn_proactor_t;
/**
+ * A raw network connection used with the proactor.
+ *
+ * @ingroup raw_connection
+ */
+typedef struct pn_raw_connection_t pn_raw_connection_t;
+
+/**
* A batch of events that must be handled in sequence.
*
* A pn_event_batch_t encapsulates potentially multiple events that relate
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 5170aa6..5c07e28 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -70,6 +70,7 @@
#include <proton/transport.h>
#include <proton/listener.h>
#include <proton/netaddr.h>
+#include <proton/raw_connection.h>
#include <assert.h>
#include <stddef.h>
diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c
index 1c87f94..dd33b41 100644
--- a/c/src/proactor/libuv.c
+++ b/c/src/proactor/libuv.c
@@ -34,6 +34,7 @@
#include <proton/message.h>
#include <proton/netaddr.h>
#include <proton/proactor.h>
+#include <proton/raw_connection.h>
#include <proton/transport.h>
#include <uv.h>
diff --git a/c/src/proactor/win_iocp.cpp b/c/src/proactor/win_iocp.cpp
index 764a7c4..348564c 100644
--- a/c/src/proactor/win_iocp.cpp
+++ b/c/src/proactor/win_iocp.cpp
@@ -29,6 +29,7 @@
#include <proton/transport.h>
#include <proton/listener.h>
#include <proton/proactor.h>
+#include <proton/raw_connection.h>
#include <assert.h>
#include <stddef.h>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org