You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/25 21:01:28 UTC
[32/48] qpid-proton git commit: PROTON-1344: proactor batch events,
rename connection_driver
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/io/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
deleted file mode 100644
index 5e6483f..0000000
--- a/proton-c/bindings/cpp/src/io/connection_engine.cpp
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "proton/io/connection_engine.hpp"
-
-#include "proton/event_loop.hpp"
-#include "proton/error.hpp"
-#include "proton/messaging_handler.hpp"
-#include "proton/uuid.hpp"
-
-#include "contexts.hpp"
-#include "messaging_adapter.hpp"
-#include "msg.hpp"
-#include "proton_bits.hpp"
-#include "proton_event.hpp"
-
-#include <proton/connection.h>
-#include <proton/transport.h>
-#include <proton/event.h>
-
-#include <algorithm>
-
-
-namespace proton {
-namespace io {
-
-void connection_engine::init() {
- if (pn_connection_engine_init(&engine_, pn_connection(), pn_transport()) != 0) {
- this->~connection_engine(); // Dtor won't be called on throw from ctor.
- throw proton::error(std::string("connection_engine allocation failed"));
- }
-}
-
-connection_engine::connection_engine() : handler_(0), container_(0) { init(); }
-
-connection_engine::connection_engine(class container& cont, event_loop* loop) : handler_(0), container_(&cont) {
- init();
- connection_context& ctx = connection_context::get(connection());
- ctx.container = container_;
- ctx.event_loop.reset(loop);
-}
-
-connection_engine::~connection_engine() {
- pn_connection_engine_destroy(&engine_);
-}
-
-void connection_engine::configure(const connection_options& opts, bool server) {
- proton::connection c(connection());
- opts.apply_unbound(c);
- if (server) pn_transport_set_server(engine_.transport);
- pn_connection_engine_bind(&engine_);
- opts.apply_bound(c);
- handler_ = opts.handler();
- connection_context::get(connection()).collector = engine_.collector;
-}
-
-void connection_engine::connect(const connection_options& opts) {
- connection_options all;
- if (container_) {
- all.container_id(container_->id());
- all.update(container_->client_connection_options());
- }
- all.update(opts);
- configure(all, false);
- connection().open();
-}
-
-void connection_engine::accept(const connection_options& opts) {
- connection_options all;
- if (container_) {
- all.container_id(container_->id());
- all.update(container_->server_connection_options());
- }
- all.update(opts);
- configure(all, true);
-}
-
-bool connection_engine::dispatch() {
- pn_event_t* c_event;
- while ((c_event = pn_connection_engine_event(&engine_)) != NULL) {
- proton_event cpp_event(c_event, container_);
- try {
- if (handler_ != 0) {
- messaging_adapter adapter(*handler_);
- cpp_event.dispatch(adapter);
- }
- } catch (const std::exception& e) {
- pn_condition_t *cond = pn_transport_condition(engine_.transport);
- if (!pn_condition_is_set(cond)) {
- pn_condition_format(cond, "exception", "%s", e.what());
- }
- }
- pn_connection_engine_pop_event(&engine_);
- }
- return !pn_connection_engine_finished(&engine_);
-}
-
-mutable_buffer connection_engine::read_buffer() {
- pn_rwbytes_t buffer = pn_connection_engine_read_buffer(&engine_);
- return mutable_buffer(buffer.start, buffer.size);
-}
-
-void connection_engine::read_done(size_t n) {
- return pn_connection_engine_read_done(&engine_, n);
-}
-
-void connection_engine::read_close() {
- pn_connection_engine_read_close(&engine_);
-}
-
-const_buffer connection_engine::write_buffer() {
- pn_bytes_t buffer = pn_connection_engine_write_buffer(&engine_);
- return const_buffer(buffer.start, buffer.size);
-}
-
-void connection_engine::write_done(size_t n) {
- return pn_connection_engine_write_done(&engine_, n);
-}
-
-void connection_engine::write_close() {
- pn_connection_engine_write_close(&engine_);
-}
-
-void connection_engine::disconnected(const proton::error_condition& err) {
- pn_condition_t* condition = pn_transport_condition(engine_.transport);
- if (!pn_condition_is_set(condition)) {
- set_error_condition(err, condition);
- }
- pn_connection_engine_close(&engine_);
-}
-
-proton::connection connection_engine::connection() const {
- return make_wrapper(engine_.connection);
-}
-
-proton::transport connection_engine::transport() const {
- return make_wrapper(engine_.transport);
-}
-
-proton::container* connection_engine::container() const {
- return container_;
-}
-
-}}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/receiver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp
index b84722c..e5ec55a 100644
--- a/proton-c/bindings/cpp/src/receiver.cpp
+++ b/proton-c/bindings/cpp/src/receiver.cpp
@@ -74,7 +74,7 @@ void receiver::drain() {
// Create dummy flow event where "drain finish" can be detected.
pn_connection_t *pnc = pn_session_connection(pn_link_session(pn_object()));
connection_context& cctx = connection_context::get(pnc);
- // connection_engine collector is per connection. Reactor collector is global.
+ // connection_driver collector is per connection. Reactor collector is global.
pn_collector_t *coll = cctx.collector;
if (!coll)
coll = pn_reactor_collector(pn_object_reactor(pnc));
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/thread_safe_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/thread_safe_test.cpp b/proton-c/bindings/cpp/src/thread_safe_test.cpp
index f8dc3d8..5b5d487 100644
--- a/proton-c/bindings/cpp/src/thread_safe_test.cpp
+++ b/proton-c/bindings/cpp/src/thread_safe_test.cpp
@@ -24,7 +24,7 @@
#include "proton_bits.hpp"
#include "proton/thread_safe.hpp"
-#include "proton/io/connection_engine.hpp"
+#include "proton/io/connection_driver.hpp"
#include <proton/connection.h>
@@ -37,7 +37,7 @@ void test_new() {
pn_connection_t* c = 0;
thread_safe<connection>* p = 0;
{
- io::connection_engine e;
+ io::connection_driver e;
c = unwrap(e.connection());
int r = pn_refcount(c);
ASSERT(r >= 1); // engine may have internal refs (transport, collector).
@@ -54,7 +54,7 @@ void test_new() {
{
std::shared_ptr<thread_safe<connection> > sp;
{
- io::connection_engine e;
+ io::connection_driver e;
c = unwrap(e.connection());
sp = make_shared_thread_safe(e.connection());
}
@@ -63,7 +63,7 @@ void test_new() {
{
std::unique_ptr<thread_safe<connection> > up;
{
- io::connection_engine e;
+ io::connection_driver e;
c = unwrap(e.connection());
up = make_unique_thread_safe(e.connection());
}
@@ -78,7 +78,7 @@ void test_convert() {
connection c;
pn_connection_t* pc = 0;
{
- io::connection_engine eng;
+ io::connection_driver eng;
c = eng.connection();
pc = unwrap(c); // Unwrap in separate scope to avoid confusion from temp values.
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/docs/api/index.md
----------------------------------------------------------------------
diff --git a/proton-c/docs/api/index.md b/proton-c/docs/api/index.md
index ccd679d..9c6009f 100644
--- a/proton-c/docs/api/index.md
+++ b/proton-c/docs/api/index.md
@@ -5,35 +5,31 @@ Proton Documentation {#index}
The [Engine API](@ref engine) is a "pure AMQP" toolkit, it decodes AMQP bytes
into proton [events](@ref event) and generates AMQP bytes from application
-calls.
+calls. There is no IO or threading code in this part of the library.
-The [connection engine](@ref connection_engine) provides a simple bytes in/bytes
-out, event-driven interface so you can read AMQP data from any source, process
-the resulting [events](@ref event) and write AMQP output to any destination.
+## Proactive event-driven programming
-There is no IO or threading code in this part of the library, so it can be
-embedded in many different environments. The proton project provides language
-bindings (Python, Ruby, Go etc.) that embed it into the standard IO and
-threading facilities of the bound language.
+The [Proactor API](@ref proactor) is a pro-active, asynchronous framework to
+build single or multi-threaded Proton C applications. It manages the IO
+transport layer so you can write portable, event-driven AMQP code using the @ref
+engine API.
-## Integrating with IO
+## IO Integration
-The [Proactor API](@ref proactor) is a pro-active, asynchronous framewokr to
-build single or multi-threaded Proton C applications.
+The [connection driver](@ref connection_driver) provides a simple bytes in/bytes
+out, event-driven interface so you can read AMQP data from any source, process
+the resulting [events](@ref event) and write AMQP output to any destination. It
+lets you use proton in in alternate event loops, or for specialized embedded
+applications.
-For advanced use-cases it is possible to write your own implementation of the
-proactor API for an unusual IO or threading framework. Any proton application
+It is also possible to write your own implementation of the @ref proactor if you
+are dealing with an unusual IO or threading framework. Any proton application
written to the proactor API will be able to use your implementation.
-## Messenger and Reactor APIs
-
-The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs were intended
-to be simple APIs that included IO support directly out of the box.
+## Messenger and Reactor APIs (deprecated)
-They both had good points but were both based on the assumption of a single-threaded
-environment using a POSIX-like poll() call. This was a problem for performance on some
-platforms and did not support multi-threaded applications.
+The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs are older APIs
+that were limited to single-threaded applications.
-Note however that application code which interacts with the AMQP @ref engine and
-processes AMQP @ref "events" event is the same for the proactor and reactor APIs,
-so is quite easy to convert. The main difference is in how connections are set up.
+Existing @ref reactor applications can be converted easily to use the @ref proactor,
+since they share the same @engine API and @ref event set.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/connection.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection.h b/proton-c/include/proton/connection.h
index 0ed23b0..70fad73 100644
--- a/proton-c/include/proton/connection.h
+++ b/proton-c/include/proton/connection.h
@@ -38,7 +38,7 @@ extern "C" {
/**
* @file
*
- * Connection API for the proton Engine.
+ * Connection API for the proton @ref engine
*
* @defgroup connection Connection
* @ingroup engine
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/connection_driver.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection_driver.h b/proton-c/include/proton/connection_driver.h
new file mode 100644
index 0000000..4fa3fb9
--- /dev/null
+++ b/proton-c/include/proton/connection_driver.h
@@ -0,0 +1,243 @@
+#ifndef PROTON_CONNECTION_DRIVER_H
+#define PROTON_CONNECTION_DRIVER_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * @file
+ *
+ * @defgroup connection_driver Connection Driver
+ *
+ * **Experimental**: Toolkit for integrating proton with arbitrary network or IO
+ * transports. Provides a single point of control for an AMQP connection and
+ * a simple bytes-in/bytes-out interface that lets you:
+ *
+ * - process AMQP-encoded bytes from some input byte stream
+ * - generate ::pn_event_t events for your application to handle
+ * - encode resulting AMQP output bytes for some output byte stream
+ *
+ * The pn_connection_driver_() functions provide a simplified API and extra
+ * logic to use ::pn_connection_t and ::pn_transport_t as a unit. You can also
+ * access them directly for features that are not exposed via the @ref
+ * connection_driver API.
+ *
+ * The engine buffers events and data, you should run it until
+ * pn_connection_driver_finished() is true, to ensure all reading, writing and
+ * event handling (including ERROR and FINAL events) is finished.
+ *
+ * ## Error handling
+ *
+ * The pn_connection_driver_*() functions do not return an error code. IO errors set
+ * the transport condition and are returned as a PN_TRANSPORT_ERROR. The integration
+ * code can set errors using pn_connection_driver_errorf()
+ *
+ * ## IO patterns
+ *
+ * This API supports asynchronous, proactive, non-blocking and reactive IO. An
+ * integration does not have to follow the dispatch-read-write sequence above,
+ * but note that you should handle all available events before calling
+ * pn_connection_driver_read_buffer() and check that `size` is non-zero before
+ * starting a blocking or asynchronous read call. A `read` started while there
+ * are unprocessed CLOSE events in the buffer may never complete.
+ *
+ * AMQP is a full-duplex, asynchronous protocol. The "read" and "write" sides of
+ * an AMQP connection can close separately
+ *
+ * ## Thread safety
+ *
+ * The @ref engine types are not thread safe, but each connection and its
+ * associated types forms an independent unit. Different connections can be
+ * processed concurrently by different threads.
+ *
+ * @{
+ */
+
+#include <proton/import_export.h>
+#include <proton/event.h>
+#include <proton/types.h>
+
+#include <stdarg.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Struct containing the 3 elements needed to driver AMQP IO and events, aggregated as a unit.
+ */
+typedef struct pn_connection_driver_t {
+ pn_connection_t *connection;
+ pn_transport_t *transport;
+ pn_event_batch_t batch;
+} pn_connection_driver_t;
+
+/**
+ * Set #connection and #transport to the provided values, or create a new
+ * @ref pn_connection_t or @ref pn_transport_t if either is NULL.
+ * The provided values belong to the connection driver and will be freed by
+ * pn_connection_driver_destroy()
+ *
+ * The transport is bound automatically after the PN_CONNECTION_INIT has been is
+ * handled by the application. It can be bound earlier with
+ * pn_connection_driver_bind().
+ *
+ * The following functions must be called before the transport is
+ * bound to have effect: pn_connection_set_username(), pn_connection_set_password(),
+ * pn_transport_set_server()
+ *
+ * @return PN_OUT_OF_MEMORY if any allocation fails.
+ */
+PN_EXTERN int pn_connection_driver_init(pn_connection_driver_t*, pn_connection_t*, pn_transport_t*);
+
+/** Force binding of the transport.
+ * This happens automatically after the PN_CONNECTION_INIT is processed.
+ *
+ * @return PN_STATE_ERR if the transport is already bound.
+ */
+PN_EXTERN int pn_connection_driver_bind(pn_connection_driver_t *d);
+
+/**
+ * Unbind, release and free #connection and #transport. Set all pointers to
+ * NULL. Does not free the @ref pn_connection_driver_t struct itself.
+ */
+PN_EXTERN void pn_connection_driver_destroy(pn_connection_driver_t *);
+
+/**
+ * Get the read buffer.
+ *
+ * Copy data from your input byte source to buf.start, up to buf.size.
+ * Call pn_connection_driver_read_done() when reading is complete.
+ *
+ * buf.size==0 means reading is not possible: no buffer space or the read side is closed.
+ */
+PN_EXTERN pn_rwbytes_t pn_connection_driver_read_buffer(pn_connection_driver_t *);
+
+/**
+ * Process the first n bytes of data in pn_connection_driver_read_buffer() and
+ * reclaim the buffer space.
+ */
+PN_EXTERN void pn_connection_driver_read_done(pn_connection_driver_t *, size_t n);
+
+/**
+ * Close the read side. Call when the IO can no longer be read.
+ */
+PN_EXTERN void pn_connection_driver_read_close(pn_connection_driver_t *);
+
+/**
+ * True if read side is closed.
+ */
+PN_EXTERN bool pn_connection_driver_read_closed(pn_connection_driver_t *);
+
+/**
+ * Get the write buffer.
+ *
+ * Write data from buf.start to your IO destination, up to a max of buf.size.
+ * Call pn_connection_driver_write_done() when writing is complete.
+ *
+ * buf.size==0 means there is nothing to write.
+ */
+ PN_EXTERN pn_bytes_t pn_connection_driver_write_buffer(pn_connection_driver_t *);
+
+/**
+ * Call when the first n bytes of pn_connection_driver_write_buffer() have been
+ * written to IO. Reclaims the buffer space and reset the write buffer.
+ */
+PN_EXTERN void pn_connection_driver_write_done(pn_connection_driver_t *, size_t n);
+
+/**
+ * Close the write side. Call when IO can no longer be written to.
+ */
+PN_EXTERN void pn_connection_driver_write_close(pn_connection_driver_t *);
+
+/**
+ * True if write side is closed.
+ */
+PN_EXTERN bool pn_connection_driver_write_closed(pn_connection_driver_t *);
+
+/**
+ * Close both sides side.
+ */
+PN_EXTERN void pn_connection_driver_close(pn_connection_driver_t * c);
+
+/**
+ * Get the next event to handle.
+ *
+ * @return pointer is valid till the next call of
+ * pn_connection_driver_next(). NULL if there are no more events available now,
+ * reading/writing may produce more.
+ */
+PN_EXTERN pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *);
+
+/**
+ * True if pn_connection_driver_next_event() will return a non-NULL event.
+ */
+PN_EXTERN bool pn_connection_driver_has_event(pn_connection_driver_t *);
+
+/**
+ * Return true if the the engine is closed for reading and writing and there are
+ * no more events.
+ *
+ * Call pn_connection_driver_free() to free all related memory.
+ */
+PN_EXTERN bool pn_connection_driver_finished(pn_connection_driver_t *);
+
+/**
+ * Set IO error information.
+ *
+ * The name and formatted description are set on the transport condition, and
+ * returned as a PN_TRANSPORT_ERROR event from pn_connection_driver_next_event().
+ *
+ * You must call this *before* pn_connection_driver_read_close() or
+ * pn_connection_driver_write_close() to ensure the error is processed.
+ */
+PN_EXTERN void pn_connection_driver_errorf(pn_connection_driver_t *d, const char *name, const char *fmt, ...);
+
+/**
+ * Set IO error information via a va_list, see pn_connection_driver_errorf()
+ */
+PN_EXTERN void pn_connection_driver_verrorf(pn_connection_driver_t *d, const char *name, const char *fmt, va_list);
+
+/**
+ * Log a string message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_driver_log(pn_connection_driver_t *d, const char *msg);
+
+/**
+ * Log a printf formatted message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_driver_logf(pn_connection_driver_t *d, char *fmt, ...);
+
+/**
+ * Log a printf formatted message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_driver_vlogf(pn_connection_driver_t *d, const char *fmt, va_list ap);
+
+/**
+ * If batch is part of a connection_driver, return the connection_driver address,
+ * else return NULL
+ */
+PN_EXTERN pn_connection_driver_t* pn_event_batch_connection_driver(pn_event_batch_t *batch);
+///@}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // PROTON_CONNECTION_DRIVER_H
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/connection_engine.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection_engine.h b/proton-c/include/proton/connection_engine.h
deleted file mode 100644
index b7022a9..0000000
--- a/proton-c/include/proton/connection_engine.h
+++ /dev/null
@@ -1,313 +0,0 @@
-#ifndef PROTON_CONNECTION_ENGINE_H
-#define PROTON_CONNECTION_ENGINE_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * @file
- *
- * **Experimental** The connection IO API is a set of functions to simplify
- * integrating proton with different IO and concurrency platforms. The portable
- * parts of a Proton application should use the @ref engine types. We will
- * use "application" to mean the portable part of the application and
- * "integration" to mean code that integrates with a particular IO platform.
- *
- * The connection_engine functions take a @ref pn_connection_t\*, and perform common
- * tasks involving the @ref pn_connection_t and it's @ref pn_transport_t and
- * @ref pn_collector_t so you can treat them as a unit. You can also work with
- * these types directly for features not available via @ref connection_engine API.
- *
- * @defgroup connection_engine Connection Engine
- *
- * **Experimental**: Toolkit for integrating proton with arbitrary network or IO
- * transports. Provides a single point of control for an AMQP connection and
- * a simple bytes-in/bytes-out interface that lets you:
- *
- * - process AMQP-encoded bytes from some input byte stream
- * - generate @ref pn_event_t events for your application to handle
- * - encode resulting AMQP output bytes for some output byte stream
- *
- * The engine contains a @ref pn_connection_t, @ref pn_transport_t and @ref
- * pn_collector_t and provides functions to operate on all three as a unit for
- * IO integration. You can also use them directly for anything not covered by
- * this API
- *
- * For example a simple blocking IO integration with the imaginary "my_io" library:
- *
- * pn_connection_engine_t ce;
- * pn_connection_engine_init(&ce);
- * while (!pn_connection_engine_finished(&ce) {
- * // Dispatch events to be handled by the application.
- * pn_event_t *e;
- * while ((e = pn_connection_engine_event(&ce))!= NULL) {
- * my_app_handle(e); // Pass to the application handler
- * switch (pn_event_type(e)) {
- * case PN_CONNECTION_INIT: pn_connection_engine_bind(&ce);
- * // Only for full-duplex IO where read/write can shutdown separately.
- * case PN_TRANSPORT_CLOSE_READ: my_io_shutdown_read(...); break;
- * case PN_TRANSPORT_CLOSE_WRITE: my_io_shutdown_write(...); break;
- * default: break;
- * };
- * e = pn_connection_engine_pop_event(&ce);
- * }
- * // Read from my_io into the connection buffer
- * pn_rwbytes_t readbuf = pn_connection_engine_read_buffer(&ce);
- * if (readbuf.size) {
- * size_t n = my_io_read(readbuf.start, readbuf.size, ...);
- * if (n > 0) {
- * pn_connection_engine_read_done(&ce, n);
- * } else if (n < 0) {
- * pn_connection_engine_errorf(&ce, "read-err", "something-bad (%d): %s", n, ...);
- * pn_connection_engine_read_close(&ce);
- * }
- * }
- * // Write from connection buffer to my_io
- * pn_bytes_t writebuf = pn_connection_engine_write_buffer(&ce);
- * if (writebuf.size) {
- * size_t n = my_io_write_data(writebuf.start, writebuf.size, ...);
- * if (n < 0) {
- * pn_connection_engine_errorf(&ce, "write-err", "something-bad (%d): %s", d, ...);
- * pn_connection_engine_write_close(&ce);
- * } else {
- * pn_connection_engine_write_done(&ce, n);
- * }
- * }
- * }
- * // If my_io doesn't have separate read/write shutdown, then we should close it now.
- * my_io_close(...);
- *
- * AMQP is a full-duplex, asynchronous protocol. The "read" and "write" sides of
- * an AMQP connection can close separately, the example shows how to handle this
- * for full-duplex IO or IO with a simple close.
- *
- * The engine buffers events, you must keep processing till
- * pn_connection_engine_finished() is true, to ensure all reading, writing and event
- * handling (including ERROR and FINAL events) is completely finished.
- *
- * ## Error handling
- *
- * The pn_connection_engine_*() functions do not return an error code. IO errors set
- * the transport condition and are returned as a PN_TRANSPORT_ERROR. The integration
- * code can set errors using pn_connection_engine_errorf()
- *
- * ## Other IO patterns
- *
- * This API supports asynchronous, proactive, non-blocking and reactive IO. An
- * integration does not have to follow the dispatch-read-write sequence above,
- * but note that you should handle all available events before calling
- * pn_connection_engine_read_buffer() and check that `size` is non-zero before
- * starting a blocking or asynchronous read call. A `read` started while there
- * are unprocessed CLOSE events in the buffer may never complete.
- *
- * ## Thread safety
- *
- * The @ref engine types are not thread safe, but each connection and its
- * associated types forms an independent unit. Different connections can be
- * processed concurrently by different threads.
- *
- * @defgroup connection_engine Connection IO
- * @{
- */
-
-#include <proton/import_export.h>
-#include <proton/event.h>
-#include <proton/types.h>
-
-#include <stdarg.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-/**
- * Struct containing a connection, transport and collector. See
- * pn_connection_engine_init(), pn_connection_engine_destroy() and pn_connection_engine()
- */
-typedef struct pn_connection_engine_t {
- pn_connection_t *connection;
- pn_transport_t *transport;
- pn_collector_t *collector;
-} pn_connection_engine_t;
-
-/**
- * Set #connection and #transport to the provided values, or create a new
- * @ref pn_connection_t or @ref pn_transport_t if either is NULL.
- * The provided values belong to the connection engine and will be freed by
- * pn_connection_engine_destroy()
- *
- * Create a new @ref pn_collector_t and set as #collector.
- *
- * The transport and connection are *not* bound at this point. You should
- * configure them as needed and let the application handle the
- * PN_CONNECTION_INIT from pn_connection_engine_event() before calling
- * pn_connection_engine_bind().
- *
- * @return if any allocation fails call pn_connection_engine_destroy() and return PN_OUT_OF_MEMORY
- */
-PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t*, pn_connection_t*, pn_transport_t*);
-
-/**
- * Bind the connection to the transport when the external IO is ready.
- *
- * The following functions (if called at all) must be called *before* bind:
- * pn_connection_set_username(), pn_connection_set_password(), pn_transport_set_server()
- *
- * If there is an external IO error during setup, set a transport error, close
- * the transport and then bind. The error events are reported to the application
- * via pn_connection_engine_event().
- *
- * @return an error code if the bind fails.
- */
-PN_EXTERN int pn_connection_engine_bind(pn_connection_engine_t *);
-
-/**
- * Unbind, release and free #connection, #transpot and #collector. Set all pointers to NULL.
- * Does not free the @ref pn_connection_engine_t struct itself.
- */
-PN_EXTERN void pn_connection_engine_destroy(pn_connection_engine_t *);
-
-/**
- * Get the read buffer.
- *
- * Copy data from your input byte source to buf.start, up to buf.size.
- * Call pn_connection_engine_read_done() when reading is complete.
- *
- * buf.size==0 means reading is not possible: no buffer space or the read side is closed.
- */
-PN_EXTERN pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *);
-
-/**
- * Process the first n bytes of data in pn_connection_engine_read_buffer() and
- * reclaim the buffer space.
- */
-PN_EXTERN void pn_connection_engine_read_done(pn_connection_engine_t *, size_t n);
-
-/**
- * Close the read side. Call when the IO can no longer be read.
- */
-PN_EXTERN void pn_connection_engine_read_close(pn_connection_engine_t *);
-
-/**
- * True if read side is closed.
- */
-PN_EXTERN bool pn_connection_engine_read_closed(pn_connection_engine_t *);
-
-/**
- * Get the write buffer.
- *
- * Write data from buf.start to your IO destination, up to a max of buf.size.
- * Call pn_connection_engine_write_done() when writing is complete.
- *
- * buf.size==0 means there is nothing to write.
- */
- PN_EXTERN pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *);
-
-/**
- * Call when the first n bytes of pn_connection_engine_write_buffer() have been
- * written to IO. Reclaims the buffer space and reset the write buffer.
- */
-PN_EXTERN void pn_connection_engine_write_done(pn_connection_engine_t *, size_t n);
-
-/**
- * Close the write side. Call when IO can no longer be written to.
- */
-PN_EXTERN void pn_connection_engine_write_close(pn_connection_engine_t *);
-
-/**
- * True if write side is closed.
- */
-PN_EXTERN bool pn_connection_engine_write_closed(pn_connection_engine_t *);
-
-/**
- * Close both sides side.
- */
-PN_EXTERN void pn_connection_engine_close(pn_connection_engine_t * c);
-
-/**
- * Get the current event. Call pn_connection_engine_done() when done handling it.
- * Note that if PN_TRACE_EVT is enabled this will log the event, so you should
- * avoid calling it more than once per event. Use pn_connection_engine_has_event()
- * to silently test if any events are available.
- *
- * @return NULL if there are no more events ready. Reading/writing data may produce more.
- */
-PN_EXTERN pn_event_t* pn_connection_engine_event(pn_connection_engine_t *);
-
-/**
- * True if pn_connection_engine_event() will return a non-NULL event.
- */
-PN_EXTERN bool pn_connection_engine_has_event(pn_connection_engine_t *);
-
-/**
- * Drop the current event, advance pn_connection_engine_event() to the next event.
- */
-PN_EXTERN void pn_connection_engine_pop_event(pn_connection_engine_t *);
-
-/**
- * Return true if the the engine is closed for reading and writing and there are
- * no more events.
- *
- * Call pn_connection_engine_free() to free all related memory.
- */
-PN_EXTERN bool pn_connection_engine_finished(pn_connection_engine_t *);
-
-/**
- * Set IO error information.
- *
- * The name and formatted description are set on the transport condition, and
- * returned as a PN_TRANSPORT_ERROR event from pn_connection_engine_event().
- *
- * You must call this *before* pn_connection_engine_read_close() or
- * pn_connection_engine_write_close() to ensure the error is processed.
- *
- * If there is already a transport condition set, this call does nothing. For
- * more complex cases, you can work with the transport condition directly using:
- *
- * pn_condition_t *cond = pn_transport_condition(pn_connection_transport(conn));
- */
-PN_EXTERN void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...);
-
-/**
- * Set IO error information via a va_list, see pn_connection_engine_errorf()
- */
-PN_EXTERN void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list);
-
-/**
- * Log a string message using the connection's transport log.
- */
-PN_EXTERN void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg);
-
-/**
- * Log a printf formatted message using the connection's transport log.
- */
-PN_EXTERN void pn_connection_engine_logf(pn_connection_engine_t *ce, char *fmt, ...);
-
-/**
- * Log a printf formatted message using the connection's transport log.
- */
-PN_EXTERN void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap);
-
-///@}
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif // PROTON_CONNECTION_ENGINE_H
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/cproton.i
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/cproton.i b/proton-c/include/proton/cproton.i
index ffcf830..931437e 100644
--- a/proton-c/include/proton/cproton.i
+++ b/proton-c/include/proton/cproton.i
@@ -39,6 +39,9 @@ typedef unsigned long int uintptr_t;
%ignore pn_bytes_t;
%ignore pn_rwbytes_t;
+/* pn_event_batch_t is not used directly by bindings */
+%ignore pn_event_batch_t;
+
/* There is no need to wrap pn_class_t aa it is an internal implementation detail and cannot be used outside the library */
%ignore pn_class_t;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index 4dca2d5..31d4bdd 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -428,6 +428,32 @@ PN_EXTERN pn_event_t *pn_collector_peek(pn_collector_t *collector);
PN_EXTERN bool pn_collector_pop(pn_collector_t *collector);
/**
+ * Return the next event to be handled.
+ *
+ * Returns the head event if it has not previously been returned by
+ * pn_collector_next(), otherwise does pn_collector_pop() and returns
+ * the new head event.
+ *
+ * The returned pointer is valid till the next call of pn_collector_pop(),
+ * pn_collector_next(), pn_collector_release() or pn_collector_free()
+ *
+ * @param[in] collector a collector object
+ * @return the next event.
+ */
+PN_EXTERN pn_event_t *pn_collector_next(pn_collector_t *collector);
+
+/**
+ * Return the same event as the previous call to pn_collector_next()
+ *
+ * The returned pointer is valid till the next call of pn_collector_pop(),
+ * pn_collector_next(), pn_collector_release() or pn_collector_free()
+ *
+ * @param[in] collector a collector object
+ * @return a pointer to the event returned by previous call to pn_collector_next()
+ */
+PN_EXTERN pn_event_t *pn_collector_prev(pn_collector_t *collector);
+
+/**
* Check if there are more events after the current event. If this
* returns true, then pn_collector_peek() will return an event even
* after pn_collector_pop() is called.
@@ -506,6 +532,36 @@ PN_EXTERN pn_transport_t *pn_event_transport(pn_event_t *event);
*/
PN_EXTERN pn_record_t *pn_event_attachments(pn_event_t *event);
+/**
+ * **Experimental**: A batch of events to handle. Call pn_event_batch_next() in
+ * a loop until it returns NULL to handle them.
+ */
+typedef struct pn_event_batch_t pn_event_batch_t;
+
+/* NOTE: there is deliberately no peek(), more() or other look-ahead on an event
+ * batch. We want to know exactly which events have been handled, next() only
+ * allows the user to get each event exactly once, in order.
+ */
+
+/**
+ * **Experimental**: Remove the next event from the batch and return it. NULL
+ * means the batch is empty. The returned event pointer is valid until
+ * pn_event_batch_next() is called again on the same batch.
+ */
+PN_EXTERN pn_event_t *pn_event_batch_next(pn_event_batch_t *batch);
+
+/**
+ *@cond INTERNAL
+ * pn_event_batch_next() can be re-implemented for different behaviors in different contextxs.
+ */
+struct pn_event_batch_t {
+ pn_event_t *(*next_event)(pn_event_batch_t *batch);
+};
+
+/**
+ *@endcond
+ */
+
#ifdef __cplusplus
}
#endif
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 49d7b6a..e23a24f 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -33,30 +33,27 @@ typedef struct pn_condition_t pn_condition_t;
/**
* @file
*
- * **Experimental**: Proactor API for the the proton @ref engine.
+ * **Experimental**: Proactor API for the the proton @ref engine
*
* @defgroup proactor Proactor
*
* **Experimental**: Proactor API for portable, multi-threaded, asynchronous applications.
*
- * The proactor establishes and listens for connections. It creates the @ref
- * "transport" transport that sends and receives data over the network and
- * delivers @ref "events" event to application threads for processing.
+ * The proactor establishes and listens for connections. It creates
+ * the @ref transport that sends and receives data over the network and
+ * delivers @ref event to application threads for handling.
*
- * ## Multi-threading
- *
- * The @ref proactor is thread-safe, but the @ref "protocol engine" is not. The
- * proactor ensures that each @ref "connection" connection and its associated
- * values (@ref session, @ref link etc.) is processed sequentially, even if there
- * are multiple application threads. See pn_proactor_wait()
+ * **Multi-threading**:
+ * The @ref proactor is thread-safe, but the @ref engine is not. The proactor
+ * ensures that each @ref connection and its associated values (@ref session,
+ * @ref link etc.) is handle sequentially, even if there are multiple
+ * application threads. See pn_proactor_wait()
*
* @{
*/
/**
- * The proactor creates and manage @ref "transports" transport and delivers @ref
- * "event" events to the application.
- *
+ * The proactor.
*/
typedef struct pn_proactor_t pn_proactor_t;
@@ -70,13 +67,6 @@ pn_proactor_t *pn_proactor(void);
*/
void pn_proactor_free(pn_proactor_t*);
-/* FIXME aconway 2016-11-12: connect and listen need options to enable
- things like websockets, alternate encryption or other features.
- The "extra" parameter will be replaced by an "options" parameter
- that will include providing extra data and other manipulators
- to affect how the connection is processed.
-*/
-
/**
* Asynchronous connect: a connection and transport will be created, the
* relevant events will be returned by pn_proactor_wait()
@@ -104,13 +94,27 @@ int pn_proactor_connect(pn_proactor_t*, const char *host, const char *port, pn_b
pn_listener_t *pn_proactor_listen(pn_proactor_t *, const char *host, const char *port, int backlog, pn_bytes_t extra);
/**
- * Wait for an event. Can be called in multiple threads concurrently.
- * You must call pn_event_done() when the event has been handled.
+ * Wait for events to handle. Call pn_proactor_done() after handling events.
+ *
+ * Thread safe: pn_proactor_wait() can be called concurrently, but the events in
+ * the returned ::pn_event_batch_t must be handled sequentially.
+ *
+ * The proactor always returns events that must be handled sequentially in the
+ * same batch or sequentially in a later batch after pn_proactor_done(). Any
+ * events returned concurrently by pn_proactor_wait() are safe to handle
+ * concurrently.
+ */
+pn_event_batch_t *pn_proactor_wait(pn_proactor_t* d);
+
+/**
+ * Call when done handling events.
*
- * The proactor ensures that events that cannot be handled concurrently
- * (e.g. events for for the same connection) are never returned concurrently.
+ * It is generally most efficient to handle the entire batch in the thread
+ * that calls pn_proactor_wait(), then call pn_proactor_done(). If you call
+ * pn_proactor_done() earlier, the remaining events will be returned again by
+ * pn_proactor_wait(), possibly to another thread.
*/
-pn_event_t *pn_proactor_wait(pn_proactor_t* d);
+void pn_proactor_done(pn_proactor_t* d, pn_event_batch_t *events);
/**
* Cause PN_PROACTOR_INTERRUPT to be returned to exactly one thread calling wait()
@@ -146,14 +150,6 @@ void pn_connection_wake(pn_connection_t *c);
pn_proactor_t *pn_connection_proactor(pn_connection_t *c);
/**
- * Call when a proactor event has been handled. Does nothing if not a proactor event.
- *
- * Thread safe: May be called from any thread but must be called exactly once
- * for each event returned by pn_proactor_wait()
- */
-void pn_event_done(pn_event_t *);
-
-/**
* Get the proactor that created the event or NULL.
*/
pn_proactor_t *pn_event_proactor(pn_event_t *);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/src/core/connection_driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c
index f31ddb0..3393e64 100644
--- a/proton-c/src/core/connection_driver.c
+++ b/proton-c/src/core/connection_driver.c
@@ -20,144 +20,149 @@
#include "engine-internal.h"
#include <proton/condition.h>
#include <proton/connection.h>
-#include <proton/connection_engine.h>
+#include <proton/connection_driver.h>
#include <proton/transport.h>
#include <string.h>
-int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) {
- ce->connection = c ? c : pn_connection();
- ce->transport = t ? t : pn_transport();
- ce->collector = pn_collector();
- if (!ce->connection || !ce->transport || !ce->collector) {
- pn_connection_engine_destroy(ce);
+struct driver_batch {
+ pn_event_batch_t batch;
+};
+
+static pn_event_t *batch_next(pn_event_batch_t *batch) {
+ pn_connection_driver_t *d =
+ (pn_connection_driver_t*)((char*)batch - offsetof(pn_connection_driver_t, batch));
+ pn_collector_t *collector = pn_connection_collector(d->connection);
+ pn_event_t *handled = pn_collector_prev(collector);
+ if (handled && pn_event_type(handled) == PN_CONNECTION_INIT) {
+ pn_transport_bind(d->transport, d->connection); /* Init event handled, auto-bind */
+ }
+ pn_event_t *next = pn_collector_next(collector);
+ if (next && d->transport->trace & PN_TRACE_EVT) {
+ pn_string_clear(d->transport->scratch);
+ pn_inspect(next, d->transport->scratch);
+ pn_transport_log(d->transport, pn_string_get(d->transport->scratch));
+ }
+ return next;
+}
+
+int pn_connection_driver_init(pn_connection_driver_t* d, pn_connection_t *c, pn_transport_t *t) {
+ memset(d, 0, sizeof(*d));
+ d->batch.next_event = &batch_next;
+ d->connection = c ? c : pn_connection();
+ d->transport = t ? t : pn_transport();
+ pn_collector_t *collector = pn_collector();
+ if (!d->connection || !d->transport || !collector) {
+ if (collector) pn_collector_free(collector);
+ pn_connection_driver_destroy(d);
return PN_OUT_OF_MEMORY;
}
- pn_connection_collect(ce->connection, ce->collector);
+ pn_connection_collect(d->connection, collector);
return 0;
}
-int pn_connection_engine_bind(pn_connection_engine_t *ce) {
- return pn_transport_bind(ce->transport, ce->connection);
+int pn_connection_driver_bind(pn_connection_driver_t *d) {
+ return pn_transport_bind(d->transport, d->connection);
}
-void pn_connection_engine_destroy(pn_connection_engine_t *ce) {
- if (ce->transport) {
- pn_transport_unbind(ce->transport);
- pn_transport_free(ce->transport);
+void pn_connection_driver_destroy(pn_connection_driver_t *d) {
+ if (d->transport) {
+ pn_transport_unbind(d->transport);
+ pn_transport_free(d->transport);
+ }
+ if (d->connection) {
+ pn_collector_t *collector = pn_connection_collector(d->connection);
+ pn_connection_free(d->connection);
+ pn_collector_free(collector);
}
- if (ce->collector) pn_collector_free(ce->collector);
- if (ce->connection) pn_connection_free(ce->connection);
- memset(ce, 0, sizeof(*ce));
+ memset(d, 0, sizeof(*d));
}
-pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) {
- ssize_t cap = pn_transport_capacity(ce->transport);
- return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0);
+pn_rwbytes_t pn_connection_driver_read_buffer(pn_connection_driver_t *d) {
+ ssize_t cap = pn_transport_capacity(d->transport);
+ return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(d->transport)) : pn_rwbytes(0, 0);
}
-void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) {
- if (n > 0) pn_transport_process(ce->transport, n);
+void pn_connection_driver_read_done(pn_connection_driver_t *d, size_t n) {
+ if (n > 0) pn_transport_process(d->transport, n);
}
-bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) {
- return pn_transport_capacity(ce->transport) < 0;
+bool pn_connection_driver_read_closed(pn_connection_driver_t *d) {
+ return pn_transport_capacity(d->transport) < 0;
}
-void pn_connection_engine_read_close(pn_connection_engine_t *ce) {
- if (!pn_connection_engine_read_closed(ce)) {
- pn_transport_close_tail(ce->transport);
+void pn_connection_driver_read_close(pn_connection_driver_t *d) {
+ if (!pn_connection_driver_read_closed(d)) {
+ pn_transport_close_tail(d->transport);
}
}
-pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) {
- ssize_t pending = pn_transport_pending(ce->transport);
+pn_bytes_t pn_connection_driver_write_buffer(pn_connection_driver_t *d) {
+ ssize_t pending = pn_transport_pending(d->transport);
return (pending > 0) ?
- pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null;
+ pn_bytes(pending, pn_transport_head(d->transport)) : pn_bytes_null;
}
-void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) {
+void pn_connection_driver_write_done(pn_connection_driver_t *d, size_t n) {
if (n > 0)
- pn_transport_pop(ce->transport, n);
+ pn_transport_pop(d->transport, n);
}
-bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) {
- return pn_transport_pending(ce->transport) < 0;
+bool pn_connection_driver_write_closed(pn_connection_driver_t *d) {
+ return pn_transport_pending(d->transport) < 0;
}
-void pn_connection_engine_write_close(pn_connection_engine_t *ce) {
- if (!pn_connection_engine_write_closed(ce)) {
- pn_transport_close_head(ce->transport);
+void pn_connection_driver_write_close(pn_connection_driver_t *d) {
+ if (!pn_connection_driver_write_closed(d)) {
+ pn_transport_close_head(d->transport);
}
}
-void pn_connection_engine_close(pn_connection_engine_t *ce) {
- pn_connection_engine_read_close(ce);
- pn_connection_engine_write_close(ce);
-}
-
-pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) {
- pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL;
- if (e) {
- pn_transport_t *t = ce->transport;
- if (t && t->trace & PN_TRACE_EVT) {
- /* This can log the same event twice if pn_connection_engine_event is called
- * twice but for debugging it is much better to log before handling than after.
- */
- pn_string_clear(t->scratch);
- pn_inspect(e, t->scratch);
- pn_transport_log(t, pn_string_get(t->scratch));
- }
- }
- return e;
+void pn_connection_driver_close(pn_connection_driver_t *d) {
+ pn_connection_driver_read_close(d);
+ pn_connection_driver_write_close(d);
}
-bool pn_connection_engine_has_event(pn_connection_engine_t *ce) {
- return ce->collector && pn_collector_peek(ce->collector);
+pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *d) {
+ return pn_event_batch_next(&d->batch);
}
-void pn_connection_engine_pop_event(pn_connection_engine_t *ce) {
- if (ce->collector) {
- pn_event_t *e = pn_collector_peek(ce->collector);
- if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */
- /* Events can accumulate behind the TRANSPORT_CLOSED before the
- * PN_TRANSPORT_CLOSED event is handled. They can never be processed
- * so release them.
- */
- pn_collector_release(ce->collector);
- } else {
- pn_collector_pop(ce->collector);
- }
-
- }
+bool pn_connection_driver_has_event(pn_connection_driver_t *d) {
+ return pn_collector_peek(pn_connection_collector(d->connection));
}
-bool pn_connection_engine_finished(pn_connection_engine_t *ce) {
- return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce);
+bool pn_connection_driver_finished(pn_connection_driver_t *d) {
+ return pn_transport_closed(d->transport) && !pn_connection_driver_has_event(d);
}
-void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) {
- pn_transport_t *t = ce->transport;
+void pn_connection_driver_verrorf(pn_connection_driver_t *d, const char *name, const char *fmt, va_list ap) {
+ pn_transport_t *t = d->transport;
pn_condition_t *cond = pn_transport_condition(t);
pn_string_vformat(t->scratch, fmt, ap);
pn_condition_set_name(cond, name);
pn_condition_set_description(cond, pn_string_get(t->scratch));
}
-void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) {
+void pn_connection_driver_errorf(pn_connection_driver_t *d, const char *name, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
- pn_connection_engine_verrorf(ce, name, fmt, ap);
+ pn_connection_driver_verrorf(d, name, fmt, ap);
va_end(ap);
}
-void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) {
- pn_transport_log(ce->transport, msg);
+void pn_connection_driver_log(pn_connection_driver_t *d, const char *msg) {
+ pn_transport_log(d->transport, msg);
+}
+
+void pn_connection_driver_vlogf(pn_connection_driver_t *d, const char *fmt, va_list ap) {
+ pn_transport_vlogf(d->transport, fmt, ap);
}
-void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) {
- pn_transport_vlogf(ce->transport, fmt, ap);
+void pn_connection_driver_vlog(pn_connection_driver_t *d, const char *msg) {
+ pn_transport_log(d->transport, msg);
}
-void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) {
- pn_transport_log(ce->transport, msg);
+pn_connection_driver_t* pn_event_batch_connection_driver(pn_event_batch_t *batch) {
+ return (batch->next_event == batch_next) ?
+ (pn_connection_driver_t*)((char*)batch - offsetof(pn_connection_driver_t, batch)) :
+ NULL;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/src/core/connection_engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_engine.c b/proton-c/src/core/connection_engine.c
deleted file mode 100644
index f31ddb0..0000000
--- a/proton-c/src/core/connection_engine.c
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "engine-internal.h"
-#include <proton/condition.h>
-#include <proton/connection.h>
-#include <proton/connection_engine.h>
-#include <proton/transport.h>
-#include <string.h>
-
-int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) {
- ce->connection = c ? c : pn_connection();
- ce->transport = t ? t : pn_transport();
- ce->collector = pn_collector();
- if (!ce->connection || !ce->transport || !ce->collector) {
- pn_connection_engine_destroy(ce);
- return PN_OUT_OF_MEMORY;
- }
- pn_connection_collect(ce->connection, ce->collector);
- return 0;
-}
-
-int pn_connection_engine_bind(pn_connection_engine_t *ce) {
- return pn_transport_bind(ce->transport, ce->connection);
-}
-
-void pn_connection_engine_destroy(pn_connection_engine_t *ce) {
- if (ce->transport) {
- pn_transport_unbind(ce->transport);
- pn_transport_free(ce->transport);
- }
- if (ce->collector) pn_collector_free(ce->collector);
- if (ce->connection) pn_connection_free(ce->connection);
- memset(ce, 0, sizeof(*ce));
-}
-
-pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) {
- ssize_t cap = pn_transport_capacity(ce->transport);
- return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0);
-}
-
-void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) {
- if (n > 0) pn_transport_process(ce->transport, n);
-}
-
-bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) {
- return pn_transport_capacity(ce->transport) < 0;
-}
-
-void pn_connection_engine_read_close(pn_connection_engine_t *ce) {
- if (!pn_connection_engine_read_closed(ce)) {
- pn_transport_close_tail(ce->transport);
- }
-}
-
-pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) {
- ssize_t pending = pn_transport_pending(ce->transport);
- return (pending > 0) ?
- pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null;
-}
-
-void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) {
- if (n > 0)
- pn_transport_pop(ce->transport, n);
-}
-
-bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) {
- return pn_transport_pending(ce->transport) < 0;
-}
-
-void pn_connection_engine_write_close(pn_connection_engine_t *ce) {
- if (!pn_connection_engine_write_closed(ce)) {
- pn_transport_close_head(ce->transport);
- }
-}
-
-void pn_connection_engine_close(pn_connection_engine_t *ce) {
- pn_connection_engine_read_close(ce);
- pn_connection_engine_write_close(ce);
-}
-
-pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) {
- pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL;
- if (e) {
- pn_transport_t *t = ce->transport;
- if (t && t->trace & PN_TRACE_EVT) {
- /* This can log the same event twice if pn_connection_engine_event is called
- * twice but for debugging it is much better to log before handling than after.
- */
- pn_string_clear(t->scratch);
- pn_inspect(e, t->scratch);
- pn_transport_log(t, pn_string_get(t->scratch));
- }
- }
- return e;
-}
-
-bool pn_connection_engine_has_event(pn_connection_engine_t *ce) {
- return ce->collector && pn_collector_peek(ce->collector);
-}
-
-void pn_connection_engine_pop_event(pn_connection_engine_t *ce) {
- if (ce->collector) {
- pn_event_t *e = pn_collector_peek(ce->collector);
- if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */
- /* Events can accumulate behind the TRANSPORT_CLOSED before the
- * PN_TRANSPORT_CLOSED event is handled. They can never be processed
- * so release them.
- */
- pn_collector_release(ce->collector);
- } else {
- pn_collector_pop(ce->collector);
- }
-
- }
-}
-
-bool pn_connection_engine_finished(pn_connection_engine_t *ce) {
- return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce);
-}
-
-void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) {
- pn_transport_t *t = ce->transport;
- pn_condition_t *cond = pn_transport_condition(t);
- pn_string_vformat(t->scratch, fmt, ap);
- pn_condition_set_name(cond, name);
- pn_condition_set_description(cond, pn_string_get(t->scratch));
-}
-
-void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) {
- va_list ap;
- va_start(ap, fmt);
- pn_connection_engine_verrorf(ce, name, fmt, ap);
- va_end(ap);
-}
-
-void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) {
- pn_transport_log(ce->transport, msg);
-}
-
-void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) {
- pn_transport_vlogf(ce->transport, fmt, ap);
-}
-
-void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) {
- pn_transport_log(ce->transport, msg);
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/src/core/event.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/event.c b/proton-c/src/core/event.c
index 7882327..2a0a5cf 100644
--- a/proton-c/src/core/event.c
+++ b/proton-c/src/core/event.c
@@ -28,7 +28,8 @@ struct pn_collector_t {
pn_list_t *pool;
pn_event_t *head;
pn_event_t *tail;
- bool freed;
+ bool freed:1;
+ bool head_returned:1; /* Head has been returned by pn_collector_next() */
};
struct pn_event_t {
@@ -51,11 +52,8 @@ static void pn_collector_initialize(pn_collector_t *collector)
static void pn_collector_drain(pn_collector_t *collector)
{
assert(collector);
-
- while (pn_collector_peek(collector)) {
- pn_collector_pop(collector);
- }
-
+ while (pn_collector_next(collector))
+ ;
assert(!collector->head);
assert(!collector->tail);
}
@@ -175,6 +173,7 @@ pn_event_t *pn_collector_peek(pn_collector_t *collector)
bool pn_collector_pop(pn_collector_t *collector)
{
+ collector->head_returned = false;
pn_event_t *event = collector->head;
if (event) {
collector->head = event->next;
@@ -190,6 +189,19 @@ bool pn_collector_pop(pn_collector_t *collector)
return true;
}
+pn_event_t *pn_collector_next(pn_collector_t *collector)
+{
+ if (collector->head_returned) {
+ pn_collector_pop(collector);
+ }
+ collector->head_returned = collector->head;
+ return collector->head;
+}
+
+pn_event_t *pn_collector_prev(pn_collector_t *collector) {
+ return collector->head_returned ? collector->head : NULL;
+}
+
bool pn_collector_more(pn_collector_t *collector)
{
assert(collector);
@@ -386,3 +398,7 @@ const char *pn_event_type_name(pn_event_type_t type)
}
return NULL;
}
+
+pn_event_t *pn_event_batch_next(pn_event_batch_t *batch) {
+ return batch->next_event(batch);
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/src/tests/refcount.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/refcount.c b/proton-c/src/tests/refcount.c
index a36d01c..267c861 100644
--- a/proton-c/src/tests/refcount.c
+++ b/proton-c/src/tests/refcount.c
@@ -313,7 +313,8 @@ static void test_transport_connection(void) {
}
static void drain(pn_collector_t *collector) {
- while (pn_collector_peek(collector)) { pn_collector_pop(collector); }
+ while (pn_collector_next(collector))
+ ;
}
static void test_collector_connection_transport(void) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/qpid-proton-cpp.syms
----------------------------------------------------------------------
diff --git a/qpid-proton-cpp.syms b/qpid-proton-cpp.syms
index 7a25651..c24e898 100644
--- a/qpid-proton-cpp.syms
+++ b/qpid-proton-cpp.syms
@@ -30,24 +30,24 @@ proton::connection::transport() const
proton::connection::user(std::string const&)
proton::connection::~connection()
-proton::connection_engine::can_read() const
-proton::connection_engine::can_write() const
-proton::connection_engine::closed() const
-proton::connection_engine::connection() const
-proton::connection_engine::connection_engine(proton::handler&, proton::connection_options const&)
-proton::connection_engine::container::container(std::string const&)
-proton::connection_engine::container::id() const
-proton::connection_engine::container::make_options()
-proton::connection_engine::container::options(proton::connection_options const&)
-proton::connection_engine::container::~container()
-proton::connection_engine::dispatch()
-proton::connection_engine::io_error::io_error(std::string const&)
-proton::connection_engine::io_error::~io_error()
-proton::connection_engine::no_opts
-proton::connection_engine::process(int)
-proton::connection_engine::try_read()
-proton::connection_engine::try_write()
-proton::connection_engine::~connection_engine()
+proton::connection_driver::can_read() const
+proton::connection_driver::can_write() const
+proton::connection_driver::closed() const
+proton::connection_driver::connection() const
+proton::connection_driver::connection_driver(proton::handler&, proton::connection_options const&)
+proton::connection_driver::container::container(std::string const&)
+proton::connection_driver::container::id() const
+proton::connection_driver::container::make_options()
+proton::connection_driver::container::options(proton::connection_options const&)
+proton::connection_driver::container::~container()
+proton::connection_driver::dispatch()
+proton::connection_driver::io_error::io_error(std::string const&)
+proton::connection_driver::io_error::~io_error()
+proton::connection_driver::no_opts
+proton::connection_driver::process(int)
+proton::connection_driver::try_read()
+proton::connection_driver::try_write()
+proton::connection_driver::~connection_driver()
proton::connection_options::connection_options()
proton::connection_options::connection_options(proton::connection_options const&)
@@ -587,8 +587,8 @@ proton::value::value(proton::value const&)
# Only types with the following info can be thrown across shared abject boundary
# Or correctly dynamically cast by user
typeinfo for proton::connection
-typeinfo for proton::connection_engine
-typeinfo for proton::connection_engine::io_error
+typeinfo for proton::connection_driver
+typeinfo for proton::connection_driver::io_error
typeinfo for proton::conversion_error
typeinfo for proton::endpoint
typeinfo for proton::error
@@ -600,8 +600,8 @@ typeinfo for proton::session
typeinfo for proton::timeout_error
typeinfo for proton::url_error
typeinfo name for proton::connection
-typeinfo name for proton::connection_engine
-typeinfo name for proton::connection_engine::io_error
+typeinfo name for proton::connection_driver
+typeinfo name for proton::connection_driver::io_error
typeinfo name for proton::conversion_error
typeinfo name for proton::endpoint
typeinfo name for proton::error
@@ -613,8 +613,8 @@ typeinfo name for proton::session
typeinfo name for proton::timeout_error
typeinfo name for proton::url_error
vtable for proton::connection
-vtable for proton::connection_engine
-vtable for proton::connection_engine::io_error
+vtable for proton::connection_driver
+vtable for proton::connection_driver::io_error
vtable for proton::conversion_error
vtable for proton::endpoint
vtable for proton::error
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org