You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/10/25 20:11:49 UTC
[1/3] qpid-proton git commit: PROTON-1255: Remove
pn_connection_engine_start [Forced Update!]
Repository: qpid-proton
Updated Branches:
refs/heads/aconway-libuv-driver 8dc69787a -> 464c166c2 (forced update)
PROTON-1255: Remove pn_connection_engine_start
Bind the connection automatically in pn_connection_engine_dispatch after the
user has processed the PN_CONNECTION_INIT event. This removes the need to
manually call start and allows user handlers to set security settings
in their handler on PN_CONNECTION_INIT, before the bind.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/507c9356
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/507c9356
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/507c9356
Branch: refs/heads/aconway-libuv-driver
Commit: 507c9356692858a81916a4a8802fbc6b2302bc5f
Parents: 79e4848
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Oct 25 10:45:46 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Oct 25 10:45:46 2016 -0400
----------------------------------------------------------------------
proton-c/bindings/cpp/src/io/connection_engine.cpp | 1 -
.../bindings/go/src/qpid.apache.org/proton/engine.go | 1 -
proton-c/include/proton/connection_engine.h | 6 ------
proton-c/src/engine/connection_engine.c | 13 ++++---------
4 files changed, 4 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/507c9356/proton-c/bindings/cpp/src/io/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
index d3f2667..4712b3e 100644
--- a/proton-c/bindings/cpp/src/io/connection_engine.cpp
+++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp
@@ -65,7 +65,6 @@ void connection_engine::configure(const connection_options& opts) {
opts.apply_bound(c);
handler_ = opts.handler();
connection_context::get(connection()).collector = c_engine_.collector;
- pn_connection_engine_start(&c_engine_);
}
connection_engine::~connection_engine() {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/507c9356/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index a0b8888..5680010 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -283,7 +283,6 @@ func (eng *Engine) readBuffer() []byte {
// disconnected. You can check for errors after exit with Engine.Error().
//
func (eng *Engine) Run() error {
- C.pn_connection_engine_start(&eng.engine)
// Channels for read and write buffers going in and out of the read/write goroutines.
// The channels are unbuffered: we want to exchange buffers in seuquence.
readsIn, writesIn := make(chan []byte), make(chan []byte)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/507c9356/proton-c/include/proton/connection_engine.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection_engine.h b/proton-c/include/proton/connection_engine.h
index b1476c7..d9df77b 100644
--- a/proton-c/include/proton/connection_engine.h
+++ b/proton-c/include/proton/connection_engine.h
@@ -87,18 +87,12 @@ typedef struct pn_connection_engine_t {
/// Initialize a pn_connection_engine_t struct with a new connection and
/// transport.
///
-/// Configure connection properties and call connection_engine_start() before
-/// using the engine.
-///
/// Call pn_connection_engine_final to free resources when you are done.
///
///@return 0 on success, a proton error code on failure (@see error.h)
///
PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t* engine);
-/// Start the engine, call after setting security and host properties.
-PN_EXTERN void pn_connection_engine_start(pn_connection_engine_t* engine);
-
/// Free resources used by the engine, set the connection and transport pointers
/// to NULL.
PN_EXTERN void pn_connection_engine_final(pn_connection_engine_t* engine);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/507c9356/proton-c/src/engine/connection_engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/connection_engine.c b/proton-c/src/engine/connection_engine.c
index adfb145..5d184a1 100644
--- a/proton-c/src/engine/connection_engine.c
+++ b/proton-c/src/engine/connection_engine.c
@@ -36,14 +36,6 @@ int pn_connection_engine_init(pn_connection_engine_t* e) {
return PN_OK;
}
-void pn_connection_engine_start(pn_connection_engine_t* e) {
- /*
- Ignore bind errors. PN_STATE_ERR means we are already bound, any
- other error will be delivered as an event.
- */
- pn_transport_bind(e->transport, e->connection);
-}
-
void pn_connection_engine_final(pn_connection_engine_t* e) {
if (e->transport && e->connection) {
pn_transport_unbind(e->transport);
@@ -105,8 +97,11 @@ static void log_event(pn_connection_engine_t *engine, pn_event_t* event) {
}
pn_event_t* pn_connection_engine_dispatch(pn_connection_engine_t* e) {
- if (e->event)
+ if (e->event) { /* Already returned */
+ if (pn_event_type(e->event) == PN_CONNECTION_INIT)
+ pn_transport_bind(e->transport, e->connection);
pn_collector_pop(e->collector);
+ }
e->event = pn_collector_peek(e->collector);
log_event(e, e->event);
return e->event;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-proton git commit: C driver for C/C++ bindings and direct
C users.
Posted by ac...@apache.org.
C driver for C/C++ bindings and direct C users.
driver.h is an SPI so an IO integration can be shared between C and C++
applications and bindings. Provides common connnect/listen/wakeup API to drive
the pn_connection_engine.
Examples show C sender, receiver and broker using a libuv driver.
The driver API is source compatible so an app can be re-compiled to use a
different driver implementation by changing the PN_DRIVER_INCLUDE setting.
This is not intended for use by non-C/C++ bindings where the binding language
has its own IO and concurrency framework. Such bindings should implement their
own driver in the binding language, using native IO/threading for the
language. This driver can be a structural example, as is the Go binding.
NOTE: preview only, not finished. Issues to address include:
- handle transport ticks
- support for scheduled wakeup (leave task queueing outside like conn wakeup)
- check when driver is "empty" - not monitoring anything. For clean shutdown.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/464c166c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/464c166c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/464c166c
Branch: refs/heads/aconway-libuv-driver
Commit: 464c166c2e31a503bf53fdeb5dba8f1ec9f9fc81
Parents: 507c935
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Oct 24 12:13:40 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Oct 25 16:11:14 2016 -0400
----------------------------------------------------------------------
config.sh.in | 2 +-
examples/CMakeLists.txt | 13 +
examples/c/CMakeLists.txt | 2 +
examples/c/driver/CMakeLists.txt | 42 ++
examples/c/driver/broker.c | 485 +++++++++++++++++++
examples/c/driver/libuv_driver.c | 569 +++++++++++++++++++++++
examples/c/driver/libuv_driver.h | 111 +++++
examples/c/driver/receive.c | 187 ++++++++
examples/c/driver/send.c | 215 +++++++++
examples/c/driver/test.py | 49 ++
examples/cpp/README.dox | 2 +
examples/cpp/mt/broker.cpp | 1 +
examples/exampletest.py | 183 ++++++++
proton-c/docs/api/index.md | 49 +-
proton-c/include/proton/connection_engine.h | 233 +++++-----
proton-c/include/proton/driver.h | 282 +++++++++++
proton-c/include/proton/event.h | 2 +-
proton-c/src/driver/driver.c | 18 +
proton-c/src/engine/connection_engine.c | 17 +-
tools/cmake/Modules/FindLibuv.cmake | 37 ++
20 files changed, 2379 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/config.sh.in
----------------------------------------------------------------------
diff --git a/config.sh.in b/config.sh.in
index 4902b61..edb77e6 100755
--- a/config.sh.in
+++ b/config.sh.in
@@ -40,7 +40,7 @@ RUBY_BINDINGS=$PROTON_BINDINGS/ruby
PERL_BINDINGS=$PROTON_BINDINGS/perl
# Python & Jython
-COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python
+COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python:$PROTON_HOME/examples
export PYTHONPATH=$COMMON_PYPATH:$PYTHON_BINDINGS
export JYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-j/src/main/resources:$PROTON_JARS
export CLASSPATH=$PROTON_JARS
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index 99e8315..4d744d2 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -20,6 +20,19 @@
set (Proton_DIR ${CMAKE_CURRENT_SOURCE_DIR})
set (ProtonCpp_DIR ${CMAKE_CURRENT_SOURCE_DIR})
+# Set result to a native search path
+macro(set_search_path result) # args after result are directories or search paths.
+ set(${result} ${ARGN})
+ if (UNIX)
+ string(REPLACE ";" ":" ${result} "${${result}}") # native search path separators.
+ endif()
+ file(TO_NATIVE_PATH "${${result}}" ${result}) # native slash separators
+endmacro()
+
+# Some non-python examples use exampletest.py to drive their self-tests.
+set_search_path(EXAMPLE_PYTHONPATH "${CMAKE_CURRENT_SOURCE_DIR}" "$ENV{PYTHON_PATH}")
+set(EXAMPLE_ENV "PYTHONPATH=${EXAMPLE_PYTHONPATH}")
+
add_subdirectory(c)
add_subdirectory(go)
if (BUILD_CPP)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/examples/c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt
index 1612a86..2fbfffb 100644
--- a/examples/c/CMakeLists.txt
+++ b/examples/c/CMakeLists.txt
@@ -17,6 +17,8 @@
# under the License.
#
+find_package(Proton REQUIRED)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
+add_subdirectory(driver)
add_subdirectory(messenger)
add_subdirectory(reactor)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/examples/c/driver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/driver/CMakeLists.txt b/examples/c/driver/CMakeLists.txt
new file mode 100644
index 0000000..63d8777
--- /dev/null
+++ b/examples/c/driver/CMakeLists.txt
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+find_package(Proton REQUIRED)
+
+include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
+add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
+
+find_package(Libuv)
+if (Libuv_FOUND)
+ foreach(name broker send receive)
+ add_executable(libuv_${name} ${name}.c libuv_driver.c)
+ target_link_libraries(libuv_${name} ${Proton_LIBRARIES} ${Libuv_LIBRARIES})
+ set_target_properties(libuv_${name} PROPERTIES
+ COMPILE_DEFINITIONS "PN_DRIVER_INCLUDE=\"libuv_driver.h\"")
+ endforeach()
+
+ # Add a test with the correct environment to find test executables and valgrind.
+ if(WIN32)
+ set(test_path "$<TARGET_FILE_DIR:libuv_broker>;$<TARGET_FILE_DIR:qpid-proton>")
+ else(WIN32)
+ set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
+ endif(WIN32)
+ set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
+ add_test(c-driver-libuv ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py)
+endif()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/examples/c/driver/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/driver/broker.c b/examples/c/driver/broker.c
new file mode 100644
index 0000000..f857a97
--- /dev/null
+++ b/examples/c/driver/broker.c
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/connection_engine.h>
+#include <proton/driver.h>
+#include <proton/engine.h>
+#include <proton/sasl.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+/* TODO aconway 2016-10-14: this example does not require libuv IO,
+ it uses uv.h only for portable mutex and thread functions.
+*/
+#include <uv.h>
+
+bool verbose = false;
+
+typedef struct broker_connection_t {
+ pn_driver_connection_t dc;
+ bool check_queues;
+} broker_connection_t;
+
+
+void debug(const char* fmt, ...) {
+ if (verbose) {
+ va_list(ap);
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ fputc('\n', stderr);
+ fflush(stderr);
+ }
+}
+
+static void connection_debug(pn_driver_connection_t* dc, const char* fmt, ...) {
+ if (verbose) {
+ va_list(ap);
+ va_start(ap, fmt);
+ fprintf(stderr, "[%p] broker: ", (void*)dc->engine.transport);
+ vfprintf(stderr, fmt, ap);
+ fputc('\n', stderr);
+ fflush(stderr);
+ }
+}
+
+void check(int err, const char* s) {
+ if (err != 0) {
+ perror(s);
+ exit(1);
+ }
+}
+
+void pcheck(int err, const char* s) {
+ if (err != 0) {
+ fprintf(stderr, "%s: %s", s, pn_code(err));
+ exit(1);
+ }
+}
+
+/* Simple re-sizable vector that acts as a queue */
+#define VEC(T) struct { T* data; size_t len, cap; }
+
+#define VEC_INIT(V) \
+ do { \
+ V.len = 0; \
+ V.cap = 16; \
+ void **vp = (void**)&V.data; \
+ *vp = malloc(V.cap * sizeof(*V.data)); \
+ } while(0)
+
+#define VEC_FINAL(V) free(V.data)
+
+#define VEC_PUSH(V, X) \
+ do { \
+ if (V.len == V.cap) { \
+ V.cap *= 2; \
+ void **vp = (void**)&V.data; \
+ *vp = realloc(V.data, V.cap * sizeof(*V.data)); \
+ } \
+ V.data[V.len++] = X; \
+ } while(0) \
+
+#define VEC_POP(V) \
+ do { \
+ if (V.len > 0) \
+ memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data)); \
+ } while(0)
+
+/* Simple thread-safe queue implementation */
+typedef struct queue {
+ uv_mutex_t lock;
+ char* name;
+ VEC(pn_rwbytes_t) messages; /* Messages on the queue */
+ VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
+ struct queue *next; /* Next queue in chain */
+} queue;
+
+static void queue_init(queue *q, const char* name, queue *next) {
+ debug("created queue %s", name);
+ uv_mutex_init(&q->lock);
+ q->name = strdup(name);
+ VEC_INIT(q->messages);
+ VEC_INIT(q->waiting);
+ q->next = next;
+}
+
+static void queue_final(queue *q) {
+ uv_mutex_destroy(&q->lock);
+ free(q->name);
+ for (size_t i = 0; i < q->messages.len; ++i)
+ free(q->messages.data[i].start);
+ VEC_FINAL(q->messages);
+ for (size_t i = 0; i < q->waiting.len; ++i)
+ pn_decref(q->waiting.data[i]);
+ VEC_FINAL(q->waiting);
+}
+
+/* Send a message on s, or record s as eating if no messages.
+ Called in s dispatch loop, assumes s has credit.
+*/
+static void queue_send(queue *q, pn_link_t *s) {
+ pn_rwbytes_t m = { 0 };
+ uv_mutex_lock(&q->lock);
+ if (q->messages.len == 0) { /* Empty, record connection as waiting */
+ debug("queue is empty %s", q->name);
+ /* Record connection for wake-up if not already on the list. */
+ pn_connection_t *c = pn_session_connection(pn_link_session(s));
+ size_t i = 0;
+ for (; i < q->waiting.len && q->waiting.data[i] != c; ++i)
+ ;
+ if (i == q->waiting.len) {
+ VEC_PUSH(q->waiting, c);
+ }
+ } else {
+ debug("sending from queue %s", q->name);
+ m = q->messages.data[0];
+ VEC_POP(q->messages);
+ }
+ uv_mutex_unlock(&q->lock);
+ if (m.start) {
+ /* FIXME aconway 2016-10-13: unique tags, see send.c */
+ pn_delivery_t *d = pn_delivery(s, pn_dtag("tag", 4));
+ pn_link_send(s, m.start, m.size);
+ pn_link_advance(s);
+ pn_delivery_settle(d); /* Pre-settled */
+ free(m.start);
+ }
+}
+
+/* Put a message on the queue, called in receiver dispatch loop.
+ If the queue was previously empty, notify waiting senders.
+*/
+static void queue_receive(pn_driver_t *d, queue *q, pn_rwbytes_t m) {
+ debug("received to queue %s", q->name);
+ uv_mutex_lock(&q->lock);
+ VEC_PUSH(q->messages, m);
+ if (q->messages.len == 1) { /* Was empty, notify waiting connections */
+ for (size_t i = 0; i < q->waiting.len; ++i) {
+ pn_connection_t *c = q->waiting.data[i];
+ broker_connection_t *bc = (broker_connection_t*)pn_driver_connection_get(c);
+ if (bc) {
+ bc->check_queues = true;
+ pn_driver_wake(&bc->dc); /* Signal that the connection should check queues */
+ }
+ }
+ q->waiting.len = 0;
+ }
+ uv_mutex_unlock(&q->lock);
+}
+
+/* Thread safe set of queues */
+typedef struct queues_t {
+ uv_mutex_t lock;
+ queue *queues;
+} queues_t;
+
+void queues_init(queues_t *qs) {
+ uv_mutex_init(&qs->lock);
+ qs->queues = NULL;
+}
+
+void queues_final(queues_t *qs) {
+ for (queue *q = qs->queues; q; q = q->next) {
+ queue_final(q);
+ free(q);
+ }
+ uv_mutex_destroy(&qs->lock);
+}
+
+/** Get or create the named queue. */
+queue* queues_get(queues_t *qs, const char* name) {
+ uv_mutex_lock(&qs->lock);
+ queue *q;
+ for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
+ ;
+ if (!q) {
+ q = (queue*)malloc(sizeof(queue));
+ queue_init(q, name, qs->queues);
+ qs->queues = q;
+ }
+ uv_mutex_unlock(&qs->lock);
+ return q;
+}
+
+/* The broker implementation */
+typedef struct broker {
+ pn_driver_t driver;
+ pn_driver_listener_t listener;
+ queues_t queues;
+ char container_id[256]; /* AMQP container-id */
+ int threads;
+} broker;
+
+void broker_stop(broker *b) {
+ for (int i = 0; i < b->threads; ++i)
+ pn_driver_interrupt(&b->driver);
+}
+
+/* Try to send if link is sender and has credit */
+static void link_send(broker *b, pn_link_t *s) {
+ if (pn_link_is_sender(s) && pn_link_credit(s) > 0) {
+ const char *qname = pn_terminus_get_address(pn_link_source(s));
+ queue *q = queues_get(&b->queues, qname);
+ queue_send(q, s);
+ }
+}
+
+static void queue_unsub(queue *q, pn_connection_t *c) {
+ uv_mutex_lock(&q->lock);
+ for (size_t i = 0; i < q->waiting.len; ++i) {
+ if (q->waiting.data[i] == c){
+ q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */
+ VEC_POP(q->waiting);
+ break;
+ }
+ }
+ uv_mutex_unlock(&q->lock);
+}
+
+/* Unsubscribe from the queue of interest to this link. */
+static void link_unsub(broker *b, pn_link_t *s) {
+ if (pn_link_is_sender(s)) {
+ const char *qname = pn_terminus_get_address(pn_link_source(s));
+ if (qname) {
+ queue *q = queues_get(&b->queues, qname);
+ queue_unsub(q, pn_session_connection(pn_link_session(s)));
+ }
+ }
+}
+
+/* Called in connection's event loop when a connection is woken for messages.*/
+static void connection_unsub(broker *b, pn_connection_t *c) {
+ for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0))
+ link_unsub(b, l);
+}
+
+static void session_unsub(broker *b, pn_session_t *ssn) {
+ pn_connection_t *c = pn_session_connection(ssn);
+ for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) {
+ if (pn_link_session(l) == ssn)
+ link_unsub(b, l);
+ }
+}
+
+static void print_condition(pn_event_t *e, pn_condition_t *cond) {
+ if (pn_condition_is_set(cond)) {
+ fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+ pn_condition_get_name(cond), pn_condition_get_description(cond));
+ }
+}
+
+const int WINDOW=10; /* Incoming credit window */
+
+broker_connection_t *broker_new_connection(broker* b) {
+ broker_connection_t *bc = (broker_connection_t*)calloc(1, sizeof(broker_connection_t));
+ pn_driver_connection_init(&b->driver, &bc->dc);
+ bc->check_queues = false;
+ pn_connection_engine_t *eng = pn_driver_engine(&bc->dc);
+ pn_connection_set_container(eng->connection, b->container_id);
+ pn_transport_set_server(eng->transport);
+ /* No security */
+ pn_transport_require_auth(eng->transport, false);
+ pn_sasl_allowed_mechs(pn_sasl(eng->transport), "ANONYMOUS");
+ return bc;
+}
+
+/* Dispatch events for a broker connection */
+static void broker_dispatch(broker* b, broker_connection_t* bc) {
+ pn_connection_engine_t *eng = pn_driver_engine(&bc->dc);
+ pn_connection_t *c = pn_connection_engine_connection(eng);
+
+ if (bc->check_queues) {
+ int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
+ for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
+ link_send(b, l);
+ bc->check_queues = false;
+ }
+
+ pn_event_t *e;
+ while ((e = pn_connection_engine_dispatch(eng)) != NULL) {
+ connection_debug(&bc->dc, "dispatch event: %s\n",
+ pn_event_type_name(pn_event_type(e)));
+ switch (pn_event_type(e)) {
+ case PN_CONNECTION_REMOTE_OPEN: {
+ pn_connection_open(pn_event_connection(e)); /* Complete the open */
+ break;
+ }
+ case PN_SESSION_REMOTE_OPEN: {
+ pn_session_open(pn_event_session(e));
+ break;
+ }
+ case PN_LINK_REMOTE_OPEN: {
+ pn_link_t *l = pn_event_link(e);
+ if (pn_link_is_sender(l)) {
+ const char *source = pn_terminus_get_address(pn_link_remote_source(l));
+ pn_terminus_set_address(pn_link_source(l), source);
+ } else {
+ const char* target = pn_terminus_get_address(pn_link_remote_target(l));
+ pn_terminus_set_address(pn_link_target(l), target);
+ pn_link_flow(l, WINDOW);
+ }
+ pn_link_open(l);
+ break;
+ }
+ case PN_LINK_FLOW: {
+ link_send(b, pn_event_link(e));
+ break;
+ }
+ case PN_DELIVERY: {
+ pn_delivery_t *d = pn_event_delivery(e);
+ pn_link_t *r = pn_delivery_link(d);
+ if (pn_link_is_receiver(r) &&
+ pn_delivery_readable(d) && !pn_delivery_partial(d))
+ {
+ size_t size = pn_delivery_pending(d);
+ /* The broker does not decode the message, just forwards it. */
+ pn_rwbytes_t m = { size, (char*)malloc(size) };
+ pn_link_recv(r, m.start, m.size);
+ const char *qname = pn_terminus_get_address(pn_link_target(r));
+ queue_receive(&b->driver, queues_get(&b->queues, qname), m);
+ pn_delivery_update(d, PN_ACCEPTED);
+ pn_delivery_settle(d);
+ pn_link_flow(r, WINDOW - pn_link_credit(r));
+ }
+ /* FIXME aconway 2016-09-30: settlement... */
+ break;
+ }
+
+ case PN_TRANSPORT_CLOSED:
+ connection_unsub(b, pn_event_connection(e));
+ print_condition(e, pn_transport_condition(pn_event_transport(e)));
+ break;
+ case PN_CONNECTION_REMOTE_CLOSE:
+ print_condition(e, pn_connection_remote_condition(pn_event_connection(e)));
+ connection_unsub(b, pn_event_connection(e));
+ pn_connection_close(pn_event_connection(e));
+ break;
+ case PN_SESSION_REMOTE_CLOSE:
+ print_condition(e, pn_session_remote_condition(pn_event_session(e)));
+ session_unsub(b, pn_event_session(e));
+ pn_session_close(pn_event_session(e));
+ pn_session_free(pn_event_session(e));
+ break;
+ case PN_LINK_REMOTE_CLOSE:
+ print_condition(e, pn_link_remote_condition(pn_event_link(e)));
+ link_unsub(b, pn_event_link(e));
+ pn_link_close(pn_event_link(e));
+ pn_link_free(pn_event_link(e));
+ break;
+ default:
+ break;
+ }
+ }
+}
+
+static void broker_thread(void *void_broker) {
+ broker *b = (broker*)void_broker;
+ pn_driver_t *d = (pn_driver_t*)&b->driver;
+ while (true) {
+ pn_driver_event_t e = pn_driver_wait(d);
+ switch (e.type) {
+ case PN_DRIVER_LISTENER_READY: {
+ broker_connection_t *bc = broker_new_connection(b);
+ connection_debug(e.connection, "accepting");
+ pn_driver_accept(e.listener, &bc->dc);
+ break;
+ }
+ case PN_DRIVER_CONNECTION_READY:
+ connection_debug(e.connection, "dispatching");
+ broker_dispatch(b, (broker_connection_t*)e.connection);
+ connection_debug(e.connection, "watching");
+ pn_driver_watch(e.connection);
+ break;
+ case PN_DRIVER_INTERRUPT:
+ debug("interrupted");
+ return;
+ case PN_DRIVER_CONNECTION_FINISHED:
+ connection_debug(e.connection, "connection finished");
+ pn_driver_connection_final(e.connection);
+ free(e.connection);
+ break;
+ case PN_DRIVER_LISTENER_FINISHED: {
+ debug("listener finished");
+ const char *errstr = pn_driver_listener_error(e.listener);
+ if (errstr)
+ fprintf(stderr, "listener error: %s\n", errstr);
+ pn_driver_listener_final(e.listener);
+ broker_stop(b);
+ break;
+ }
+ }
+ }
+}
+
+static void usage(const char *arg0) {
+ fprintf(stderr, "Usage: %s [-d] [-a url] [-t thread-count]\n", arg0);
+ exit(1);
+}
+
+int main(int argc, char **argv) {
+ /* Command line options */
+ const char *urlstr = NULL;
+ size_t nthreads = 4; /* Thread count. */
+ int opt;
+ while ((opt = getopt(argc, argv, "a:t:d")) != -1) {
+ switch (opt) {
+ case 'a': urlstr = optarg; break;
+ case 't': nthreads = atoi(optarg); break;
+ case 'd': verbose = true; break;
+ default: usage(argv[0]); break;
+ }
+ }
+ if (optind < argc)
+ usage(argv[0]);
+
+ /* Run the broker */
+ broker b;
+ b.threads = nthreads;
+ pn_driver_init(&b.driver);
+ pn_driver_listener_init(&b.driver, &b.listener);
+ snprintf(b.container_id, sizeof(b.container_id), "%s:%d", argv[0], getpid());
+ queues_init(&b.queues);
+
+ /* Parse the URL or use default values */
+ pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+ const char *host = url ? pn_url_get_host(url) : NULL;
+ const char *port = url ? pn_url_get_port(url) : NULL;
+ if (!port) port = "amqp";
+
+ pn_driver_listen(&b.listener, host, port, NULL, 16);
+ printf("listening on '%s:%s' %zd threads\n", host, port, nthreads);
+
+ if (url) pn_url_free(url);
+ uv_thread_t* threads = (uv_thread_t*)calloc(sizeof(uv_thread_t), nthreads);
+ for (size_t i = 0; i < nthreads-1; ++i) {
+ check(uv_thread_create(&threads[i], broker_thread, &b), "pthread_create");
+ }
+ broker_thread(&b); /* Use the main thread too. */
+ for (size_t i = 0; i < nthreads-1; ++i) {
+ check(uv_thread_join(&threads[i]), "pthread_join");
+ }
+ pn_driver_final(&b.driver);
+ free(threads);
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/examples/c/driver/libuv_driver.c
----------------------------------------------------------------------
diff --git a/examples/c/driver/libuv_driver.c b/examples/c/driver/libuv_driver.c
new file mode 100644
index 0000000..beeff3c
--- /dev/null
+++ b/examples/c/driver/libuv_driver.c
@@ -0,0 +1,569 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ libuv loop functions are thread unsafe. The only exception is uv_async_send()
+ which is a thread safe "wakeup" that can wake the uv_loop from another thread.
+
+ To provide concurrency this driver uses a "leader-worker-follower" model.
+
+ - Multiple threads can be "workers" and concurrently process distinct driver connections
+ or listeners that have been woken by IO and now have non-IO work to do.
+
+ - Only one thread, the "leader", is allowed to call unsafe libuv functions and
+ run the uv_loop to wait for IO events.
+
+ - Threads with no work to do are "followers", they wait on the leader.
+
+ - The leader runs the uv_loop for one iteration, then gives up leadership and
+ becomes a worker. One of the followers becomes the next leader.
+
+ This model is symmetric: any thread can take on any role based on run-time
+ requirements. It also allows the IO and non-IO work associated with an IO
+ wake-up to be processed in a single thread with no context switches.
+
+ Connections and listeners both contain a "dsocket". Requests to modify a
+ dsocket are queued on the driver.leader_q to be handled by the leader. Sockets
+ that are ready for user processing are queued on driver.user_q.
+
+ Connections can be closed by IO (read EOF, read/write error) or by proton
+ events (SASL failures, application closing the connection etc.) Once a
+ connection is fully closed (uv_socket closed + pn_connection_engine_finished)
+
+ Listeners can similarly be closed by IO or by pn_driver_listener_close.
+*/
+
+#include <proton/driver.h>
+
+#include <proton/engine.h>
+#include <proton/message.h>
+#include <proton/object.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <uv.h>
+
+const char *COND_NAME = "driver";
+const char *AMQP_PORT = "5672";
+const char *AMQP_SERVICE_NAME = "amqp";
+const char *AMQPS_PORT = "5671";
+const char *AMQPS_SERVICE_NAME = "amqps";
+
+/* Short aliases */
+typedef pn_uv_socket_t dsocket;
+typedef pn_uv_queue_t queue;
+typedef pn_driver_t driver;
+
+/* State of a driver socket */
+typedef enum {
+ INACTIVE, /* Initialized but not yet called connect/listen/accept */
+ RUNNING, /* Normal operation */
+ CLOSING, /* uv_close request pending */
+ CLOSED /* UV close completed */
+} dsocket_state;
+
+/* Special value for dsocket.next pointer when socket is not on any any list. */
+dsocket UNLISTED;
+
+static void push(queue *q, dsocket *ds) {
+ if (ds->next != &UNLISTED) /* Don't move if already listed. */
+ return;
+ ds->next = NULL;
+ if (!q->front) {
+ q->front = q->back = ds;
+ } else {
+ q->back->next = ds;
+ q->back = ds;
+ }
+}
+
+static dsocket* pop(queue *q) {
+ dsocket *ds = q->front;
+ if (ds) {
+ q->front = ds->next;
+ ds->next = &UNLISTED;
+ }
+ return ds;
+}
+
+static void dsocket_init(dsocket* ds, driver* d, bool is_conn) {
+ ds->next = &UNLISTED;
+ ds->state = INACTIVE;
+ ds->driver = d;
+ ds->user = true;
+ ds->is_conn = is_conn;
+ ds->socket.data = ds;
+}
+
+static void set_addr(pn_uv_addr_t* addr, const char *host, const char *service) {
+ /* For platforms that don't know about "amqp" and "amqps" services. */
+ if (strcmp(service, AMQP_SERVICE_NAME) == 0)
+ service = AMQP_PORT;
+ else if (strcmp(service, AMQPS_SERVICE_NAME) == 0)
+ service = AMQPS_PORT;
+ /* Set to "\001" to indicate a NULL as opposed to an empty string "" */
+ strncpy(addr->host, host ? host : "\001", sizeof(addr->host));
+ strncpy(addr->service, service ? service : "\001", sizeof(addr->service));
+}
+
+static void get_addr(pn_uv_addr_t* addr, const char **host, const char **service) {
+ *host = addr->host[0] == '\001' ? NULL : addr->host;
+ *service = addr->service[0] == '\001' ? NULL : addr->service;
+}
+
+static void to_leader_lh(dsocket* ds) {
+ ds->user = false;
+ push(&ds->driver->leader_q, ds);
+ uv_async_send(&ds->driver->async); /* wake up the uv_loop */
+}
+
+static void to_leader(dsocket* ds) {
+ uv_mutex_lock(&ds->driver->lock);
+ to_leader_lh(ds);
+ uv_mutex_unlock(&ds->driver->lock);
+}
+
+static void to_user(dsocket* ds) {
+ uv_mutex_lock(&ds->driver->lock);
+ ds->user = true;
+ push(&ds->driver->user_q, ds);
+ uv_mutex_unlock(&ds->driver->lock);
+}
+
+static void on_close(uv_handle_t *socket) {
+ dsocket *ds = (dsocket*)socket->data;
+ uv_mutex_lock(&ds->driver->lock);
+ ds->state = CLOSED;
+ push(&ds->driver->user_q, ds); /* Return in FINISHED event */
+ uv_mutex_unlock(&ds->driver->lock);
+}
+
+static void do_close(dsocket *ds) {
+ switch ((dsocket_state)ds->state) {
+ case INACTIVE:
+ ds->state = CLOSED;
+ to_user(ds);
+ break;
+ case RUNNING:
+ ds->state = CLOSING;
+ uv_close((uv_handle_t*)&ds->socket, on_close);
+ break;
+ case CLOSING:
+ break;
+ case CLOSED:
+ to_user(ds);
+ break;
+ }
+}
+
+static void set_error(dsocket* ds, const char* fmt, ...) {
+ if (ds->is_conn) {
+ pn_connection_engine_t *eng = &((pn_driver_connection_t*)ds)->engine;
+ pn_condition_t* cond = pn_connection_engine_condition(eng);
+ if (!pn_condition_is_set(cond)) { /* Don't overwrite an existing error */
+ va_list ap;
+ va_start(ap, fmt);
+ pn_condition_vformat(cond, COND_NAME, fmt, ap);
+ va_end(ap);
+ pn_connection_engine_disconnected(eng);
+ }
+ } else { /* Listener */
+ pn_driver_listener_t *dl = (pn_driver_listener_t*)ds;
+ va_list ap;
+ va_start(ap, fmt);
+ vsnprintf(dl->error, sizeof(dl->error), fmt, ap);
+ va_end(ap);
+ }
+ do_close(ds);
+}
+
+static int set_uv_error(dsocket *ds, int err, const char* prefix) {
+ if (err < 0)
+ set_error(ds, "%s: %s", prefix, uv_strerror(err));
+ return err;
+}
+
+static void on_connect(uv_connect_t* connect, int status) {
+ pn_driver_connection_t* dc = (pn_driver_connection_t*)connect->data;
+ if (set_uv_error(&dc->dsocket, status, "cannot connect") == 0) {
+ to_user(&dc->dsocket); /* Process initial events before doing IO */
+ }
+}
+
+static void on_connection(uv_stream_t* server, int status) {
+ pn_driver_listener_t* dl = (pn_driver_listener_t*)server->data;
+ if (status == 0) {
+ ++dl->pending;
+ to_user(&dl->dsocket);
+ } else {
+ set_uv_error(&dl->dsocket, status, "incoming connection error");
+ }
+}
+
+static void do_connect(pn_driver_connection_t *dc) {
+ const char *host, *service;
+ get_addr(&dc->addr, &host, &service);
+ uv_getaddrinfo_t info;
+ int err = uv_getaddrinfo(&dc->dsocket.driver->loop, &info, NULL, host, service, NULL);
+ if (!err) {
+ err = uv_tcp_connect(&dc->connect, &dc->dsocket.socket, info.addrinfo->ai_addr, on_connect);
+ uv_freeaddrinfo(info.addrinfo);
+ }
+ if (err)
+ set_error(&dc->dsocket, "connect to %s:%s: %s", host, service, uv_strerror(err));
+}
+
+static void do_listen(pn_driver_listener_t *dl) {
+ const char *host, *service;
+ get_addr(&dl->addr, &host, &service);
+ uv_getaddrinfo_t info;
+ int err = uv_getaddrinfo(&dl->dsocket.driver->loop, &info, NULL, host, service, NULL);
+ if (!err) {
+ err = uv_tcp_bind(&dl->dsocket.socket, info.addrinfo->ai_addr, 0);
+ if (!err)
+ err = uv_listen((uv_stream_t*)&dl->dsocket.socket, dl->backlog, on_connection);
+ uv_freeaddrinfo(info.addrinfo);
+ }
+ if (err)
+ set_error(&dl->dsocket, "listen on %s:%s: %s", host, service, uv_strerror(err));
+}
+
+static void do_accept(pn_driver_connection_t *dc) {
+ pn_driver_listener_t *dl = dc->listener;
+ const char *host, *service;
+ get_addr(&dl->addr, &host, &service);
+
+ if (dl->pending == 0) {
+ set_error(&dl->dsocket, "accept from %s:%s: %s", host, service,
+ "no connection available");
+ return;
+ }
+ --dl->pending;
+ int err = uv_accept((uv_stream_t*)&dl->dsocket.socket, (uv_stream_t*)&dc->dsocket.socket);
+ if (err) {
+ set_error(&dl->dsocket, "accept from %s:%s: %s", host, service, uv_strerror(err));
+ set_error(&dc->dsocket, "accept from %s:%s: %s", host, service, uv_strerror(err));
+ }
+}
+
+static void do_activate(dsocket *ds) {
+ int err = uv_tcp_init(&ds->driver->loop, &ds->socket);
+ if (err) {
+ set_uv_error(ds, err, "tcp socket init");
+ return;
+ }
+ if (ds->is_conn) {
+ pn_driver_connection_t *dc = (pn_driver_connection_t*)ds;
+ if (dc->is_accept)
+ do_accept(dc);
+ else
+ do_connect(dc);
+ to_user(ds); /* Process initial events */
+ } else {
+ do_listen((pn_driver_listener_t*)ds);
+ }
+}
+
+static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
+ pn_driver_connection_t *dc = (pn_driver_connection_t*)stream->data;
+ if (nread >= 0) {
+ pn_connection_engine_read_done(&dc->engine, nread);
+ if (!dc->writing) { /* Don't go ready if write is pending */
+ uv_read_stop(stream);
+ dc->reading = false;
+ to_user(&dc->dsocket);
+ }
+ } else if (nread == UV_EOF) { /* hangup */
+ pn_connection_engine_read_close(&dc->engine);
+ to_user(&dc->dsocket);
+ } else {
+ set_uv_error(&dc->dsocket, nread, "read");
+ }
+}
+
+static void on_write(uv_write_t* request, int status) {
+ if (status == UV_ECANCELED)
+ return; /* Nothing to do */
+ pn_driver_connection_t *dc = (pn_driver_connection_t*)request->data;
+ if (set_uv_error(&dc->dsocket, status, "write") == 0) {
+ pn_connection_engine_write_done(&dc->engine, dc->writing);
+ dc->writing = 0;
+ if (dc->reading) { /* Cancel the read request before going ready. */
+ uv_read_stop((uv_stream_t*)&dc->dsocket.socket);
+ dc->reading = false;
+ }
+ to_user(&dc->dsocket);
+ }
+}
+
+// Read buffer allocation function just returns the engine's read buffer.
+static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
+ pn_driver_connection_t *dc = (pn_driver_connection_t*)stream->data;
+ pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&dc->engine);
+ *buf = uv_buf_init(rbuf.start, rbuf.size);
+}
+
+static void do_connection(pn_driver_connection_t *dc) {
+ uv_mutex_lock(&dc->dsocket.driver->lock);
+ bool do_wake = dc->dsocket.wake;
+ dc->dsocket.wake = false;
+ uv_mutex_unlock(&dc->dsocket.driver->lock);
+
+ if (do_wake) {
+ /* Detach from the IO loop before sending to user. */
+ if (dc->writing) {
+ uv_cancel((uv_req_t*)&dc->write);
+ dc->writing = 0;
+ }
+ if (dc->reading) {
+ uv_read_stop((uv_stream_t*)&dc->dsocket.socket);
+ dc->reading = false;
+ }
+ to_user(&dc->dsocket);
+ } else if (pn_connection_engine_finished(&dc->engine)) {
+ do_close(&dc->dsocket); /* Request close */
+ } else { /* Check for IO */
+ pn_bytes_t wbuf = pn_connection_engine_write_buffer(&dc->engine);
+ pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&dc->engine);
+ /* Calling write_buffer can generate events.
+ Make all events are processed before we resume IO, since events may
+ close the transport.
+ */
+ if (pn_collector_peek(dc->engine.collector)) {
+ to_user(&dc->dsocket);
+ } else { /* Really resume IO */
+ if (wbuf.size > 0 && !dc->writing) {
+ dc->writing = wbuf.size;
+ uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+ uv_write(&dc->write, (uv_stream_t*)&dc->dsocket.socket, &buf, 1, on_write);
+ }
+ if (rbuf.size > 0 && !dc->reading) {
+ dc->reading = true;
+ uv_read_start((uv_stream_t*)&dc->dsocket.socket, alloc_read_buffer, on_read);
+ }
+ }
+ }
+}
+
+static void do_listener(pn_driver_listener_t *dl) {
+ uv_mutex_lock(&dl->dsocket.driver->lock);
+ bool do_wake = dl->dsocket.wake;
+ uv_mutex_unlock(&dl->dsocket.driver->lock);
+ if (do_wake)
+ do_close(&dl->dsocket);
+ else if (dl->pending > 0) /* Ready for another accept */
+ to_user(&dl->dsocket);
+ /* Don't need to resume IO, the uv_listen call remains in force all the time. */
+}
+
+/* Called by leader to handle queued requests */
+static void do_socket(dsocket *ds) {
+ switch ((dsocket_state)ds->state) {
+ case INACTIVE:
+ do_activate(ds);
+ if (ds->state == INACTIVE)
+ ds->state = RUNNING;
+ break;
+ case RUNNING:
+ if (ds->is_conn) {
+ do_connection((pn_driver_connection_t*)ds);
+ } else {
+ do_listener((pn_driver_listener_t*)ds);
+ }
+ break;
+ case CLOSING:
+ break;
+ case CLOSED:
+ to_user(ds); /* Return FINISHED event to user */
+ break;
+ }
+}
+
+/* Fill in event and return true or return false if no events are available. */
+bool get_event_lh(driver *d, pn_driver_event_t *event) {
+ if (d->interrupt > 0) {
+ --d->interrupt;
+ event->type = PN_DRIVER_INTERRUPT;
+ event->connection = NULL;
+ return true;
+ }
+ dsocket *ds = pop(&d->user_q);
+ if (!ds) return false;
+ if (ds->is_conn) {
+ pn_driver_connection_t *dc = event->connection = (pn_driver_connection_t*)ds;
+ if (pn_connection_engine_finished(&dc->engine) && ds->state == CLOSED)
+ event->type = PN_DRIVER_CONNECTION_FINISHED;
+ else
+ event->type = PN_DRIVER_CONNECTION_READY;
+ return true;
+ } else { /* Listener */
+ pn_driver_listener_t *dl = event->listener = (pn_driver_listener_t*)ds;
+ if (dl->dsocket.state == CLOSED) {
+ event->type = PN_DRIVER_LISTENER_FINISHED;
+ return true;
+ }
+ else if (dl->pending > 0) {
+ event->type = PN_DRIVER_LISTENER_READY;
+ return true;
+ }
+ }
+ return false;
+}
+
+
+pn_driver_event_t pn_driver_wait(struct pn_driver_t* d) {
+ uv_mutex_lock(&d->lock);
+ pn_driver_event_t event;
+ /* Try to grab work immediately. */
+ if (!get_event_lh(d, &event)) {
+ /* No work available, follow the leader */
+ while (d->has_leader)
+ uv_cond_wait(&d->cond, &d->lock);
+ d->has_leader = true; /* I am the leader */
+ while (!get_event_lh(d, &event)) { /* Lead till there is work to do. */
+ /* Run IO outside the lock */
+ uv_mutex_unlock(&d->lock);
+ uv_run(&d->loop, UV_RUN_ONCE);
+ uv_mutex_lock(&d->lock);
+ /* Process leader requests outside the lock */
+ for (dsocket* ds = pop(&d->leader_q); ds; ds = pop(&d->leader_q)) {
+ uv_mutex_unlock(&d->lock);
+ do_socket(ds);
+ uv_mutex_lock(&d->lock);
+ }
+ }
+ d->has_leader = false;
+ uv_cond_signal(&d->cond); /* Signal the next leader */
+ }
+ uv_mutex_unlock(&d->lock);
+ return event;
+}
+
+void pn_driver_interrupt(pn_driver_t *d) {
+ uv_mutex_lock(&d->lock);
+ ++d->interrupt;
+ uv_async_send(&d->async); /* Interrupt the UV loop */
+ uv_mutex_unlock(&d->lock);
+}
+
+void pn_driver_connect(pn_driver_connection_t* dc, const char *host, const char *service, const char *network) {
+ dc->is_accept = false;
+ set_addr(&dc->addr, host, service);
+ to_leader(&dc->dsocket);
+}
+
+void pn_driver_watch(pn_driver_connection_t *dc) {
+ to_leader(&dc->dsocket);
+}
+
+static void on_connect(uv_connect_t* connect, int status);
+
+void pn_driver_connection_init(pn_driver_t *d, pn_driver_connection_t *dc) {
+ memset(dc, 0, sizeof(*dc));
+ dsocket_init(&dc->dsocket, d, true);
+ pn_connection_engine_init(&dc->engine);
+ pn_connection_set_context(dc->engine.connection, dc);
+ dc->connect.data = dc;
+ dc->write.data = dc;
+}
+
+void pn_driver_connection_final(pn_driver_connection_t* dc) {
+ pn_connection_engine_final(&dc->engine);
+}
+
+void pn_driver_wake(pn_driver_connection_t* dc) {
+ uv_mutex_lock(&dc->dsocket.driver->lock);
+ dc->dsocket.wake = true;
+ if (!dc->dsocket.user)
+ to_leader_lh(&dc->dsocket);
+ uv_mutex_unlock(&dc->dsocket.driver->lock);
+}
+
+void pn_driver_listener_init(pn_driver_t *d, pn_driver_listener_t *dl) {
+ memset(dl, 0, sizeof(*dl));
+ dsocket_init(&dl->dsocket, d, false);
+}
+
+void pn_driver_listen(pn_driver_listener_t *dl, const char *host, const char *service, const char *network, int backlog) {
+ set_addr(&dl->addr, host, service);
+ dl->backlog = backlog;
+ dl->pending = 0;
+ to_leader(&dl->dsocket);
+}
+
+void pn_driver_accept(pn_driver_listener_t* dl, pn_driver_connection_t* dc) {
+ dc->is_accept = true;
+ dc->listener = dl;
+ to_leader(&dc->dsocket);
+ to_leader(&dl->dsocket); /* Re-activate the listener */
+}
+
+void pn_driver_listener_close(pn_driver_listener_t* dl) {
+ uv_mutex_lock(&dl->dsocket.driver->lock);
+ dl->dsocket.wake = true;
+ if (!dl->dsocket.user)
+ to_leader_lh(&dl->dsocket);
+ uv_mutex_unlock(&dl->dsocket.driver->lock);
+}
+
+const char* pn_driver_listener_error(pn_driver_listener_t* dl) {
+ return dl->error[0] ? dl->error : NULL;
+}
+
+void pn_driver_listener_final(pn_driver_listener_t* l) {
+ /* Nothing to do. */
+}
+
+void pn_driver_init(pn_driver_t *d) {
+ memset(d, '\0', sizeof(*d));
+ uv_mutex_init(&d->lock);
+ uv_cond_init(&d->cond);
+ uv_loop_init(&d->loop);
+ uv_async_init(&d->loop, &d->async, NULL); /* Just wake the loop */
+}
+
+static void on_stopping(uv_handle_t* h, void* v) {
+ uv_close(h, NULL);
+ if (!uv_loop_alive(h->loop))
+ uv_stop(h->loop);
+}
+
+void pn_driver_final(pn_driver_t *pd) {
+ driver *d = (driver*)pd;
+ uv_walk(&d->loop, on_stopping, NULL); /* Close all handles */
+ uv_run(&d->loop, UV_RUN_DEFAULT); /* Run till stop, all handles closed */
+ uv_loop_close(&d->loop);
+ uv_mutex_destroy(&d->lock);
+ uv_cond_destroy(&d->cond);
+}
+
+pn_driver_connection_t *pn_driver_connection_get(pn_connection_t* c) {
+ return c ? (pn_driver_connection_t*)pn_connection_get_context(c) : NULL;
+}
+
+pn_connection_engine_t *pn_driver_engine(pn_driver_connection_t *dc) {
+ return dc ? &dc->engine : NULL;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/examples/c/driver/libuv_driver.h
----------------------------------------------------------------------
diff --git a/examples/c/driver/libuv_driver.h b/examples/c/driver/libuv_driver.h
new file mode 100644
index 0000000..f00629c
--- /dev/null
+++ b/examples/c/driver/libuv_driver.h
@@ -0,0 +1,111 @@
+#ifndef LIBUV_DRIVER_H
+#define LIBUV_DRIVER_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**@cond INTERNAL */
+
+/*
+ Definitions for libuv driver implementation.
+ Included as part of proton/driver.h if libuv implementation is selected.
+
+ Defines structs specific to libuv driver, see proton/driver.h for public API.
+*/
+
+#include <proton/connection_engine.h>
+#include <uv.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct pn_driver_t pn_driver_t;
+
+typedef struct pn_uv_addr_t {
+ char host[NI_MAXHOST];
+ char service[NI_MAXSERV];
+} pn_uv_addr_t;
+
+typedef struct pn_uv_socket_t {
+ /* Protected by driver.lock */
+ struct pn_uv_socket_t* next;
+ bool user:1; /* In use by user thread */
+ bool wake:1; /* Wake or close requested */
+
+ /* Remaining members only used in leader thread */
+ int state;
+ uv_tcp_t socket;
+ pn_driver_t *driver;
+ bool is_conn:1; /* True for connection, false for listener. */
+} pn_uv_socket_t;
+
+typedef struct pn_driver_connection_t {
+ pn_uv_socket_t dsocket;
+
+ /* Members only used by user or leader thread, never both at the same time. */
+ pn_connection_engine_t engine;
+ union {
+ struct {
+ pn_uv_addr_t addr; /* for connect() */
+ uv_connect_t connect;
+ };
+ pn_driver_listener_t *listener; /* for accept() */
+ };
+ uv_write_t write;
+ size_t writing;
+ bool reading:1;
+ bool is_accept:1;
+} pn_driver_connection_t;
+
+#define PN_UV_MAX_ERR_LEN 128
+
+typedef struct pn_driver_listener_t {
+ pn_uv_socket_t dsocket;
+
+ /* Members only used by leader thread */
+ pn_uv_addr_t addr;
+ size_t backlog;
+ size_t pending;
+ char error[PN_UV_MAX_ERR_LEN];
+} pn_driver_listener_t;
+
+typedef struct pn_uv_queue_t { pn_uv_socket_t *front, *back; } pn_uv_queue_t;
+
+typedef struct pn_driver_t {
+ uv_mutex_t lock;
+ pn_uv_queue_t user_q;
+ pn_uv_queue_t leader_q;
+ size_t interrupt;
+
+ uv_cond_t cond;
+ uv_loop_t loop;
+ uv_async_t async;
+
+ bool has_leader:1;
+} pn_driver_t;
+
+#ifdef __cplusplus
+}
+#endif
+
+/**@endcond INTERNAL */
+
+#endif // LIBUV_DRIVER_H
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/examples/c/driver/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/driver/receive.c b/examples/c/driver/receive.c
new file mode 100644
index 0000000..d9c04db
--- /dev/null
+++ b/examples/c/driver/receive.c
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/connection_engine.h>
+#include <proton/delivery.h>
+#include <proton/driver.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+typedef char str[1024];
+
+typedef struct app_data_t {
+ str address;
+ str container_id;
+ pn_rwbytes_t message_buffer;
+ int message_count;
+ int received;
+} app_data_t;
+
+static const int BATCH = 100; /* Batch size for unlimited receive */
+
+static void print_condition(pn_event_t *e, pn_condition_t *cond) {
+ if (pn_condition_is_set(cond))
+ fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+ pn_condition_get_name(cond), pn_condition_get_description(cond));
+}
+
+static void dispatch(app_data_t* app, pn_connection_engine_t* eng) {
+ for (pn_event_t *event = pn_connection_engine_dispatch(eng);
+ event != NULL;
+ event = pn_connection_engine_dispatch(eng)
+ ) {
+ switch (pn_event_type(event)) {
+
+ case PN_CONNECTION_INIT: {
+ pn_connection_t* c = pn_event_connection(event);
+ pn_connection_set_container(c, app->container_id);
+ pn_connection_open(c);
+ pn_session_t* s = pn_session(c);
+ pn_session_open(s);
+ pn_link_t* l = pn_receiver(s, "my_receiver");
+ pn_terminus_set_address(pn_link_source(l), app->address);
+ pn_link_open(l);
+ /* cannot receive without granting credit: */
+ pn_link_flow(l, app->message_count ? app->message_count : BATCH);
+ } break;
+
+ case PN_DELIVERY: {
+ /* A message has been received */
+ pn_link_t *link = NULL;
+ pn_delivery_t *dlv = pn_event_delivery(event);
+ if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
+ link = pn_delivery_link(dlv);
+ /* Accept the delivery */
+ pn_delivery_update(dlv, PN_ACCEPTED);
+ /* done with the delivery, move to the next and free it */
+ pn_link_advance(link);
+ pn_delivery_settle(dlv); /* dlv is now freed */
+
+ if (app->message_count == 0) {
+ /* receive forever - see if more credit is needed */
+ if (pn_link_credit(link) < BATCH/2) {
+ /* Grant enough credit to bring it up to BATCH: */
+ pn_link_flow(link, BATCH - pn_link_credit(link));
+ }
+ } else if (++app->received >= app->message_count) {
+ /* done receiving, close the endpoints */
+ printf("%d messages received\n", app->received);
+ pn_session_t *ssn = pn_link_session(link);
+ pn_link_close(link);
+ pn_session_close(ssn);
+ pn_connection_close(pn_session_connection(ssn));
+ }
+ }
+ } break;
+
+ case PN_TRANSPORT_ERROR:
+ print_condition(event, pn_transport_condition(pn_event_transport(event)));
+ break;
+
+ case PN_CONNECTION_REMOTE_CLOSE:
+ print_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_SESSION_REMOTE_CLOSE:
+ print_condition(event, pn_session_remote_condition(pn_event_session(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_LINK_REMOTE_CLOSE:
+ case PN_LINK_REMOTE_DETACH:
+ print_condition(event, pn_link_remote_condition(pn_event_link(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ default: break;
+ }
+ }
+}
+
+static void usage(const char *arg0) {
+ fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+ exit(1);
+}
+
+int main(int argc, char **argv) {
+ /* Default values for application and connection. */
+ app_data_t app = {0};
+ app.message_count = 100;
+ const char* urlstr = NULL;
+
+ int opt;
+ while((opt = getopt(argc, argv, "a:m:")) != -1) {
+ switch(opt) {
+ case 'a': urlstr = optarg; break;
+ case 'm': app.message_count = atoi(optarg); break;
+ default: usage(argv[0]); break;
+ }
+ }
+ if (optind < argc)
+ usage(argv[0]);
+
+ snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
+
+ /* Parse the URL or use default values */
+ pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+ const char *host = url ? pn_url_get_host(url) : NULL;
+ const char *port = url ? pn_url_get_port(url) : NULL;
+ if (!port) port = "amqp";
+ strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
+
+ /* Create the driver and connect */
+ pn_driver_t driver;
+ pn_driver_init(&driver);
+ pn_driver_connection_t dc;
+ pn_driver_connection_init(&driver, &dc);
+ pn_driver_connect(&dc, host, port, NULL);
+ if (url) pn_url_free(url);
+
+ for (pn_driver_event_t e = pn_driver_wait(&driver);
+ e.type != PN_DRIVER_INTERRUPT;
+ e = pn_driver_wait(&driver))
+ {
+ switch (e.type) {
+ case PN_DRIVER_CONNECTION_READY:
+ dispatch(&app, pn_driver_engine(e.connection));
+ pn_driver_watch(e.connection);
+ break;
+ case PN_DRIVER_CONNECTION_FINISHED:
+ pn_driver_connection_final(e.connection);
+ pn_driver_interrupt(&driver);
+ break;
+ default:
+ break;
+ }
+ }
+ pn_driver_final(&driver);
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/examples/c/driver/send.c
----------------------------------------------------------------------
diff --git a/examples/c/driver/send.c b/examples/c/driver/send.c
new file mode 100644
index 0000000..b7e8200
--- /dev/null
+++ b/examples/c/driver/send.c
@@ -0,0 +1,215 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/connection_engine.h>
+#include <proton/delivery.h>
+#include <proton/driver.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+typedef char str[1024];
+
+typedef struct app_data_t {
+ str address;
+ str container_id;
+ pn_rwbytes_t message_buffer;
+ int message_count;
+ int sent;
+ int acknowledged;
+} app_data_t;
+
+/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
+static pn_bytes_t encode_message(app_data_t* app) {
+ /* Construct a message with the map { "sequence": app.sent } */
+ pn_message_t* message = pn_message();
+ pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+ pn_data_t* body = pn_message_body(message);
+ pn_data_put_map(body);
+ pn_data_enter(body);
+ pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+ pn_data_put_int(body, app->sent); /* The sequence number */
+ pn_data_exit(body);
+
+ /* encode the message, expanding the encode buffer as needed */
+ if (app->message_buffer.start == NULL) {
+ static const size_t initial_size = 128;
+ app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+ }
+ /* app->message_buffer is the total buffer space available. */
+ /* mbuf wil point at just the portion used by the encoded message */
+ pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+ int status = 0;
+ while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+ app->message_buffer.size *= 2;
+ app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+ mbuf.size = app->message_buffer.size;
+ }
+ if (status != 0) {
+ fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+ exit(1);
+ }
+ pn_message_free(message);
+ return pn_bytes(mbuf.size, mbuf.start);
+}
+
+static void print_condition(pn_event_t *e, pn_condition_t *cond) {
+ if (pn_condition_is_set(cond))
+ fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+ pn_condition_get_name(cond), pn_condition_get_description(cond));
+}
+
+/* Dispatch proton events. */
+static void dispatch(app_data_t* app, pn_connection_engine_t* eng) {
+ for (pn_event_t *event = pn_connection_engine_dispatch(eng);
+ event != NULL;
+ event = pn_connection_engine_dispatch(eng)
+ ) {
+ switch (pn_event_type(event)) {
+
+ case PN_CONNECTION_INIT: {
+ pn_connection_t* c = pn_event_connection(event);
+ pn_connection_set_container(c, app->container_id);
+ pn_connection_open(c);
+ pn_session_t* s = pn_session(c);
+ pn_session_open(s);
+ pn_link_t* l = pn_sender(s, "my_sender");
+ pn_terminus_set_address(pn_link_target(l), app->address);
+ pn_link_open(l);
+ } break;
+
+ case PN_LINK_FLOW: {
+ /* The peer has given us some credit, now we can send messages */
+ pn_link_t *sender = pn_event_link(event);
+ while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
+ ++app->sent;
+ /* TODO aconway 2016-06-30: explain delivery tag */
+ pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+ pn_bytes_t msgbuf = encode_message(app);
+ pn_link_send(sender, msgbuf.start, msgbuf.size);
+ pn_link_advance(sender);
+ }
+ } break;
+
+ case PN_DELIVERY: {
+ /* We received acknowledgedment from the peer that a message was delivered. */
+ pn_delivery_t* d = pn_event_delivery(event);
+ if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
+ if (++app->acknowledged == app->message_count) {
+ printf("%d messages sent and acknowledged\n", app->acknowledged);
+ pn_connection_close(pn_event_connection(event));
+ }
+ }
+ } break;
+
+ case PN_TRANSPORT_ERROR:
+ print_condition(event, pn_transport_condition(pn_event_transport(event)));
+ break;
+
+ case PN_CONNECTION_REMOTE_CLOSE:
+ print_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_SESSION_REMOTE_CLOSE:
+ print_condition(event, pn_session_remote_condition(pn_event_session(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ case PN_LINK_REMOTE_CLOSE:
+ case PN_LINK_REMOTE_DETACH:
+ print_condition(event, pn_link_remote_condition(pn_event_link(event)));
+ pn_connection_close(pn_event_connection(event));
+ break;
+
+ default: break;
+ }
+ }
+}
+
+static void usage(const char *arg0) {
+ fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+ exit(1);
+}
+
+int main(int argc, char **argv) {
+ /* Default values for application and connection. */
+ app_data_t app = {0};
+ app.message_count = 100;
+ const char* urlstr = NULL;
+
+ int opt;
+ while((opt = getopt(argc, argv, "a:m:")) != -1) {
+ switch(opt) {
+ case 'a': urlstr = optarg; break;
+ case 'm': app.message_count = atoi(optarg); break;
+ default: usage(argv[0]); break;
+ }
+ }
+ if (optind < argc)
+ usage(argv[0]);
+
+ snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
+
+ /* Parse the URL or use default values */
+ pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+ const char *host = url ? pn_url_get_host(url) : NULL;
+ const char *port = url ? pn_url_get_port(url) : NULL;
+ if (!port) port = "amqp";
+ strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
+
+ /* Create the driver and connect */
+ pn_driver_t driver;
+ pn_driver_init(&driver);
+ pn_driver_connection_t dc;
+ pn_driver_connection_init(&driver, &dc);
+ pn_driver_connect(&dc, host, port, NULL);
+ if (url) pn_url_free(url);
+
+ for (pn_driver_event_t e = pn_driver_wait(&driver);
+ e.type != PN_DRIVER_INTERRUPT;
+ e = pn_driver_wait(&driver))
+ {
+ switch (e.type) {
+ case PN_DRIVER_CONNECTION_READY:
+ dispatch(&app, pn_driver_engine(e.connection));
+ pn_driver_watch(e.connection);
+ break;
+ case PN_DRIVER_CONNECTION_FINISHED:
+ pn_driver_connection_final(e.connection);
+ pn_driver_interrupt(&driver);
+ break;
+ default:
+ break;
+ }
+ }
+ pn_driver_final(&driver);
+ free(app.message_buffer.start);
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/examples/c/driver/test.py
----------------------------------------------------------------------
diff --git a/examples/c/driver/test.py b/examples/c/driver/test.py
new file mode 100644
index 0000000..bb4d7eb
--- /dev/null
+++ b/examples/c/driver/test.py
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+# This is a test script to run the examples and verify that they behave as expected.
+
+from exampletest import *
+
+import unittest
+import sys
+
+def python_cmd(name):
+ dir = os.path.dirname(__file__)
+ return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
+
+class CExampleTest(BrokerTestCase):
+ broker_exe = ["libuv_broker"]
+
+ def test_send_receive(self):
+ """Send first then receive"""
+ s = self.proc(["libuv_send", "-a", self.addr])
+ self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
+ r = self.proc(["libuv_receive", "-a", self.addr])
+ self.assertEqual("100 messages received\n", r.wait_out())
+
+ def test_receive_send(self):
+ """Start receiving first, then send."""
+ r = self.proc(["libuv_receive", "-a", self.addr]);
+ s = self.proc(["libuv_send", "-a", self.addr]);
+ self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
+ self.assertEqual("100 messages received\n", r.wait_out())
+
+if __name__ == "__main__":
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/examples/cpp/README.dox
----------------------------------------------------------------------
diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox
index 1d46ec8..421dd34 100644
--- a/examples/cpp/README.dox
+++ b/examples/cpp/README.dox
@@ -161,3 +161,5 @@ A working example for accessing Service Bus session-enabled queues.
Also provides some general notes on Service Bus usage.
*/
+
+/** FIXME aconway - documentation for driver examples */
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/examples/cpp/mt/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/broker.cpp b/examples/cpp/mt/broker.cpp
index 39d7132..0945a07 100644
--- a/examples/cpp/mt/broker.cpp
+++ b/examples/cpp/mt/broker.cpp
@@ -149,6 +149,7 @@ class broker_connection_handler : public proton::messaging_handler {
// A sender sends messages from a queue to a subscriber.
void on_sender_open(proton::sender &sender) OVERRIDE {
+ // FIXME aconway 2016-09-29: need to set the source address here.
queue *q = sender.source().dynamic() ?
queues_.dynamic() : queues_.get(sender.source().address());
std::cout << "sending from " << q->name() << std::endl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/examples/exampletest.py
----------------------------------------------------------------------
diff --git a/examples/exampletest.py b/examples/exampletest.py
new file mode 100644
index 0000000..d40b9cb
--- /dev/null
+++ b/examples/exampletest.py
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+# A test library to make it easy to run unittest tests that start,
+# monitor, and report output from sub-processes. In particular
+# it helps with starting processes that listen on random ports.
+
+import unittest
+import os, sys, socket, time, re, inspect, errno, threading
+from random import randrange
+from subprocess import Popen, PIPE, STDOUT
+from copy import copy
+import platform
+from os.path import dirname as dirname
+
+def pick_port():
+ """Pick a random port."""
+ p = randrange(10000, 20000)
+ return p
+
+class ProcError(Exception):
+ """An exception that captures failed process output"""
+ def __init__(self, proc, what="bad exit status"):
+ out = proc.out.strip()
+ if out:
+ out = "\nvvvvvvvvvvvvvvvv\n%s\n^^^^^^^^^^^^^^^^\n" % out
+ else:
+ out = ", no output)"
+ super(Exception, self, ).__init__(
+ "%s %s, code=%s%s" % (proc.args, what, proc.returncode, out))
+
+class NotFoundError(ProcError):
+ pass
+
+class Proc(Popen):
+ """A example process that stores its output, optionally run with valgrind."""
+
+ if "VALGRIND" in os.environ and os.environ["VALGRIND"]:
+ env_args = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet", "--leak-check=full"]
+ else:
+ env_args = []
+
+ @property
+ def out(self):
+ self._out.seek(0)
+ return self._out.read()
+
+ def __init__(self, args, **kwargs):
+ """Start an example process"""
+ args = list(args)
+ self.args = args
+ self._out = os.tmpfile()
+ try:
+ Popen.__init__(self, self.env_args + self.args, stdout=self._out, stderr=STDOUT, **kwargs)
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ raise NotFoundError(self, str(e))
+ raise ProcError(self, str(e))
+ except Exception, e:
+ raise ProcError(self, str(e))
+
+ def kill(self):
+ try:
+ if self.poll() is None:
+ Popen.kill(self)
+ except:
+ pass # Already exited.
+ return self.out
+
+ def wait_out(self, timeout=10, expect=0):
+ """Wait for process to exit, return output. Raise ProcError on failure."""
+ t = threading.Thread(target=self.wait)
+ t.start()
+ t.join(timeout)
+ if self.poll() is None: # Still running
+ self.kill()
+ raise ProcError(self, "timeout")
+ if expect is not None and self.poll() != expect:
+ raise ProcError(self)
+ return self.out
+
+# Work-around older python unittest that lacks setUpClass.
+if hasattr(unittest.TestCase, 'setUpClass') and hasattr(unittest.TestCase, 'tearDownClass'):
+ TestCase = unittest.TestCase
+else:
+ class TestCase(unittest.TestCase):
+ """
+ Roughly provides setUpClass and tearDownClass functionality for older python
+ versions in our test scenarios. If subclasses override setUp or tearDown
+ they *must* call the superclass.
+ """
+ def setUp(self):
+ if not hasattr(type(self), '_setup_class_count'):
+ type(self)._setup_class_count = len(
+ inspect.getmembers(
+ type(self),
+ predicate=lambda(m): inspect.ismethod(m) and m.__name__.startswith('test_')))
+ type(self).setUpClass()
+
+ def tearDown(self):
+ self.assertTrue(self._setup_class_count > 0)
+ self._setup_class_count -= 1
+ if self._setup_class_count == 0:
+ type(self).tearDownClass()
+
+class ExampleTestCase(TestCase):
+ """TestCase that manages started processes"""
+ def setUp(self):
+ super(ExampleTestCase, self).setUp()
+ self.procs = []
+
+ def tearDown(self):
+ for p in self.procs:
+ p.kill()
+ super(ExampleTestCase, self).tearDown()
+
+ def proc(self, *args, **kwargs):
+ p = Proc(*args, **kwargs)
+ self.procs.append(p)
+ return p
+
+def wait_port(port, timeout=10):
+ """Wait up to timeout for port to be connectable."""
+ if timeout:
+ deadline = time.time() + timeout
+ while (timeout is None or time.time() < deadline):
+ try:
+ s = socket.create_connection((None, port), timeout) # Works for IPv6 and v4
+ s.close()
+ return
+ except socket.error, e:
+ if e.errno != errno.ECONNREFUSED: # Only retry on connection refused error.
+ raise
+ raise socket.timeout()
+
+
+class BrokerTestCase(ExampleTestCase):
+ """
+ ExampleTest that starts a broker in setUpClass and kills it in tearDownClass.
+ Subclass must set `broker_exe` class variable with the name of the broker executable.
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ cls.port = pick_port()
+ cls.addr = "127.0.0.1:%s/examples" % (cls.port)
+ cls.broker = None # In case Proc throws, create the attribute.
+ cls.broker = Proc(cls.broker_exe + ["-a", cls.addr])
+ try:
+ wait_port(cls.port)
+ except Exception, e:
+ cls.broker.kill()
+ raise ProcError(cls.broker, "timed out waiting for port")
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls.broker: cls.broker.kill()
+
+ def tearDown(self):
+ b = type(self).broker
+ if b and b.poll() != None: # Broker crashed
+ type(self).setUpClass() # Start another for the next test.
+ raise ProcError(b, "broker crash")
+ super(BrokerTestCase, self).tearDown()
+
+if __name__ == "__main__":
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/proton-c/docs/api/index.md
----------------------------------------------------------------------
diff --git a/proton-c/docs/api/index.md b/proton-c/docs/api/index.md
index 10aea84..4a6dec6 100644
--- a/proton-c/docs/api/index.md
+++ b/proton-c/docs/api/index.md
@@ -1,5 +1,50 @@
Proton Documentation {#index}
====================
-The proton library contains two APIs: The [Engine API](@ref engine),
-and the [Messenger API](@ref messenger).
+## The Protocol Engine
+
+The [Engine API](@ref engine) is a "pure AMQP" toolkit, it decodes AMQP bytes
+into proton [events](@ref event) and generates AMQP bytes from application
+calls.
+
+The [connection engine](@ref connection_engine) provides a simple bytes in/bytes
+out, event-driven interface so you can read AMQP data from any source, process
+the resulting [events](@ref event) and write AMQP output to any destination.
+
+There is no IO or threading code in this part of the library, so it can be
+embedded in many different environments. The proton project provides language
+bindings (Python, Ruby, Go etc.) that embed it into the standard IO and
+threading facilities of the bound language.
+
+## Integrating with IO
+
+The [Driver SPI](@ref driver) is a portable framework to build single or
+multi-threaded Proton C applications with replaceable IO implementations.
+
+The driver can initiate or listen for connections. Application threads wait for
+a [connection engine](@ref connection_engine) to become ready for processing due
+to an IO event. The application thread uses [Engine API](@ref engine) described
+above, fully decoupled from the IO mechanism.
+
+The driver is the basis for the Proton C++ binding and the
+[Qpid Disptch Router](http://qpid.apache.org/components/dispatch-router/) The
+Proton project provides drivers for many platforms, and you can implement your
+own.
+
+## Messenger and Reactor APIs
+
+The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs were intended
+to be simple APIs that included IO support directly out of the box.
+
+They both had good points but were both based on POSIX-style polling
+assumptions, and did not support concurrent or multi-threaded use. This creates
+several problems:
+
+- Difficult to port (e.g. Windows poll() is inefficient, IOCP has a different model)
+- Difficult to integrate: even in python and ruby, foreign C code blocking on IO violates normal IO and threading assumptions and requires some dubious hacking to make it work.
+- Impossible to use in multi-threaded servers or concurrent languages like Go.
+
+Note however that the Reactor API was built around the Proton @ref engine and
+@ref event APIs. For the most part using the @ref driver or @ref connection_engine
+is the same as using the reactor. Connection setup/teardown and pumping the
+event loop are the main differences.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-proton git commit: C driver for C/C++ bindings and direct
C users.
Posted by ac...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/proton-c/include/proton/connection_engine.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection_engine.h b/proton-c/include/proton/connection_engine.h
index d9df77b..bae27d5 100644
--- a/proton-c/include/proton/connection_engine.h
+++ b/proton-c/include/proton/connection_engine.h
@@ -20,48 +20,31 @@
* under the License.
*/
-///@file
-///
-/// **Experimental** The Connection Engine API wraps up the proton engine
-/// objects associated with a single connection: pn_connection_t, pn_transport_t
-/// and pn_collector_t. It provides a simple bytes-in/bytes-out interface for IO
-/// and generates pn_event_t events to be handled by the application.
-///
-/// The connection engine can be fed with raw AMQP bytes from any source, and it
-/// generates AMQP byte output to be written to any destination. You can use the
-/// engine to integrate proton AMQP with any IO library, or native IO on any
-/// platform.
-///
-/// The engine is not thread safe but each engine is independent. Separate
-/// engines can be used concurrently. For example a multi-threaded application
-/// can process connections in multiple threads, but serialize work for each
-/// connection to the corresponding engine.
-///
-/// The engine is designed to be thread and IO neutral so it can be integrated with
-/// single or multi-threaded code in reactive or proactive IO frameworks.
-///
-/// Summary of use:
-///
-/// - while !pn_connection_engine_finished()
-/// - Call pn_connection_engine_dispatch() to dispatch events until it returns NULL.
-/// - Read data from your source into pn_connection_engine_read_buffer()
-/// - Call pn_connection_engine_read_done() when complete.
-/// - Write data from pn_connection_engine_write_buffer() to your destination.
-/// - Call pn_connection_engine_write_done() to indicate how much was written.
-///
-/// Note on blocking: the _read/write_buffer and _read/write_done functions can
-/// all generate events that may cause the engine to finish. Before you wait for
-/// IO, always drain pn_connection_engine_dispatch() till it returns NULL and
-/// check pn_connection_engine_finished() in case there is nothing more to do..
-///
-/// Note on error handling: the pn_connection_engine_*() functions do not return
-/// an error code. If an error occurs it will be reported as a
-/// PN_TRANSPORT_ERROR event and pn_connection_engine_finished() will return
-/// true once all final events have been processed.
-///
-/// @defgroup connection_engine The Connection Engine
-/// @{
-///
+
+/**
+ * @file
+ * @defgroup connection_engine Connection Engine
+ * @ingroup engine
+ *
+ * **Experimental**: AMQP input bytes into proton @ref event "events", and
+ * application calls to the @ref engine API into AMQP output bytes.
+ *
+ * Each individual engine is not thread safe but separate engines can be used
+ * concurrently. A multi-threaded application must serialize activity for each
+ * connection but can process separate connections concurrently.
+ *
+ * Note on blocking: the _read/write_buffer and _read/write_done functions can
+ * generate events that may cause the engine to finish. Before you wait for IO,
+ * always drain pn_connection_engine_dispatch() till it returns NULL and check
+ * pn_connection_engine_finished() in case there is nothing more to do.
+ *
+ * Note on error handling: the pn_connection_engine_*() functions do not return
+ * an error code. If an error occurs it will be reported as a
+ * PN_TRANSPORT_ERROR event and pn_connection_engine_finished() will return
+ * true once all final events have been processed.
+ *
+ * @{
+ */
#include <proton/condition.h>
#include <proton/event.h>
@@ -72,11 +55,11 @@
extern "C" {
#endif
-/// A connection engine is a trio of pn_connection_t, pn_transport_t and pn_collector_t.
-/// Use the pn_connection_engine_*() functions to operate on it.
-/// It is a plain struct, not a proton object. Use pn_connection_engine_init to set up
-/// the initial objects and pn_connection_engine_final to release them.
-///
+/**
+ * connection_engine is a plain struct, not a proton object. Use
+ * pn_connection_engine_init to initialize and pn_connection_engine_final before
+ * freeing.
+ */
typedef struct pn_connection_engine_t {
pn_connection_t* connection;
pn_transport_t* transport;
@@ -84,98 +67,126 @@ typedef struct pn_connection_engine_t {
pn_event_t* event;
} pn_connection_engine_t;
-/// Initialize a pn_connection_engine_t struct with a new connection and
-/// transport.
-///
-/// Call pn_connection_engine_final to free resources when you are done.
-///
-///@return 0 on success, a proton error code on failure (@see error.h)
-///
+/**
+ * Initialize a pn_connection_engine_t struct, create a pn_connection_t and
+ * pn_transport_t. You can configure security properties on the connection,
+ * call connection_engine_start() to bind the transport before using the engine.
+ *
+ * @return 0 on success, a proton error code on failure (@see error.h)
+ */
PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t* engine);
-/// Free resources used by the engine, set the connection and transport pointers
-/// to NULL.
+/**
+ * Free resources used by the engine, set the connection and transport pointers
+ * to NULL.
+ */
PN_EXTERN void pn_connection_engine_final(pn_connection_engine_t* engine);
-/// Get the engine's read buffer. Read data from your IO source to buf.start, up
-/// to a max of buf.size. Then call pn_connection_engine_read_done().
-///
-/// buf.size==0 means the engine cannot read presently, calling
-/// pn_connection_engine_dispatch() may create more buffer space.
-///
+/**
+ * Get the engine's read buffer. Read data from your IO source to buf.start, up
+ * to a max of buf.size. Then call pn_connection_engine_read_done().
+ *
+ * buf.size==0 means the engine cannot read presently, calling
+ * pn_connection_engine_dispatch() may create more buffer space.
+ */
PN_EXTERN pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t*);
-/// Consume the first n bytes of data in pn_connection_engine_read_buffer() and
-/// update the buffer.
+/**
+ * Consume the first n bytes of data in pn_connection_engine_read_buffer() and
+ * update the buffer.
+ */
PN_EXTERN void pn_connection_engine_read_done(pn_connection_engine_t*, size_t n);
-/// Close the read side of the transport when no more data is available.
-/// Note there may still be events for pn_connection_engine_dispatch() or data
-/// in pn_connection_engine_write_buffer()
+/**
+ * Close the read side of the transport when no more data is available.
+ * Note there may still be events for pn_connection_engine_dispatch() or data
+ * in pn_connection_engine_write_buffer()
+ */
PN_EXTERN void pn_connection_engine_read_close(pn_connection_engine_t*);
-/// Get the engine's write buffer. Write data from buf.start to your IO destination,
-/// up to a max of buf.size. Then call pn_connection_engine_write_done().
-///
-/// buf.size==0 means the engine has nothing to write presently. Calling
-/// pn_connection_engine_dispatch() may generate more data.
+/**
+ * Get the engine's write buffer. Write data from buf.start to your IO destination,
+ * up to a max of buf.size. Then call pn_connection_engine_write_done().
+
+ * buf.size==0 means the engine has nothing to write presently. Calling
+ * pn_connection_engine_dispatch() may generate more data.
+ */
PN_EXTERN pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t*);
-/// Call when the first n bytes of pn_connection_engine_write_buffer() have been
-/// written to IO and can be re-used for new data. Updates the buffer.
+/**
+ * Call when the first n bytes of pn_connection_engine_write_buffer() have been
+ * written to IO and can be re-used for new data. Updates the buffer.
+ */
PN_EXTERN void pn_connection_engine_write_done(pn_connection_engine_t*, size_t n);
-/// Call when the write side of IO has closed and no more data can be written.
-/// Note that there may still be events for pn_connection_engine_dispatch() or
-/// data to read into pn_connection_engine_read_buffer().
+/**
+ * Call when the write side of IO has closed and no more data can be written.
+ * Note that there may still be events for pn_connection_engine_dispatch() or
+ * data to read into pn_connection_engine_read_buffer().
+ */
PN_EXTERN void pn_connection_engine_write_close(pn_connection_engine_t*);
-/// Close both sides of the transport, equivalent to
-/// pn_connection_engine_read_close(); pn_connection_engine_write_close()
-///
-/// You must still call pn_connection_engine_dispatch() to process final
-/// events.
-///
-/// To provide transport error information to the handler, set it on
-/// pn_connection_engine_condition()
-/// *before* calling pn_connection_engine_disconnected(). This sets
-/// the error on the pn_transport_t object.
-///
-/// Note this does *not* modify the pn_connection_t, so you can distinguish
-/// between a connection close error sent by the remote peer (which will set the
-/// connection condition) and a transport error (which sets the transport
-/// condition.)
-///
+/**
+ * Close both sides of the transport, equivalent to
+ *
+ * pn_connection_engine_read_close(); pn_connection_engine_write_close()
+ *
+ * To pass a transport error to the handler, set it on
+ * pn_connection_engine_condition() *before* calling
+ * pn_connection_engine_disconnected(). Note this is different from an AMQP
+ * close sent by the remote peer, which sets the connection condition.
+ *
+ * You must still call pn_connection_engine_dispatch() to process final
+ * events.
+ */
PN_EXTERN void pn_connection_engine_disconnected(pn_connection_engine_t*);
-/// Get the next available event.
-/// Call in a loop until it returns NULL to dispatch all available events.
-/// Note this call may modify the read and write buffers.
-///
-/// @return Pointer to the next event, or NULL if there are none available.
-///
+/**
+ * Get the next available event.
+ * Call in a loop until it returns NULL to dispatch all available events.
+ * Note this call may modify the read and write buffers.
+ *
+ * @return Pointer to the next event, or NULL if there are none available.
+ *
+ */
PN_EXTERN pn_event_t* pn_connection_engine_dispatch(pn_connection_engine_t*);
-/// Return true if the engine is finished - all data has been written, all
-/// events have been handled and the transport is closed.
+/**
+ * Return true if the engine is finished - all data has been written, all
+ * events have been handled and the transport is closed.
+ */
PN_EXTERN bool pn_connection_engine_finished(pn_connection_engine_t*);
-/// Get the AMQP connection, owned by the pn_connection_engine_t.
+/**
+ * Return true if the engine transport is closed. There may still be
+ * outstanding events to process.
+
+ * Check if the transport has been closed internally (e.g. by an authentication
+ * failure) before blocking for IO.
+ */
+PN_EXTERN bool pn_connection_engine_closed(pn_connection_engine_t*);
+
+/**
+ * Get the AMQP connection, owned by the pn_connection_engine_t.
+ */
PN_EXTERN pn_connection_t* pn_connection_engine_connection(pn_connection_engine_t*);
-/// Get the proton transport, owned by the pn_connection_engine_t.
+/**
+ * Get the proton transport, owned by the pn_connection_engine_t.
+ */
PN_EXTERN pn_transport_t* pn_connection_engine_transport(pn_connection_engine_t*);
-/// Get the condition object for the engine's transport.
-///
-/// Note that IO errors should be set on this, the transport condition, not on
-/// the pn_connection_t condition. The connection's condition is for errors
-/// received via the AMQP protocol, the transport condition is for errors in the
-/// the IO layer such as a socket read or disconnect errors.
-///
+/**
+ * Get the condition object for the engine's transport.
+ *
+ * Note that IO errors should be set on this, the transport condition, not on
+ * the pn_connection_t condition. The connection's condition is for errors
+ * received via the AMQP protocol, the transport condition is for errors in the
+ * the IO layer such as a socket read or disconnect errors.
+ */
PN_EXTERN pn_condition_t* pn_connection_engine_condition(pn_connection_engine_t*);
-///@}
+/** @}*/
#ifdef __cplusplus
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/proton-c/include/proton/driver.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/driver.h b/proton-c/include/proton/driver.h
new file mode 100644
index 0000000..145b3c3
--- /dev/null
+++ b/proton-c/include/proton/driver.h
@@ -0,0 +1,282 @@
+#ifndef PROTON_DRIVER_H
+#define PROTON_DRIVER_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* FIXME aconway 2016-10-13: TOO:
+ - handle transport ticks
+ - support for scheduled wakeup (leave task queueing outside like conn wakeup)
+ - check when driver is "empty" - not monitoring anything. For clean shutdown.
+*/
+
+/*@file
+ @defgroup driver
+
+
+*/
+#include <proton/types.h>
+#include <proton/import_export.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct pn_connection_engine_t pn_connection_engine_t;
+
+/**
+ * @defgroup driver The proton Driver API.
+ *
+ * **Experimental**: IO driver for a multi-threaded proton application
+ *
+ * The driver hides the details of the underlying IO platform, and handles IO
+ * read/write events. Multiple threads call pn_driver_wait(), which returns
+ * a @ref pn_connection_engine_t when it has events to dispatch.
+ *
+ * Each connection can only be processed in a single thread at a time. The
+ * pn_driver_wait() threads dispatch all the available events, then call
+ * pn_driver_watch() to let the driver resume monitoring IO for the connection.
+ * Different connections can be processed concurrently.
+ *
+ * The driver allows you to:
+ *
+ * - create outgoing connections
+ * - listen for incoming connections
+ * - wake connections for application processing
+ *
+ * Waking a connection causes it to be returned by a pn_driver_wait() call even
+ * if there are no IO events pending, so that you can safely do application
+ * processing that is not triggered by IO.
+ *
+ * ## Thread Safety
+ *
+ * Functions that take a pn_driver_t* parameter are thread safe.
+ *
+ * Unless noted otherwise, functions that take a pn_driver_connection_t* or
+ * pn_driver_listener_t* are not thread safe. They can be called sequentially
+ * after the connection/listener has been returned by pn_driver_wait() and until
+ * the connection/listener has been passed back to the driver by
+ * pn_driver_watch() or pn_driver_accept()
+ *
+ * ## Error handling
+ *
+ * Driver functions do not return an error code. Errors are indicated by
+ * PN_DRIVER_CONNECTION_FINISHED or PN_DRIVER_LISTENER_FINISHED events where the
+ * connection or listener carries the error information.
+ *
+ * ## Context information
+ *
+ * You can get the pn_driver_connection_t* associated with a pn_connection_t* via
+ * pn_driver_connection_get(). You can attach arbitrary additional data to the
+ * driver connection without extra allocations with this technique:
+
+ struct my_connection {
+ pn_driver_connection_t driver_conn;
+ // your additional data members here.
+ }
+ *
+ * Pass a pointer to the driver_conn member in a my_connection struct to
+ * pn_driver_connection_init() pn_driver_connect() and pn_driver_accept(). When
+ * pn_driver_wait() returns a pn_driver_connection_t*, you can cast it to
+ * my_connection* to access your data.
+ *
+ * You should not use pn_connection_context() with driver connections as
+ * the driver may use it internally.
+
+ *
+ * ## Ease of use features
+ *
+ * This driver provides minimal features to hide the underlying IO platorm.
+ * Additional features can be layered on top, but are not built-in.
+ *
+ * For example: a feature to "inject" function objects to a connection can be
+ * implemented by using the pn_driver_wake() function and associating a
+ * queue of functions with the connection, this is left for higher layers
+ * because the best way to implement it will depend on the environment.
+ *
+ * @{
+ */
+
+/**
+ * The driver struct, initialize with pn_driver_init
+ */
+typedef struct pn_driver_t pn_driver_t;
+
+/**
+ * The driver connection struct, initialize with pn_driver_connection_init.
+ * Call pn_driver_engine() to get the contained pn_connection_engine_t.
+ */
+typedef struct pn_driver_connection_t pn_driver_connection_t;
+
+/**
+ * The driver listener struct, initialize with pn_driver_listener_init
+ */
+typedef struct pn_driver_listener_t pn_driver_listener_t;
+
+/**
+ * Type of event returned by pn_driver_wait()
+ */
+typedef enum {
+ PN_DRIVER_CONNECTION_READY, /**< Connection ready for dispatch */
+ PN_DRIVER_CONNECTION_FINISHED, /**< Connection no longer active */
+ PN_DRIVER_LISTENER_READY, /**< Listener ready for accept */
+ PN_DRIVER_LISTENER_FINISHED, /**< Listener no longer active */
+ PN_DRIVER_INTERRUPT /**< pn_driver_interrupt() called */
+} pn_driver_event_type_t;
+
+/**
+ * Event returned by pn_driver_wait()
+ */
+typedef struct pn_driver_event_t {
+ pn_driver_event_type_t type;
+ union {
+ pn_driver_connection_t *connection;
+ pn_driver_listener_t *listener;
+ };
+} pn_driver_event_t;
+
+/**
+ * Initialize a pn_driver_t struct
+ */
+void pn_driver_init(pn_driver_t *d);
+
+/**
+ * Finalize a driver struct, free all resources. Driver itself can be freed afterwards.
+ */
+void pn_driver_final(pn_driver_t *d);
+
+/**
+ * Wait for a driver event. Can be called in multiple threads concurrently.
+ * It is safe to use the connection/listener in the returned event until it is
+ * passed back to the driver via pn_driver_watch() or pn_driver_accept()
+ */
+pn_driver_event_t pn_driver_wait(pn_driver_t* d);
+
+/**
+ * Return PN_DRIVER_INTERRUPT in a single thread calling wait(). Thread safe.
+ */
+void pn_driver_interrupt(pn_driver_t* d);
+
+/**
+ * Initialize a driver connection struct
+ */
+void pn_driver_connection_init(pn_driver_t*, pn_driver_connection_t*);
+
+/**
+ * Finalize a driver connection after it has been returned by a PN_DRIVER_CONNECTION_FINISHED.
+ * Can be freed after.
+ */
+void pn_driver_connection_final(pn_driver_connection_t *c);
+
+/**
+ * Connect to host:service with an initialized pn_driver_connection_t (thread
+ * safe). When there are events to dispatch, c it will be returned by
+ * pn_driver_wait() in a PN_DRIVER_CONNECTION_READY event. When c is finished
+ * and can be finalized and freed, it will be returned in a
+ * PN_DRIVER_CONNECTION_FINISHED event.
+ *
+ * @param c initialized driver connection struct
+ * @param host network host name
+ * @param service network service (aka port) name
+ * @param network can be NULL, placeholder for future multi-network drivers.
+ */
+void pn_driver_connect(pn_driver_connection_t* c, const char* host, const char* service, const char *network);
+
+/**
+ * Get the pn_connection_engine_t owned by a connection returned by pn_driver_wait().
+ */
+pn_connection_engine_t *pn_driver_engine(pn_driver_connection_t *c);
+
+/**
+ * Pass a connection that was previously returned by pn_driver_wait() back to the
+ * driver so it can monitor IO. It is not safe to use the connection until it is
+ * returned again by pn_driver_wait().
+ */
+void pn_driver_watch(pn_driver_connection_t *c);
+
+/**
+ * Cause a PN_DRIVER_CONNECTION_READY event for dc to be returned as soon as
+ * possible, even if there are no IO events (thread safe).
+ */
+void pn_driver_wake(pn_driver_connection_t *dc);
+
+/**
+ * Initialize a driver listener
+ */
+void pn_driver_listener_init(pn_driver_t*, pn_driver_listener_t *);
+
+/**
+ * Finalize a driver listener
+ */
+void pn_driver_listener_final(pn_driver_listener_t *);
+
+/**
+ * Listen on host:service with an initialized pn_driver_listener_t (thread safe).
+ * When there is an incoming connection pn_driver_wait() will return PN_DRIVER_LISTENER_READY,
+ * you should accept it with pn_driver_accept(). When the listener is closed it will
+ * be returned in a PN_DRIVER_LISTENER_FINISHED event.
+ *
+ * @param l initialized driver listener struct
+ * @param host network host name
+ * @param service network service (aka port) name
+ * @param network can be NULL, placeholder for future multi-network drivers.
+ * @param backlog number of incoming connections to buffer.
+ */
+void pn_driver_listen(pn_driver_listener_t* l, const char* host, const char* service, const char *network, int backlog);
+
+/**
+ * Accept a new connection on a listener returned in a PN_DRIVER_LISTENER_READY
+ * event by pn_driver_wait() It is not safe to use l or c until they are
+ * returned again by pn_driver_wait()
+ *
+ * @param l a ready listener, returned in a PN_DRIVER_LISTENER_READY event.
+ * @param c an initialized connection, not previously used for connect or accept.
+ */
+void pn_driver_accept(pn_driver_listener_t* l, pn_driver_connection_t* c);
+
+/**
+ * Close the listener (thread safe).
+ * Once closed it will be returned in a PN_DRIVER_LISTENER_FINISHED event.
+ */
+void pn_driver_listener_close(pn_driver_listener_t *l);
+
+/**
+ * Get the error (if any) on a listener returned in a PN_DRIVER_LISTENER_FINISHED event.
+ */
+const char* pn_driver_listener_error(pn_driver_listener_t*);
+
+/** Get the pn_driver_connection_t associated with a pn_connection_t if there is one */
+pn_driver_connection_t *pn_driver_connection_get(pn_connection_t*);
+
+/**
+ * @}
+ */
+
+#ifdef __cplusplus
+}
+#endif
+
+/* FIXME aconway 2016-10-20: Build flags to get consistent link/include of driver. Exports. */
+#ifdef PN_DRIVER_INCLUDE
+#include PN_DRIVER_INCLUDE
+#else
+#error "Define PN_DRIVER_INCLUDE as the driver implementation header file"
+#endif
+
+#endif // PROTON_DRIVER_IMPL_H
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index 16d2bda..8b69279 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -302,7 +302,7 @@ typedef enum {
PN_SELECTABLE_WRITABLE,
PN_SELECTABLE_ERROR,
PN_SELECTABLE_EXPIRED,
- PN_SELECTABLE_FINAL
+ PN_SELECTABLE_FINAL,
} pn_event_type_t;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/proton-c/src/driver/driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/driver/driver.c b/proton-c/src/driver/driver.c
new file mode 100644
index 0000000..042f3ce
--- /dev/null
+++ b/proton-c/src/driver/driver.c
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/proton-c/src/engine/connection_engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/connection_engine.c b/proton-c/src/engine/connection_engine.c
index 5d184a1..e1d1181 100644
--- a/proton-c/src/engine/connection_engine.c
+++ b/proton-c/src/engine/connection_engine.c
@@ -37,14 +37,17 @@ int pn_connection_engine_init(pn_connection_engine_t* e) {
}
void pn_connection_engine_final(pn_connection_engine_t* e) {
- if (e->transport && e->connection) {
+ if (e->transport) {
pn_transport_unbind(e->transport);
- pn_decref(e->transport);
+ pn_transport_free(e->transport);
}
- if (e->collector)
+ if (e->collector) {
+ pn_collector_release(e->collector);
pn_collector_free(e->collector); /* Break cycle with connection */
- if (e->connection)
- pn_decref(e->connection);
+ }
+ if (e->connection) {
+ pn_connection_free(e->connection);
+ }
memset(e, 0, sizeof(*e));
}
@@ -111,6 +114,10 @@ bool pn_connection_engine_finished(pn_connection_engine_t* e) {
return pn_transport_closed(e->transport) && (pn_collector_peek(e->collector) == NULL);
}
+bool pn_connection_engine_closed(pn_connection_engine_t* e) {
+ return pn_transport_closed(e->transport);
+}
+
pn_connection_t* pn_connection_engine_connection(pn_connection_engine_t* e) {
return e->connection;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/464c166c/tools/cmake/Modules/FindLibuv.cmake
----------------------------------------------------------------------
diff --git a/tools/cmake/Modules/FindLibuv.cmake b/tools/cmake/Modules/FindLibuv.cmake
new file mode 100644
index 0000000..ae3ab70
--- /dev/null
+++ b/tools/cmake/Modules/FindLibuv.cmake
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Find libuv include dirs and libraries.
+#
+# Sets the following variables:
+#
+# Libuv_FOUND - True if headers and requested libraries were found
+# Libuv_INCLUDE_DIRS - Libuv include directories
+# Libuv_LIBRARIES - Link these to use libuv.
+#
+# This module reads hints about search locations from variables::
+# LIBUV_ROOT - Preferred installation prefix
+# LIBUV_INCLUDEDIR - Preferred include directory e.g. <prefix>/include
+# LIBUV_LIBRARYDIR - Preferred library directory e.g. <prefix>/lib
+
+find_library(Libuv_LIBRARIES Names uv libuv HINTS ${LIBUV_LIBRARYDIR} ${LIBUV_ROOT})
+find_path(Libuv_INCLUDE_DIRS NAMES uv.h HINTS ${LIBUV_INCLUDEDIR} ${LIBUV_ROOT} ${LIBUV_ROOT}/include ${CMAKE_INSTALL_PREFIX}/include PATHS /usr/include)
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(Libuv DEFAULT_MSG Libuv_LIBRARIES Libuv_INCLUDE_DIRS)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org