You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/11/02 20:46:56 UTC
[36/50] qpid-proton git commit: NO-JIRA: [cpp] fix example race
condition, causing occasional hang
NO-JIRA: [cpp] fix example race condition, causing occasional hang
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1c44c431
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1c44c431
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1c44c431
Branch: refs/heads/go1
Commit: 1c44c431ef2acc1b37e88a0891c8e8c03bd30eb5
Parents: 5960f15
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Sep 28 11:11:41 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Sep 28 11:12:42 2018 -0400
----------------------------------------------------------------------
.../multithreaded_client_flow_control.cpp | 48 ++++++++++++++------
1 file changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c44c431/cpp/examples/multithreaded_client_flow_control.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/multithreaded_client_flow_control.cpp b/cpp/examples/multithreaded_client_flow_control.cpp
index 93c6b3d..73ea6f6 100644
--- a/cpp/examples/multithreaded_client_flow_control.cpp
+++ b/cpp/examples/multithreaded_client_flow_control.cpp
@@ -61,6 +61,12 @@
std::mutex out_lock;
#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
+// Exception raised if a sender or receiver is closed when trying to send/receive
+class closed : public std::runtime_error {
+ public:
+ closed(const std::string& msg) : std::runtime_error(msg) {}
+};
+
// A thread-safe sending connection that blocks sending threads when there
// is no AMQP credit to send messages.
class sender : private proton::messaging_handler {
@@ -151,12 +157,13 @@ class receiver : private proton::messaging_handler {
proton::work_queue* work_queue_;
std::queue<proton::message> buffer_; // Messages not yet returned by receive()
std::condition_variable can_receive_; // Notify receivers of messages
+ bool closed_;
public:
// Connect to url
receiver(proton::container& cont, const std::string& url, const std::string& address)
- : work_queue_()
+ : work_queue_(0), closed_(false)
{
// NOTE:credit_window(0) disables automatic flow control.
// We will use flow control to match AMQP credit to buffer capacity.
@@ -168,8 +175,10 @@ class receiver : private proton::messaging_handler {
proton::message receive() {
std::unique_lock<std::mutex> l(lock_);
// Wait for buffered messages
- while (!work_queue_ || buffer_.empty())
+ while (!closed_ && (!work_queue_ || buffer_.empty())) {
can_receive_.wait(l);
+ }
+ if (closed_) throw closed("receiver closed");
proton::message m = std::move(buffer_.front());
buffer_.pop();
// Add a lambda to the work queue to call receive_done().
@@ -178,9 +187,16 @@ class receiver : private proton::messaging_handler {
return m;
}
+ // Thread safe
void close() {
std::lock_guard<std::mutex> l(lock_);
- if (work_queue_) work_queue_->add([this]() { this->receiver_.connection().close(); });
+ if (!closed_) {
+ closed_ = true;
+ can_receive_.notify_all();
+ if (work_queue_) {
+ work_queue_->add([this]() { this->receiver_.connection().close(); });
+ }
+ }
}
private:
@@ -229,24 +245,26 @@ void send_thread(sender& s, int n) {
// Receive messages till atomic remaining count is 0.
// remaining is shared among all receiving threads
void receive_thread(receiver& r, std::atomic_int& remaining) {
- auto id = std::this_thread::get_id();
- int n = 0;
- // atomically check and decrement remaining *before* receiving.
- // If it is 0 or less then return, as there are no more
- // messages to receive so calling r.receive() would block forever.
- while (remaining-- > 0) {
- auto m = r.receive();
- ++n;
- OUT(std::cout << id << " received \"" << m.body() << '"' << std::endl);
- }
- OUT(std::cout << id << " received " << n << " messages" << std::endl);
+ try {
+ auto id = std::this_thread::get_id();
+ int n = 0;
+ // atomically check and decrement remaining *before* receiving.
+ // If it is 0 or less then return, as there are no more
+ // messages to receive so calling r.receive() would block forever.
+ while (remaining-- > 0) {
+ auto m = r.receive();
+ ++n;
+ OUT(std::cout << id << " received \"" << m.body() << '"' << std::endl);
+ }
+ OUT(std::cout << id << " received " << n << " messages" << std::endl);
+ } catch (const closed&) {}
}
int main(int argc, const char **argv) {
try {
if (argc != 5) {
std::cerr <<
- "Usage: " << argv[0] << " MESSAGE-COUNT THREAD-COUNT URL\n"
+ "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT THREAD-COUNT\n"
"CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
"AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
"MESSAGE-COUNT: number of messages to send\n"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org