You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/01 15:04:04 UTC

[36/50] qpid-proton git commit: PROTON-1557: c++ fix race conditions in proactor_container_impl

PROTON-1557: c++ fix race conditions in proactor_container_impl

Fixed:
- using of option member variables outside of guard.
- reference couting of pn_ objects outside of handler after pn_proactor_connect
- creating returned<> wrappers from raw pointers.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/38b27b50
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/38b27b50
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/38b27b50

Branch: refs/heads/go1
Commit: 38b27b5035658454be9ddc2bae80c589576960f2
Parents: bd10259
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Aug 29 18:08:01 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Aug 29 18:29:45 2017 -0400

----------------------------------------------------------------------
 examples/cpp/multithreaded_client.cpp           | 30 ++++----
 .../cpp/multithreaded_client_flow_control.cpp   | 35 +++++----
 .../bindings/cpp/include/proton/returned.hpp    | 15 ++--
 .../cpp/src/include/proactor_container_impl.hpp |  3 +-
 .../bindings/cpp/src/include/proton_bits.hpp    | 15 ++--
 .../cpp/src/proactor_container_impl.cpp         | 75 ++++++++++++--------
 proton-c/bindings/cpp/src/returned.cpp          |  6 +-
 tools/py/proctest.py                            |  2 +
 8 files changed, 110 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/examples/cpp/multithreaded_client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/multithreaded_client.cpp b/examples/cpp/multithreaded_client.cpp
index 955655c..4119bbf 100644
--- a/examples/cpp/multithreaded_client.cpp
+++ b/examples/cpp/multithreaded_client.cpp
@@ -47,6 +47,10 @@
 #include <string>
 #include <thread>
 
+// Lock output from threads to avoid scramblin
+std::mutex out_lock;
+#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
+
 // Handler for a single thread-safe sending and receiving connection.
 class client : public proton::messaging_handler {
     // Invariant
@@ -64,7 +68,7 @@ class client : public proton::messaging_handler {
     std::condition_variable messages_ready_;
 
   public:
-    client(const std::string& url, const std::string& address) : url_(url), address_(address) {}
+    client(const std::string& url, const std::string& address) : url_(url), address_(address), work_queue_(0) {}
 
     // Thread safe
     void send(const proton::message& msg) {
@@ -112,25 +116,21 @@ class client : public proton::messaging_handler {
     }
 
     void on_sender_open(proton::sender& s) override {
-        {
-            // sender_ and work_queue_ must be set atomically
-            std::lock_guard<std::mutex> l(lock_);
-            sender_ = s;
-            work_queue_ = &s.work_queue();
-        }
+        // sender_ and work_queue_ must be set atomically
+        std::lock_guard<std::mutex> l(lock_);
+        sender_ = s;
+        work_queue_ = &s.work_queue();
         sender_ready_.notify_all();
     }
 
     void on_message(proton::delivery& dlv, proton::message& msg) override {
-        {
-            std::lock_guard<std::mutex> l(lock_);
-            messages_.push(msg);
-        }
+        std::lock_guard<std::mutex> l(lock_);
+        messages_.push(msg);
         messages_ready_.notify_all();
     }
 
     void on_error(const proton::error_condition& e) override {
-        std::cerr << "unexpected error: " << e << std::endl;
+        OUT(std::cerr << "unexpected error: " << e << std::endl);
         exit(1);
     }
 };
@@ -157,7 +157,7 @@ int main(int argc, const char** argv) {
                 for (int i = 0; i < n_messages; ++i) {
                     proton::message msg(std::to_string(i + 1));
                     cl.send(msg);
-                    std::cout << "sent: " << msg.body() << std::endl;
+                    OUT(std::cout << "sent: " << msg.body() << std::endl);
                 }
             });
 
@@ -165,7 +165,7 @@ int main(int argc, const char** argv) {
         std::thread receiver([&]() {
                 for (int i = 0; i < n_messages; ++i) {
                     auto msg = cl.receive();
-                    std::cout << "received: " << msg.body() << std::endl;
+                    OUT(std::cout << " received: " << msg.body() << std::endl);
                     ++received;
                 }
             });
@@ -174,7 +174,7 @@ int main(int argc, const char** argv) {
         receiver.join();
         cl.close();
         container_thread.join();
-        std::cout << "received " << received << " messages" << std::endl;
+        std::cout << received << " messages sent and received" << std::endl;
 
         return 0;
     } catch (const std::exception& e) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/examples/cpp/multithreaded_client_flow_control.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/multithreaded_client_flow_control.cpp b/examples/cpp/multithreaded_client_flow_control.cpp
index 9eec782..8764793 100644
--- a/examples/cpp/multithreaded_client_flow_control.cpp
+++ b/examples/cpp/multithreaded_client_flow_control.cpp
@@ -56,6 +56,10 @@
 #include <string>
 #include <thread>
 
+// Lock output from threads to avoid scramblin
+std::mutex out_lock;
+#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
+
 // A thread-safe sending connection that blocks sending threads when there
 // is no AMQP credit to send messages.
 class sender : private proton::messaging_handler {
@@ -127,7 +131,7 @@ class sender : private proton::messaging_handler {
     }
 
     void on_error(const proton::error_condition& e) override {
-        std::cerr << "unexpected error: " << e << std::endl;
+        OUT(std::cerr << "unexpected error: " << e << std::endl);
         exit(1);
     }
 };
@@ -202,7 +206,7 @@ class receiver : private proton::messaging_handler {
     }
 
     void on_error(const proton::error_condition& e) override {
-        std::cerr << "unexpected error: " << e << std::endl;
+        OUT(std::cerr << "unexpected error: " << e << std::endl);
         exit(1);
     }
 };
@@ -210,28 +214,31 @@ class receiver : private proton::messaging_handler {
 // ==== Example code using the sender and receiver
 
 // Send n messages
-void send_thread(sender& s, int n, bool print) {
+void send_thread(sender& s, int n) {
     auto id = std::this_thread::get_id();
     for (int i = 0; i < n; ++i) {
         std::ostringstream ss;
         ss << std::this_thread::get_id() << ":" << i;
         s.send(proton::message(ss.str()));
-        if (print) std::cout << "received: " << ss.str() << std::endl;
+        OUT(std::cout << id << " received: " << ss.str() << std::endl);
     }
-    std::cout << id << " sent " << n << std::endl;
+    OUT(std::cout << id << " sent " << n << std::endl);
 }
 
 // Receive messages till atomic remaining count is 0.
 // remaining is shared among all receiving threads
-void receive_thread(receiver& r, std::atomic_int& remaining, bool print) {
+void receive_thread(receiver& r, std::atomic_int& remaining) {
     auto id = std::this_thread::get_id();
     int n = 0;
+    // atomically check and decrement remaining *before* receiving.
+    // If it is 0 or less then return, as there are no more
+    // messages to receive so calling r.receive() would block forever.
     while (remaining-- > 0) {
         auto m = r.receive();
         ++n;
-        if (print) std::cout << id << "received: " << m.body() << std::endl;
+        OUT(std::cout << id << " received: " << m.body() << std::endl);
     }
-    std::cout << id << " received " << n << " messages" << std::endl;
+    OUT(std::cout << id << " received " << n << " messages" << std::endl);
 }
 
 int main(int argc, const char **argv) {
@@ -250,10 +257,10 @@ int main(int argc, const char **argv) {
         const char *address = argv[2];
         int n_messages = atoi(argv[3]);
         int n_threads = atoi(argv[4]);
+        int count = n_messages * n_threads;
 
         // Total messages to be received, multiple receiver threads will decrement this.
-        std::atomic_int remaining(n_messages * n_threads);
-        bool print = remaining < 1000; // Don't print for long runs, dominates run time
+        std::atomic_int remaining(count);
 
         // Run the proton container
         proton::container container;
@@ -267,17 +274,19 @@ int main(int argc, const char **argv) {
         // Starting receivers first gives all receivers a chance to compete for messages.
         std::vector<std::thread> threads;
         for (int i = 0; i < n_threads; ++i)
-            threads.push_back(std::thread([&]() { receive_thread(recv, remaining, print); }));
+            threads.push_back(std::thread([&]() { receive_thread(recv, remaining); }));
         for (int i = 0; i < n_threads; ++i)
-            threads.push_back(std::thread([&]() { send_thread(send, n_messages, print); }));
+            threads.push_back(std::thread([&]() { send_thread(send, n_messages); }));
 
         // Wait for threads to finish
         for (auto& t : threads)
             t.join();
         send.close();
         recv.close();
-
         container_thread.join();
+        if (remaining > 0)
+            throw std::runtime_error("not all messages were received");
+        std::cout << count << " messages sent and received" << std::endl;
 
         return 0;
     } catch (const std::exception& e) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/include/proton/returned.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/returned.hpp b/proton-c/bindings/cpp/include/proton/returned.hpp
index 28c45a0..45de4c4 100644
--- a/proton-c/bindings/cpp/include/proton/returned.hpp
+++ b/proton-c/bindings/cpp/include/proton/returned.hpp
@@ -36,7 +36,7 @@
 namespace proton {
 
 namespace internal {
-template <class T> class factory;
+class returned_factory;
 }
 
 /// Return type for container functions
@@ -51,13 +51,20 @@ template <class T>
 class PN_CPP_CLASS_EXTERN returned
 {
   public:
-    PN_CPP_EXTERN returned(const T&);
+    /// Copy operator required to return a value
+    /// @note thread safe
+    PN_CPP_EXTERN returned(const returned<T>&);
+
+    /// Convert to the proton::object
+    ///
+    /// @note **Thread unsafe** do not use in a multi-threaded application.
     PN_CPP_EXTERN operator T() const;
 
   private:
     typename T::pn_type* ptr_;
-    returned& operator=(const returned&);
-    template <class U> friend class internal::factory;
+    returned(typename T::pn_type*);
+    returned& operator=(const returned&); // Not defined
+  friend class internal::returned_factory;
 };
 
 } // proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
index ac54156..0aa62a5 100644
--- a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
+++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
@@ -98,7 +98,8 @@ class container::impl {
     class connection_work_queue;
     class container_work_queue;
     pn_listener_t* listen_common_lh(const std::string&);
-    connection connect_common(const std::string&, const connection_options&);
+    pn_connection_t* make_connection_lh(const url& url, const connection_options&);
+    void start_connection(const url& url, pn_connection_t* c);
 
     // Event loop to run in each container thread
     void thread();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/src/include/proton_bits.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proton_bits.hpp b/proton-c/bindings/cpp/src/include/proton_bits.hpp
index 035ffb7..675249f 100644
--- a/proton-c/bindings/cpp/src/include/proton_bits.hpp
+++ b/proton-c/bindings/cpp/src/include/proton_bits.hpp
@@ -124,7 +124,6 @@ class factory {
 public:
     static T wrap(typename wrapped<T>::type* t) { return t; }
     static typename wrapped<T>::type* unwrap(const T& t) { return t.pn_object(); }
-    static returned<T> make_returned(const T& t) { return returned<T>(t); }
 };
 
 template <class T> struct context {};
@@ -140,7 +139,14 @@ inline void set_messaging_handler(T t, messaging_handler* mh) { context<T>::type
 template <class T>
 inline messaging_handler* get_messaging_handler(T* t) { return context<typename internal::wrapper<T>::type>::type::get(t).handler; }
 
-}
+class returned_factory {
+  public:
+    template <class T> static returned<T> make(typename internal::wrapped<T>::type* pn) {
+        return returned<T>(pn);
+    }
+};
+
+} // namespace internal
 
 template <class T>
 typename internal::wrapper<T>::type make_wrapper(T* t) { return internal::factory<typename internal::wrapper<T>::type>::wrap(t); }
@@ -151,9 +157,8 @@ U make_wrapper(typename internal::wrapped<U>::type* t) { return internal::factor
 template <class T>
 typename internal::wrapped<T>::type* unwrap(const T& t) { return internal::factory<T>::unwrap(t); }
 
-template <class T>
-returned<T> make_returned(const T& t) {
-    return internal::factory<T>::make_returned(t);
+template <class T> returned<T> make_returned(typename internal::wrapped<T>::type* pn) {
+    return internal::returned_factory::make<T>(pn);
 }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 1389306..9870210 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -152,9 +152,9 @@ void container::impl::remove_work_queue(container::impl::container_work_queue* l
     work_queues_.erase(l);
 }
 
-proton::connection container::impl::connect_common(
-    const std::string& addr,
-    const proton::connection_options& user_opts)
+pn_connection_t* container::impl::make_connection_lh(
+    const url& url,
+    const connection_options& user_opts)
 {
     if (stopping_)
         throw proton::error("container is stopping");
@@ -163,7 +163,6 @@ proton::connection container::impl::connect_common(
     opts.update(user_opts);
     messaging_handler* mh = opts.handler();
 
-    proton::url url(addr);
     pn_connection_t *pnc = pn_connection();
     connection_context& cc(connection_context::get(pnc));
     cc.container = &container_;
@@ -177,42 +176,58 @@ proton::connection container::impl::connect_common(
     if (!url.password().empty())
         pn_connection_set_password(pnc, url.password().c_str());
 
-    connection conn = make_wrapper(pnc);
-    conn.open(opts);
-    // Figure out correct string len then create connection address
-    int len = pn_proactor_addr(0, 0, url.host().c_str(), url.port().c_str());
-    std::vector<char> caddr(len+1);
-    pn_proactor_addr(&caddr[0], len+1, url.host().c_str(), url.port().c_str());
-    pn_proactor_connect(proactor_, pnc, &caddr[0]);
-    return conn;
+    make_wrapper(pnc).open(opts);
+    return pnc;                 // 1 refcount from pn_connection()
 }
 
-returned<proton::connection> container::impl::connect(
+void container::impl::start_connection(const url& url, pn_connection_t *pnc) {
+    char caddr[PN_MAX_ADDR];
+    pn_proactor_addr(caddr, sizeof(caddr), url.host().c_str(), url.port().c_str());
+    pn_proactor_connect(proactor_, pnc, caddr); // Takes ownership of pnc
+}
+
+returned<connection> container::impl::connect(
     const std::string& addr,
     const proton::connection_options& user_opts)
 {
-    connection conn = connect_common(addr, user_opts);
     GUARD(lock_);
-    return make_returned(conn);
+    proton::url url(addr);
+    pn_connection_t *pnc = make_connection_lh(url, user_opts);
+    start_connection(url, pnc);
+    return make_returned<proton::connection>(pnc);
 }
 
-returned<sender> container::impl::open_sender(const std::string &url, const proton::sender_options &o1, const connection_options &o2) {
-    proton::sender_options lopts(sender_options_);
-    lopts.update(o1);
-    connection conn = connect_common(url, o2);
-
-    GUARD(lock_);
-    return make_returned(conn.default_session().open_sender(proton::url(url).path(), lopts));
+returned<sender> container::impl::open_sender(const std::string &urlstr, const proton::sender_options &o1, const connection_options &o2)
+{
+    proton::url url(urlstr);
+    pn_link_t* pnl = 0;
+    pn_connection_t* pnc = 0;
+    {
+        GUARD(lock_);
+        proton::sender_options lopts(sender_options_);
+        lopts.update(o1);
+        pnc = make_connection_lh(url, o2);
+        connection conn(make_wrapper(pnc));
+        pnl = unwrap(conn.default_session().open_sender(url.path(), lopts));
+    }                                   // There must be no refcounting after here
+    start_connection(url, pnc);         // Takes ownership of pnc
+    return make_returned<sender>(pnl);  // Unsafe returned pointer
 }
 
-returned<receiver> container::impl::open_receiver(const std::string &url, const proton::receiver_options &o1, const connection_options &o2) {
-    proton::receiver_options lopts(receiver_options_);
-    lopts.update(o1);
-    connection conn = connect_common(url, o2);
-
-    GUARD(lock_);
-    return make_returned(
-        conn.default_session().open_receiver(proton::url(url).path(), lopts));
+returned<receiver> container::impl::open_receiver(const std::string &urlstr, const proton::receiver_options &o1, const connection_options &o2) {
+    proton::url url(urlstr);
+    pn_link_t* pnl = 0;
+    pn_connection_t* pnc = 0;
+    {
+        GUARD(lock_);
+        proton::receiver_options lopts(receiver_options_);
+        lopts.update(o1);
+        pnc = make_connection_lh(url, o2);
+        connection conn(make_wrapper(pnc));
+        pnl = unwrap(conn.default_session().open_receiver(url.path(), lopts));
+    }                                   // There must be no refcounting after here
+    start_connection(url, pnc);
+    return make_returned<receiver>(pnl);
 }
 
 pn_listener_t* container::impl::listen_common_lh(const std::string& addr) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/src/returned.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/returned.cpp b/proton-c/bindings/cpp/src/returned.cpp
index a6c6101..cd8c1cb 100644
--- a/proton-c/bindings/cpp/src/returned.cpp
+++ b/proton-c/bindings/cpp/src/returned.cpp
@@ -28,14 +28,14 @@
 
 namespace proton {
 
-template <class T> PN_CPP_EXTERN returned<T>::returned(const T& t) : ptr_(unwrap(t)) {}
-
-//template <class T> PN_CPP_EXTERN returned<T>::returned(const returned<T>& x) : ptr_(x.ptr_) {}
+template <class T> PN_CPP_EXTERN returned<T>::returned(const returned<T>& x) : ptr_(x.ptr_) {}
 
 template <class T> PN_CPP_EXTERN returned<T>::operator T() const {
     return internal::factory<T>::wrap(ptr_);
 }
 
+template <class T> returned<T>::returned(typename T::pn_type* p) : ptr_(p) {}
+
 // Explicit instantiations for allowed types
 
 template class PN_CPP_CLASS_EXTERN returned<connection>;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/tools/py/proctest.py
----------------------------------------------------------------------
diff --git a/tools/py/proctest.py b/tools/py/proctest.py
index 047dc44..ebd7db2 100644
--- a/tools/py/proctest.py
+++ b/tools/py/proctest.py
@@ -95,6 +95,8 @@ class Proc(Popen):
         self.kwargs = kwargs
         self._out = tempfile.TemporaryFile()
         try:
+            if (os.getenv("PROCTEST_VERBOSE")):
+                sys.stderr.write("\nstart proc: %s\n" % self.args)
             Popen.__init__(self, self.args, stdout=self._out, stderr=STDOUT, **kwargs)
         except OSError, e:
             if e.errno == errno.ENOENT:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org