You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2017/07/21 17:01:58 UTC

[02/20] qpid-proton git commit: PROTON-1400: [C++ binding] Make proactor container thread safe

PROTON-1400: [C++ binding] Make proactor container thread safe


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e4eca5c3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e4eca5c3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e4eca5c3

Branch: refs/heads/master
Commit: e4eca5c3d92b917edca4629c6bc01155c4678baf
Parents: 8aee73b
Author: Andrew Stitcher <as...@apache.org>
Authored: Fri May 26 17:00:31 2017 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Fri Jul 21 12:50:06 2017 -0400

----------------------------------------------------------------------
 .../bindings/cpp/include/proton/container.hpp   |  22 +++-
 .../cpp/include/proton/internal/config.hpp      |   4 +
 proton-c/bindings/cpp/src/container.cpp         |   6 +-
 .../cpp/src/include/proactor_container_impl.hpp |  28 +++-
 .../cpp/src/proactor_container_impl.cpp         | 127 +++++++++++++++----
 5 files changed, 155 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp
index 0739517..8b517bf 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -29,11 +29,11 @@
 #include "./internal/export.hpp"
 #include "./internal/pn_unique_ptr.hpp"
 
-#ifdef PN_CPP_HAS_STD_FUNCTION
-#include <functional>
-#endif
 #include <string>
 
+/// If the library can support multithreaded containers then PN_CPP_SUPPORTS_THREADS will be set.
+#define PN_CPP_SUPPORTS_THREADS PN_CPP_HAS_STD_THREAD && PN_CPP_HAS_STD_MUTEX && PN_CPP_HAS_STD_ATOMIC
+
 namespace proton {
 
 /// A top-level container of connections, sessions, senders, and
@@ -55,6 +55,13 @@ class PN_CPP_CLASS_EXTERN container {
     /// Create a container.
     PN_CPP_EXTERN container(const std::string& id="");
 
+    /// Destroy a container.
+    /// Note that you may not delete a container from within any of the threads running
+    /// any of the container's messaging_handlers. Specifically if you delete the container
+    /// from within a handler you cause a deadlock or a crash.
+    ///
+    /// The only safe place to delete a container is after all of the threads running a container
+    /// have finished and all of the run functions have returned.
     PN_CPP_EXTERN ~container();
 
     /// Connect to `url` and send an open request to the remote peer.
@@ -95,9 +102,16 @@ class PN_CPP_CLASS_EXTERN container {
     /// Returns when the container stops.
     /// @see auto_stop() and stop().
     ///
-    /// With a multithreaded container, call run() in multiple threads to create a thread pool.
+    /// If you are using C++11 or later you may use a multithreaded container. In this case you may
+    /// call run() in multiple threads to create a thread pool. Or aternatively call run with an
+    /// integer parameter specifying the number of threads for the thread pool.
     PN_CPP_EXTERN void run();
 
+#if PN_CPP_SUPPORTS_THREADS
+    /// @copydoc run()
+    PN_CPP_EXTERN void run(int threads);
+#endif
+
     /// If true, stop the container when all active connections and listeners are closed.
     /// If false the container will keep running till stop() is called.
     ///

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/include/proton/internal/config.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/internal/config.hpp b/proton-c/bindings/cpp/include/proton/internal/config.hpp
index 79d201c..54b014b 100644
--- a/proton-c/bindings/cpp/include/proton/internal/config.hpp
+++ b/proton-c/bindings/cpp/include/proton/internal/config.hpp
@@ -103,6 +103,10 @@
 #define PN_CPP_HAS_STD_ATOMIC PN_CPP_HAS_CPP11
 #endif
 
+#ifndef PN_CPP_HAS_STD_THREAD
+#define PN_CPP_HAS_STD_THREAD PN_CPP_HAS_CPP11
+#endif
+
 #endif // PROTON_INTERNAL_CONFIG_HPP
 
 /// @endcond

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/src/container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp
index c2af659..35f645c 100644
--- a/proton-c/bindings/cpp/src/container.cpp
+++ b/proton-c/bindings/cpp/src/container.cpp
@@ -81,7 +81,11 @@ returned<connection> container::connect(const std::string& url, const connection
 
 listener container::listen(const std::string& url, listen_handler& l) { return impl_->listen(url, l); }
 
-void container::run() { impl_->run(); }
+void container::run() { impl_->run(1); }
+
+#if PN_CPP_SUPPORTS_THREADS
+void container::run(int threads) { impl_->run(threads); }
+#endif
 
 void container::auto_stop(bool set) { impl_->auto_stop(set); }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
index fc963f7..ac54156 100644
--- a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
+++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
@@ -43,6 +43,21 @@
 #include <string>
 #include <vector>
 
+#if PN_CPP_SUPPORTS_THREADS
+#include <atomic>
+#include <mutex>
+# define MUTEX(x) std::mutex x;
+# define GUARD(x) std::lock_guard<std::mutex> g(x)
+# define ONCE_FLAG(x) std::once_flag x;
+# define CALL_ONCE(x, ...) std::call_once(x, __VA_ARGS__)
+# define ATOMIC_INT(x) std::atomic<int> x;
+#else
+# define MUTEX(x)
+# define GUARD(x)
+# define ONCE_FLAG(x)
+# define CALL_ONCE(x, f, o) ((o)->*(f))()
+# define ATOMIC_INT(x) int x;
+#endif
 struct pn_proactor_t;
 struct pn_listener_t;
 struct pn_event_t;
@@ -70,7 +85,7 @@ class container::impl {
     class sender_options sender_options() const { return sender_options_; }
     void receiver_options(const proton::receiver_options&);
     class receiver_options receiver_options() const { return receiver_options_; }
-    void run();
+    void run(int threads);
     void stop(const error_condition& err);
     void auto_stop(bool set);
     void schedule(duration, work);
@@ -86,16 +101,23 @@ class container::impl {
     connection connect_common(const std::string&, const connection_options&);
 
     // Event loop to run in each container thread
-    static void thread(impl&);
+    void thread();
     bool handle(pn_event_t*);
     void run_timer_jobs();
 
+    ATOMIC_INT(threads_)
+    MUTEX(lock_)
+    ONCE_FLAG(start_once_)
+    ONCE_FLAG(stop_once_)
     container& container_;
 
     typedef std::set<container_work_queue*> work_queues;
     work_queues work_queues_;
     container_work_queue* add_work_queue();
     void remove_work_queue(container_work_queue*);
+    void start_event();
+    void stop_event();
+
     struct scheduled {
         timestamp time; // duration from epoch for task
         work task;
@@ -112,8 +134,8 @@ class container::impl {
     connection_options server_connection_options_;
     proton::sender_options sender_options_;
     proton::receiver_options receiver_options_;
+    error_condition disconnect_error_;
 
-    proton::error_condition stop_err_;
     bool auto_stop_;
     bool stopping_;
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 62115fd..b900d6f 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -41,21 +41,27 @@
 #include <algorithm>
 #include <vector>
 
+#if PN_CPP_SUPPORTS_THREADS
+# include <thread>
+#endif
+
 namespace proton {
 
 class container::impl::common_work_queue : public work_queue::impl {
   public:
-    common_work_queue(container::impl& c): container_(c), finished_(false) {}
+    common_work_queue(container::impl& c): container_(c), finished_(false), running_(false) {}
 
     typedef std::vector<work> jobs;
 
     void run_all_jobs();
-    void finished() { finished_ = true; }
+    void finished() { GUARD(lock_); finished_ = true; }
     void schedule(duration, work);
 
+    MUTEX(lock_)
     container::impl& container_;
     jobs jobs_;
     bool finished_;
+    bool running_;
 };
 
 void container::impl::common_work_queue::schedule(duration d, work f) {
@@ -68,11 +74,22 @@ void container::impl::common_work_queue::schedule(duration d, work f) {
 void container::impl::common_work_queue::run_all_jobs() {
     jobs j;
     // Lock this operation for mt
-    std::swap(j, jobs_);
+    {
+        GUARD(lock_);
+        // Ensure that we never run work from this queue concurrently
+        if (running_) return;
+        running_ = true;
+        // But allow adding to the queue concurrently to running
+        std::swap(j, jobs_);
+    }
     // Run queued work, but ignore any exceptions
     for (jobs::iterator f = j.begin(); f != j.end(); ++f) try {
         (*f)();
     } catch (...) {};
+    {
+        GUARD(lock_);
+        running_ = false;
+    }
     return;
 }
 
@@ -88,6 +105,7 @@ class container::impl::connection_work_queue : public common_work_queue {
 bool container::impl::connection_work_queue::add(work f) {
     // Note this is an unbounded work queue.
     // A resource-safe implementation should be bounded.
+    GUARD(lock_);
     if (finished_) return false;
     jobs_.push_back(f);
     pn_connection_wake(connection_);
@@ -105,6 +123,7 @@ class container::impl::container_work_queue : public common_work_queue {
 bool container::impl::container_work_queue::add(work f) {
     // Note this is an unbounded work queue.
     // A resource-safe implementation should be bounded.
+    GUARD(lock_);
     if (finished_) return false;
     jobs_.push_back(f);
     pn_proactor_set_timeout(container_.proactor_, 0);
@@ -116,15 +135,11 @@ class work_queue::impl* container::impl::make_work_queue(container& c) {
 }
 
 container::impl::impl(container& c, const std::string& id, messaging_handler* mh)
-    : container_(c), proactor_(pn_proactor()), handler_(mh), id_(id),
+    : threads_(0), container_(c), proactor_(pn_proactor()), handler_(mh), id_(id),
       auto_stop_(true), stopping_(false)
 {}
 
 container::impl::~impl() {
-    try {
-        stop(error_condition("exception", "container shut-down"));
-        //wait();
-    } catch (...) {}
     pn_proactor_free(proactor_);
 }
 
@@ -178,6 +193,7 @@ proton::returned<proton::connection> container::impl::connect(
     const proton::connection_options& user_opts)
 {
     connection conn = connect_common(addr, user_opts);
+    GUARD(lock_);
     return make_thread_safe(conn);
 }
 
@@ -186,6 +202,7 @@ returned<sender> container::impl::open_sender(const std::string &url, const prot
     lopts.update(o1);
     connection conn = connect_common(url, o2);
 
+    GUARD(lock_);
     return make_thread_safe(conn.default_session().open_sender(proton::url(url).path(), lopts));
 }
 
@@ -194,6 +211,7 @@ returned<receiver> container::impl::open_receiver(const std::string &url, const
     lopts.update(o1);
     connection conn = connect_common(url, o2);
 
+    GUARD(lock_);
     return make_thread_safe(
         conn.default_session().open_receiver(proton::url(url).path(), lopts));
 }
@@ -215,11 +233,13 @@ pn_listener_t* container::impl::listen_common_lh(const std::string& addr) {
 }
 
 proton::listener container::impl::listen(const std::string& addr) {
+    GUARD(lock_);
     pn_listener_t* listener = listen_common_lh(addr);
     return proton::listener(listener);
 }
 
 proton::listener container::impl::listen(const std::string& addr, const proton::connection_options& opts) {
+    GUARD(lock_);
     pn_listener_t* listener = listen_common_lh(addr);
     listener_context& lc=listener_context::get(listener);
     lc.connection_options_.reset(new connection_options(opts));
@@ -227,6 +247,7 @@ proton::listener container::impl::listen(const std::string& addr, const proton::
 }
 
 proton::listener container::impl::listen(const std::string& addr, proton::listen_handler& lh) {
+    GUARD(lock_);
     pn_listener_t* listener = listen_common_lh(addr);
     listener_context& lc=listener_context::get(listener);
     lc.listen_handler_ = &lh;
@@ -234,6 +255,7 @@ proton::listener container::impl::listen(const std::string& addr, proton::listen
 }
 
 void container::impl::schedule(duration delay, work f) {
+    GUARD(lock_);
     timestamp now = timestamp::now();
 
     // Record timeout; Add callback to timeout sorted list
@@ -247,18 +269,22 @@ void container::impl::schedule(duration delay, work f) {
 }
 
 void container::impl::client_connection_options(const connection_options &opts) {
+    GUARD(lock_);
     client_connection_options_ = opts;
 }
 
 void container::impl::server_connection_options(const connection_options &opts) {
+    GUARD(lock_);
     server_connection_options_ = opts;
 }
 
 void container::impl::sender_options(const proton::sender_options &opts) {
+    GUARD(lock_);
     sender_options_ = opts;
 }
 
 void container::impl::receiver_options(const proton::receiver_options &opts) {
+    GUARD(lock_);
     receiver_options_ = opts;
 }
 
@@ -294,13 +320,18 @@ bool container::impl::handle(pn_event_t* event) {
     switch (pn_event_type(event)) {
 
     case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
-        return auto_stop_;
+        // If we're stopping interrupt all other threads still running
+        if (auto_stop_) pn_proactor_interrupt(proactor_);
+        return false;
 
-    // We never interrupt the proactor so ignore
+    // We only interrupt to stop threads
     case PN_PROACTOR_INTERRUPT:
-        return false;
+        // Interrupt any other threads still running
+        if (threads_>1) pn_proactor_interrupt(proactor_);
+        return true;
 
     case PN_PROACTOR_TIMEOUT: {
+        GUARD(lock_);
         // Can get an immediate timeout, if we have a container event loop inject
         if  ( deferred_.size()>0 ) {
             run_timer_jobs();
@@ -402,30 +433,78 @@ bool container::impl::handle(pn_event_t* event) {
     return false;
 }
 
-void container::impl::thread(container::impl& ci) {
-  bool finished = false;
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(ci.proactor_);
-    pn_event_t *e;
-    while ((e = pn_event_batch_next(events))) {
-      finished = ci.handle(e) || finished;
-    }
-    pn_proactor_done(ci.proactor_, events);
-  } while(!finished);
+void container::impl::thread() {
+    ++threads_;
+    bool finished = false;
+    do {
+      pn_event_batch_t *events = pn_proactor_wait(proactor_);
+      pn_event_t *e;
+      try {
+        while ((e = pn_event_batch_next(events))) {
+          finished = handle(e);
+          if (finished) break;
+        }
+      } catch (proton::error& e) {
+        // If we caught an exception then shutdown the (other threads of the) container
+        disconnect_error_ = error_condition("exception", e.what());
+        if (!stopping_) stop(disconnect_error_);
+        finished = true;
+      } catch (...) {
+        // If we caught an exception then shutdown the (other threads of the) container
+        disconnect_error_ = error_condition("exception", "container shut-down by unknown exception");
+        if (!stopping_) stop(disconnect_error_);
+        finished = true;
+      }
+      pn_proactor_done(proactor_, events);
+    } while(!finished);
+    --threads_;
 }
 
-void container::impl::run() {
-    // Have to "manually" generate container events
+void container::impl::start_event() {
     if (handler_) handler_->on_container_start(container_);
-    thread(*this);
+}
+
+void container::impl::stop_event() {
     if (handler_) handler_->on_container_stop(container_);
 }
 
+void container::impl::run(int threads) {
+    // Have to "manually" generate container events
+    CALL_ONCE(start_once_, &impl::start_event, this);
+
+#if PN_CPP_SUPPORTS_THREADS
+    // Run handler threads
+    std::vector<std::thread> ts(threads-1);
+    if (threads>1) {
+      for (auto& t : ts) t = std::thread(&impl::thread, this);
+    }
+
+    thread();      // Use this thread too.
+
+    // Wait for the other threads to stop
+    if (threads>1) {
+      for (auto& t : ts) t.join();
+    }
+#else
+    // Run a single handler thread (As we have no threading API)
+    thread();
+#endif
+
+    if (threads_==0) CALL_ONCE(stop_once_, &impl::stop_event, this);
+
+    // Throw an exception if we disconnected the proactor because of an exception
+    if (!disconnect_error_.empty()) {
+      throw proton::error(disconnect_error_.description());
+    };
+}
+
 void container::impl::auto_stop(bool set) {
+    GUARD(lock_);
     auto_stop_ = set;
 }
 
 void container::impl::stop(const proton::error_condition& err) {
+    GUARD(lock_);
     auto_stop_ = true;
     stopping_ = true;
     pn_condition_t* error_condition = pn_condition();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org