You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2023/10/30 23:05:06 UTC

(pulsar-client-cpp) branch main updated: Fix possible deadlock of Future when adding a listener after completed (#334)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 77e2d63  Fix possible deadlock of Future when adding a listener after completed (#334)
77e2d63 is described below

commit 77e2d63c5cf7ad9a8f3153359a9c318db4668226
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Oct 31 07:05:01 2023 +0800

    Fix possible deadlock of Future when adding a listener after completed (#334)
    
    * Fix possible deadlock of Future when adding a listener after completed
    
    ### Motivation
    
    There is a case that deadlock could happen for a `Future`. Assume there
    is a `Promise` and its `Future`.
    
    1. Call `Future::addListener` to add a listener that tries to acquire a
       user-provided mutex (`lock`).
    2. Thread 1: Acquire `lock` first.
    3. Thread 2: Call `Promise::setValue`, the listener will be triggered
       first before completed. Since `lock` is held by Thread 1, the
       listener will be blocked.
    4. Thread 1: Call `Future::addListener`, since it detects the
       `InternalState::completed_` is true, it will call `get` to retrieve
       the result and value.
    
    Then, deadlock happens:
    - Thread 1 waits for `lock` is released, and then complete
      `InternalState::future_`.
    - Thread 2 holds `lock` but wait for `InternalState::future_` is
      completed.
    
    In a real world case, if we acquire a lock before
    `ProducerImpl::closeAsync`, then another thread call `setValue` in
    `ClientConnection::handleSuccess` and the callback of
    `createProducerAsync` tries to acquire the lock, `handleSuccess` will be
    blocked. Then in `closeAsync`, the current thread will be blocked in:
    
    ```c++
        cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId)
            .addListener([self, callback](Result result, const ResponseData&) { callback(result); });
    ```
    
    The stacks:
    
    ```
    Thread 1:
    #11 0x00007fab80da2173 in pulsar::InternalState<...>::complete (this=0x3d53e7a10, result=..., value=...) at lib/Futre.h:61
    #13 pulsar::ClientConnection::handleSuccess (this=this@entry=0x2214bc000, success=...) at lib/ClientConnection.cc:1552
    
    Thread 2:
    #8  get (result=..., this=0x3d53e7a10) at lib/Future.h:69
    #9  pulsar::InternalState<...>::addListener (this=this@entry=0x3d53e7a10, listener=...) at lib/Future.h:51
    #11 0x00007fab80e8dc4e in pulsar::ProducerImpl::closeAsync at lib/ProducerImpl.cc:794
    ```
    
    There are two points that make the deadlock:
    1. We use `completed_` to represent if the future is completed. However,
       after it's true, the future might not be completed because the value
       is not set and the listeners are not completed.
    2. If `addListener` is called after it's completed, we still push the
       listener to `listeners_` so that previous listeners could be executed
       before the new listener. This guarantee is unnecessarily strong.
    
    ### Modifications
    
    First, complete the future before calling the listeners.
    
    Then, use an enum to represent the status:
    - INITIAL: `complete` has not been called
    - COMPLETING: when the 1st time `complete` is called, the status will
      change from INITIAL to COMPLETING
    - COMPLETED: the future is completed.
    
    Besides, implementation of `Future` is simplified.
    https://github.com/apache/pulsar-client-cpp/pull/299 fixes a possible
    mutex crash by introducing the `std::future`. However, the root cause is
    the conditional variable is not used correctly:
    
    > Even if the shared variable is atomic, it must be modified while owning the mutex to correctly publish the modification to the waiting thread.
    
    See https://en.cppreference.com/w/cpp/thread/condition_variable
    
    The simplest way to fix
    https://github.com/apache/pulsar-client-cpp/pull/298 is just adding
    `lock.lock()` before `state->condition.notify_all();`.
    
    * Acquire lock again
    
    * Add initial value
---
 lib/ConsumerImpl.h   |  1 +
 lib/Future.h         | 94 +++++++++++++++++++++++++---------------------------
 lib/ProducerImpl.h   |  1 +
 tests/PromiseTest.cc | 53 ++++++++++++++++++-----------
 tests/WaitUtils.h    |  5 +--
 5 files changed, 84 insertions(+), 70 deletions(-)

diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index dd7163f..61d96b1 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -23,6 +23,7 @@
 
 #include <boost/optional.hpp>
 #include <functional>
+#include <list>
 #include <memory>
 #include <utility>
 
diff --git a/lib/Future.h b/lib/Future.h
index 5ee937e..290ebc6 100644
--- a/lib/Future.h
+++ b/lib/Future.h
@@ -20,14 +20,11 @@
 #define LIB_FUTURE_H_
 
 #include <atomic>
-#include <chrono>
+#include <condition_variable>
+#include <forward_list>
 #include <functional>
-#include <future>
-#include <list>
 #include <memory>
 #include <mutex>
-#include <thread>
-#include <utility>
 
 namespace pulsar {
 
@@ -38,71 +35,70 @@ class InternalState {
     using Pair = std::pair<Result, Type>;
     using Lock = std::unique_lock<std::mutex>;
 
+    enum Status : uint8_t
+    {
+        INITIAL,
+        COMPLETING,
+        COMPLETED
+    };
+
     // NOTE: Add the constructor explicitly just to be compatible with GCC 4.8
     InternalState() {}
 
     void addListener(Listener listener) {
         Lock lock{mutex_};
-        listeners_.emplace_back(listener);
-        lock.unlock();
-
         if (completed()) {
-            Type value;
-            Result result = get(value);
-            triggerListeners(result, value);
+            auto result = result_;
+            auto value = value_;
+            lock.unlock();
+            listener(result, value);
+        } else {
+            tailListener_ = listeners_.emplace_after(tailListener_, std::move(listener));
         }
     }
 
     bool complete(Result result, const Type &value) {
-        bool expected = false;
-        if (!completed_.compare_exchange_strong(expected, true)) {
+        Status expected = Status::INITIAL;
+        if (!status_.compare_exchange_strong(expected, Status::COMPLETING)) {
             return false;
         }
-        triggerListeners(result, value);
-        promise_.set_value(std::make_pair(result, value));
-        return true;
-    }
-
-    bool completed() const noexcept { return completed_; }
 
-    Result get(Type &result) {
-        const auto &pair = future_.get();
-        result = pair.second;
-        return pair.first;
-    }
+        // Ensure if another thread calls `addListener` at the same time, that thread can get the value by
+        // `get` before the existing listeners are executed
+        Lock lock{mutex_};
+        result_ = result;
+        value_ = value;
+        status_ = COMPLETED;
+        cond_.notify_all();
 
-    // Only public for test
-    void triggerListeners(Result result, const Type &value) {
-        while (true) {
-            Lock lock{mutex_};
-            if (listeners_.empty()) {
-                return;
+        if (!listeners_.empty()) {
+            auto listeners = std::move(listeners_);
+            lock.unlock();
+            for (auto &&listener : listeners) {
+                listener(result, value);
             }
+        }
 
-            bool expected = false;
-            if (!listenerRunning_.compare_exchange_strong(expected, true)) {
-                // There is another thread that polled a listener that is running, skip polling and release
-                // the lock. Here we wait for some time to avoid busy waiting.
-                std::this_thread::sleep_for(std::chrono::milliseconds(1));
-                continue;
-            }
-            auto listener = std::move(listeners_.front());
-            listeners_.pop_front();
-            lock.unlock();
+        return true;
+    }
 
-            listener(result, value);
-            listenerRunning_ = false;
-        }
+    bool completed() const noexcept { return status_.load() == COMPLETED; }
+
+    Result get(Type &value) const {
+        Lock lock{mutex_};
+        cond_.wait(lock, [this] { return completed(); });
+        value = value_;
+        return result_;
     }
 
    private:
-    std::atomic_bool completed_{false};
-    std::promise<Pair> promise_;
-    std::shared_future<Pair> future_{promise_.get_future()};
-
-    std::list<Listener> listeners_;
     mutable std::mutex mutex_;
-    std::atomic_bool listenerRunning_{false};
+    mutable std::condition_variable cond_;
+    std::forward_list<Listener> listeners_;
+    decltype(listeners_.before_begin()) tailListener_{listeners_.before_begin()};
+    Result result_;
+    Type value_;
+    std::atomic<Status> status_{INITIAL};
 };
 
 template <typename Result, typename Type>
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 770ac45..91b9544 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -20,6 +20,7 @@
 #define LIB_PRODUCERIMPL_H_
 
 #include <boost/optional.hpp>
+#include <list>
 #include <memory>
 
 #include "Future.h"
diff --git a/tests/PromiseTest.cc b/tests/PromiseTest.cc
index 29ee2a3..ad67e7d 100644
--- a/tests/PromiseTest.cc
+++ b/tests/PromiseTest.cc
@@ -19,10 +19,13 @@
 #include <gtest/gtest.h>
 
 #include <chrono>
+#include <memory>
+#include <mutex>
 #include <string>
 #include <thread>
 #include <vector>
 
+#include "WaitUtils.h"
 #include "lib/Future.h"
 #include "lib/LogUtils.h"
 
@@ -88,26 +91,38 @@ TEST(PromiseTest, testListeners) {
     ASSERT_EQ(values, (std::vector<std::string>(2, "hello")));
 }
 
-TEST(PromiseTest, testTriggerListeners) {
-    InternalState<int, int> state;
-    state.addListener([](int, const int&) {
-        LOG_INFO("Start task 1...");
-        std::this_thread::sleep_for(std::chrono::seconds(1));
-        LOG_INFO("Finish task 1...");
+TEST(PromiseTest, testListenerDeadlock) {
+    Promise<int, int> promise;
+    auto future = promise.getFuture();
+    auto mutex = std::make_shared<std::mutex>();
+    auto done = std::make_shared<std::atomic_bool>(false);
+
+    future.addListener([mutex, done](int, int) {
+        LOG_INFO("Listener-1 before acquiring the lock");
+        std::lock_guard<std::mutex> lock{*mutex};
+        LOG_INFO("Listener-1 after acquiring the lock");
+        done->store(true);
     });
-    state.addListener([](int, const int&) {
-        LOG_INFO("Start task 2...");
+
+    std::thread t1{[mutex, &future] {
+        std::lock_guard<std::mutex> lock{*mutex};
+        // Make it a great chance that `t2` executes `promise.setValue` first
+        std::this_thread::sleep_for(std::chrono::seconds(2));
+
+        // Since the future is completed, `Future::get` will be called in `addListener` to get the result
+        LOG_INFO("Before adding Listener-2 (acquired the mutex)")
+        future.addListener([](int, int) { LOG_INFO("Listener-2 is triggered"); });
+        LOG_INFO("After adding Listener-2 (releasing the mutex)");
+    }};
+    t1.detach();
+    std::thread t2{[mutex, promise] {
+        // Make there a great chance that `t1` acquires `mutex` first
         std::this_thread::sleep_for(std::chrono::seconds(1));
-        LOG_INFO("Finish task 2...");
-    });
+        LOG_INFO("Before setting value");
+        promise.setValue(0);  // the 1st listener is called, which is blocked at acquiring `mutex`
+        LOG_INFO("After setting value");
+    }};
+    t2.detach();
 
-    auto start = std::chrono::high_resolution_clock::now();
-    auto future1 = std::async(std::launch::async, [&state] { state.triggerListeners(0, 0); });
-    auto future2 = std::async(std::launch::async, [&state] { state.triggerListeners(0, 0); });
-    future1.wait();
-    future2.wait();
-    auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
-                       std::chrono::high_resolution_clock::now() - start)
-                       .count();
-    ASSERT_TRUE(elapsed > 2000) << "elapsed: " << elapsed << "ms";
+    ASSERT_TRUE(waitUntil(std::chrono::seconds(5000), [done] { return done->load(); }));
 }
diff --git a/tests/WaitUtils.h b/tests/WaitUtils.h
index d7db82e..4a03e53 100644
--- a/tests/WaitUtils.h
+++ b/tests/WaitUtils.h
@@ -25,13 +25,13 @@
 namespace pulsar {
 
 template <typename Rep, typename Period>
-inline void waitUntil(std::chrono::duration<Rep, Period> timeout, const std::function<bool()>& condition,
+inline bool waitUntil(std::chrono::duration<Rep, Period> timeout, const std::function<bool()>& condition,
                       long durationMs = 10) {
     auto timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
     while (timeoutMs > 0) {
         auto now = std::chrono::high_resolution_clock::now();
         if (condition()) {
-            break;
+            return true;
         }
         std::this_thread::sleep_for(std::chrono::milliseconds(durationMs));
         auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -39,6 +39,7 @@ inline void waitUntil(std::chrono::duration<Rep, Period> timeout, const std::fun
                            .count();
         timeoutMs -= elapsed;
     }
+    return false;
 }
 
 }  // namespace pulsar