You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2017/07/21 17:01:58 UTC
[02/20] qpid-proton git commit: PROTON-1400: [C++ binding] Make
proactor container thread safe
PROTON-1400: [C++ binding] Make proactor container thread safe
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e4eca5c3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e4eca5c3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e4eca5c3
Branch: refs/heads/master
Commit: e4eca5c3d92b917edca4629c6bc01155c4678baf
Parents: 8aee73b
Author: Andrew Stitcher <as...@apache.org>
Authored: Fri May 26 17:00:31 2017 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Fri Jul 21 12:50:06 2017 -0400
----------------------------------------------------------------------
.../bindings/cpp/include/proton/container.hpp | 22 +++-
.../cpp/include/proton/internal/config.hpp | 4 +
proton-c/bindings/cpp/src/container.cpp | 6 +-
.../cpp/src/include/proactor_container_impl.hpp | 28 +++-
.../cpp/src/proactor_container_impl.cpp | 127 +++++++++++++++----
5 files changed, 155 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp
index 0739517..8b517bf 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -29,11 +29,11 @@
#include "./internal/export.hpp"
#include "./internal/pn_unique_ptr.hpp"
-#ifdef PN_CPP_HAS_STD_FUNCTION
-#include <functional>
-#endif
#include <string>
+/// If the library can support multithreaded containers then PN_CPP_SUPPORTS_THREADS will be set.
+#define PN_CPP_SUPPORTS_THREADS PN_CPP_HAS_STD_THREAD && PN_CPP_HAS_STD_MUTEX && PN_CPP_HAS_STD_ATOMIC
+
namespace proton {
/// A top-level container of connections, sessions, senders, and
@@ -55,6 +55,13 @@ class PN_CPP_CLASS_EXTERN container {
/// Create a container.
PN_CPP_EXTERN container(const std::string& id="");
+ /// Destroy a container.
+ /// Note that you may not delete a container from within any of the threads running
+ /// any of the container's messaging_handlers. Specifically if you delete the container
+ /// from within a handler you cause a deadlock or a crash.
+ ///
+ /// The only safe place to delete a container is after all of the threads running a container
+ /// have finished and all of the run functions have returned.
PN_CPP_EXTERN ~container();
/// Connect to `url` and send an open request to the remote peer.
@@ -95,9 +102,16 @@ class PN_CPP_CLASS_EXTERN container {
/// Returns when the container stops.
/// @see auto_stop() and stop().
///
- /// With a multithreaded container, call run() in multiple threads to create a thread pool.
+ /// If you are using C++11 or later you may use a multithreaded container. In this case you may
+ /// call run() in multiple threads to create a thread pool. Or aternatively call run with an
+ /// integer parameter specifying the number of threads for the thread pool.
PN_CPP_EXTERN void run();
+#if PN_CPP_SUPPORTS_THREADS
+ /// @copydoc run()
+ PN_CPP_EXTERN void run(int threads);
+#endif
+
/// If true, stop the container when all active connections and listeners are closed.
/// If false the container will keep running till stop() is called.
///
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/include/proton/internal/config.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/internal/config.hpp b/proton-c/bindings/cpp/include/proton/internal/config.hpp
index 79d201c..54b014b 100644
--- a/proton-c/bindings/cpp/include/proton/internal/config.hpp
+++ b/proton-c/bindings/cpp/include/proton/internal/config.hpp
@@ -103,6 +103,10 @@
#define PN_CPP_HAS_STD_ATOMIC PN_CPP_HAS_CPP11
#endif
+#ifndef PN_CPP_HAS_STD_THREAD
+#define PN_CPP_HAS_STD_THREAD PN_CPP_HAS_CPP11
+#endif
+
#endif // PROTON_INTERNAL_CONFIG_HPP
/// @endcond
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/src/container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp
index c2af659..35f645c 100644
--- a/proton-c/bindings/cpp/src/container.cpp
+++ b/proton-c/bindings/cpp/src/container.cpp
@@ -81,7 +81,11 @@ returned<connection> container::connect(const std::string& url, const connection
listener container::listen(const std::string& url, listen_handler& l) { return impl_->listen(url, l); }
-void container::run() { impl_->run(); }
+void container::run() { impl_->run(1); }
+
+#if PN_CPP_SUPPORTS_THREADS
+void container::run(int threads) { impl_->run(threads); }
+#endif
void container::auto_stop(bool set) { impl_->auto_stop(set); }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
index fc963f7..ac54156 100644
--- a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
+++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
@@ -43,6 +43,21 @@
#include <string>
#include <vector>
+#if PN_CPP_SUPPORTS_THREADS
+#include <atomic>
+#include <mutex>
+# define MUTEX(x) std::mutex x;
+# define GUARD(x) std::lock_guard<std::mutex> g(x)
+# define ONCE_FLAG(x) std::once_flag x;
+# define CALL_ONCE(x, ...) std::call_once(x, __VA_ARGS__)
+# define ATOMIC_INT(x) std::atomic<int> x;
+#else
+# define MUTEX(x)
+# define GUARD(x)
+# define ONCE_FLAG(x)
+# define CALL_ONCE(x, f, o) ((o)->*(f))()
+# define ATOMIC_INT(x) int x;
+#endif
struct pn_proactor_t;
struct pn_listener_t;
struct pn_event_t;
@@ -70,7 +85,7 @@ class container::impl {
class sender_options sender_options() const { return sender_options_; }
void receiver_options(const proton::receiver_options&);
class receiver_options receiver_options() const { return receiver_options_; }
- void run();
+ void run(int threads);
void stop(const error_condition& err);
void auto_stop(bool set);
void schedule(duration, work);
@@ -86,16 +101,23 @@ class container::impl {
connection connect_common(const std::string&, const connection_options&);
// Event loop to run in each container thread
- static void thread(impl&);
+ void thread();
bool handle(pn_event_t*);
void run_timer_jobs();
+ ATOMIC_INT(threads_)
+ MUTEX(lock_)
+ ONCE_FLAG(start_once_)
+ ONCE_FLAG(stop_once_)
container& container_;
typedef std::set<container_work_queue*> work_queues;
work_queues work_queues_;
container_work_queue* add_work_queue();
void remove_work_queue(container_work_queue*);
+ void start_event();
+ void stop_event();
+
struct scheduled {
timestamp time; // duration from epoch for task
work task;
@@ -112,8 +134,8 @@ class container::impl {
connection_options server_connection_options_;
proton::sender_options sender_options_;
proton::receiver_options receiver_options_;
+ error_condition disconnect_error_;
- proton::error_condition stop_err_;
bool auto_stop_;
bool stopping_;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 62115fd..b900d6f 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -41,21 +41,27 @@
#include <algorithm>
#include <vector>
+#if PN_CPP_SUPPORTS_THREADS
+# include <thread>
+#endif
+
namespace proton {
class container::impl::common_work_queue : public work_queue::impl {
public:
- common_work_queue(container::impl& c): container_(c), finished_(false) {}
+ common_work_queue(container::impl& c): container_(c), finished_(false), running_(false) {}
typedef std::vector<work> jobs;
void run_all_jobs();
- void finished() { finished_ = true; }
+ void finished() { GUARD(lock_); finished_ = true; }
void schedule(duration, work);
+ MUTEX(lock_)
container::impl& container_;
jobs jobs_;
bool finished_;
+ bool running_;
};
void container::impl::common_work_queue::schedule(duration d, work f) {
@@ -68,11 +74,22 @@ void container::impl::common_work_queue::schedule(duration d, work f) {
void container::impl::common_work_queue::run_all_jobs() {
jobs j;
// Lock this operation for mt
- std::swap(j, jobs_);
+ {
+ GUARD(lock_);
+ // Ensure that we never run work from this queue concurrently
+ if (running_) return;
+ running_ = true;
+ // But allow adding to the queue concurrently to running
+ std::swap(j, jobs_);
+ }
// Run queued work, but ignore any exceptions
for (jobs::iterator f = j.begin(); f != j.end(); ++f) try {
(*f)();
} catch (...) {};
+ {
+ GUARD(lock_);
+ running_ = false;
+ }
return;
}
@@ -88,6 +105,7 @@ class container::impl::connection_work_queue : public common_work_queue {
bool container::impl::connection_work_queue::add(work f) {
// Note this is an unbounded work queue.
// A resource-safe implementation should be bounded.
+ GUARD(lock_);
if (finished_) return false;
jobs_.push_back(f);
pn_connection_wake(connection_);
@@ -105,6 +123,7 @@ class container::impl::container_work_queue : public common_work_queue {
bool container::impl::container_work_queue::add(work f) {
// Note this is an unbounded work queue.
// A resource-safe implementation should be bounded.
+ GUARD(lock_);
if (finished_) return false;
jobs_.push_back(f);
pn_proactor_set_timeout(container_.proactor_, 0);
@@ -116,15 +135,11 @@ class work_queue::impl* container::impl::make_work_queue(container& c) {
}
container::impl::impl(container& c, const std::string& id, messaging_handler* mh)
- : container_(c), proactor_(pn_proactor()), handler_(mh), id_(id),
+ : threads_(0), container_(c), proactor_(pn_proactor()), handler_(mh), id_(id),
auto_stop_(true), stopping_(false)
{}
container::impl::~impl() {
- try {
- stop(error_condition("exception", "container shut-down"));
- //wait();
- } catch (...) {}
pn_proactor_free(proactor_);
}
@@ -178,6 +193,7 @@ proton::returned<proton::connection> container::impl::connect(
const proton::connection_options& user_opts)
{
connection conn = connect_common(addr, user_opts);
+ GUARD(lock_);
return make_thread_safe(conn);
}
@@ -186,6 +202,7 @@ returned<sender> container::impl::open_sender(const std::string &url, const prot
lopts.update(o1);
connection conn = connect_common(url, o2);
+ GUARD(lock_);
return make_thread_safe(conn.default_session().open_sender(proton::url(url).path(), lopts));
}
@@ -194,6 +211,7 @@ returned<receiver> container::impl::open_receiver(const std::string &url, const
lopts.update(o1);
connection conn = connect_common(url, o2);
+ GUARD(lock_);
return make_thread_safe(
conn.default_session().open_receiver(proton::url(url).path(), lopts));
}
@@ -215,11 +233,13 @@ pn_listener_t* container::impl::listen_common_lh(const std::string& addr) {
}
proton::listener container::impl::listen(const std::string& addr) {
+ GUARD(lock_);
pn_listener_t* listener = listen_common_lh(addr);
return proton::listener(listener);
}
proton::listener container::impl::listen(const std::string& addr, const proton::connection_options& opts) {
+ GUARD(lock_);
pn_listener_t* listener = listen_common_lh(addr);
listener_context& lc=listener_context::get(listener);
lc.connection_options_.reset(new connection_options(opts));
@@ -227,6 +247,7 @@ proton::listener container::impl::listen(const std::string& addr, const proton::
}
proton::listener container::impl::listen(const std::string& addr, proton::listen_handler& lh) {
+ GUARD(lock_);
pn_listener_t* listener = listen_common_lh(addr);
listener_context& lc=listener_context::get(listener);
lc.listen_handler_ = &lh;
@@ -234,6 +255,7 @@ proton::listener container::impl::listen(const std::string& addr, proton::listen
}
void container::impl::schedule(duration delay, work f) {
+ GUARD(lock_);
timestamp now = timestamp::now();
// Record timeout; Add callback to timeout sorted list
@@ -247,18 +269,22 @@ void container::impl::schedule(duration delay, work f) {
}
void container::impl::client_connection_options(const connection_options &opts) {
+ GUARD(lock_);
client_connection_options_ = opts;
}
void container::impl::server_connection_options(const connection_options &opts) {
+ GUARD(lock_);
server_connection_options_ = opts;
}
void container::impl::sender_options(const proton::sender_options &opts) {
+ GUARD(lock_);
sender_options_ = opts;
}
void container::impl::receiver_options(const proton::receiver_options &opts) {
+ GUARD(lock_);
receiver_options_ = opts;
}
@@ -294,13 +320,18 @@ bool container::impl::handle(pn_event_t* event) {
switch (pn_event_type(event)) {
case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
- return auto_stop_;
+ // If we're stopping interrupt all other threads still running
+ if (auto_stop_) pn_proactor_interrupt(proactor_);
+ return false;
- // We never interrupt the proactor so ignore
+ // We only interrupt to stop threads
case PN_PROACTOR_INTERRUPT:
- return false;
+ // Interrupt any other threads still running
+ if (threads_>1) pn_proactor_interrupt(proactor_);
+ return true;
case PN_PROACTOR_TIMEOUT: {
+ GUARD(lock_);
// Can get an immediate timeout, if we have a container event loop inject
if ( deferred_.size()>0 ) {
run_timer_jobs();
@@ -402,30 +433,78 @@ bool container::impl::handle(pn_event_t* event) {
return false;
}
-void container::impl::thread(container::impl& ci) {
- bool finished = false;
- do {
- pn_event_batch_t *events = pn_proactor_wait(ci.proactor_);
- pn_event_t *e;
- while ((e = pn_event_batch_next(events))) {
- finished = ci.handle(e) || finished;
- }
- pn_proactor_done(ci.proactor_, events);
- } while(!finished);
+void container::impl::thread() {
+ ++threads_;
+ bool finished = false;
+ do {
+ pn_event_batch_t *events = pn_proactor_wait(proactor_);
+ pn_event_t *e;
+ try {
+ while ((e = pn_event_batch_next(events))) {
+ finished = handle(e);
+ if (finished) break;
+ }
+ } catch (proton::error& e) {
+ // If we caught an exception then shutdown the (other threads of the) container
+ disconnect_error_ = error_condition("exception", e.what());
+ if (!stopping_) stop(disconnect_error_);
+ finished = true;
+ } catch (...) {
+ // If we caught an exception then shutdown the (other threads of the) container
+ disconnect_error_ = error_condition("exception", "container shut-down by unknown exception");
+ if (!stopping_) stop(disconnect_error_);
+ finished = true;
+ }
+ pn_proactor_done(proactor_, events);
+ } while(!finished);
+ --threads_;
}
-void container::impl::run() {
- // Have to "manually" generate container events
+void container::impl::start_event() {
if (handler_) handler_->on_container_start(container_);
- thread(*this);
+}
+
+void container::impl::stop_event() {
if (handler_) handler_->on_container_stop(container_);
}
+void container::impl::run(int threads) {
+ // Have to "manually" generate container events
+ CALL_ONCE(start_once_, &impl::start_event, this);
+
+#if PN_CPP_SUPPORTS_THREADS
+ // Run handler threads
+ std::vector<std::thread> ts(threads-1);
+ if (threads>1) {
+ for (auto& t : ts) t = std::thread(&impl::thread, this);
+ }
+
+ thread(); // Use this thread too.
+
+ // Wait for the other threads to stop
+ if (threads>1) {
+ for (auto& t : ts) t.join();
+ }
+#else
+ // Run a single handler thread (As we have no threading API)
+ thread();
+#endif
+
+ if (threads_==0) CALL_ONCE(stop_once_, &impl::stop_event, this);
+
+ // Throw an exception if we disconnected the proactor because of an exception
+ if (!disconnect_error_.empty()) {
+ throw proton::error(disconnect_error_.description());
+ };
+}
+
void container::impl::auto_stop(bool set) {
+ GUARD(lock_);
auto_stop_ = set;
}
void container::impl::stop(const proton::error_condition& err) {
+ GUARD(lock_);
auto_stop_ = true;
stopping_ = true;
pn_condition_t* error_condition = pn_condition();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org