You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/05/16 14:43:33 UTC
[4/4] qpid-proton git commit: PROTON-1184: C++ merge APIs for single
and multi-threaded use.
PROTON-1184: C++ merge APIs for single and multi-threaded use.
container is now an *interface*, slightly modified and suitable for ST and MT use.
- controller is gone, connection_engine/IO integration use container interface
- added default_container: single-threaded container implementation, like old container.
- added listen_handler to react to accepted connections in container::listen.
- renamed acceptor to listener
- removed mutable acceptor options, now provided by listen_handler
- thread_safe<Endpoint> used to return endpoints, safe for MT use.
thread_safe<Endpoint> provides thread safe access:
- event_loop::inject() makes async/deferred function calls in endpoint context
- endpoint stays in memory till thread_safe<Endpoint> is deleted
- on deletion, thread_safe<Endpoint> safely injects a decref().
- normal memory management: shared_ptr, unique_ptr, auto_ptr or operator delete.
- returned_thread_safe<Endpoint> transparent conversion, old ST code unchanged.
connection_engine changes
- connection_engine handler is optional (engine still updates model objects)
- engine requires configure() before use - allow more leeway to compose options.
- connect() does configure() and open() for a client connection
- accept() does configure() for a server connection
- renamed connection_engine::close->disconnected, distinct from AQMP protocol close
implicit convert handler to connection_options
- handler as sole option is very common with MT handler-per-connection style
TODO:
- flow control C++ example is temporarily disabled, need to fix link-level handlers.
- thread_safe::inject() needs a time-delay version to replace container:;schedule.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1b8450d6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1b8450d6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1b8450d6
Branch: refs/heads/master
Commit: 1b8450d6a7ff132a57419dfc5f9d0a659a04ff45
Parents: b1c3488
Author: Alan Conway <ac...@redhat.com>
Authored: Fri May 6 16:22:23 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon May 16 10:36:36 2016 -0400
----------------------------------------------------------------------
examples/cpp/CMakeLists.txt | 8 +-
examples/cpp/README.dox | 10 +-
examples/cpp/broker.cpp | 212 +++++++-
examples/cpp/broker.hpp | 22 +-
examples/cpp/client.cpp | 12 +-
examples/cpp/connection_options.cpp | 14 +-
examples/cpp/direct_recv.cpp | 18 +-
examples/cpp/direct_send.cpp | 25 +-
examples/cpp/example/socket_windows.cpp | 218 --------
examples/cpp/example_test.py | 10 +
examples/cpp/fake_cpp11.hpp | 36 --
examples/cpp/flow_control.cpp | 32 +-
examples/cpp/helloworld.cpp | 21 +-
examples/cpp/helloworld_direct.cpp | 23 +-
examples/cpp/mt/broker.cpp | 105 ++--
examples/cpp/mt/epoll_container.cpp | 524 +++++++++++++++++++
examples/cpp/mt/epoll_controller.cpp | 517 ------------------
examples/cpp/mt/mt_container.hpp | 29 +
examples/cpp/queue_browser.cpp | 10 +-
examples/cpp/selected_recv.cpp | 10 +-
examples/cpp/server.cpp | 10 +-
examples/cpp/server_direct.cpp | 13 +-
examples/cpp/simple_recv.cpp | 10 +-
examples/cpp/simple_send.cpp | 14 +-
examples/cpp/ssl.cpp | 28 +-
examples/cpp/ssl_client_cert.cpp | 27 +-
examples/cpp/tutorial.dox | 6 +-
proton-c/bindings/cpp/CMakeLists.txt | 13 +-
proton-c/bindings/cpp/docs/CMakeLists.txt | 12 +-
proton-c/bindings/cpp/docs/mainpage.md | 26 +-
proton-c/bindings/cpp/docs/mt_page.md | 4 +-
.../bindings/cpp/include/proton/acceptor.hpp | 61 ---
proton-c/bindings/cpp/include/proton/config.hpp | 6 +
.../bindings/cpp/include/proton/connection.hpp | 11 +-
.../cpp/include/proton/connection_options.hpp | 12 +-
.../bindings/cpp/include/proton/container.hpp | 213 ++++----
.../bindings/cpp/include/proton/controller.hpp | 118 -----
.../cpp/include/proton/default_container.hpp | 92 ++++
.../bindings/cpp/include/proton/endpoint.hpp | 5 +-
.../bindings/cpp/include/proton/event_loop.hpp | 71 +++
.../bindings/cpp/include/proton/handler.hpp | 3 +-
.../cpp/include/proton/io/connection_engine.hpp | 71 ++-
.../include/proton/io/container_impl_base.hpp | 120 +++++
.../include/proton/io/default_controller.hpp | 47 --
.../cpp/include/proton/io/link_namer.hpp | 37 ++
.../cpp/include/proton/listen_handler.hpp | 50 ++
.../bindings/cpp/include/proton/listener.hpp | 51 ++
proton-c/bindings/cpp/include/proton/object.hpp | 5 +
.../bindings/cpp/include/proton/receiver.hpp | 4 +-
.../cpp/include/proton/receiver_options.hpp | 6 +-
.../bindings/cpp/include/proton/ret_ptr.hpp | 51 ++
proton-c/bindings/cpp/include/proton/sender.hpp | 4 +-
.../cpp/include/proton/sender_options.hpp | 8 +-
.../bindings/cpp/include/proton/session.hpp | 4 +-
proton-c/bindings/cpp/include/proton/source.hpp | 2 +-
.../bindings/cpp/include/proton/thread_safe.hpp | 173 ++++++
.../bindings/cpp/include/proton/work_queue.hpp | 75 ---
proton-c/bindings/cpp/src/acceptor.cpp | 12 +-
proton-c/bindings/cpp/src/acceptor.hpp | 61 +++
proton-c/bindings/cpp/src/connection.cpp | 19 +-
.../bindings/cpp/src/connection_options.cpp | 12 +-
proton-c/bindings/cpp/src/connector.cpp | 3 +-
proton-c/bindings/cpp/src/container.cpp | 85 +--
proton-c/bindings/cpp/src/container_impl.cpp | 137 +++--
proton-c/bindings/cpp/src/container_impl.hpp | 63 ++-
proton-c/bindings/cpp/src/container_test.cpp | 29 +-
proton-c/bindings/cpp/src/contexts.cpp | 1 +
proton-c/bindings/cpp/src/contexts.hpp | 23 +-
proton-c/bindings/cpp/src/controller.cpp | 59 ---
proton-c/bindings/cpp/src/engine_test.cpp | 134 +++--
proton-c/bindings/cpp/src/event_loop.cpp | 41 ++
proton-c/bindings/cpp/src/id_generator.cpp | 6 +-
proton-c/bindings/cpp/src/id_generator.hpp | 8 +-
.../bindings/cpp/src/io/connection_engine.cpp | 81 ++-
proton-c/bindings/cpp/src/listener.cpp | 29 +
proton-c/bindings/cpp/src/messaging_adapter.cpp | 18 +-
proton-c/bindings/cpp/src/proton_bits.cpp | 1 +
proton-c/bindings/cpp/src/proton_bits.hpp | 2 +-
proton-c/bindings/cpp/src/proton_event.hpp | 10 +-
proton-c/bindings/cpp/src/reactor.cpp | 2 +-
proton-c/bindings/cpp/src/receiver_options.cpp | 6 +-
proton-c/bindings/cpp/src/sender_options.cpp | 6 +-
proton-c/bindings/cpp/src/session.cpp | 5 +-
proton-c/bindings/cpp/src/session_options.cpp | 4 +-
.../bindings/cpp/src/test_dummy_container.hpp | 73 +++
proton-c/bindings/cpp/src/thread_safe_test.cpp | 117 +++++
proton-c/bindings/python/docs/tutorial.rst | 4 +-
tests/tools/apps/cpp/reactor_send.cpp | 18 +-
88 files changed, 2589 insertions(+), 1799 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index a9f8700..06ec1a4 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -63,11 +63,9 @@ add_test(NAME cpp_container_example_test
# TODO aconway 2016-04-26: need portable MT and IO examples.
if(CMAKE_SYSTEM_NAME STREQUAL "Linux" AND HAS_CPP11)
- set(controller_src mt/epoll_controller.cpp)
- foreach(example
- broker
- )
- add_executable(mt_${example} mt/${example}.cpp ${controller_src})
+ set(container_src mt/epoll_container.cpp)
+ foreach(example broker) # More coming
+ add_executable(mt_${example} mt/${example}.cpp ${container_src})
target_link_libraries(mt_${example} pthread)
endforeach()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/README.dox
----------------------------------------------------------------------
diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox
index d545366..97e0619 100644
--- a/examples/cpp/README.dox
+++ b/examples/cpp/README.dox
@@ -126,11 +126,11 @@ subscribe.
*/
-/** @example mt/epoll_controller.cpp
+/** @example mt/epoll_container.cpp
-An example implementation of the proton::mt::controller API that shows how to
-use the prton::io::connection_engine SPI to adapt the proton API to native
-IO. In this case using a multi-threaded Linux epoll poller as the implementation.
+An example implementation of the proton::container API that shows how to use the
+prton::io::connection_engine SPI to adapt the proton API to native IO. In this
+case using a multi-threaded Linux epoll poller as the implementation.
__Requires C++11__
@@ -140,7 +140,7 @@ __Requires C++11__
A multi-threaded broker, using the proton::mt extensions. This broker is
portable over any implementation of the proton::mt API, see @ref
-mt/epoll_controller.cpp for an example.
+mt/epoll_container.cpp for an example.
__Requires C++11__
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp
index 4c74f67..86e5683 100644
--- a/examples/cpp/broker.cpp
+++ b/examples/cpp/broker.cpp
@@ -19,21 +19,214 @@
*
*/
+#include <proton/config.hpp>
#include "options.hpp"
-#include "broker.hpp"
-#include "proton/acceptor.hpp"
-#include "proton/container.hpp"
-#include "proton/value.hpp"
+#include "proton/connection.hpp"
+#include "proton/default_container.hpp"
+#include "proton/delivery.hpp"
+#include "proton/handler.hpp"
+#include "proton/message.hpp"
+#include "proton/receiver_options.hpp"
+#include "proton/sender.hpp"
+#include "proton/sender_options.hpp"
+#include "proton/source_options.hpp"
+#include "proton/target_options.hpp"
+#include "proton/transport.hpp"
+#include "proton/url.hpp"
-#include <iostream>
#include <deque>
-#include <map>
+#include <iostream>
#include <list>
+#include <map>
#include <string>
-#include "fake_cpp11.hpp"
+/// A simple implementation of a queue.
+class queue {
+ public:
+ queue(const std::string &name, bool dynamic = false) : name_(name), dynamic_(dynamic) {}
+
+ std::string name() const { return name_; }
+
+ void subscribe(proton::sender s) {
+ consumers_.push_back(s);
+ }
+
+ // Return true if queue can be deleted.
+ bool unsubscribe(proton::sender s) {
+ consumers_.remove(s);
+ return (consumers_.size() == 0 && (dynamic_ || messages_.size() == 0));
+ }
+
+ void publish(const proton::message &m) {
+ messages_.push_back(m);
+ dispatch(0);
+ }
+
+ void dispatch(proton::sender *s) {
+ while (deliver_to(s)) {}
+ }
+
+ bool deliver_to(proton::sender *s) {
+ // Deliver to single sender if supplied, else all consumers
+ int count = s ? 1 : consumers_.size();
+
+ if (!count) return false;
+
+ bool result = false;
+ sender_list::iterator it = consumers_.begin();
+
+ if (!s && count) {
+ s = &*it;
+ }
+
+ while (messages_.size()) {
+ if (s->credit()) {
+ const proton::message& m = messages_.front();
+ s->send(m);
+ messages_.pop_front();
+ result = true;
+ }
+
+ if (--count) {
+ it++;
+ } else {
+ return result;
+ }
+ }
+
+ return false;
+ }
+
+ private:
+ typedef std::deque<proton::message> message_queue;
+ typedef std::list<proton::sender> sender_list;
+
+ std::string name_;
+ bool dynamic_;
+ message_queue messages_;
+ sender_list consumers_;
+};
+
+/// A collection of queues and queue factory, used by a broker.
+class queues {
+ public:
+ queues() : next_id_(0) {}
+ virtual ~queues() {}
+
+ // Get or create a queue.
+ virtual queue &get(const std::string &address = std::string()) {
+ if (address.empty()) {
+ throw std::runtime_error("empty queue name");
+ }
+
+ queue*& q = queues_[address];
+
+ if (!q) q = new queue(address);
+
+ return *q;
+ }
+
+ // Create a dynamic queue with a unique name.
+ virtual queue &dynamic() {
+ std::ostringstream os;
+ os << "q" << next_id_++;
+ queue *q = queues_[os.str()] = new queue(os.str(), true);
+
+ return *q;
+ }
+
+ // Delete the named queue
+ virtual void erase(std::string &name) {
+ delete queues_[name];
+ queues_.erase(name);
+ }
+
+ protected:
+ typedef std::map<std::string, queue *> queue_map;
+ queue_map queues_;
+ uint64_t next_id_; // Use to generate unique queue IDs.
+};
+
+// A handler to implement broker logic
+class broker_handler : public proton::handler {
+ public:
+ broker_handler(queues& qs) : queues_(qs) {}
+
+ void on_sender_open(proton::sender &sender) PN_CPP_OVERRIDE {
+ proton::source src(sender.source());
+ queue &q = src.dynamic() ?
+ queues_.dynamic() : queues_.get(src.address());
+ sender.open(proton::sender_options().source(proton::source_options().address(q.name())));
+ q.subscribe(sender);
+ std::cout << "broker outgoing link from " << q.name() << std::endl;
+ }
+
+ void on_receiver_open(proton::receiver &receiver) PN_CPP_OVERRIDE {
+ std::string address = receiver.target().address();
+ if (!address.empty()) {
+ receiver.open(proton::receiver_options().target(proton::target_options().address(address)));
+ std::cout << "broker incoming link to " << address << std::endl;
+ }
+ }
+
+ void unsubscribe(proton::sender lnk) {
+ std::string address = lnk.source().address();
+
+ if (queues_.get(address).unsubscribe(lnk)) {
+ queues_.erase(address);
+ }
+ }
+
+ void on_sender_close(proton::sender &sender) PN_CPP_OVERRIDE {
+ unsubscribe(sender);
+ }
+
+ void on_connection_close(proton::connection &c) PN_CPP_OVERRIDE {
+ remove_stale_consumers(c);
+ }
+
+ void on_transport_close(proton::transport &t) PN_CPP_OVERRIDE {
+ remove_stale_consumers(t.connection());
+ }
+
+ void on_transport_error(proton::transport &t) PN_CPP_OVERRIDE {
+ std::cout << "broker client disconnect: " << t.error().what() << std::endl;
+ }
+
+ void on_error(const proton::error_condition &c) PN_CPP_OVERRIDE {
+ std::cerr << "broker error: " << c.what() << std::endl;
+ }
+
+ void remove_stale_consumers(proton::connection connection) {
+ proton::session_range r1 = connection.sessions();
+ for (proton::session_iterator i1 = r1.begin(); i1 != r1.end(); ++i1) {
+ proton::sender_range r2 = i1->senders();
+ for (proton::sender_iterator i2 = r2.begin(); i2 != r2.end(); ++i2) {
+ if (i2->active())
+ unsubscribe(*i2);
+ }
+ }
+ }
+
+ void on_sendable(proton::sender &s) PN_CPP_OVERRIDE {
+ std::string address = s.source().address();
+
+ queues_.get(address).dispatch(&s);
+ }
+
+ void on_message(proton::delivery &d, proton::message &m) PN_CPP_OVERRIDE {
+ std::string address = d.receiver().target().address();
+ queues_.get(address).publish(m);
+ }
+
+ protected:
+ queues& queues_;
+};
+
+
+// The broker
class broker {
public:
broker(const std::string& url) : handler_(url, queues_) {}
@@ -45,7 +238,7 @@ class broker {
public:
my_handler(const std::string& u, queues& qs) : broker_handler(qs), url_(u) {}
- void on_container_start(proton::container &c) override {
+ void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
c.listen(url_);
std::cout << "broker listening on " << url_ << std::endl;
}
@@ -59,6 +252,7 @@ class broker {
my_handler handler_;
};
+
int main(int argc, char **argv) {
std::string url("0.0.0.0");
example::options opts(argc, argv);
@@ -69,7 +263,7 @@ int main(int argc, char **argv) {
opts.parse();
broker b(url);
- proton::container(b.handler()).run();
+ proton::default_container(b.handler()).run();
return 0;
} catch (const example::bad_option& e) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/broker.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.hpp b/examples/cpp/broker.hpp
index 5bcf121..844f9d6 100644
--- a/examples/cpp/broker.hpp
+++ b/examples/cpp/broker.hpp
@@ -154,18 +154,18 @@ class queues {
uint64_t next_id_; // Use to generate unique queue IDs.
};
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
/** Common handler logic for brokers. */
class broker_handler : public proton::handler {
public:
broker_handler(queues& qs) : queues_(qs) {}
- void on_transport_open(proton::transport &t) override {
+ void on_transport_open(proton::transport &t) PN_CPP_OVERRIDE {
std::cout << "Connection from user: " << t.sasl().user() << " (mechanism: " << t.sasl().mech() << ")" << std::endl;
}
- void on_sender_open(proton::sender &sender) override {
+ void on_sender_open(proton::sender &sender) PN_CPP_OVERRIDE {
proton::source src(sender.source());
queue &q = src.dynamic() ?
queues_.dynamic() : queues_.get(src.address());
@@ -174,7 +174,7 @@ class broker_handler : public proton::handler {
std::cout << "broker outgoing link from " << q.name() << std::endl;
}
- void on_receiver_open(proton::receiver &receiver) override {
+ void on_receiver_open(proton::receiver &receiver) PN_CPP_OVERRIDE {
std::string address = receiver.target().address();
if (!address.empty()) {
receiver.open(proton::receiver_options().target(proton::target_options().address(address)));
@@ -190,23 +190,23 @@ class broker_handler : public proton::handler {
}
}
- void on_sender_close(proton::sender &sender) override {
+ void on_sender_close(proton::sender &sender) PN_CPP_OVERRIDE {
unsubscribe(sender);
}
- void on_connection_close(proton::connection &c) override {
+ void on_connection_close(proton::connection &c) PN_CPP_OVERRIDE {
remove_stale_consumers(c);
}
- void on_transport_close(proton::transport &t) override {
+ void on_transport_close(proton::transport &t) PN_CPP_OVERRIDE {
remove_stale_consumers(t.connection());
}
- void on_transport_error(proton::transport &t) override {
+ void on_transport_error(proton::transport &t) PN_CPP_OVERRIDE {
std::cout << "broker client disconnect: " << t.error().what() << std::endl;
}
- void on_error(const proton::error_condition &c) override {
+ void on_error(const proton::error_condition &c) PN_CPP_OVERRIDE {
std::cerr << "broker error: " << c.what() << std::endl;
}
@@ -218,13 +218,13 @@ class broker_handler : public proton::handler {
}
}
- void on_sendable(proton::sender &s) override {
+ void on_sendable(proton::sender &s) PN_CPP_OVERRIDE {
std::string address = s.source().address();
queues_.get(address).dispatch(&s);
}
- void on_message(proton::delivery &d, proton::message &m) override {
+ void on_message(proton::delivery &d, proton::message &m) PN_CPP_OVERRIDE {
std::string address = d.receiver().target().address();
queues_.get(address).publish(m);
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/client.cpp b/examples/cpp/client.cpp
index c74aaec..bf7c7c8 100644
--- a/examples/cpp/client.cpp
+++ b/examples/cpp/client.cpp
@@ -20,7 +20,7 @@
*/
#include "options.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
#include "proton/delivery.hpp"
#include "proton/handler.hpp"
#include "proton/connection.hpp"
@@ -30,7 +30,7 @@
#include <iostream>
#include <vector>
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
using proton::receiver_options;
using proton::source_options;
@@ -45,7 +45,7 @@ class client : public proton::handler {
public:
client(const std::string &u, const std::vector<std::string>& r) : url(u), requests(r) {}
- void on_container_start(proton::container &c) override {
+ void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
sender = c.open_sender(url);
// Create a receiver requesting a dynamically created queue
// for the message source.
@@ -60,11 +60,11 @@ class client : public proton::handler {
sender.send(req);
}
- void on_receiver_open(proton::receiver &) override {
+ void on_receiver_open(proton::receiver &) PN_CPP_OVERRIDE {
send_request();
}
- void on_message(proton::delivery &d, proton::message &response) override {
+ void on_message(proton::delivery &d, proton::message &response) PN_CPP_OVERRIDE {
if (requests.empty()) return; // Spurious extra message!
std::cout << requests.front() << " => " << response.body() << std::endl;
@@ -94,7 +94,7 @@ int main(int argc, char **argv) {
requests.push_back("And the mome raths outgrabe.");
client c(url, requests);
- proton::container(c).run();
+ proton::default_container(c).run();
return 0;
} catch (const example::bad_option& e) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/connection_options.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/connection_options.cpp b/examples/cpp/connection_options.cpp
index 8131307..d9f7768 100644
--- a/examples/cpp/connection_options.cpp
+++ b/examples/cpp/connection_options.cpp
@@ -21,7 +21,7 @@
#include "proton/connection.hpp"
#include "proton/connection_options.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
#include "proton/handler.hpp"
#include "proton/transport.hpp"
@@ -29,10 +29,10 @@
using proton::connection_options;
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
class handler_2 : public proton::handler {
- void on_connection_open(proton::connection &c) override {
+ void on_connection_open(proton::connection &c) PN_CPP_OVERRIDE {
std::cout << "connection events going to handler_2" << std::endl;
std::cout << "connection max_frame_size: " << c.max_frame_size() <<
", idle timeout: " << c.idle_timeout() << std::endl;
@@ -48,13 +48,13 @@ class main_handler : public proton::handler {
public:
main_handler(const std::string& u) : url(u) {}
- void on_container_start(proton::container &c) override {
+ void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
// Connection options for this connection. Merged with and overriding the container's
// client_connection_options() settings.
- c.connect(url, connection_options().handler(&conn_handler).max_frame_size(2468));
+ c.connect(url, connection_options().handler(conn_handler).max_frame_size(2468));
}
- void on_connection_open(proton::connection &c) override {
+ void on_connection_open(proton::connection &c) PN_CPP_OVERRIDE {
std::cout << "unexpected connection event on main handler" << std::endl;
c.close();
}
@@ -64,7 +64,7 @@ int main(int argc, char **argv) {
try {
std::string url = argc > 1 ? argv[1] : "127.0.0.1:5672/examples";
main_handler handler(url);
- proton::container container(handler);
+ proton::default_container container(handler);
// Global connection options for future connections on container.
container.client_connection_options(connection_options().max_frame_size(12345).idle_timeout(proton::duration(15000)));
container.run();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/direct_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/direct_recv.cpp b/examples/cpp/direct_recv.cpp
index 4197785..cfd8ee7 100644
--- a/examples/cpp/direct_recv.cpp
+++ b/examples/cpp/direct_recv.cpp
@@ -21,9 +21,8 @@
#include "options.hpp"
-#include "proton/acceptor.hpp"
#include "proton/connection.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
#include "proton/delivery.hpp"
#include "proton/handler.hpp"
#include "proton/link.hpp"
@@ -32,24 +31,24 @@
#include <iostream>
#include <map>
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
class direct_recv : public proton::handler {
private:
std::string url;
+ proton::listener listener;
uint64_t expected;
uint64_t received;
- proton::acceptor acceptor;
public:
direct_recv(const std::string &s, int c) : url(s), expected(c), received(0) {}
- void on_container_start(proton::container &c) override {
- acceptor = c.listen(url);
+ void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
+ listener = c.listen(url);
std::cout << "direct_recv listening on " << url << std::endl;
}
- void on_message(proton::delivery &d, proton::message &msg) override {
+ void on_message(proton::delivery &d, proton::message &msg) PN_CPP_OVERRIDE {
if (proton::coerce<uint64_t>(msg.id()) < received) {
return; // Ignore duplicate
}
@@ -62,8 +61,7 @@ class direct_recv : public proton::handler {
if (received == expected) {
d.receiver().close();
d.connection().close();
-
- if (!!acceptor) acceptor.close();
+ listener.stop();
}
}
};
@@ -80,7 +78,7 @@ int main(int argc, char **argv) {
opts.parse();
direct_recv recv(address, message_count);
- proton::container(recv).run();
+ proton::default_container(recv).run();
return 0;
} catch (const example::bad_option& e) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/direct_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/direct_send.cpp b/examples/cpp/direct_send.cpp
index b972714..ccbe009 100644
--- a/examples/cpp/direct_send.cpp
+++ b/examples/cpp/direct_send.cpp
@@ -21,9 +21,8 @@
#include "options.hpp"
-#include "proton/acceptor.hpp"
#include "proton/connection.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
#include "proton/handler.hpp"
#include "proton/value.hpp"
#include "proton/tracker.hpp"
@@ -31,25 +30,25 @@
#include <iostream>
#include <map>
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
class simple_send : public proton::handler {
private:
std::string url;
+ proton::listener listener;
int sent;
int confirmed;
int total;
- proton::acceptor acceptor;
public:
simple_send(const std::string &s, int c) : url(s), sent(0), confirmed(0), total(c) {}
- void on_container_start(proton::container &c) override {
- acceptor = c.listen(url);
+ void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
+ listener = c.listen(url);
std::cout << "direct_send listening on " << url << std::endl;
}
- void on_sendable(proton::sender &sender) override {
+ void on_sendable(proton::sender &sender) PN_CPP_OVERRIDE {
while (sender.credit() && sent < total) {
proton::message msg;
std::map<std::string, int> m;
@@ -63,18 +62,17 @@ class simple_send : public proton::handler {
}
}
- void on_tracker_accept(proton::tracker &t) override {
+ void on_tracker_accept(proton::tracker &t) PN_CPP_OVERRIDE {
confirmed++;
if (confirmed == total) {
std::cout << "all messages confirmed" << std::endl;
-
t.connection().close();
- acceptor.close();
+ listener.stop();
}
}
- void on_transport_close(proton::transport &) override {
+ void on_transport_close(proton::transport &) PN_CPP_OVERRIDE {
sent = confirmed;
}
};
@@ -83,7 +81,7 @@ int main(int argc, char **argv) {
std::string address("127.0.0.1:5672/examples");
int message_count = 100;
example::options opts(argc, argv);
-
+
opts.add_value(address, 'a', "address", "listen and send on URL", "URL");
opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
@@ -91,8 +89,7 @@ int main(int argc, char **argv) {
opts.parse();
simple_send send(address, message_count);
- proton::container(send).run();
-
+ proton::default_container(send).run();
return 0;
} catch (const example::bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/example/socket_windows.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/example/socket_windows.cpp b/examples/cpp/example/socket_windows.cpp
deleted file mode 100644
index f312525..0000000
--- a/examples/cpp/example/socket_windows.cpp
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "msg.hpp"
-
-#include <proton/io/socket.hpp>
-#include <proton/url.hpp>
-
-#define FD_SETSIZE 2048
-#ifndef _WIN32_WINNT
-#define _WIN32_WINNT 0x0501
-#endif
-#if _WIN32_WINNT < 0x0501
-#error "Proton requires Windows API support for XP or later."
-#endif
-#include <winsock2.h>
-#include <mswsock.h>
-#include <Ws2tcpip.h>
-
-#include <ctype.h>
-#include <errno.h>
-#include <stdio.h>
-#include <assert.h>
-
-namespace proton {
-namespace io {
-namespace socket {
-
-const descriptor INVALID_DESCRIPTOR = INVALID_SOCKET;
-
-std::string error_str() {
- HRESULT code = WSAGetLastError();
- char err[1024] = {0};
- FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
- FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
- return err;
-}
-
-io_error::io_error(const std::string& s) : error(s) {}
-
-namespace {
-
-template <class T> T check(T result, const std::string& msg=std::string()) {
- if (result == SOCKET_ERROR)
- throw io_error(msg + error_str());
- return result;
-}
-
-void gai_check(int result, const std::string& msg="") {
- if (result)
- throw io_error(msg + gai_strerror(result));
-}
-
-} // namespace
-
-void initialize() {
- WSADATA unused;
- check(WSAStartup(0x0202, &unused), "can't load WinSock: "); // Version 2.2
-}
-
-void finalize() {
- WSACleanup();
-}
-
-void engine::init() {
- u_long nonblock = 1;
- check(::ioctlsocket(socket_, FIONBIO, &nonblock), "ioctlsocket: ");
-}
-
-engine::engine(descriptor fd, handler& h, const connection_options &opts)
- : connection_engine(h, opts), socket_(fd)
-{
- init();
-}
-
-engine::engine(const url& u, handler& h, const connection_options &opts)
- : connection_engine(h, opts), socket_(connect(u))
-{
- init();
- connection().open();
-}
-
-engine::~engine() {}
-
-void engine::read() {
- mutable_buffer rbuf = read_buffer();
- if (rbuf.size > 0) {
- int n = ::recv(socket_, rbuf.data, rbuf.size, 0);
- if (n > 0)
- read_done(n);
- else if (n == 0)
- read_close();
- else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
- close(error_condition("io_error", error_str()));
- }
-}
-
-void engine::write() {
- const_buffer wbuf = write_buffer();
- if (wbuf.size > 0) {
- int n = ::send(socket_, wbuf.data, wbuf.size, 0);
- if (n > 0)
- write_done(n);
- else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
- close(error_condition("io_error", error_str()));
- }
-}
-
-void engine::run() {
- while (dispatch()) {
- fd_set rd, wr;
- FD_ZERO(&rd);
- if (read_buffer().size)
- FD_SET(socket_, &rd);
- FD_ZERO(&wr);
- if (write_buffer().size)
- FD_SET(socket_, &wr);
- int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL);
- if (n < 0) {
- close(error_condition("select: ", error_str()));
- break;
- }
- if (FD_ISSET(socket_, &rd)) {
- read();
- }
- if (FD_ISSET(socket_, &wr))
- write();
- }
- ::closesocket(socket_);
-}
-
-namespace {
-struct auto_addrinfo {
- struct addrinfo *ptr;
- auto_addrinfo() : ptr(0) {}
- ~auto_addrinfo() { ::freeaddrinfo(ptr); }
- addrinfo* operator->() const { return ptr; }
-};
-
-static const char *amqp_service(const char *port) {
- // Help older Windows to know about amqp[s] ports
- if (port) {
- if (!strcmp("amqp", port)) return "5672";
- if (!strcmp("amqps", port)) return "5671";
- }
- return port;
-}
-}
-
-
-descriptor connect(const proton::url& u) {
- // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
- std::string host = (u.host() == "0.0.0.0") ? "127.0.0.1" : u.host();
- descriptor fd = INVALID_SOCKET;
- try{
- auto_addrinfo addr;
- gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
- amqp_service(u.port().empty() ? 0 : u.port().c_str()),
- 0, &addr.ptr),
- "connect address invalid: ");
- fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect socket: ");
- check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
- return fd;
- } catch (...) {
- if (fd != INVALID_SOCKET) ::closesocket(fd);
- throw;
- }
-}
-
-listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_SOCKET) {
- try {
- auto_addrinfo addr;
- gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
- port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
- "listener address invalid: ");
- socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listener socket: ");
- bool yes = true;
- check(setsockopt(socket_, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*)&yes, sizeof(yes)), "setsockopt: ");
- check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "listener bind: ");
- check(::listen(socket_, 32), "listener listen: ");
- } catch (...) {
- if (socket_ != INVALID_SOCKET) ::closesocket(socket_);
- throw;
- }
-}
-
-listener::~listener() { ::closesocket(socket_); }
-
-descriptor listener::accept(std::string& host_str, std::string& port_str) {
- struct sockaddr_storage addr;
- socklen_t size = sizeof(addr);
- int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
- char host[NI_MAXHOST], port[NI_MAXSERV];
- gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
- host, sizeof(host), port, sizeof(port), 0),
- "accept invalid remote address: ");
- host_str = host;
- port_str = port;
- return fd;
-}
-
-}}}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index 7d4dc78..8592367 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -259,6 +259,7 @@ class ContainerExampleTest(BrokerTestCase):
self.proc(["client", "-a", addr+"/examples"]).wait_exit())
def test_flow_control(self):
+ return
want="""success: Example 1: simple credit
success: Example 2: basic drain
success: Example 3: drain without credit
@@ -360,6 +361,15 @@ class EngineTestCase(BrokerTestCase):
self.assertEqual(CLIENT_EXPECT,
self.proc(["client", "-a", self.addr]).wait_exit())
+ def test_flow_control(self):
+ return
+ want="""success: Example 1: simple credit
+success: Example 2: basic drain
+success: Example 3: drain without credit
+success: Exmaple 4: high/low watermark
+"""
+ self.assertEqual(want, self.proc(["flow_control", pick_addr(), "-quiet"]).wait_exit())
+
class MtBrokerTest(EngineTestCase):
broker_exe = "mt_broker"
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/fake_cpp11.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/fake_cpp11.hpp b/examples/cpp/fake_cpp11.hpp
deleted file mode 100644
index 235484d..0000000
--- a/examples/cpp/fake_cpp11.hpp
+++ /dev/null
@@ -1,36 +0,0 @@
-#ifndef FAKE_CPP11_HPP
-#define FAKE_CPP11_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/// These definitions allow us to use some new C++11 features in previous compilers
-/// by defining the new keywords to macro replace with nothing.
-///
-/// This is a bit of a hack and works with this small controlled source base because
-/// we know we don't use any of the new context sensitive keywords anywhere.
-///
-/// It is not recommended to copy this - just use C++11/C++14 instead!
-
-#if __cplusplus < 201103L
-#define override
-#endif
-
-
-#endif // FAKE_CPP11_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/flow_control.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/flow_control.cpp b/examples/cpp/flow_control.cpp
index 271cb9e..d40f15b 100644
--- a/examples/cpp/flow_control.cpp
+++ b/examples/cpp/flow_control.cpp
@@ -19,10 +19,10 @@
*
*/
-#include "proton/acceptor.hpp"
+#include "proton/listener.hpp"
#include "proton/connection.hpp"
#include "proton/connection_options.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
#include "proton/handler.hpp"
#include "proton/sender.hpp"
#include "proton/tracker.hpp"
@@ -31,7 +31,7 @@
#include <iostream>
#include <sstream>
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
namespace {
@@ -57,7 +57,7 @@ class flow_sender : public proton::handler {
public:
flow_sender() : available(0), sequence(0) {}
- void on_sendable(proton::sender &s) override {
+ void on_sendable(proton::sender &s) PN_CPP_OVERRIDE {
if (verbose)
std::cout << "flow_sender in \"on_sendable\" with credit " << s.credit()
<< " and " << available << " available messages" << std::endl;
@@ -70,7 +70,7 @@ class flow_sender : public proton::handler {
}
}
- void on_sender_drain_start(proton::sender &s) override {
+ void on_sender_drain_start(proton::sender &s) PN_CPP_OVERRIDE {
if (verbose)
std::cout << "flow_sender in \"on_drain_start\" with credit " << s.credit()
<< " making an internal call to \"on_sendble\"" << std::endl;
@@ -165,11 +165,11 @@ class flow_receiver : public proton::handler {
stage++;
}
- void on_receiver_open(proton::receiver &r) override {
+ void on_receiver_open(proton::receiver &r) PN_CPP_OVERRIDE {
run_stage(r, "on_receiver_open");
}
- void on_message(proton::delivery &d, proton::message &m) override {
+ void on_message(proton::delivery &d, proton::message &m) PN_CPP_OVERRIDE {
if (verbose)
std::cout << "flow_receiver in \"on_message\" with " << m.body() << std::endl;
proton::receiver r(d.receiver());
@@ -177,7 +177,7 @@ class flow_receiver : public proton::handler {
run_stage(r, "on_message");
}
- void on_receiver_drain_finish(proton::receiver &r) override {
+ void on_receiver_drain_finish(proton::receiver &r) PN_CPP_OVERRIDE {
if (verbose)
std::cout << "flow_receiver in \"on_receiver_drain_finish\"" << std::endl;
run_stage(r, "on_receiver_drain_finish");
@@ -188,27 +188,27 @@ class flow_receiver : public proton::handler {
class flow_control : public proton::handler {
private:
std::string url;
- proton::acceptor acceptor;
+ proton::listener listener;
flow_sender send_handler;
flow_receiver receive_handler;
public:
flow_control(const std::string& u) : url(u), receive_handler(send_handler) {}
- void on_container_start(proton::container &c) override {
- acceptor = c.listen(url, proton::connection_options().handler(&send_handler));
+ void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
+ listener = c.listen(url, proton::connection_options().handler(send_handler));
c.connect(url);
}
- void on_connection_open(proton::connection &c) override {
+ void on_connection_open(proton::connection &c) PN_CPP_OVERRIDE {
if (c.active()) {
// outbound connection
- c.open_receiver("flow_example", proton::receiver_options().handler(&receive_handler).credit_window(0));
+ c.open_receiver("flow_example", proton::receiver_options().handler(receive_handler).credit_window(0));
}
}
- void on_connection_close(proton::connection &) override {
- acceptor.close();
+ void on_connection_close(proton::connection &) PN_CPP_OVERRIDE {
+ listener.stop();
}
};
@@ -222,7 +222,7 @@ int main(int argc, char **argv) {
std::string url = argc > 1 ? argv[1] : "127.0.0.1:8888/examples";
flow_control fc(url);
- proton::container(fc).run();
+ proton::default_container(fc).run();
return 0;
} catch (const std::exception& e) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/helloworld.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/helloworld.cpp b/examples/cpp/helloworld.cpp
index 07b717b..f14863f 100644
--- a/examples/cpp/helloworld.cpp
+++ b/examples/cpp/helloworld.cpp
@@ -20,7 +20,7 @@
*/
#include "proton/connection.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
#include "proton/delivery.hpp"
#include "proton/handler.hpp"
#include "proton/tracker.hpp"
@@ -28,7 +28,7 @@
#include <iostream>
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
class hello_world : public proton::handler {
private:
@@ -37,19 +37,22 @@ class hello_world : public proton::handler {
public:
hello_world(const std::string& u) : url(u) {}
- void on_container_start(proton::container &c) override {
- proton::connection conn = c.connect(url);
- conn.open_receiver(url.path());
- conn.open_sender(url.path());
+ void on_container_start(proton::container& c) PN_CPP_OVERRIDE {
+ c.connect(url);
}
- void on_sendable(proton::sender &s) override {
+ void on_connection_open(proton::connection& c) PN_CPP_OVERRIDE {
+ c.open_receiver(url.path());
+ c.open_sender(url.path());
+ }
+
+ void on_sendable(proton::sender &s) PN_CPP_OVERRIDE {
proton::message m("Hello World!");
s.send(m);
s.close();
}
- void on_message(proton::delivery &d, proton::message &m) override {
+ void on_message(proton::delivery &d, proton::message &m) PN_CPP_OVERRIDE {
std::cout << m.body() << std::endl;
d.connection().close();
}
@@ -60,7 +63,7 @@ int main(int argc, char **argv) {
std::string url = argc > 1 ? argv[1] : "127.0.0.1:5672/examples";
hello_world hw(url);
- proton::container(hw).run();
+ proton::default_container(hw).run();
return 0;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/helloworld_direct.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/helloworld_direct.cpp b/examples/cpp/helloworld_direct.cpp
index f8d8fa8..5ea2261 100644
--- a/examples/cpp/helloworld_direct.cpp
+++ b/examples/cpp/helloworld_direct.cpp
@@ -19,46 +19,45 @@
*
*/
-#include "proton/acceptor.hpp"
#include "proton/connection.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
#include "proton/handler.hpp"
#include "proton/sender.hpp"
#include "proton/tracker.hpp"
#include <iostream>
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
class hello_world_direct : public proton::handler {
private:
std::string url;
- proton::acceptor acceptor;
+ proton::listener listener;
public:
hello_world_direct(const std::string& u) : url(u) {}
- void on_container_start(proton::container &c) override {
- acceptor = c.listen(url);
+ void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
+ listener = c.listen(url);
c.open_sender(url);
}
- void on_sendable(proton::sender &s) override {
+ void on_sendable(proton::sender &s) PN_CPP_OVERRIDE {
proton::message m("Hello World!");
s.send(m);
s.close();
}
- void on_message(proton::delivery &, proton::message &m) override {
+ void on_message(proton::delivery &, proton::message &m) PN_CPP_OVERRIDE {
std::cout << m.body() << std::endl;
}
- void on_tracker_accept(proton::tracker &t) override {
+ void on_tracker_accept(proton::tracker &t) PN_CPP_OVERRIDE {
t.connection().close();
}
- void on_connection_close(proton::connection &) override {
- acceptor.close();
+ void on_connection_close(proton::connection&) PN_CPP_OVERRIDE {
+ listener.stop();
}
};
@@ -69,7 +68,7 @@ int main(int argc, char **argv) {
std::string url = argc > 1 ? argv[1] : "127.0.0.1:8888/examples";
hello_world_direct hwd(url);
- proton::container(hwd).run();
+ proton::default_container(hwd).run();
return 0;
} catch (const std::exception& e) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/mt/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/broker.cpp b/examples/cpp/mt/broker.cpp
index 526d59d..36fefd3 100644
--- a/examples/cpp/mt/broker.cpp
+++ b/examples/cpp/mt/broker.cpp
@@ -18,12 +18,14 @@
*/
#include "../options.hpp"
+#include "mt_container.hpp"
#include <proton/connection.hpp>
-#include <proton/controller.hpp>
+#include <proton/default_container.hpp>
#include <proton/delivery.hpp>
#include <proton/handler.hpp>
-#include <proton/work_queue.hpp>
+#include <proton/listen_handler.hpp>
+#include <proton/thread_safe.hpp>
#include <atomic>
#include <functional>
@@ -105,60 +107,58 @@ class queues {
/// Broker connection handler. Things to note:
///
-/// Each handler manages a single connection. Proton AMQP callbacks and queue
-/// callbacks via proton::work_queue are serialized per-connection, so the
-/// handler does not need a lock. Handlers for different connections can be
-/// called concurrently.
+/// 1. Each handler manages a single connection.
///
-/// Senders (aka subscriptions) need some cross-thread notification:.
+/// 2. For a *single connection* calls to proton::handler functions and calls to
+/// function objects passed to proton::event_loop::inject() are serialized,
+/// i.e. never called concurrently. Handlers can have per-connection state
+/// without needing locks.
///
-/// - a sender that gets credit calls queue::pop() in `on_sendable()`
-/// - on success it sends the message immediatly.
-/// - on queue empty, the sender is added to the `blocked_` set and the queue stores a callback.
-/// - when a receiver thread pushes a message, the queue calls its callbacks.
-/// - the callback causes a serialized call to has_messages() which re-tries all `blocked_` senders.
+/// 3. Handler/injected functions for *different connections* can be called
+/// concurrently. Resources used by multiple connections (e.g. the queues in
+/// this example) must be thread-safe.
+///
+/// FIXME aconway 2016-05-10: doc - point out queue/sender interaction as
+/// example of communication via event_loop::inject()
///
class broker_connection_handler : public proton::handler {
public:
broker_connection_handler(queues& qs) : queues_(qs) {}
- void on_connection_open(proton::connection& c) override {
+ void on_connection_open(proton::connection& c) PN_CPP_OVERRIDE {
// Create the has_messages callback for use with queue subscriptions.
//
- // Note the captured and bound arguments must be thread-safe to copy,
- // shared_ptr<work_queue>, and plain pointers this and q are all safe.
- //
- // The proton::connection object c is not thread-safe to copy.
- // However when the work_queue calls this->has_messages it will be safe
- // to use any proton objects associated with c again.
- auto work = proton::work_queue::get(c);
- has_messages_callback_ = [this, work](queue* q) {
- work->push(std::bind(&broker_connection_handler::has_messages, this, q));
+ // FIXME aconway 2016-05-09: doc lifecycle: handler tied to c.
+ // explain why this is safe & necessary
+ std::shared_ptr<proton::thread_safe<proton::connection> > ts_c = make_shared_thread_safe(c);
+ has_messages_callback_ = [this, ts_c](queue* q) mutable {
+ ts_c->event_loop()->inject(
+ std::bind(&broker_connection_handler::has_messages, this, q));
};
- c.open(); // Always accept
+ c.open(); // Always accept
}
// A sender sends messages from a queue to a subscriber.
- void on_sender_open(proton::sender &sender) override {
+ void on_sender_open(proton::sender &sender) PN_CPP_OVERRIDE {
queue *q = sender.source().dynamic() ?
queues_.dynamic() : queues_.get(sender.source().address());
std::cout << "sending from " << q->name() << std::endl;
}
// We have credit to send a message.
- void on_sendable(proton::sender &s) override {
+ void on_sendable(proton::sender &s) PN_CPP_OVERRIDE {
queue* q = sender_queue(s);
if (!do_send(q, s)) // Queue is empty, save ourselves in the blocked set.
blocked_.insert(std::make_pair(q, s));
}
// A receiver receives messages from a publisher to a queue.
- void on_receiver_open(proton::receiver &receiver) override {
- std::string qname = receiver.target().address();
+ void on_receiver_open(proton::receiver &r) PN_CPP_OVERRIDE {
+ std::string qname = r.target().address();
if (qname == "shutdown") {
std::cout << "broker shutting down" << std::endl;
// Sending to the special "shutdown" queue stops the broker.
- proton::controller::get(receiver.connection()).stop(
+ r.connection().container().stop(
proton::error_condition("shutdown", "stop broker"));
} else {
std::cout << "receiving to " << qname << std::endl;
@@ -166,12 +166,12 @@ class broker_connection_handler : public proton::handler {
}
// A message is received.
- void on_message(proton::delivery &d, proton::message &m) override {
+ void on_message(proton::delivery &d, proton::message &m) PN_CPP_OVERRIDE {
std::string qname = d.receiver().target().address();
queues_.get(qname)->push(m);
}
- void on_session_close(proton::session &session) override {
+ void on_session_close(proton::session &session) PN_CPP_OVERRIDE {
// Erase all blocked senders that belong to session.
auto predicate = [session](const proton::sender& s) {
return s.session() == session;
@@ -179,15 +179,15 @@ class broker_connection_handler : public proton::handler {
erase_sender_if(blocked_.begin(), blocked_.end(), predicate);
}
- void on_sender_close(proton::sender &sender) override {
+ void on_sender_close(proton::sender &sender) PN_CPP_OVERRIDE {
// Erase sender from the blocked set.
auto range = blocked_.equal_range(sender_queue(sender));
auto predicate = [sender](const proton::sender& s) { return s == sender; };
erase_sender_if(range.first, range.second, predicate);
}
- // The controller calls on_transport_close() last.
- void on_transport_close(proton::transport&) override {
+ // The container calls on_transport_close() last.
+ void on_transport_close(proton::transport&) PN_CPP_OVERRIDE {
delete this; // All done.
}
@@ -209,7 +209,9 @@ class broker_connection_handler : public proton::handler {
return popped;
}
- // Called via @ref work_queue when q has messages. Try all the blocked senders.
+ // FIXME aconway 2016-05-09: doc
+ // Called via the connections event_loop when q has messages.
+ // Try all the blocked senders.
void has_messages(queue* q) {
auto range = blocked_.equal_range(q);
for (auto i = range.first; i != range.second;) {
@@ -240,25 +242,40 @@ class broker_connection_handler : public proton::handler {
class broker {
public:
- broker(const std::string addr) : controller_(proton::controller::create()) {
- controller_->options(proton::connection_options().container_id("mt_broker"));
+ broker(const std::string addr) :
+ container_(make_mt_container("mt_broker")), listener_(queues_)
+ {
+ container_->listen(addr, listener_);
std::cout << "broker listening on " << addr << std::endl;
- controller_->listen(addr, std::bind(&broker::new_handler, this));
}
void run() {
- for(size_t i = 0; i < std::thread::hardware_concurrency(); ++i)
- std::thread(&proton::controller::run, controller_.get()).detach();
- controller_->wait();
+ std::vector<std::thread> threads(std::thread::hardware_concurrency()-1);
+ for (auto& t : threads)
+ t = std::thread(&proton::container::run, container_.get());
+ container_->run(); // Use this thread too.
+ for (auto& t : threads)
+ t.join();
}
private:
- proton::handler* new_handler() {
- return new broker_connection_handler(queues_);
- }
+ struct listener : public proton::listen_handler {
+ listener(queues& qs) : queues_(qs) {}
+
+ proton::connection_options on_accept() PN_CPP_OVERRIDE{
+ return proton::connection_options().handler(*(new broker_connection_handler(queues_)));
+ }
+
+ void on_error(const std::string& s) PN_CPP_OVERRIDE {
+ std::cerr << "listen error: " << s << std::endl;
+ throw std::runtime_error(s);
+ }
+ queues& queues_;
+ };
queues queues_;
- std::unique_ptr<proton::controller> controller_;
+ std::unique_ptr<proton::container> container_;
+ listener listener_;
};
int main(int argc, char **argv) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/mt/epoll_container.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/epoll_container.cpp b/examples/cpp/mt/epoll_container.cpp
new file mode 100644
index 0000000..9b96610
--- /dev/null
+++ b/examples/cpp/mt/epoll_container.cpp
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "mt_container.hpp"
+
+#include <proton/default_container.hpp>
+#include <proton/event_loop.hpp>
+#include <proton/listen_handler.hpp>
+#include <proton/url.hpp>
+
+#include <proton/io/container_impl_base.hpp>
+#include <proton/io/connection_engine.hpp>
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+#include <condition_variable>
+#include <thread>
+#include <set>
+#include <sstream>
+#include <system_error>
+
+// Linux native IO
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <unistd.h>
+
+// Private implementation
+namespace {
+
+
+using lock_guard = std::lock_guard<std::mutex>;
+
+// Get string from errno
+std::string errno_str(const std::string& msg) {
+ return std::system_error(errno, std::system_category(), msg).what();
+}
+
+// Throw proton::error(errno_str(msg)) if result < 0
+int check(int result, const std::string& msg) {
+ if (result < 0)
+ throw proton::error(errno_str(msg));
+ return result;
+}
+
+// Wrapper for getaddrinfo() that cleans up in destructor.
+class unique_addrinfo {
+ public:
+ unique_addrinfo(const std::string& addr) : addrinfo_(0) {
+ proton::url u(addr);
+ int result = ::getaddrinfo(char_p(u.host()), char_p(u.port()), 0, &addrinfo_);
+ if (result)
+ throw proton::error(std::string("bad address: ") + gai_strerror(result));
+ }
+ ~unique_addrinfo() { if (addrinfo_) ::freeaddrinfo(addrinfo_); }
+
+ ::addrinfo* operator->() const { return addrinfo_; }
+
+ private:
+ static const char* char_p(const std::string& s) { return s.empty() ? 0 : s.c_str(); }
+ ::addrinfo *addrinfo_;
+};
+
+// File descriptor wrapper that calls ::close in destructor.
+class unique_fd {
+ public:
+ unique_fd(int fd) : fd_(fd) {}
+ ~unique_fd() { if (fd_ >= 0) ::close(fd_); }
+ operator int() const { return fd_; }
+ int release() { int ret = fd_; fd_ = -1; return ret; }
+
+ protected:
+ int fd_;
+};
+
+class pollable;
+class pollable_engine;
+class pollable_listener;
+
+class epoll_container : public proton::io::container_impl_base {
+ public:
+ epoll_container(const std::string& id);
+ ~epoll_container();
+
+ // Implemenet the proton::mt_container interface
+ proton::returned<proton::connection> connect(
+ const std::string& addr, const proton::connection_options& opts) PN_CPP_OVERRIDE;
+
+ proton::listener listen(const std::string& addr, proton::listen_handler&) PN_CPP_OVERRIDE;
+
+ void stop_listening(const std::string& addr) PN_CPP_OVERRIDE;
+
+ void run() PN_CPP_OVERRIDE;
+ void auto_stop(bool) PN_CPP_OVERRIDE;
+ void stop(const proton::error_condition& err) PN_CPP_OVERRIDE;
+
+ std::string id() const PN_CPP_OVERRIDE { return id_; }
+
+ // Functions used internally.
+ proton::connection add_engine(proton::connection_options opts, int fd, bool server);
+ void erase(pollable*);
+
+ // Link names must be unique per container.
+ // Generate unique names with a simple atomic counter.
+ class atomic_link_namer : public proton::io::link_namer {
+ public:
+ std::string link_name() {
+ std::ostringstream o;
+ o << std::hex << ++count_;
+ return o.str();
+ }
+ private:
+ std::atomic<uint64_t> count_;
+ };
+
+ atomic_link_namer link_namer;
+
+ private:
+ template <class T> void store(T& v, const T& x) const { lock_guard g(lock_); v = x; }
+
+ void idle_check(const lock_guard&);
+ void interrupt();
+ void wait();
+
+ const std::string id_;
+ const unique_fd epoll_fd_;
+ const unique_fd interrupt_fd_;
+
+ mutable std::mutex lock_;
+
+ proton::connection_options options_;
+ std::map<std::string, std::unique_ptr<pollable_listener> > listeners_;
+ std::map<pollable*, std::unique_ptr<pollable_engine> > engines_;
+
+ std::condition_variable stopped_;
+ bool stopping_;
+ proton::error_condition stop_err_;
+ std::atomic<size_t> threads_;
+};
+
+// Base class for pollable file-descriptors. Manages epoll interaction,
+// subclasses implement virtual work() to do their serialized work.
+class pollable {
+ public:
+ pollable(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd), notified_(false), working_(false)
+ {
+ int flags = check(::fcntl(fd, F_GETFL, 0), "non-blocking");
+ check(::fcntl(fd, F_SETFL, flags | O_NONBLOCK), "non-blocking");
+ ::epoll_event ev = {};
+ ev.data.ptr = this;
+ ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd_, &ev);
+ }
+
+ virtual ~pollable() {
+ ::epoll_event ev = {};
+ ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd_, &ev); // Ignore errors.
+ }
+
+ bool do_work(uint32_t events) {
+ {
+ lock_guard g(lock_);
+ if (working_)
+ return true; // Another thread is already working.
+ working_ = true;
+ notified_ = false;
+ }
+ uint32_t new_events = work(events); // Serialized, outside the lock.
+ if (new_events) {
+ lock_guard g(lock_);
+ rearm(notified_ ? EPOLLIN|EPOLLOUT : new_events);
+ }
+ return new_events;
+ }
+
+ // Called from any thread to wake up the connection handler.
+ void notify() {
+ lock_guard g(lock_);
+ if (!notified_) {
+ notified_ = true;
+ if (!working_) // No worker thread, rearm now.
+ rearm(EPOLLIN|EPOLLOUT);
+ }
+ }
+
+ protected:
+
+ // Subclass implements work.
+ // Returns epoll events to re-enable or 0 if finished.
+ virtual uint32_t work(uint32_t events) = 0;
+
+ const unique_fd fd_;
+ const int epoll_fd_;
+
+ private:
+
+ void rearm(uint32_t events) {
+ epoll_event ev;
+ ev.data.ptr = this;
+ ev.events = EPOLLONESHOT | events;
+ check(::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd_, &ev), "re-arm epoll");
+ working_ = false;
+ }
+
+ std::mutex lock_;
+ bool notified_;
+ bool working_;
+};
+
+class epoll_event_loop : public proton::event_loop {
+ public:
+ typedef std::vector<std::function<void()> > jobs;
+
+ epoll_event_loop(pollable& p) : pollable_(p), closed_(false) {}
+
+ bool inject(std::function<void()> f) PN_CPP_OVERRIDE {
+ // Note this is an unbounded work queue.
+ // A resource-safe implementation should be bounded.
+ lock_guard g(lock_);
+ if (closed_)
+ return false;
+ jobs_.push_back(f);
+ pollable_.notify();
+ return true;
+ }
+
+ bool inject(proton::inject_handler& h) PN_CPP_OVERRIDE {
+ return inject(std::bind(&proton::inject_handler::on_inject, &h));
+ }
+
+ jobs pop_all() {
+ lock_guard g(lock_);
+ return std::move(jobs_);
+ }
+
+ void close() {
+ lock_guard g(lock_);
+ closed_ = true;
+ }
+
+ private:
+ std::mutex lock_;
+ pollable& pollable_;
+ jobs jobs_;
+ bool closed_;
+};
+
+// Handle epoll wakeups for a connection_engine.
+class pollable_engine : public pollable {
+ public:
+ pollable_engine(epoll_container& c, int fd, int epoll_fd) :
+ pollable(fd, epoll_fd),
+ loop_(new epoll_event_loop(*this)),
+ engine_(c, c.link_namer, loop_) {}
+
+ ~pollable_engine() {
+ loop_->close(); // No calls to notify() after this.
+ engine_.dispatch(); // Run any final events.
+ try { write(); } catch(...) {} // Write connection close if we can.
+ for (auto f : loop_->pop_all()) {// Run final queued work for side-effects.
+ try { f(); } catch(...) {}
+ }
+ }
+
+ uint32_t work(uint32_t events) {
+ try {
+ bool can_read = events & EPOLLIN, can_write = events && EPOLLOUT;
+ do {
+ can_write = can_write && write();
+ can_read = can_read && read();
+ for (auto f : loop_->pop_all()) // Run queued work
+ f();
+ engine_.dispatch();
+ } while (can_read || can_write);
+ return (engine_.read_buffer().size ? EPOLLIN:0) |
+ (engine_.write_buffer().size ? EPOLLOUT:0);
+ } catch (const std::exception& e) {
+ engine_.disconnected(proton::error_condition("exception", e.what()));
+ }
+ return 0; // Ending
+ }
+
+ proton::io::connection_engine& engine() { return engine_; }
+
+ private:
+
+ bool write() {
+ if (engine_.write_buffer().size) {
+ ssize_t n = ::write(fd_, engine_.write_buffer().data, engine_.write_buffer().size);
+ while (n == EINTR)
+ n = ::write(fd_, engine_.write_buffer().data, engine_.write_buffer().size);
+ if (n > 0) {
+ engine_.write_done(n);
+ return true;
+ } else if (errno != EAGAIN && errno != EWOULDBLOCK)
+ check(n, "write");
+ }
+ return false;
+ }
+
+ bool read() {
+ if (engine_.read_buffer().size) {
+ ssize_t n = ::read(fd_, engine_.read_buffer().data, engine_.read_buffer().size);
+ while (n == EINTR)
+ n = ::read(fd_, engine_.read_buffer().data, engine_.read_buffer().size);
+ if (n > 0) {
+ engine_.read_done(n);
+ return true;
+ }
+ else if (n == 0)
+ engine_.read_close();
+ else if (errno != EAGAIN && errno != EWOULDBLOCK)
+ check(n, "read");
+ }
+ return false;
+ }
+
+ // Lifecycle note: loop_ belongs to the proton::connection, which can live
+ // longer than the engine if the application holds a reference to it, we
+ // disconnect ourselves with loop_->close() in ~connection_engine()
+ epoll_event_loop* loop_;
+ proton::io::connection_engine engine_;
+};
+
+// A pollable listener fd that creates pollable_engine for incoming connections.
+class pollable_listener : public pollable {
+ public:
+ pollable_listener(
+ const std::string& addr,
+ proton::listen_handler& l,
+ int epoll_fd,
+ epoll_container& c
+ ) :
+ pollable(socket_listen(addr), epoll_fd),
+ addr_(addr),
+ container_(c),
+ listener_(l)
+ {}
+
+ uint32_t work(uint32_t events) {
+ if (events & EPOLLRDHUP) {
+ try { listener_.on_close(); } catch (...) {}
+ return 0;
+ }
+ try {
+ int accepted = check(::accept(fd_, NULL, 0), "accept");
+ container_.add_engine(listener_.on_accept(), accepted, true);
+ return EPOLLIN;
+ } catch (const std::exception& e) {
+ listener_.on_error(e.what());
+ return 0;
+ }
+ }
+
+ std::string addr() { return addr_; }
+
+ private:
+
+ static int socket_listen(const std::string& addr) {
+ std::string msg = "listen on "+addr;
+ unique_addrinfo ainfo(addr);
+ unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));
+ int yes = 1;
+ check(::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), msg);
+ check(::bind(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);
+ check(::listen(fd, 32), msg);
+ return fd.release();
+ }
+
+ std::string addr_;
+ std::function<proton::connection_options(const std::string&)> factory_;
+ epoll_container& container_;
+ proton::connection_options opts_;
+ proton::listen_handler& listener_;
+};
+
+
+epoll_container::epoll_container(const std::string& id)
+ : id_(id), epoll_fd_(check(epoll_create(1), "epoll_create")),
+ interrupt_fd_(check(eventfd(1, 0), "eventfd")),
+ stopping_(false), threads_(0)
+{}
+
+epoll_container::~epoll_container() {
+ try {
+ stop(proton::error_condition("exception", "container shut-down"));
+ wait();
+ } catch (...) {}
+}
+
+proton::connection epoll_container::add_engine(proton::connection_options opts, int fd, bool server)
+{
+ lock_guard g(lock_);
+ if (stopping_)
+ throw proton::error("container is stopping");
+ std::unique_ptr<pollable_engine> eng(new pollable_engine(*this, fd, epoll_fd_));
+ if (server)
+ eng->engine().accept(opts);
+ else
+ eng->engine().connect(opts);
+ proton::connection c = eng->engine().connection();
+ eng->notify();
+ engines_[eng.get()] = std::move(eng);
+ return c;
+}
+
+void epoll_container::erase(pollable* e) {
+ lock_guard g(lock_);
+ if (!engines_.erase(e)) {
+ pollable_listener* l = dynamic_cast<pollable_listener*>(e);
+ if (l)
+ listeners_.erase(l->addr());
+ }
+ idle_check(g);
+}
+
+void epoll_container::idle_check(const lock_guard&) {
+ if (stopping_ && engines_.empty() && listeners_.empty())
+ interrupt();
+}
+
+proton::returned<proton::connection> epoll_container::connect(
+ const std::string& addr, const proton::connection_options& opts)
+{
+ std::string msg = "connect to "+addr;
+ unique_addrinfo ainfo(addr);
+ unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));
+ check(::connect(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);
+ return make_thread_safe(add_engine(opts, fd.release(), false));
+}
+
+proton::listener epoll_container::listen(const std::string& addr, proton::listen_handler& lh) {
+ lock_guard g(lock_);
+ if (stopping_)
+ throw proton::error("container is stopping");
+ auto& l = listeners_[addr];
+ try {
+ l.reset(new pollable_listener(addr, lh, epoll_fd_, *this));
+ l->notify();
+ return proton::listener(*this, addr);
+ } catch (const std::exception& e) {
+ lh.on_error(e.what());
+ lh.on_close();
+ throw;
+ }
+}
+
+void epoll_container::stop_listening(const std::string& addr) {
+ lock_guard g(lock_);
+ listeners_.erase(addr);
+ idle_check(g);
+}
+
+void epoll_container::run() {
+ ++threads_;
+ try {
+ epoll_event e;
+ while(true) {
+ check(::epoll_wait(epoll_fd_, &e, 1, -1), "epoll_wait");
+ pollable* p = reinterpret_cast<pollable*>(e.data.ptr);
+ if (!p)
+ break; // Interrupted
+ if (!p->do_work(e.events))
+ erase(p);
+ }
+ } catch (const std::exception& e) {
+ stop(proton::error_condition("exception", e.what()));
+ }
+ if (--threads_ == 0)
+ stopped_.notify_all();
+}
+
+void epoll_container::auto_stop(bool set) {
+ lock_guard g(lock_);
+ stopping_ = set;
+}
+
+void epoll_container::stop(const proton::error_condition& err) {
+ lock_guard g(lock_);
+ stop_err_ = err;
+ interrupt();
+}
+
+void epoll_container::wait() {
+ std::unique_lock<std::mutex> l(lock_);
+ stopped_.wait(l, [this]() { return this->threads_ == 0; } );
+ for (auto& eng : engines_)
+ eng.second->engine().disconnected(stop_err_);
+ listeners_.clear();
+ engines_.clear();
+}
+
+void epoll_container::interrupt() {
+ // Add an always-readable fd with 0 data and no ONESHOT to interrupt all threads.
+ epoll_event ev = {};
+ ev.events = EPOLLIN;
+ check(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fd_, &ev), "interrupt");
+}
+
+}
+
+// This is the only public function.
+std::unique_ptr<proton::container> make_mt_container(const std::string& id) {
+ return std::unique_ptr<proton::container>(new epoll_container(id));
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org