You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2017/07/21 17:02:01 UTC
[05/20] qpid-proton git commit: PROTON-1400: [C++ binding] Proactor
container implementation - Remove all reactor use - Rearrange object context
code - Change container includes to proactor container includes - Add
sender/receiver options API to connecti
PROTON-1400: [C++ binding] Proactor container implementation
- Remove all reactor use
- Rearrange object context code
- Change container includes to proactor container includes
- Add sender/receiver options API to connection so we never need container in handlers
- Rework connection_driver remove all use of container
- Change signature of listener_handler callbacks to supply the listener
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9fad779c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9fad779c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9fad779c
Branch: refs/heads/master
Commit: 9fad779c98dcc2ccc75e5055d7333e9dd862c235
Parents: 6f88f52
Author: Andrew Stitcher <as...@apache.org>
Authored: Wed Feb 8 02:32:36 2017 -0500
Committer: Andrew Stitcher <as...@apache.org>
Committed: Fri Jul 21 12:50:06 2017 -0400
----------------------------------------------------------------------
examples/cpp/ssl.cpp | 9 +-
proton-c/bindings/cpp/CMakeLists.txt | 6 +-
.../bindings/cpp/include/proton/connection.hpp | 6 +
.../bindings/cpp/include/proton/container.hpp | 6 -
.../cpp/include/proton/internal/config.hpp | 8 +
.../cpp/include/proton/io/connection_driver.hpp | 25 +-
.../include/proton/io/container_impl_base.hpp | 144 -------
.../cpp/include/proton/listen_handler.hpp | 6 +-
.../bindings/cpp/include/proton/listener.hpp | 18 +-
proton-c/bindings/cpp/src/connection.cpp | 27 +-
.../bindings/cpp/src/connection_driver_test.cpp | 11 +-
.../bindings/cpp/src/connection_options.cpp | 30 +-
proton-c/bindings/cpp/src/container.cpp | 20 +-
proton-c/bindings/cpp/src/container_test.cpp | 11 +-
proton-c/bindings/cpp/src/contexts.cpp | 55 +--
proton-c/bindings/cpp/src/event_loop.cpp | 2 +-
proton-c/bindings/cpp/src/include/contexts.hpp | 56 +--
.../cpp/src/include/messaging_adapter.hpp | 2 -
.../cpp/src/include/proactor_container_impl.hpp | 133 ++++++
.../src/include/proactor_event_loop_impl.hpp | 54 +++
.../bindings/cpp/src/include/proton_bits.hpp | 18 +-
.../bindings/cpp/src/include/proton_event.hpp | 16 +-
.../cpp/src/include/test_dummy_container.hpp | 82 ----
.../bindings/cpp/src/io/connection_driver.cpp | 31 +-
proton-c/bindings/cpp/src/listener.cpp | 11 +-
proton-c/bindings/cpp/src/messaging_adapter.cpp | 21 +-
.../cpp/src/proactor_container_impl.cpp | 419 +++++++++++++++++++
proton-c/bindings/cpp/src/receiver.cpp | 1 -
proton-c/bindings/cpp/src/receiver_options.cpp | 2 +-
proton-c/bindings/cpp/src/reconnect_timer.cpp | 1 -
proton-c/bindings/cpp/src/sender_options.cpp | 2 +-
proton-c/bindings/cpp/src/session_options.cpp | 2 +-
32 files changed, 765 insertions(+), 470 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/examples/cpp/ssl.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/ssl.cpp b/examples/cpp/ssl.cpp
index 2e901c2..00bbccd 100644
--- a/examples/cpp/ssl.cpp
+++ b/examples/cpp/ssl.cpp
@@ -66,16 +66,16 @@ namespace {
struct server_handler : public proton::messaging_handler {
- std::string url;
+ proton::listener listener;
void on_connection_open(proton::connection &c) OVERRIDE {
std::cout << "Inbound server connection connected via SSL. Protocol: " <<
c.transport().ssl().protocol() << std::endl;
- c.container().stop_listening(url); // Just expecting the one connection.
+ listener.stop(); // Just expecting the one connection.
}
void on_transport_error(proton::transport &t) OVERRIDE {
- t.connection().container().stop_listening(url);
+ listener.stop();
}
void on_message(proton::delivery &, proton::message &m) OVERRIDE {
@@ -122,8 +122,7 @@ class hello_world_direct : public proton::messaging_handler {
} else throw std::logic_error("bad verify mode: " + verify);
c.client_connection_options(client_opts);
- s_handler.url = url;
- c.listen(url);
+ s_handler.listener = c.listen(url);
c.open_sender(url);
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index d1c6fd1..295a99e 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -32,9 +32,8 @@ set(qpid-proton-cpp-source
src/map.cpp
src/connection.cpp
src/connection_options.cpp
- src/connector.cpp
src/container.cpp
- src/container_impl.cpp
+ src/proactor_container_impl.cpp
src/contexts.cpp
src/data.cpp
src/decimal.cpp
@@ -58,7 +57,6 @@ set(qpid-proton-cpp-source
src/proton_bits.cpp
src/proton_event.cpp
src/proton_handler.cpp
- src/reactor.cpp
src/receiver.cpp
src/receiver_options.cpp
src/reconnect_timer.cpp
@@ -91,7 +89,7 @@ set_source_files_properties (
add_library(qpid-proton-cpp SHARED ${qpid-proton-cpp-source})
-target_link_libraries (qpid-proton-cpp ${PLATFORM_LIBS} qpid-proton)
+target_link_libraries (qpid-proton-cpp ${PLATFORM_LIBS} qpid-proton-core qpid-proton-proactor)
set_target_properties (
qpid-proton-cpp
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp
index a4046be..331ba82 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -106,6 +106,12 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi
PN_CPP_EXTERN receiver open_receiver(const std::string &addr,
const receiver_options &);
+ /// @copydoc container::sender_options
+ PN_CPP_EXTERN class sender_options sender_options() const;
+
+ /// @copydoc container::receiver_options
+ PN_CPP_EXTERN class receiver_options receiver_options() const;
+
/// Return all sessions on this connection.
PN_CPP_EXTERN session_range sessions() const;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp
index 6f10c3c..be83e5e 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -73,12 +73,6 @@ class PN_CPP_CLASS_EXTERN container {
/// Connect to `url` and send an open request to the remote peer.
PN_CPP_EXTERN returned<connection> connect(const std::string& url);
- /// @cond INTERNAL
- /// Stop listening on url, must match the url string given to listen().
- /// You can also use the proton::listener object returned by listen()
- PN_CPP_EXTERN void stop_listening(const std::string& url);
- /// @endcond
-
/// Start listening on url.
///
/// Calls to the @ref listen_handler are serialized for this listener,
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/internal/config.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/internal/config.hpp b/proton-c/bindings/cpp/include/proton/internal/config.hpp
index da7f480..79d201c 100644
--- a/proton-c/bindings/cpp/include/proton/internal/config.hpp
+++ b/proton-c/bindings/cpp/include/proton/internal/config.hpp
@@ -95,6 +95,14 @@
#define PN_CPP_HAS_CHRONO PN_CPP_HAS_CPP11
#endif
+#ifndef PN_CPP_HAS_STD_MUTEX
+#define PN_CPP_HAS_STD_MUTEX PN_CPP_HAS_CPP11
+#endif
+
+#ifndef PN_CPP_HAS_STD_ATOMIC
+#define PN_CPP_HAS_STD_ATOMIC PN_CPP_HAS_CPP11
+#endif
+
#endif // PROTON_INTERNAL_CONFIG_HPP
/// @endcond
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
index 56deb00..8d0be85 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
@@ -94,28 +94,11 @@ struct const_buffer {
class
PN_CPP_CLASS_EXTERN connection_driver {
public:
- /// An engine that is not associated with a proton::container or
- /// proton::event_loop.
- ///
- /// Accessing the container or event_loop for this connection in
- /// a proton::messaging_handler will throw a proton::error exception.
- ///
+ /// An engine without a container id.
PN_CPP_EXTERN connection_driver();
- /// Create a connection driver associated with a proton::container and
- /// optional event_loop. If the event_loop is not provided attempts to use
- /// it will throw proton::error.
- ///
- /// Takes ownership of the event_loop. Note the proton::connection created
- /// by this connection_driver can outlive the connection_driver itself if
- /// the user pins it in memory using the proton::thread_safe<> template.
- /// The event_loop is deleted when, and only when, the proton::connection is.
- ///
- PN_CPP_EXTERN connection_driver(proton::container&);
-#if PN_CPP_HAS_RVALUE_REFERENCES
- /// @copydoc connection_driver()
- PN_CPP_EXTERN connection_driver(proton::container&, event_loop&& loop);
-#endif
+ /// Create a connection driver associated with a container id.
+ PN_CPP_EXTERN connection_driver(const std::string&);
PN_CPP_EXTERN ~connection_driver();
@@ -207,8 +190,8 @@ PN_CPP_CLASS_EXTERN connection_driver {
connection_driver(const connection_driver&);
connection_driver& operator=(const connection_driver&);
+ std::string container_id_;
messaging_handler* handler_;
- proton::container* container_;
pn_connection_driver_t driver_;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp b/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp
deleted file mode 100644
index a04b4ff..0000000
--- a/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp
+++ /dev/null
@@ -1,144 +0,0 @@
-#ifndef PROTON_IO_CONTAINER_IMPL_BASE_HPP
-#define PROTON_IO_CONTAINER_IMPL_BASE_HPP
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "../container.hpp"
-
-#include <future>
-#include <mutex>
-#include <sstream>
-
-namespace proton {
-namespace io {
-
-/// **Experimental** - A base container implementation.
-///
-/// This is a thread-safe partial implementation of the
-/// proton::container interface to reduce boilerplate code in
-/// container implementations. Requires C++11.
-///
-/// You can ignore this class if you want to implement the functions
-/// in a different way.
-class container_impl_base : public standard_container {
- public:
- // Pull in base class functions here so that name search finds all the overloads
- using standard_container::open_receiver;
- using standard_container::open_sender;
-
- /// @see proton::container::client_connection_options
- void client_connection_options(const connection_options & opts) {
- store(client_copts_, opts);
- }
-
- /// @see proton::container::client_connection_options
- connection_options client_connection_options() const {
- return load(client_copts_);
- }
-
- /// @see proton::container::server_connection_options
- void server_connection_options(const connection_options & opts) {
- store(server_copts_, opts);
- }
-
- /// @see proton::container::server_connection_options
- connection_options server_connection_options() const {
- return load(server_copts_);
- }
-
- /// @see proton::container::sender_options
- void sender_options(const class sender_options & opts) {
- store(sender_opts_, opts);
- }
-
- /// @see proton::container::sender_options
- class sender_options sender_options() const {
- return load(sender_opts_);
- }
-
- /// @see proton::container::receiver_options
- void receiver_options(const class receiver_options & opts) {
- store(receiver_opts_, opts);
- }
-
- /// @see proton::container::receiver_options
- class receiver_options receiver_options() const {
- return load(receiver_opts_);
- }
-
- /// @see proton::container::open_sender
- returned<sender> open_sender(
- const std::string &url, const class sender_options &opts, const connection_options &copts)
- {
- return open_link<sender, class sender_options>(url, opts, copts, &connection::open_sender);
- }
-
- /// @see proton::container::open_receiver
- returned<receiver> open_receiver(
- const std::string &url, const class receiver_options &opts, const connection_options &copts)
- {
- return open_link<receiver>(url, opts, copts, &connection::open_receiver);
- }
-
- private:
- template<class T, class Opts>
- returned<T> open_link(
- const std::string &url_str, const Opts& opts, const connection_options& copts,
- T (connection::*open_fn)(const std::string&, const Opts&))
- {
- std::string addr = url(url_str).path();
- std::shared_ptr<thread_safe<connection> > ts_connection = connect(url_str, copts);
- std::promise<returned<T> > result_promise;
- auto do_open = [ts_connection, addr, opts, open_fn, &result_promise]() {
- try {
- connection c = ts_connection->unsafe();
- returned<T> s = make_thread_safe((c.*open_fn)(addr, opts));
- result_promise.set_value(s);
- } catch (...) {
- result_promise.set_exception(std::current_exception());
- }
- };
- ts_connection->event_loop()->inject(do_open);
- std::future<returned<T> > result_future = result_promise.get_future();
- if (!result_future.valid())
- throw error(url_str+": connection closed");
- return result_future.get();
- }
-
- mutable std::mutex lock_;
- template <class T> T load(const T& v) const {
- std::lock_guard<std::mutex> g(lock_);
- return v;
- }
- template <class T> void store(T& v, const T& x) const {
- std::lock_guard<std::mutex> g(lock_);
- v = x;
- }
- connection_options client_copts_, server_copts_;
- class receiver_options receiver_opts_;
- class sender_options sender_opts_;
-};
-
-} // io
-} // proton
-
-#endif // PROTON_IO_CONTAINER_IMPL_BASE_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/listen_handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/listen_handler.hpp b/proton-c/bindings/cpp/include/proton/listen_handler.hpp
index 99f7558..08d5e76 100644
--- a/proton-c/bindings/cpp/include/proton/listen_handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/listen_handler.hpp
@@ -41,14 +41,14 @@ class listen_handler {
/// the connection. messaging_handler::on_connection_open() will be called with
/// the proton::connection, it can call connection::open() to accept or
/// connection::close() to reject the connection.
- virtual connection_options on_accept()= 0;
+ virtual connection_options on_accept(listener&)= 0;
/// Called if there is a listening error, with an error message.
/// close() will also be called.
- virtual void on_error(const std::string&) {}
+ virtual void on_error(listener&, const std::string&) {}
/// Called when this listen_handler is no longer needed, and can be deleted.
- virtual void on_close() {}
+ virtual void on_close(listener&) {}
};
} // proton
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/listener.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/listener.hpp b/proton-c/bindings/cpp/include/proton/listener.hpp
index 4b4ca24..c7f95a7 100644
--- a/proton-c/bindings/cpp/include/proton/listener.hpp
+++ b/proton-c/bindings/cpp/include/proton/listener.hpp
@@ -20,30 +20,30 @@
* under the License.
*/
-#include "./fwd.hpp"
#include "./internal/export.hpp"
-#include <string>
+struct pn_listener_t;
namespace proton {
/// A listener for incoming connections.
class PN_CPP_CLASS_EXTERN listener {
+ /// @cond INTERNAL
+ listener(pn_listener_t*);
+ /// @endcond
+
public:
/// Create an empty listener.
PN_CPP_EXTERN listener();
- /// @cond INTERNAL
- PN_CPP_EXTERN listener(container&, const std::string&);
- /// @endcond
-
/// Stop listening on the address provided to the call to
/// container::listen that returned this listener.
PN_CPP_EXTERN void stop();
- private:
- std::string url_;
- container* container_;
+ private:
+ pn_listener_t* listener_;
+
+ friend class container;
};
} // proton
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection.cpp b/proton-c/bindings/cpp/src/connection.cpp
index f706df4..113a06f 100644
--- a/proton-c/bindings/cpp/src/connection.cpp
+++ b/proton-c/bindings/cpp/src/connection.cpp
@@ -38,7 +38,6 @@
#include <proton/connection.h>
#include <proton/session.h>
#include <proton/transport.h>
-#include <proton/reactor.h>
#include <proton/object.h>
namespace proton {
@@ -72,13 +71,7 @@ std::string connection::user() const {
container& connection::container() const {
class container* c = connection_context::get(pn_object()).container;
- if (!c) {
- pn_reactor_t *r = pn_object_reactor(pn_object());
- if (r)
- c = &container_context::get(r);
- }
- if (!c)
- throw proton::error("connection does not have a container");
+ if (!c) throw proton::error("No container");
return *c;
}
@@ -133,7 +126,7 @@ sender connection::open_sender(const std::string &addr) {
return open_sender(addr, sender_options());
}
-sender connection::open_sender(const std::string &addr, const sender_options &opts) {
+sender connection::open_sender(const std::string &addr, const class sender_options &opts) {
return default_session().open_sender(addr, opts);
}
@@ -141,11 +134,25 @@ receiver connection::open_receiver(const std::string &addr) {
return open_receiver(addr, receiver_options());
}
-receiver connection::open_receiver(const std::string &addr, const receiver_options &opts)
+receiver connection::open_receiver(const std::string &addr, const class receiver_options &opts)
{
return default_session().open_receiver(addr, opts);
}
+class sender_options connection::sender_options() const {
+ connection_context& ctx = connection_context::get(pn_object());
+ return ctx.container ?
+ ctx.container->sender_options() :
+ proton::sender_options();
+}
+
+class receiver_options connection::receiver_options() const {
+ connection_context& ctx = connection_context::get(pn_object());
+ return ctx.container ?
+ ctx.container->receiver_options() :
+ proton::receiver_options();
+}
+
error_condition connection::error() const {
return make_wrapper(pn_connection_remote_condition(pn_object()));
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/connection_driver_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_driver_test.cpp b/proton-c/bindings/cpp/src/connection_driver_test.cpp
index a5771f9..ae18ebe 100644
--- a/proton-c/bindings/cpp/src/connection_driver_test.cpp
+++ b/proton-c/bindings/cpp/src/connection_driver_test.cpp
@@ -31,6 +31,7 @@
#include "proton/sender.hpp"
#include "proton/sender_options.hpp"
#include "proton/source_options.hpp"
+#include "proton/thread_safe.hpp"
#include "proton/types_fwd.hpp"
#include "proton/uuid.hpp"
@@ -57,8 +58,8 @@ struct in_memory_driver : public connection_driver {
byte_stream& writes;
int spinning;
- in_memory_driver(byte_stream& rd, byte_stream& wr) :
- reads(rd), writes(wr), spinning(0) {}
+ in_memory_driver(byte_stream& rd, byte_stream& wr, const std::string& name) :
+ connection_driver(name), reads(rd), writes(wr), spinning(0) {}
void do_read() {
mutable_buffer rbuf = read_buffer();
@@ -102,8 +103,10 @@ struct driver_pair {
byte_stream ab, ba;
in_memory_driver a, b;
- driver_pair(const connection_options& oa, const connection_options& ob)
- : a(ba, ab), b(ab, ba)
+ driver_pair(const connection_options& oa, const connection_options& ob,
+ const std::string& name=""
+ ) :
+ a(ba, ab, name+"a"), b(ab, ba, name+"b")
{
a.connect(oa);
b.accept(ob);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/connection_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_options.cpp b/proton-c/bindings/cpp/src/connection_options.cpp
index 506e84e..4644094 100644
--- a/proton-c/bindings/cpp/src/connection_options.cpp
+++ b/proton-c/bindings/cpp/src/connection_options.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "proton/fwd.hpp"
#include "proton/connection_options.hpp"
#include "proton/messaging_handler.hpp"
#include "proton/reconnect_timer.hpp"
@@ -27,12 +28,12 @@
#include "acceptor.hpp"
#include "contexts.hpp"
-#include "connector.hpp"
#include "messaging_adapter.hpp"
#include "msg.hpp"
#include "proton_bits.hpp"
#include <proton/connection.h>
+#include <proton/proactor.h>
#include <proton/transport.h>
namespace proton {
@@ -74,15 +75,14 @@ class connection_options::impl {
*/
void apply_unbound(connection& c) {
pn_connection_t *pnc = unwrap(c);
- container::impl::connector *outbound = dynamic_cast<container::impl::connector*>(
- connection_context::get(unwrap(c)).handler.get());
// Only apply connection options if uninit.
bool uninit = c.uninitialized();
if (!uninit) return;
+ bool outbound = !connection_context::get(pnc).listener_context_;
if (reconnect.set && outbound)
- outbound->reconnect_timer(reconnect.value);
+ connection_context::get(pnc).reconnect.reset(new reconnect_timer(reconnect.value));
if (container_id.set)
pn_connection_set_container(pnc, container_id.value.c_str());
if (virtual_host.set)
@@ -97,31 +97,23 @@ class connection_options::impl {
// Transport options. pnt is NULL between reconnect attempts
// and if there is a pipelined open frame.
pn_connection_t *pnc = unwrap(c);
- container::impl::connector *outbound = dynamic_cast<container::impl::connector*>(
- connection_context::get(unwrap(c)).handler.get());
-
pn_transport_t *pnt = pn_connection_transport(pnc);
if (!pnt) return;
// SSL
- if (outbound && outbound->address().scheme() == url::AMQPS) {
+ connection_context& cc = connection_context::get(pnc);
+ bool outbound = !cc.listener_context_;
+ if (outbound && ssl_client_options.set) {
// A side effect of pn_ssl() is to set the ssl peer
// hostname to the connection hostname, which has
// already been adjusted for the virtual_host option.
pn_ssl_t *ssl = pn_ssl(pnt);
if (pn_ssl_init(ssl, ssl_client_options.value.pn_domain(), NULL))
throw error(MSG("client SSL/TLS initialization error"));
- } else if (!outbound) {
- // TODO aconway 2016-05-13: reactor only
- pn_acceptor_t *pnp = pn_connection_acceptor(pnc);
- if (pnp) {
- listener_context &lc(listener_context::get(pnp));
- if (lc.ssl) {
- pn_ssl_t *ssl = pn_ssl(pnt);
- if (pn_ssl_init(ssl, ssl_server_options.value.pn_domain(), NULL))
- throw error(MSG("server SSL/TLS initialization error"));
- }
- }
+ } else if (!outbound && ssl_server_options.set) {
+ pn_ssl_t *ssl = pn_ssl(pnt);
+ if (pn_ssl_init(ssl, ssl_server_options.value.pn_domain(), NULL))
+ throw error(MSG("server SSL/TLS initialization error"));
}
// SASL
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp
index 3daa925..b98da78 100644
--- a/proton-c/bindings/cpp/src/container.cpp
+++ b/proton-c/bindings/cpp/src/container.cpp
@@ -27,7 +27,7 @@
#include "proton/listener.hpp"
#include "proton/thread_safe.hpp"
-#include "container_impl.hpp"
+#include "proactor_container_impl.hpp"
namespace proton {
@@ -65,24 +65,12 @@ returned<receiver> container::open_receiver(const std::string &url, const proton
return open_receiver(url, receiver_options(), co);
}
-namespace{
- struct listen_opts : public listen_handler {
- connection_options opts;
- listen_opts(const connection_options& o) : opts(o) {}
- connection_options on_accept() { return opts; }
- void on_close() { delete this; }
- };
-}
-
listener container::listen(const std::string& url, const connection_options& opts) {
- // Note: listen_opts::on_close() calls delete(this) so this is not a leak.
- // The container will always call on_closed() even if there are errors or exceptions.
- listen_opts* lh = new listen_opts(opts);
- return listen(url, *lh);
+ return impl_->listen(url, opts);
}
listener container::listen(const std::string &url) {
- return listen(url, connection_options());
+ return impl_->listen(url);
}
void container::stop() { stop(error_condition()); }
@@ -93,8 +81,6 @@ returned<connection> container::connect(const std::string& url, const connection
listener container::listen(const std::string& url, listen_handler& l) { return impl_->listen(url, l); }
-void container::stop_listening(const std::string& url) { impl_->stop_listening(url); }
-
void container::run() { impl_->run(); }
void container::auto_stop(bool set) { impl_->auto_stop(set); }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/container_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_test.cpp b/proton-c/bindings/cpp/src/container_test.cpp
index e02aff5..d210268 100644
--- a/proton-c/bindings/cpp/src/container_test.cpp
+++ b/proton-c/bindings/cpp/src/container_test.cpp
@@ -124,12 +124,12 @@ struct test_listener : public proton::listen_handler {
bool on_accept_, on_close_;
std::string on_error_;
test_listener() : on_accept_(false), on_close_(false) {}
- proton::connection_options on_accept() PN_CPP_OVERRIDE {
+ proton::connection_options on_accept(proton::listener&) PN_CPP_OVERRIDE {
on_accept_ = true;
return proton::connection_options();
}
- void on_close() PN_CPP_OVERRIDE { on_close_ = true; }
- void on_error(const std::string& e) PN_CPP_OVERRIDE { on_error_ = e; }
+ void on_close(proton::listener&) PN_CPP_OVERRIDE { on_close_ = true; }
+ void on_error(proton::listener&, const std::string& e) PN_CPP_OVERRIDE { on_error_ = e; }
};
int test_container_bad_address() {
@@ -179,6 +179,11 @@ class stop_tester : public proton::messaging_handler {
state = 5;
}
+ void on_transport_error(proton::transport & t) PN_CPP_OVERRIDE {
+ // Do nothing - ignore transport errors - we're going to get one when
+ // the container stops.
+ }
+
public:
stop_tester(): state(0) {}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/contexts.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp
index b1a234f..81ef5eb 100644
--- a/proton-c/bindings/cpp/src/contexts.cpp
+++ b/proton-c/bindings/cpp/src/contexts.cpp
@@ -21,7 +21,6 @@
#include "contexts.hpp"
#include "msg.hpp"
-#include "reactor.hpp"
#include "proton_bits.hpp"
#include "proton/error.hpp"
@@ -29,8 +28,9 @@
#include <proton/connection.h>
#include <proton/object.h>
#include <proton/link.h>
+#include <proton/listener.h>
#include <proton/message.h>
-#include <proton/reactor.h>
+#include "proton/reconnect_timer.hpp"
#include <proton/session.h>
#include <typeinfo>
@@ -48,16 +48,10 @@ pn_class_t cpp_context_class = PN_CLASS(cpp_context);
// Handles
PN_HANDLE(CONNECTION_CONTEXT)
-PN_HANDLE(CONTAINER_CONTEXT)
PN_HANDLE(LISTENER_CONTEXT)
+PN_HANDLE(SESSION_CONTEXT)
PN_HANDLE(LINK_CONTEXT)
-void set_context(pn_record_t* record, pn_handle_t handle, const pn_class_t *clazz, void* value)
-{
- pn_record_def(record, handle, clazz);
- pn_record_set(record, handle, value);
-}
-
template <class T>
T* get_context(pn_record_t* record, pn_handle_t handle) {
return reinterpret_cast<T*>(pn_record_get(record, handle));
@@ -71,45 +65,24 @@ void *context::alloc(size_t n) { return pn_object_new(&cpp_context_class, n); }
pn_class_t* context::pn_class() { return &cpp_context_class; }
+connection_context::connection_context() :
+ container(0), default_session(0), link_gen(0), handler(0), listener_context_(0)
+{}
-context::id connection_context::id(pn_connection_t* c) {
- return context::id(pn_connection_attachments(c), CONNECTION_CONTEXT);
-}
-
-void container_context::set(const reactor& r, container& c) {
- set_context(pn_reactor_attachments(unwrap(r)), CONTAINER_CONTEXT, PN_VOID, &c);
+connection_context& connection_context::get(pn_connection_t *c) {
+ return ref<connection_context>(id(pn_connection_attachments(c), CONNECTION_CONTEXT));
}
-container &container_context::get(pn_reactor_t *pn_reactor) {
- container *ctx = get_context<container>(pn_reactor_attachments(pn_reactor), CONTAINER_CONTEXT);
- if (!ctx) throw error(MSG("Reactor has no C++ container context"));
- return *ctx;
+listener_context& listener_context::get(pn_listener_t* l) {
+ return ref<listener_context>(id(pn_listener_attachments(l), LISTENER_CONTEXT));
}
-listener_context& listener_context::get(pn_acceptor_t* a) {
- // TODO aconway 2016-05-13: reactor only
- // A Proton C pn_acceptor_t is really just a selectable
- pn_selectable_t *sel = reinterpret_cast<pn_selectable_t*>(a);
-
- listener_context* ctx =
- get_context<listener_context>(pn_selectable_attachments(sel), LISTENER_CONTEXT);
- if (!ctx) {
- ctx = context::create<listener_context>();
- set_context(pn_selectable_attachments(sel), LISTENER_CONTEXT, context::pn_class(), ctx);
- pn_decref(ctx);
- }
- return *ctx;
+link_context& link_context::get(pn_link_t* l) {
+ return ref<link_context>(id(pn_link_attachments(l), LINK_CONTEXT));
}
-link_context& link_context::get(pn_link_t* l) {
- link_context* ctx =
- get_context<link_context>(pn_link_attachments(l), LINK_CONTEXT);
- if (!ctx) {
- ctx = context::create<link_context>();
- set_context(pn_link_attachments(l), LINK_CONTEXT, context::pn_class(), ctx);
- pn_decref(ctx);
- }
- return *ctx;
+session_context& session_context::get(pn_session_t* s) {
+ return ref<session_context>(id(pn_session_attachments(s), SESSION_CONTEXT));
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/event_loop.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/event_loop.cpp b/proton-c/bindings/cpp/src/event_loop.cpp
index ea4ee71..ab39aa7 100644
--- a/proton-c/bindings/cpp/src/event_loop.cpp
+++ b/proton-c/bindings/cpp/src/event_loop.cpp
@@ -20,7 +20,7 @@
#include "proton/event_loop.hpp"
#include "contexts.hpp"
-#include "event_loop_impl.hpp"
+#include "proactor_event_loop_impl.hpp"
#include <proton/session.h>
#include <proton/link.h>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/contexts.hpp b/proton-c/bindings/cpp/src/include/contexts.hpp
index 742b346..c096a6e 100644
--- a/proton-c/bindings/cpp/src/include/contexts.hpp
+++ b/proton-c/bindings/cpp/src/include/contexts.hpp
@@ -34,16 +34,16 @@
#include "proton_handler.hpp"
-struct pn_session_t;
-struct pn_event_t;
-struct pn_reactor_t;
struct pn_record_t;
-struct pn_acceptor_t;
+struct pn_link_t;
+struct pn_session_t;
+struct pn_connection_t;
+struct pn_listener_t;
namespace proton {
class proton_handler;
-class reactor;
+class reconnect_timer;
// Base class for C++ classes that are used as proton contexts.
// Contexts are pn_objects managed by pn reference counts, the C++ value is allocated in-place.
@@ -82,51 +82,53 @@ class context {
static void *alloc(size_t n);
};
+class listener_context;
+
// Connection context used by all connections.
class connection_context : public context {
public:
- connection_context() : container(0), default_session(0), link_gen(0) {}
+ connection_context();
+ static connection_context& get(pn_connection_t *c);
class container* container;
pn_session_t *default_session; // Owned by connection.
message event_message; // re-used by messaging_adapter for performance.
io::link_namer* link_gen; // Link name generator.
- internal::pn_unique_ptr<proton_handler> handler;
+ messaging_handler* handler;
+ internal::pn_unique_ptr<reconnect_timer> reconnect;
+ listener_context* listener_context_;
event_loop event_loop_;
-
- static connection_context& get(pn_connection_t *c) { return ref<connection_context>(id(c)); }
-
- protected:
- static context::id id(pn_connection_t*);
-};
-
-void container_context(const reactor&, container&);
-
-class container_context {
- public:
- static void set(const reactor& r, container& c);
- static container& get(pn_reactor_t*);
};
class listener_context : public context {
public:
- static listener_context& get(pn_acceptor_t* c);
- listener_context() : listen_handler_(0), ssl(false) {}
- connection_options get_options() { return listen_handler_->on_accept(); }
- class listen_handler* listen_handler_;
- bool ssl;
+ listener_context() : listen_handler_(0) {}
+ static listener_context& get(pn_listener_t* c);
+
+ listen_handler* listen_handler_;
+ internal::pn_unique_ptr<const connection_options> connection_options_;
};
class link_context : public context {
public:
+ link_context() : handler(0), credit_window(10), pending_credit(0), auto_accept(true), auto_settle(true), draining(false) {}
static link_context& get(pn_link_t* l);
- link_context() : credit_window(10), auto_accept(true), auto_settle(true), draining(false), pending_credit(0) {}
+
+ messaging_handler* handler;
int credit_window;
+ uint32_t pending_credit;
bool auto_accept;
bool auto_settle;
bool draining;
- uint32_t pending_credit;
+};
+
+class session_context : public context {
+ public:
+ session_context() : handler(0) {}
+ static session_context& get(pn_session_t* s);
+
+ messaging_handler* handler;
};
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/messaging_adapter.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/messaging_adapter.hpp b/proton-c/bindings/cpp/src/include/messaging_adapter.hpp
index 5371eec..d7eb6a0 100644
--- a/proton-c/bindings/cpp/src/include/messaging_adapter.hpp
+++ b/proton-c/bindings/cpp/src/include/messaging_adapter.hpp
@@ -39,8 +39,6 @@ class messaging_adapter : public proton_handler
public:
messaging_adapter(messaging_handler &delegate) : delegate_(delegate) {}
- void on_reactor_init(proton_event &e);
- void on_reactor_final(proton_event & e);
void on_link_flow(proton_event &e);
void on_delivery(proton_event &e);
void on_connection_remote_open(proton_event &e);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
new file mode 100644
index 0000000..8c12c02
--- /dev/null
+++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
@@ -0,0 +1,133 @@
+#ifndef PROTON_CPP_PROACTOR_CONTAINERIMPL_H
+#define PROTON_CPP_PROACTOR_CONTAINERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/fwd.hpp"
+#include "proton/container.hpp"
+#include "proton/connection.hpp"
+#include "proton/connection_options.hpp"
+#include "proton/duration.hpp"
+#include "proton/error_condition.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/receiver.hpp"
+#include "proton/receiver_options.hpp"
+#include "proton/sender.hpp"
+#include "proton/sender_options.hpp"
+
+#include "proton_bits.hpp"
+#include "proton_handler.hpp"
+
+#include <list>
+#include <map>
+#include <string>
+#include <vector>
+
+struct pn_proactor_t;
+struct pn_listener_t;
+struct pn_event_t;
+
+namespace proton {
+
+class container::impl {
+ public:
+ impl(container& c, const std::string& id, messaging_handler* = 0);
+ ~impl();
+ std::string id() const { return id_; }
+ returned<connection> connect(const std::string&, const connection_options&);
+ returned<sender> open_sender(
+ const std::string&, const proton::sender_options &, const connection_options &);
+ returned<receiver> open_receiver(
+ const std::string&, const proton::receiver_options &, const connection_options &);
+ listener listen(const std::string&);
+ listener listen(const std::string&, const connection_options& lh);
+ listener listen(const std::string&, listen_handler& lh);
+ void client_connection_options(const connection_options &);
+ connection_options client_connection_options() const { return client_connection_options_; }
+ void server_connection_options(const connection_options &);
+ connection_options server_connection_options() const { return server_connection_options_; }
+ void sender_options(const proton::sender_options&);
+ class sender_options sender_options() const { return sender_options_; }
+ void receiver_options(const proton::receiver_options&);
+ class receiver_options receiver_options() const { return receiver_options_; }
+ void run();
+ void stop(const error_condition& err);
+ void auto_stop(bool set);
+ void schedule(duration, void_function0&);
+#if PN_CPP_HAS_STD_FUNCTION
+ void schedule(duration, std::function<void()>);
+#endif
+ template <class T> static void set_handler(T s, messaging_handler* h);
+ template <class T> static messaging_handler* get_handler(T s);
+
+ private:
+ pn_listener_t* listen_common_lh(const std::string&);
+ connection connect_common(const std::string&, const connection_options&);
+
+ // Event loop to run in each container thread
+ static void thread(impl&);
+ bool handle(pn_event_t*);
+ void run_timer_jobs();
+
+ container& container_;
+
+ struct scheduled {
+ timestamp time; // duration from epoch for task
+#if PN_CPP_HAS_STD_FUNCTION
+ std::function<void()> task;
+#else
+ void_function0* task_;
+ void task();
+#endif
+
+ // We want to get to get the *earliest* first so test is "reversed"
+ bool operator < (const scheduled& r) const { return r.time < time; }
+ };
+ std::vector<scheduled> deferred_; // This vector is kept as a heap
+
+ pn_proactor_t* proactor_;
+ messaging_handler* handler_;
+ std::string id_;
+ connection_options client_connection_options_;
+ connection_options server_connection_options_;
+ proton::sender_options sender_options_;
+ proton::receiver_options receiver_options_;
+
+ proton::error_condition stop_err_;
+ bool auto_stop_;
+ bool stopping_;
+};
+
+template <class T>
+void container::impl::set_handler(T s, messaging_handler* mh) {
+ internal::set_messaging_handler(s, mh);
+}
+
+template <class T>
+messaging_handler* container::impl::get_handler(T s) {
+ return internal::get_messaging_handler(s);
+}
+
+
+}
+
+#endif /*!PROTON_CPP_PROACTOR_CONTAINERIMPL_H*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp
new file mode 100644
index 0000000..8fa7acf
--- /dev/null
+++ b/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp
@@ -0,0 +1,54 @@
+#ifndef PROTON_CPP_EVENT_LOOP_IMPL_HPP
+#define PROTON_CPP_EVENT_LOOP_IMPL_HPP
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/fwd.hpp"
+
+struct pn_connection_t;
+
+namespace proton {
+
+class event_loop::impl {
+ public:
+ impl(pn_connection_t*);
+
+ bool inject(void_function0& f);
+#if PN_CPP_HAS_STD_FUNCTION
+ bool inject(std::function<void()> f);
+ typedef std::vector<std::function<void()> > jobs;
+#else
+ typedef std::vector<void_function0*> jobs;
+#endif
+
+
+ void run_all_jobs();
+ void finished();
+
+ jobs jobs_;
+ pn_connection_t* connection_;
+ bool finished_;
+};
+
+}
+
+#endif // PROTON_CPP_EVENT_LOOP_IMPL_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/proton_bits.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proton_bits.hpp b/proton-c/bindings/cpp/src/include/proton_bits.hpp
index 53f2230..e72f343 100644
--- a/proton-c/bindings/cpp/src/include/proton_bits.hpp
+++ b/proton-c/bindings/cpp/src/include/proton_bits.hpp
@@ -24,6 +24,8 @@
#include <string>
#include <iosfwd>
+#include "contexts.hpp"
+
/**@file
*
* Assorted internal proton utilities.
@@ -65,6 +67,7 @@ class terminus;
class source;
class target;
class reactor;
+class messaging_handler;
std::string error_str(long code);
@@ -127,12 +130,19 @@ public:
static typename wrapped<T>::type* unwrap(const T& t) { return t.pn_object(); }
};
-// Get attachments for various proton-c types
+template <class T> struct context {};
+template <> struct context<link> {typedef link_context type; };
+template <> struct context<receiver> {typedef link_context type; };
+template <> struct context<sender> {typedef link_context type; };
+template <> struct context<session> {typedef session_context type; };
+template <> struct context<connection> {typedef connection_context type; };
+
+template <class T>
+inline void set_messaging_handler(T t, messaging_handler* mh) { context<T>::type::get(factory<T>::unwrap(t)).handler = mh; }
+
template <class T>
-inline pn_record_t* get_attachments(T*);
+inline messaging_handler* get_messaging_handler(T* t) { return context<typename internal::wrapper<T>::type>::type::get(t).handler; }
-template <> inline pn_record_t* get_attachments(pn_session_t* s) { return pn_session_attachments(s); }
-template <> inline pn_record_t* get_attachments(pn_link_t* l) { return pn_link_attachments(l); }
}
template <class T>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/proton_event.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proton_event.hpp b/proton-c/bindings/cpp/src/include/proton_event.hpp
index 374da85..be324e7 100644
--- a/proton-c/bindings/cpp/src/include/proton_event.hpp
+++ b/proton-c/bindings/cpp/src/include/proton_event.hpp
@@ -266,23 +266,12 @@ class proton_event
};
///@}
- proton_event(pn_event_t *ce, class container* cont) :
- pn_event_(ce),
- container_(cont)
+ proton_event(pn_event_t *ce) :
+ pn_event_(ce)
{}
pn_event_t* pn_event() const { return pn_event_; }
- /** Return a reference to the container, throws proton::error if there is none. */
- class container& container() const {
- if (!container_)
- throw proton::error("event does not have a container");
- return *container_;
- }
-
- /** Return a pointer to the container if there is one, NULL otherwise. */
- class container* container_ptr() const { return container_; }
-
/// Get type of event
event_type type() const { return event_type(pn_event_type(pn_event_)); }
@@ -290,7 +279,6 @@ class proton_event
private:
pn_event_t *pn_event_;
- class container* container_;
};
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/test_dummy_container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/test_dummy_container.hpp b/proton-c/bindings/cpp/src/include/test_dummy_container.hpp
deleted file mode 100644
index daed435..0000000
--- a/proton-c/bindings/cpp/src/include/test_dummy_container.hpp
+++ /dev/null
@@ -1,82 +0,0 @@
-#ifndef TEST_DUMMY_CONTAINER_HPP
-#define TEST_DUMMY_CONTAINER_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "proton/container.hpp"
-#include "proton/event_loop.hpp"
-#include "proton/thread_safe.hpp"
-
-namespace test {
-
-using namespace proton;
-
-class dummy_container : public standard_container {
- public:
- dummy_container(const std::string cid="") :
- id_(cid), fail("not implemented for dummy_container") {}
-
- // Pull in base class functions here so that name search finds all the overloads
- using standard_container::stop;
- using standard_container::connect;
- using standard_container::listen;
- using standard_container::open_receiver;
- using standard_container::open_sender;
-
- returned<connection> connect(const std::string&, const connection_options&) { throw fail; }
- listener listen(const std::string& , listen_handler& ) { throw fail; }
- void stop_listening(const std::string&) { throw fail; }
- void run() { throw fail; }
- void auto_stop(bool) { throw fail; }
- void stop(const proton::error_condition& ) { throw fail; }
- returned<sender> open_sender(const std::string &, const proton::sender_options &, const connection_options&) { throw fail; }
- returned<receiver> open_receiver( const std::string &, const proton::receiver_options &, const connection_options &) { throw fail; }
- std::string id() const { return id_; }
- void client_connection_options(const connection_options &o) { ccopts_ = o; }
- connection_options client_connection_options() const { return ccopts_; }
- void server_connection_options(const connection_options &o) { scopts_ = o; }
- connection_options server_connection_options() const { return scopts_; }
- void sender_options(const class sender_options &o) { sopts_ = o; }
- class sender_options sender_options() const { return sopts_; }
- void receiver_options(const class receiver_options &o) { ropts_ = o; }
- class receiver_options receiver_options() const { return ropts_; }
-#if PN_CPP_HAS_STD_FUNCTION
- void schedule(duration, std::function<void()>) { throw fail; }
-#endif
- void schedule(duration, void_function0&) { throw fail; }
-
- private:
- std::string id_;
- connection_options ccopts_, scopts_;
- class sender_options sopts_;
- class receiver_options ropts_;
- std::runtime_error fail;
-};
-
-class dummy_event_loop : public event_loop {
-#if PN_CPP_HAS_CPP11
- bool inject(std::function<void()> f) PN_CPP_OVERRIDE { f(); return true; }
-#endif
- bool inject(proton::void_function0& h) PN_CPP_OVERRIDE { h(); return true; }
-};
-
-}
-
-#endif // TEST_DUMMY_CONTAINER_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/io/connection_driver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_driver.cpp b/proton-c/bindings/cpp/src/io/connection_driver.cpp
index da8c2a4..d7c5e5c 100644
--- a/proton-c/bindings/cpp/src/io/connection_driver.cpp
+++ b/proton-c/bindings/cpp/src/io/connection_driver.cpp
@@ -47,23 +47,12 @@ void connection_driver::init() {
}
}
-connection_driver::connection_driver() : handler_(0), container_(0) { init(); }
+connection_driver::connection_driver() : handler_(0) { init(); }
-connection_driver::connection_driver(class container& cont) : handler_(0), container_(&cont) {
+connection_driver::connection_driver(const std::string& id) : container_id_(id), handler_(0) {
init();
- connection_context& ctx = connection_context::get(unwrap(connection()));
- ctx.container = container_;
}
-#if PN_CPP_HAS_RVALUE_REFERENCES
-connection_driver::connection_driver(class container& cont, event_loop&& loop) : handler_(0), container_(&cont) {
- init();
- connection_context& ctx = connection_context::get(unwrap(connection()));
- ctx.container = container_;
- ctx.event_loop_ = loop.impl_.get();
-}
-#endif
-
connection_driver::~connection_driver() {
pn_connection_driver_destroy(&driver_);
}
@@ -79,10 +68,7 @@ void connection_driver::configure(const connection_options& opts, bool server) {
void connection_driver::connect(const connection_options& opts) {
connection_options all;
- if (container_) {
- all.container_id(container_->id());
- all.update(container_->client_connection_options());
- }
+ all.container_id(container_id_);
all.update(opts);
configure(all, false);
connection().open();
@@ -90,10 +76,7 @@ void connection_driver::connect(const connection_options& opts) {
void connection_driver::accept(const connection_options& opts) {
connection_options all;
- if (container_) {
- all.container_id(container_->id());
- all.update(container_->server_connection_options());
- }
+ all.container_id(container_id_);
all.update(opts);
configure(all, true);
}
@@ -105,7 +88,7 @@ bool connection_driver::has_events() const {
bool connection_driver::dispatch() {
pn_event_t* c_event;
while ((c_event = pn_connection_driver_next_event(&driver_)) != NULL) {
- proton_event cpp_event(c_event, container_);
+ proton_event cpp_event(c_event);
try {
if (handler_ != 0) {
messaging_adapter adapter(*handler_);
@@ -163,8 +146,4 @@ proton::transport connection_driver::transport() const {
return make_wrapper(driver_.transport);
}
-proton::container* connection_driver::container() const {
- return container_;
-}
-
}}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/listener.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/listener.cpp b/proton-c/bindings/cpp/src/listener.cpp
index 2639f5e..a9ca53d 100644
--- a/proton-c/bindings/cpp/src/listener.cpp
+++ b/proton-c/bindings/cpp/src/listener.cpp
@@ -18,12 +18,15 @@
*/
#include "proton/listener.hpp"
-#include "proton/container.hpp"
+
+#include <proton/listener.h>
+
+#include "contexts.hpp"
namespace proton {
-listener::listener() : container_(0) {}
-listener::listener(container& c, const std::string& u) : url_(u), container_(&c) {}
-void listener::stop() { if (container_) container_->stop_listening(url_); }
+listener::listener(): listener_(0) {}
+listener::listener(pn_listener_t* l) : listener_(l) {}
+void listener::stop() { if (listener_) pn_listener_close(listener_); }
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp
index a70703e..613808b 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.cpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp
@@ -58,16 +58,6 @@ void credit_topup(pn_link_t *link) {
}
}
-void messaging_adapter::on_reactor_init(proton_event &pe) {
- container* c = pe.container_ptr();
- if (c) delegate_.on_container_start(*c);
-}
-
-void messaging_adapter::on_reactor_final(proton_event &pe) {
- container* c = pe.container_ptr();
- if (c) delegate_.on_container_stop(*c);
-}
-
void messaging_adapter::on_link_flow(proton_event &pe) {
pn_event_t *pne = pe.pn_event();
pn_link_t *lnk = pn_event_link(pne);
@@ -281,24 +271,17 @@ void messaging_adapter::on_link_local_open(proton_event &pe) {
void messaging_adapter::on_link_remote_open(proton_event &pe) {
pn_link_t *lnk = pn_event_link(pe.pn_event());
- container* c = pe.container_ptr();
if (pn_link_is_receiver(lnk)) {
receiver r(make_wrapper<receiver>(lnk));
delegate_.on_receiver_open(r);
if (is_local_unititialised(pn_link_state(lnk))) {
- if (c)
- r.open(c->receiver_options());
- else
- r.open();
+ r.open(r.connection().receiver_options());
}
} else {
sender s(make_wrapper<sender>(lnk));
delegate_.on_sender_open(s);
if (is_local_unititialised(pn_link_state(lnk))) {
- if (c)
- s.open(c->sender_options());
- else
- s.open();
+ s.open(s.connection().sender_options());
}
}
credit_topup(lnk);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
new file mode 100644
index 0000000..2b6b1de
--- /dev/null
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proactor_container_impl.hpp"
+#include "proactor_event_loop_impl.hpp"
+
+#include "proton/error_condition.hpp"
+#include "proton/function.hpp"
+#include "proton/listener.hpp"
+#include "proton/listen_handler.hpp"
+#include "proton/thread_safe.hpp"
+#include "proton/url.hpp"
+
+#include "proton/connection.h"
+#include "proton/listener.h"
+#include "proton/proactor.h"
+#include "proton/transport.h"
+
+#include "contexts.hpp"
+#include "messaging_adapter.hpp"
+#include "proton_bits.hpp"
+#include "proton_event.hpp"
+
+#include <assert.h>
+
+#include <algorithm>
+#include <vector>
+
+namespace proton {
+
+event_loop::impl::impl(pn_connection_t* c)
+ : connection_(c), finished_(false)
+{}
+
+void event_loop::impl::finished() {
+ finished_ = true;
+}
+
+#if PN_CPP_HAS_STD_FUNCTION
+bool event_loop::impl::inject(std::function<void()> f) {
+ // Note this is an unbounded work queue.
+ // A resource-safe implementation should be bounded.
+ if (finished_)
+ return false;
+ jobs_.push_back(f);
+ pn_connection_wake(connection_);
+ return true;
+}
+
+bool event_loop::impl::inject(proton::void_function0& f) {
+ return inject([&f]() { f(); });
+}
+
+void event_loop::impl::run_all_jobs() {
+ decltype(jobs_) j;
+ {
+ std::swap(j, jobs_);
+ }
+ // Run queued work, but ignore any exceptions
+ for (auto& f : j) try {
+ f();
+ } catch (...) {};
+}
+#else
+bool event_loop::impl::inject(proton::void_function0& f) {
+ // Note this is an unbounded work queue.
+ // A resource-safe implementation should be bounded.
+ if (finished_)
+ return false;
+ jobs_.push_back(&f);
+ pn_connection_wake(connection_);
+ return true;
+}
+
+void event_loop::impl::run_all_jobs() {
+ // Run queued work, but ignore any exceptions
+ for (event_loop::impl::jobs::iterator f = jobs_.begin(); f != jobs_.end(); ++f) try {
+ (**f)();
+ } catch (...) {};
+ jobs_.clear();
+ return;
+}
+#endif
+container::impl::impl(container& c, const std::string& id, messaging_handler* mh)
+ : container_(c), proactor_(pn_proactor()), handler_(mh), id_(id),
+ auto_stop_(true), stopping_(false)
+{}
+
+container::impl::~impl() {
+ try {
+ stop(error_condition("exception", "container shut-down"));
+ //wait();
+ } catch (...) {}
+ pn_proactor_free(proactor_);
+}
+
+proton::connection container::impl::connect_common(
+ const std::string& addr,
+ const proton::connection_options& user_opts)
+{
+ if (stopping_)
+ throw proton::error("container is stopping");
+
+ connection_options opts = client_connection_options_; // Defaults
+ opts.update(user_opts);
+ messaging_handler* mh = opts.handler();
+
+ proton::url url(addr);
+ pn_connection_t *pnc = pn_connection();
+ connection_context& cc(connection_context::get(pnc));
+ cc.container = &container_;
+ cc.handler = mh;
+ cc.event_loop_ = new event_loop::impl(pnc);
+
+ pn_connection_set_container(pnc, id_.c_str());
+ pn_connection_set_hostname(pnc, url.host().c_str());
+ if (!url.user().empty())
+ pn_connection_set_user(pnc, url.user().c_str());
+ if (!url.password().empty())
+ pn_connection_set_password(pnc, url.password().c_str());
+
+ connection conn = make_wrapper(pnc);
+ conn.open(opts);
+ // Figure out correct string len then create connection address
+ int len = pn_proactor_addr(0, 0, url.host().c_str(), url.port().c_str());
+ std::vector<char> caddr(len+1);
+ pn_proactor_addr(&caddr[0], len+1, url.host().c_str(), url.port().c_str());
+ pn_proactor_connect(proactor_, pnc, &caddr[0]);
+ return conn;
+}
+
+proton::returned<proton::connection> container::impl::connect(
+ const std::string& addr,
+ const proton::connection_options& user_opts)
+{
+ connection conn = connect_common(addr, user_opts);
+ return make_thread_safe(conn);
+}
+
+returned<sender> container::impl::open_sender(const std::string &url, const proton::sender_options &o1, const connection_options &o2) {
+ proton::sender_options lopts(sender_options_);
+ lopts.update(o1);
+ connection conn = connect_common(url, o2);
+
+ return make_thread_safe(conn.default_session().open_sender(proton::url(url).path(), lopts));
+}
+
+returned<receiver> container::impl::open_receiver(const std::string &url, const proton::receiver_options &o1, const connection_options &o2) {
+ proton::receiver_options lopts(receiver_options_);
+ lopts.update(o1);
+ connection conn = connect_common(url, o2);
+
+ return make_thread_safe(
+ conn.default_session().open_receiver(proton::url(url).path(), lopts));
+}
+
+pn_listener_t* container::impl::listen_common_lh(const std::string& addr) {
+ if (stopping_)
+ throw proton::error("container is stopping");
+
+ proton::url url(addr);
+
+ // Figure out correct string len then create connection address
+ int len = pn_proactor_addr(0, 0, url.host().c_str(), url.port().c_str());
+ std::vector<char> caddr(len+1);
+ pn_proactor_addr(&caddr[0], len+1, url.host().c_str(), url.port().c_str());
+
+ pn_listener_t* listener = pn_listener();
+ pn_proactor_listen(proactor_, listener, &caddr[0], 16);
+ return listener;
+}
+
+proton::listener container::impl::listen(const std::string& addr) {
+ pn_listener_t* listener = listen_common_lh(addr);
+ return proton::listener(listener);
+}
+
+proton::listener container::impl::listen(const std::string& addr, const proton::connection_options& opts) {
+ pn_listener_t* listener = listen_common_lh(addr);
+ listener_context& lc=listener_context::get(listener);
+ lc.connection_options_.reset(new connection_options(opts));
+ return proton::listener(listener);
+}
+
+proton::listener container::impl::listen(const std::string& addr, proton::listen_handler& lh) {
+ pn_listener_t* listener = listen_common_lh(addr);
+ listener_context& lc=listener_context::get(listener);
+ lc.listen_handler_ = &lh;
+ return proton::listener(listener);
+}
+
+#if PN_CPP_HAS_STD_FUNCTION
+void container::impl::schedule(duration delay, void_function0& f) {
+ schedule(delay, [&f](){ f(); } );
+}
+
+void container::impl::schedule(duration delay, std::function<void()> f) {
+ // Set timeout
+ pn_proactor_set_timeout(proactor_, delay.milliseconds());
+
+ // Record timeout; Add callback to timeout sorted list
+ deferred_.emplace_back(scheduled{timestamp::now()+delay, f});
+ std::push_heap(deferred_.begin(), deferred_.end());
+}
+#else
+void container::impl::scheduled::task() {(*task_)();}
+
+void container::impl::schedule(duration delay, void_function0& f) {
+ // Set timeout
+ pn_proactor_set_timeout(proactor_, delay.milliseconds());
+
+ // Record timeout; Add callback to timeout sorted list
+ scheduled s={timestamp::now()+delay, &f};
+ deferred_.push_back(s);
+ std::push_heap(deferred_.begin(), deferred_.end());
+}
+#endif
+
+void container::impl::client_connection_options(const connection_options &opts) {
+ client_connection_options_ = opts;
+}
+
+void container::impl::server_connection_options(const connection_options &opts) {
+ server_connection_options_ = opts;
+}
+
+void container::impl::sender_options(const proton::sender_options &opts) {
+ sender_options_ = opts;
+}
+
+void container::impl::receiver_options(const proton::receiver_options &opts) {
+ receiver_options_ = opts;
+}
+
+void container::impl::run_timer_jobs() {
+ // Check head of timer queue
+ timestamp now = timestamp::now();
+ scheduled* next = &deferred_.front();
+
+ // So every scheduled element that has past run and remove head
+ while ( next->time<=now ) {
+ next->task();
+ std::pop_heap(deferred_.begin(), deferred_.end());
+ deferred_.pop_back();
+ // If there are no more scheduled items finish now
+ if ( deferred_.size()==0 ) return;
+ next = &deferred_.front();
+ };
+
+ // To get here we know we must have at least one more thing scheduled
+ pn_proactor_set_timeout(proactor_, (next->time-now).milliseconds());
+}
+
+bool container::impl::handle(pn_event_t* event) {
+
+ // If we have any pending connection work, do it now
+ pn_connection_t* c = pn_event_connection(event);
+ if (c) {
+ event_loop::impl* loop = connection_context::get(c).event_loop_.impl_.get();
+ loop->run_all_jobs();
+ }
+
+ // Process events that shouldn't be sent to messaging_handler
+ switch (pn_event_type(event)) {
+
+ case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
+ return auto_stop_;
+
+ // We never interrupt the proactor so ignore
+ case PN_PROACTOR_INTERRUPT:
+ return false;
+
+ case PN_PROACTOR_TIMEOUT:
+ // Maybe we got a timeout and have nothing scheduled (not sure if this is possible)
+ if ( deferred_.size()==0 ) return false;
+
+ run_timer_jobs();
+ return false;
+
+ case PN_LISTENER_OPEN:
+ return false;
+
+ case PN_LISTENER_ACCEPT: {
+ pn_listener_t* l = pn_event_listener(event);
+ pn_connection_t* c = pn_connection();
+ listener_context &lc(listener_context::get(l));
+ pn_connection_set_container(c, id_.c_str());
+ connection_options opts = server_connection_options_;
+ if (lc.listen_handler_) {
+ listener lstr(l);
+ opts.update(lc.listen_handler_->on_accept(lstr));
+ }
+ else if (!!lc.connection_options_) opts.update(*lc.connection_options_);
+ lc.connection_options_.reset(new connection_options(opts));
+ // Handler applied separately
+ connection_context& cc = connection_context::get(c);
+ cc.container = &container_;
+ cc.listener_context_ = &lc;
+ cc.handler = opts.handler();
+ cc.event_loop_ = new event_loop::impl(c);
+ pn_listener_accept(l, c);
+ return false;
+ }
+ case PN_LISTENER_CLOSE: {
+ pn_listener_t* l = pn_event_listener(event);
+ listener_context &lc(listener_context::get(l));
+ listener lstnr(l);
+ if (lc.listen_handler_) {
+ pn_condition_t* c = pn_listener_condition(l);
+ if (pn_condition_is_set(c)) {
+ lc.listen_handler_->on_error(lstnr, make_wrapper(c).what());
+ }
+ lc.listen_handler_->on_close(lstnr);
+ }
+ return false;
+ }
+ // If the event was just connection wake then there isn't anything more to do
+ case PN_CONNECTION_WAKE:
+ return false;
+
+ // Connection driver will bind a new transport to the connection at this point
+ case PN_CONNECTION_INIT:
+ return false;
+
+ case PN_CONNECTION_BOUND: {
+ // Need to apply post bind connection options
+ pn_connection_t* c = pn_event_connection(event);
+ connection conn = make_wrapper(c);
+ connection_context& cc = connection_context::get(c);
+ if (cc.listener_context_) {
+ cc.listener_context_->connection_options_->apply_bound(conn);
+ } else {
+ client_connection_options_.apply_bound(conn);
+ }
+
+ return false;
+ }
+ default:
+ break;
+ }
+
+ // Figure out the handler for the primary object for event
+ messaging_handler* mh = 0;
+
+ // First try for a link (send/receiver) handler
+ pn_link_t *link = pn_event_link(event);
+ if (link) mh = get_handler(link);
+
+ // Try for session handler if no link handler
+ pn_session_t *session = pn_event_session(event);
+ if (session && !mh) mh = get_handler(session);
+
+ // Try for connection handler if none of the above
+ pn_connection_t *connection = pn_event_connection(event);
+ if (connection && !mh) mh = get_handler(connection);
+
+ // Use container handler if nothing more specific (must be a container handler)
+ if (!mh) mh = handler_;
+
+ // If we still have no handler don't do anything!
+ // This is pretty unusual, but possible if we use the default constructor for container
+ if (!mh) return false;
+
+ // TODO: Currently create a throwaway messaging_adapter and proton_event so we can call dispatch, a bit inefficient
+ messaging_adapter ma(*mh);
+ proton_event pe(event);
+ pe.dispatch(ma);
+ return false;
+}
+
+void container::impl::thread(container::impl& ci) {
+ bool finished = false;
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(ci.proactor_);
+ pn_event_t *e;
+ while ((e = pn_event_batch_next(events))) {
+ finished = ci.handle(e) || finished;
+ }
+ pn_proactor_done(ci.proactor_, events);
+ } while(!finished);
+}
+
+void container::impl::run() {
+ // Have to "manually" generate container events
+ if (handler_) handler_->on_container_start(container_);
+ thread(*this);
+ if (handler_) handler_->on_container_stop(container_);
+}
+
+void container::impl::auto_stop(bool set) {
+ auto_stop_ = set;
+}
+
+void container::impl::stop(const proton::error_condition& err) {
+ auto_stop_ = true;
+ stopping_ = true;
+ pn_condition_t* error_condition = pn_condition();
+ set_error_condition(err, error_condition);
+ pn_proactor_disconnect(proactor_, error_condition);
+ pn_condition_free(error_condition);
+}
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/receiver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp
index 68d55d0..b7239a5 100644
--- a/proton-c/bindings/cpp/src/receiver.cpp
+++ b/proton-c/bindings/cpp/src/receiver.cpp
@@ -34,7 +34,6 @@
#include <proton/session.h>
#include <proton/link.h>
#include <proton/event.h>
-#include <proton/reactor.h>
namespace proton {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/receiver_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/receiver_options.cpp b/proton-c/bindings/cpp/src/receiver_options.cpp
index 4a4d80f..2b134bc 100644
--- a/proton-c/bindings/cpp/src/receiver_options.cpp
+++ b/proton-c/bindings/cpp/src/receiver_options.cpp
@@ -27,7 +27,7 @@
#include <proton/link.h>
#include "contexts.hpp"
-#include "container_impl.hpp"
+#include "proactor_container_impl.hpp"
#include "messaging_adapter.hpp"
#include "proton_bits.hpp"
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/reconnect_timer.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/reconnect_timer.cpp b/proton-c/bindings/cpp/src/reconnect_timer.cpp
index c63f8a1..a299b0e 100644
--- a/proton-c/bindings/cpp/src/reconnect_timer.cpp
+++ b/proton-c/bindings/cpp/src/reconnect_timer.cpp
@@ -23,7 +23,6 @@
#include "proton/error.hpp"
#include "msg.hpp"
#include <proton/types.h>
-#include <proton/reactor.h>
namespace proton {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/sender_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/sender_options.cpp b/proton-c/bindings/cpp/src/sender_options.cpp
index 4f501e6..9305666 100644
--- a/proton-c/bindings/cpp/src/sender_options.cpp
+++ b/proton-c/bindings/cpp/src/sender_options.cpp
@@ -24,7 +24,7 @@
#include "proton/source_options.hpp"
#include "proton/target_options.hpp"
-#include "container_impl.hpp"
+#include "proactor_container_impl.hpp"
#include "contexts.hpp"
#include "messaging_adapter.hpp"
#include "proton_bits.hpp"
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/session_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/session_options.cpp b/proton-c/bindings/cpp/src/session_options.cpp
index 2147fd4..fc03ebb 100644
--- a/proton-c/bindings/cpp/src/session_options.cpp
+++ b/proton-c/bindings/cpp/src/session_options.cpp
@@ -27,7 +27,7 @@
#include <proton/session.h>
#include "messaging_adapter.hpp"
-#include "container_impl.hpp"
+#include "proactor_container_impl.hpp"
#include "proton_bits.hpp"
namespace proton {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org