You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/01 15:04:04 UTC
[36/50] qpid-proton git commit: PROTON-1557: c++ fix race conditions
in proactor_container_impl
PROTON-1557: c++ fix race conditions in proactor_container_impl
Fixed:
- using of option member variables outside of guard.
- reference couting of pn_ objects outside of handler after pn_proactor_connect
- creating returned<> wrappers from raw pointers.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/38b27b50
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/38b27b50
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/38b27b50
Branch: refs/heads/go1
Commit: 38b27b5035658454be9ddc2bae80c589576960f2
Parents: bd10259
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Aug 29 18:08:01 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Aug 29 18:29:45 2017 -0400
----------------------------------------------------------------------
examples/cpp/multithreaded_client.cpp | 30 ++++----
.../cpp/multithreaded_client_flow_control.cpp | 35 +++++----
.../bindings/cpp/include/proton/returned.hpp | 15 ++--
.../cpp/src/include/proactor_container_impl.hpp | 3 +-
.../bindings/cpp/src/include/proton_bits.hpp | 15 ++--
.../cpp/src/proactor_container_impl.cpp | 75 ++++++++++++--------
proton-c/bindings/cpp/src/returned.cpp | 6 +-
tools/py/proctest.py | 2 +
8 files changed, 110 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/examples/cpp/multithreaded_client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/multithreaded_client.cpp b/examples/cpp/multithreaded_client.cpp
index 955655c..4119bbf 100644
--- a/examples/cpp/multithreaded_client.cpp
+++ b/examples/cpp/multithreaded_client.cpp
@@ -47,6 +47,10 @@
#include <string>
#include <thread>
+// Lock output from threads to avoid scramblin
+std::mutex out_lock;
+#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
+
// Handler for a single thread-safe sending and receiving connection.
class client : public proton::messaging_handler {
// Invariant
@@ -64,7 +68,7 @@ class client : public proton::messaging_handler {
std::condition_variable messages_ready_;
public:
- client(const std::string& url, const std::string& address) : url_(url), address_(address) {}
+ client(const std::string& url, const std::string& address) : url_(url), address_(address), work_queue_(0) {}
// Thread safe
void send(const proton::message& msg) {
@@ -112,25 +116,21 @@ class client : public proton::messaging_handler {
}
void on_sender_open(proton::sender& s) override {
- {
- // sender_ and work_queue_ must be set atomically
- std::lock_guard<std::mutex> l(lock_);
- sender_ = s;
- work_queue_ = &s.work_queue();
- }
+ // sender_ and work_queue_ must be set atomically
+ std::lock_guard<std::mutex> l(lock_);
+ sender_ = s;
+ work_queue_ = &s.work_queue();
sender_ready_.notify_all();
}
void on_message(proton::delivery& dlv, proton::message& msg) override {
- {
- std::lock_guard<std::mutex> l(lock_);
- messages_.push(msg);
- }
+ std::lock_guard<std::mutex> l(lock_);
+ messages_.push(msg);
messages_ready_.notify_all();
}
void on_error(const proton::error_condition& e) override {
- std::cerr << "unexpected error: " << e << std::endl;
+ OUT(std::cerr << "unexpected error: " << e << std::endl);
exit(1);
}
};
@@ -157,7 +157,7 @@ int main(int argc, const char** argv) {
for (int i = 0; i < n_messages; ++i) {
proton::message msg(std::to_string(i + 1));
cl.send(msg);
- std::cout << "sent: " << msg.body() << std::endl;
+ OUT(std::cout << "sent: " << msg.body() << std::endl);
}
});
@@ -165,7 +165,7 @@ int main(int argc, const char** argv) {
std::thread receiver([&]() {
for (int i = 0; i < n_messages; ++i) {
auto msg = cl.receive();
- std::cout << "received: " << msg.body() << std::endl;
+ OUT(std::cout << " received: " << msg.body() << std::endl);
++received;
}
});
@@ -174,7 +174,7 @@ int main(int argc, const char** argv) {
receiver.join();
cl.close();
container_thread.join();
- std::cout << "received " << received << " messages" << std::endl;
+ std::cout << received << " messages sent and received" << std::endl;
return 0;
} catch (const std::exception& e) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/examples/cpp/multithreaded_client_flow_control.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/multithreaded_client_flow_control.cpp b/examples/cpp/multithreaded_client_flow_control.cpp
index 9eec782..8764793 100644
--- a/examples/cpp/multithreaded_client_flow_control.cpp
+++ b/examples/cpp/multithreaded_client_flow_control.cpp
@@ -56,6 +56,10 @@
#include <string>
#include <thread>
+// Lock output from threads to avoid scramblin
+std::mutex out_lock;
+#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
+
// A thread-safe sending connection that blocks sending threads when there
// is no AMQP credit to send messages.
class sender : private proton::messaging_handler {
@@ -127,7 +131,7 @@ class sender : private proton::messaging_handler {
}
void on_error(const proton::error_condition& e) override {
- std::cerr << "unexpected error: " << e << std::endl;
+ OUT(std::cerr << "unexpected error: " << e << std::endl);
exit(1);
}
};
@@ -202,7 +206,7 @@ class receiver : private proton::messaging_handler {
}
void on_error(const proton::error_condition& e) override {
- std::cerr << "unexpected error: " << e << std::endl;
+ OUT(std::cerr << "unexpected error: " << e << std::endl);
exit(1);
}
};
@@ -210,28 +214,31 @@ class receiver : private proton::messaging_handler {
// ==== Example code using the sender and receiver
// Send n messages
-void send_thread(sender& s, int n, bool print) {
+void send_thread(sender& s, int n) {
auto id = std::this_thread::get_id();
for (int i = 0; i < n; ++i) {
std::ostringstream ss;
ss << std::this_thread::get_id() << ":" << i;
s.send(proton::message(ss.str()));
- if (print) std::cout << "received: " << ss.str() << std::endl;
+ OUT(std::cout << id << " received: " << ss.str() << std::endl);
}
- std::cout << id << " sent " << n << std::endl;
+ OUT(std::cout << id << " sent " << n << std::endl);
}
// Receive messages till atomic remaining count is 0.
// remaining is shared among all receiving threads
-void receive_thread(receiver& r, std::atomic_int& remaining, bool print) {
+void receive_thread(receiver& r, std::atomic_int& remaining) {
auto id = std::this_thread::get_id();
int n = 0;
+ // atomically check and decrement remaining *before* receiving.
+ // If it is 0 or less then return, as there are no more
+ // messages to receive so calling r.receive() would block forever.
while (remaining-- > 0) {
auto m = r.receive();
++n;
- if (print) std::cout << id << "received: " << m.body() << std::endl;
+ OUT(std::cout << id << " received: " << m.body() << std::endl);
}
- std::cout << id << " received " << n << " messages" << std::endl;
+ OUT(std::cout << id << " received " << n << " messages" << std::endl);
}
int main(int argc, const char **argv) {
@@ -250,10 +257,10 @@ int main(int argc, const char **argv) {
const char *address = argv[2];
int n_messages = atoi(argv[3]);
int n_threads = atoi(argv[4]);
+ int count = n_messages * n_threads;
// Total messages to be received, multiple receiver threads will decrement this.
- std::atomic_int remaining(n_messages * n_threads);
- bool print = remaining < 1000; // Don't print for long runs, dominates run time
+ std::atomic_int remaining(count);
// Run the proton container
proton::container container;
@@ -267,17 +274,19 @@ int main(int argc, const char **argv) {
// Starting receivers first gives all receivers a chance to compete for messages.
std::vector<std::thread> threads;
for (int i = 0; i < n_threads; ++i)
- threads.push_back(std::thread([&]() { receive_thread(recv, remaining, print); }));
+ threads.push_back(std::thread([&]() { receive_thread(recv, remaining); }));
for (int i = 0; i < n_threads; ++i)
- threads.push_back(std::thread([&]() { send_thread(send, n_messages, print); }));
+ threads.push_back(std::thread([&]() { send_thread(send, n_messages); }));
// Wait for threads to finish
for (auto& t : threads)
t.join();
send.close();
recv.close();
-
container_thread.join();
+ if (remaining > 0)
+ throw std::runtime_error("not all messages were received");
+ std::cout << count << " messages sent and received" << std::endl;
return 0;
} catch (const std::exception& e) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/include/proton/returned.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/returned.hpp b/proton-c/bindings/cpp/include/proton/returned.hpp
index 28c45a0..45de4c4 100644
--- a/proton-c/bindings/cpp/include/proton/returned.hpp
+++ b/proton-c/bindings/cpp/include/proton/returned.hpp
@@ -36,7 +36,7 @@
namespace proton {
namespace internal {
-template <class T> class factory;
+class returned_factory;
}
/// Return type for container functions
@@ -51,13 +51,20 @@ template <class T>
class PN_CPP_CLASS_EXTERN returned
{
public:
- PN_CPP_EXTERN returned(const T&);
+ /// Copy operator required to return a value
+ /// @note thread safe
+ PN_CPP_EXTERN returned(const returned<T>&);
+
+ /// Convert to the proton::object
+ ///
+ /// @note **Thread unsafe** do not use in a multi-threaded application.
PN_CPP_EXTERN operator T() const;
private:
typename T::pn_type* ptr_;
- returned& operator=(const returned&);
- template <class U> friend class internal::factory;
+ returned(typename T::pn_type*);
+ returned& operator=(const returned&); // Not defined
+ friend class internal::returned_factory;
};
} // proton
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
index ac54156..0aa62a5 100644
--- a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
+++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
@@ -98,7 +98,8 @@ class container::impl {
class connection_work_queue;
class container_work_queue;
pn_listener_t* listen_common_lh(const std::string&);
- connection connect_common(const std::string&, const connection_options&);
+ pn_connection_t* make_connection_lh(const url& url, const connection_options&);
+ void start_connection(const url& url, pn_connection_t* c);
// Event loop to run in each container thread
void thread();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/src/include/proton_bits.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proton_bits.hpp b/proton-c/bindings/cpp/src/include/proton_bits.hpp
index 035ffb7..675249f 100644
--- a/proton-c/bindings/cpp/src/include/proton_bits.hpp
+++ b/proton-c/bindings/cpp/src/include/proton_bits.hpp
@@ -124,7 +124,6 @@ class factory {
public:
static T wrap(typename wrapped<T>::type* t) { return t; }
static typename wrapped<T>::type* unwrap(const T& t) { return t.pn_object(); }
- static returned<T> make_returned(const T& t) { return returned<T>(t); }
};
template <class T> struct context {};
@@ -140,7 +139,14 @@ inline void set_messaging_handler(T t, messaging_handler* mh) { context<T>::type
template <class T>
inline messaging_handler* get_messaging_handler(T* t) { return context<typename internal::wrapper<T>::type>::type::get(t).handler; }
-}
+class returned_factory {
+ public:
+ template <class T> static returned<T> make(typename internal::wrapped<T>::type* pn) {
+ return returned<T>(pn);
+ }
+};
+
+} // namespace internal
template <class T>
typename internal::wrapper<T>::type make_wrapper(T* t) { return internal::factory<typename internal::wrapper<T>::type>::wrap(t); }
@@ -151,9 +157,8 @@ U make_wrapper(typename internal::wrapped<U>::type* t) { return internal::factor
template <class T>
typename internal::wrapped<T>::type* unwrap(const T& t) { return internal::factory<T>::unwrap(t); }
-template <class T>
-returned<T> make_returned(const T& t) {
- return internal::factory<T>::make_returned(t);
+template <class T> returned<T> make_returned(typename internal::wrapped<T>::type* pn) {
+ return internal::returned_factory::make<T>(pn);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 1389306..9870210 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -152,9 +152,9 @@ void container::impl::remove_work_queue(container::impl::container_work_queue* l
work_queues_.erase(l);
}
-proton::connection container::impl::connect_common(
- const std::string& addr,
- const proton::connection_options& user_opts)
+pn_connection_t* container::impl::make_connection_lh(
+ const url& url,
+ const connection_options& user_opts)
{
if (stopping_)
throw proton::error("container is stopping");
@@ -163,7 +163,6 @@ proton::connection container::impl::connect_common(
opts.update(user_opts);
messaging_handler* mh = opts.handler();
- proton::url url(addr);
pn_connection_t *pnc = pn_connection();
connection_context& cc(connection_context::get(pnc));
cc.container = &container_;
@@ -177,42 +176,58 @@ proton::connection container::impl::connect_common(
if (!url.password().empty())
pn_connection_set_password(pnc, url.password().c_str());
- connection conn = make_wrapper(pnc);
- conn.open(opts);
- // Figure out correct string len then create connection address
- int len = pn_proactor_addr(0, 0, url.host().c_str(), url.port().c_str());
- std::vector<char> caddr(len+1);
- pn_proactor_addr(&caddr[0], len+1, url.host().c_str(), url.port().c_str());
- pn_proactor_connect(proactor_, pnc, &caddr[0]);
- return conn;
+ make_wrapper(pnc).open(opts);
+ return pnc; // 1 refcount from pn_connection()
}
-returned<proton::connection> container::impl::connect(
+void container::impl::start_connection(const url& url, pn_connection_t *pnc) {
+ char caddr[PN_MAX_ADDR];
+ pn_proactor_addr(caddr, sizeof(caddr), url.host().c_str(), url.port().c_str());
+ pn_proactor_connect(proactor_, pnc, caddr); // Takes ownership of pnc
+}
+
+returned<connection> container::impl::connect(
const std::string& addr,
const proton::connection_options& user_opts)
{
- connection conn = connect_common(addr, user_opts);
GUARD(lock_);
- return make_returned(conn);
+ proton::url url(addr);
+ pn_connection_t *pnc = make_connection_lh(url, user_opts);
+ start_connection(url, pnc);
+ return make_returned<proton::connection>(pnc);
}
-returned<sender> container::impl::open_sender(const std::string &url, const proton::sender_options &o1, const connection_options &o2) {
- proton::sender_options lopts(sender_options_);
- lopts.update(o1);
- connection conn = connect_common(url, o2);
-
- GUARD(lock_);
- return make_returned(conn.default_session().open_sender(proton::url(url).path(), lopts));
+returned<sender> container::impl::open_sender(const std::string &urlstr, const proton::sender_options &o1, const connection_options &o2)
+{
+ proton::url url(urlstr);
+ pn_link_t* pnl = 0;
+ pn_connection_t* pnc = 0;
+ {
+ GUARD(lock_);
+ proton::sender_options lopts(sender_options_);
+ lopts.update(o1);
+ pnc = make_connection_lh(url, o2);
+ connection conn(make_wrapper(pnc));
+ pnl = unwrap(conn.default_session().open_sender(url.path(), lopts));
+ } // There must be no refcounting after here
+ start_connection(url, pnc); // Takes ownership of pnc
+ return make_returned<sender>(pnl); // Unsafe returned pointer
}
-returned<receiver> container::impl::open_receiver(const std::string &url, const proton::receiver_options &o1, const connection_options &o2) {
- proton::receiver_options lopts(receiver_options_);
- lopts.update(o1);
- connection conn = connect_common(url, o2);
-
- GUARD(lock_);
- return make_returned(
- conn.default_session().open_receiver(proton::url(url).path(), lopts));
+returned<receiver> container::impl::open_receiver(const std::string &urlstr, const proton::receiver_options &o1, const connection_options &o2) {
+ proton::url url(urlstr);
+ pn_link_t* pnl = 0;
+ pn_connection_t* pnc = 0;
+ {
+ GUARD(lock_);
+ proton::receiver_options lopts(receiver_options_);
+ lopts.update(o1);
+ pnc = make_connection_lh(url, o2);
+ connection conn(make_wrapper(pnc));
+ pnl = unwrap(conn.default_session().open_receiver(url.path(), lopts));
+ } // There must be no refcounting after here
+ start_connection(url, pnc);
+ return make_returned<receiver>(pnl);
}
pn_listener_t* container::impl::listen_common_lh(const std::string& addr) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/src/returned.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/returned.cpp b/proton-c/bindings/cpp/src/returned.cpp
index a6c6101..cd8c1cb 100644
--- a/proton-c/bindings/cpp/src/returned.cpp
+++ b/proton-c/bindings/cpp/src/returned.cpp
@@ -28,14 +28,14 @@
namespace proton {
-template <class T> PN_CPP_EXTERN returned<T>::returned(const T& t) : ptr_(unwrap(t)) {}
-
-//template <class T> PN_CPP_EXTERN returned<T>::returned(const returned<T>& x) : ptr_(x.ptr_) {}
+template <class T> PN_CPP_EXTERN returned<T>::returned(const returned<T>& x) : ptr_(x.ptr_) {}
template <class T> PN_CPP_EXTERN returned<T>::operator T() const {
return internal::factory<T>::wrap(ptr_);
}
+template <class T> returned<T>::returned(typename T::pn_type* p) : ptr_(p) {}
+
// Explicit instantiations for allowed types
template class PN_CPP_CLASS_EXTERN returned<connection>;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/tools/py/proctest.py
----------------------------------------------------------------------
diff --git a/tools/py/proctest.py b/tools/py/proctest.py
index 047dc44..ebd7db2 100644
--- a/tools/py/proctest.py
+++ b/tools/py/proctest.py
@@ -95,6 +95,8 @@ class Proc(Popen):
self.kwargs = kwargs
self._out = tempfile.TemporaryFile()
try:
+ if (os.getenv("PROCTEST_VERBOSE")):
+ sys.stderr.write("\nstart proc: %s\n" % self.args)
Popen.__init__(self, self.args, stdout=self._out, stderr=STDOUT, **kwargs)
except OSError, e:
if e.errno == errno.ENOENT:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org