You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/21 12:32:00 UTC

[pulsar] branch branch-2.6 updated: [C++] Fix race condition in BlockingQueue (#8765)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new c970b3a  [C++] Fix race condition in BlockingQueue (#8765)
c970b3a is described below

commit c970b3ac22aa10399e65858e17655fc895143b35
Author: TT <er...@users.noreply.github.com>
AuthorDate: Tue Dec 8 17:42:57 2020 +0800

    [C++] Fix race condition in BlockingQueue (#8765)
    
    ### Motivation
    
    BlockingQueue has race condition that can cause threads waiting forever in multithreading environment. ProducerImpl uses BlockingQueue as pendingMessagesQueue_ and can be blocked forever at it. This PR fixes race condition in BlockingQueue.
    
    #### Race condition details
    https://github.com/apache/pulsar/blob/91e2f832178d9ffd5d78161145d895910296c2d9/pulsar-client-cpp/lib/BlockingQueue.h#L172-L185
    Use BlockingQueue::Pop as example, its procedure is:
    1. lock
    2. check wasFull and then change queue state
    3. unlock
    4. if wasFull, notify one thread waiting at queueFullCondition
    
    Race condition sequence:
    1. queue is full and there are multiple threads waiting on queueFullCondition
    2. thread A call Pop, lock, wasFull is true, unlock -> queue has one free space
    3. thread B call Pop, lock, wasFull is false, unlock -> queue has two free spaces
    4. thread A notify one thread waiting at queueFullCondition
    5. queue is no loger full again
    6. result: except one thread is notified by A, other threads waiting on queueFullCondition are waiting forever
    
    ### Modifications
    
    * Use notify_all instead of notify_one to notify threads waiting on condition variables
      Reason: Currently only notify threads when queue is full or empty. After unlock, other threads may change queue state, so thread to notify condition can not determine how queue state changed and should use notify_all in case of more then one  change occured.
    
    ### Verifying this change
    
      - Add a test case BlockingQueueTest.testPushPopRace to test concurrent push and pop
    
    (cherry picked from commit 18b38766fbcf3d5944824e828566edca310ee9d8)
---
 pulsar-client-cpp/lib/BlockingQueue.h        | 12 ++++-----
 pulsar-client-cpp/tests/BlockingQueueTest.cc | 39 ++++++++++++++++++++++++++++
 2 files changed, 45 insertions(+), 6 deletions(-)

diff --git a/pulsar-client-cpp/lib/BlockingQueue.h b/pulsar-client-cpp/lib/BlockingQueue.h
index 5e466bd..2814c4f 100644
--- a/pulsar-client-cpp/lib/BlockingQueue.h
+++ b/pulsar-client-cpp/lib/BlockingQueue.h
@@ -126,7 +126,7 @@ class BlockingQueue {
         lock.unlock();
         if (wasEmpty) {
             // Notify that an element is pushed
-            queueEmptyCondition.notify_one();
+            queueEmptyCondition.notify_all();
         }
     }
 
@@ -145,7 +145,7 @@ class BlockingQueue {
 
         if (wasEmpty) {
             // Notify that an element is pushed
-            queueEmptyCondition.notify_one();
+            queueEmptyCondition.notify_all();
         }
     }
 
@@ -163,7 +163,7 @@ class BlockingQueue {
 
         if (wasEmpty) {
             // Notify that an element is pushed
-            queueEmptyCondition.notify_one();
+            queueEmptyCondition.notify_all();
         }
 
         return true;
@@ -180,7 +180,7 @@ class BlockingQueue {
 
         if (wasFull) {
             // Notify that an element is popped
-            queueFullCondition.notify_one();
+            queueFullCondition.notify_all();
         }
     }
 
@@ -196,7 +196,7 @@ class BlockingQueue {
 
         if (wasFull) {
             // Notify that an element is popped
-            queueFullCondition.notify_one();
+            queueFullCondition.notify_all();
         }
     }
 
@@ -274,7 +274,7 @@ class BlockingQueue {
 
         if (wasFull) {
             // Notify that one spot is now available
-            queueFullCondition.notify_one();
+            queueFullCondition.notify_all();
         }
     }
 
diff --git a/pulsar-client-cpp/tests/BlockingQueueTest.cc b/pulsar-client-cpp/tests/BlockingQueueTest.cc
index 4047e5e..42644e9 100644
--- a/pulsar-client-cpp/tests/BlockingQueueTest.cc
+++ b/pulsar-client-cpp/tests/BlockingQueueTest.cc
@@ -19,6 +19,8 @@
 #include <gtest/gtest.h>
 #include <lib/BlockingQueue.h>
 
+#include <future>
+#include <iostream>
 #include <thread>
 
 class ProducerWorker {
@@ -215,3 +217,40 @@ TEST(BlockingQueueTest, testReservedSpot) {
         ASSERT_EQ(0, queue.size());
     }
 }
+
+TEST(BlockingQueueTest, testPushPopRace) {
+    auto test_logic = []() {
+        size_t size = 5;
+        BlockingQueue<int> queue(size);
+
+        std::vector<std::unique_ptr<ProducerWorker>> producers;
+        for (int i = 0; i < 5; ++i) {
+            producers.emplace_back(new ProducerWorker{queue});
+            producers.back()->produce(1000);
+        }
+
+        // wait for queue full
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+        std::vector<std::unique_ptr<ConsumerWorker>> consumers;
+        for (int i = 0; i < 5; ++i) {
+            consumers.emplace_back(new ConsumerWorker{queue});
+            consumers.back()->consume(1000);
+        }
+
+        auto future = std::async(std::launch::async, [&]() {
+            for (auto& p : producers) p->join();
+            for (auto& c : consumers) c->join();
+        });
+        auto ret = future.wait_for(std::chrono::seconds(5));
+        if (ret == std::future_status::ready) {
+            std::cerr << "Exiting";
+            exit(0);
+        } else {
+            std::cerr << "Threads are not exited in time";
+            exit(1);
+        }
+    };
+
+    ASSERT_EXIT(test_logic(), ::testing::ExitedWithCode(0), "Exiting");
+}