You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/09/28 18:07:56 UTC

qpid-proton git commit: NO-JIRA: [cpp] fix example race condition, causing occasional hang

Repository: qpid-proton
Updated Branches:
  refs/heads/master 5960f15df -> 1c44c431e


NO-JIRA: [cpp] fix example race condition, causing occasional hang


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1c44c431
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1c44c431
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1c44c431

Branch: refs/heads/master
Commit: 1c44c431ef2acc1b37e88a0891c8e8c03bd30eb5
Parents: 5960f15
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Sep 28 11:11:41 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Sep 28 11:12:42 2018 -0400

----------------------------------------------------------------------
 .../multithreaded_client_flow_control.cpp       | 48 ++++++++++++++------
 1 file changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c44c431/cpp/examples/multithreaded_client_flow_control.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/multithreaded_client_flow_control.cpp b/cpp/examples/multithreaded_client_flow_control.cpp
index 93c6b3d..73ea6f6 100644
--- a/cpp/examples/multithreaded_client_flow_control.cpp
+++ b/cpp/examples/multithreaded_client_flow_control.cpp
@@ -61,6 +61,12 @@
 std::mutex out_lock;
 #define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
 
+// Exception raised if a sender or receiver is closed when trying to send/receive
+class closed : public std::runtime_error {
+  public:
+    closed(const std::string& msg) : std::runtime_error(msg) {}
+};
+
 // A thread-safe sending connection that blocks sending threads when there
 // is no AMQP credit to send messages.
 class sender : private proton::messaging_handler {
@@ -151,12 +157,13 @@ class receiver : private proton::messaging_handler {
     proton::work_queue* work_queue_;
     std::queue<proton::message> buffer_; // Messages not yet returned by receive()
     std::condition_variable can_receive_; // Notify receivers of messages
+    bool closed_;
 
   public:
 
     // Connect to url
     receiver(proton::container& cont, const std::string& url, const std::string& address)
-        : work_queue_()
+        : work_queue_(0), closed_(false)
     {
         // NOTE:credit_window(0) disables automatic flow control.
         // We will use flow control to match AMQP credit to buffer capacity.
@@ -168,8 +175,10 @@ class receiver : private proton::messaging_handler {
     proton::message receive() {
         std::unique_lock<std::mutex> l(lock_);
         // Wait for buffered messages
-        while (!work_queue_ || buffer_.empty())
+        while (!closed_ && (!work_queue_ || buffer_.empty())) {
             can_receive_.wait(l);
+        }
+        if (closed_) throw closed("receiver closed");
         proton::message m = std::move(buffer_.front());
         buffer_.pop();
         // Add a lambda to the work queue to call receive_done().
@@ -178,9 +187,16 @@ class receiver : private proton::messaging_handler {
         return m;
     }
 
+    // Thread safe
     void close() {
         std::lock_guard<std::mutex> l(lock_);
-        if (work_queue_) work_queue_->add([this]() { this->receiver_.connection().close(); });
+        if (!closed_) {
+            closed_ = true;
+            can_receive_.notify_all();
+            if (work_queue_) {
+                work_queue_->add([this]() { this->receiver_.connection().close(); });
+            }
+        }
     }
 
   private:
@@ -229,24 +245,26 @@ void send_thread(sender& s, int n) {
 // Receive messages till atomic remaining count is 0.
 // remaining is shared among all receiving threads
 void receive_thread(receiver& r, std::atomic_int& remaining) {
-    auto id = std::this_thread::get_id();
-    int n = 0;
-    // atomically check and decrement remaining *before* receiving.
-    // If it is 0 or less then return, as there are no more
-    // messages to receive so calling r.receive() would block forever.
-    while (remaining-- > 0) {
-        auto m = r.receive();
-        ++n;
-        OUT(std::cout << id << " received \"" << m.body() << '"' << std::endl);
-    }
-    OUT(std::cout << id << " received " << n << " messages" << std::endl);
+    try {
+        auto id = std::this_thread::get_id();
+        int n = 0;
+        // atomically check and decrement remaining *before* receiving.
+        // If it is 0 or less then return, as there are no more
+        // messages to receive so calling r.receive() would block forever.
+        while (remaining-- > 0) {
+            auto m = r.receive();
+            ++n;
+            OUT(std::cout << id << " received \"" << m.body() << '"' << std::endl);
+        }
+        OUT(std::cout << id << " received " << n << " messages" << std::endl);
+    } catch (const closed&) {}
 }
 
 int main(int argc, const char **argv) {
     try {
         if (argc != 5) {
             std::cerr <<
-                "Usage: " << argv[0] << " MESSAGE-COUNT THREAD-COUNT URL\n"
+                "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT THREAD-COUNT\n"
                 "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
                 "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
                 "MESSAGE-COUNT: number of messages to send\n"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org