You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2023/10/30 23:05:06 UTC
(pulsar-client-cpp) branch main updated: Fix possible deadlock of Future when adding a listener after completed (#334)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 77e2d63 Fix possible deadlock of Future when adding a listener after completed (#334)
77e2d63 is described below
commit 77e2d63c5cf7ad9a8f3153359a9c318db4668226
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Oct 31 07:05:01 2023 +0800
Fix possible deadlock of Future when adding a listener after completed (#334)
* Fix possible deadlock of Future when adding a listener after completed
### Motivation
There is a case that deadlock could happen for a `Future`. Assume there
is a `Promise` and its `Future`.
1. Call `Future::addListener` to add a listener that tries to acquire a
user-provided mutex (`lock`).
2. Thread 1: Acquire `lock` first.
3. Thread 2: Call `Promise::setValue`, the listener will be triggered
first before completed. Since `lock` is held by Thread 1, the
listener will be blocked.
4. Thread 1: Call `Future::addListener`, since it detects the
`InternalState::completed_` is true, it will call `get` to retrieve
the result and value.
Then, deadlock happens:
- Thread 1 waits for `lock` is released, and then complete
`InternalState::future_`.
- Thread 2 holds `lock` but wait for `InternalState::future_` is
completed.
In a real world case, if we acquire a lock before
`ProducerImpl::closeAsync`, then another thread call `setValue` in
`ClientConnection::handleSuccess` and the callback of
`createProducerAsync` tries to acquire the lock, `handleSuccess` will be
blocked. Then in `closeAsync`, the current thread will be blocked in:
```c++
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId)
.addListener([self, callback](Result result, const ResponseData&) { callback(result); });
```
The stacks:
```
Thread 1:
#11 0x00007fab80da2173 in pulsar::InternalState<...>::complete (this=0x3d53e7a10, result=..., value=...) at lib/Futre.h:61
#13 pulsar::ClientConnection::handleSuccess (this=this@entry=0x2214bc000, success=...) at lib/ClientConnection.cc:1552
Thread 2:
#8 get (result=..., this=0x3d53e7a10) at lib/Future.h:69
#9 pulsar::InternalState<...>::addListener (this=this@entry=0x3d53e7a10, listener=...) at lib/Future.h:51
#11 0x00007fab80e8dc4e in pulsar::ProducerImpl::closeAsync at lib/ProducerImpl.cc:794
```
There are two points that make the deadlock:
1. We use `completed_` to represent if the future is completed. However,
after it's true, the future might not be completed because the value
is not set and the listeners are not completed.
2. If `addListener` is called after it's completed, we still push the
listener to `listeners_` so that previous listeners could be executed
before the new listener. This guarantee is unnecessarily strong.
### Modifications
First, complete the future before calling the listeners.
Then, use an enum to represent the status:
- INITIAL: `complete` has not been called
- COMPLETING: when the 1st time `complete` is called, the status will
change from INITIAL to COMPLETING
- COMPLETED: the future is completed.
Besides, implementation of `Future` is simplified.
https://github.com/apache/pulsar-client-cpp/pull/299 fixes a possible
mutex crash by introducing the `std::future`. However, the root cause is
the conditional variable is not used correctly:
> Even if the shared variable is atomic, it must be modified while owning the mutex to correctly publish the modification to the waiting thread.
See https://en.cppreference.com/w/cpp/thread/condition_variable
The simplest way to fix
https://github.com/apache/pulsar-client-cpp/pull/298 is just adding
`lock.lock()` before `state->condition.notify_all();`.
* Acquire lock again
* Add initial value
---
lib/ConsumerImpl.h | 1 +
lib/Future.h | 94 +++++++++++++++++++++++++---------------------------
lib/ProducerImpl.h | 1 +
tests/PromiseTest.cc | 53 ++++++++++++++++++-----------
tests/WaitUtils.h | 5 +--
5 files changed, 84 insertions(+), 70 deletions(-)
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index dd7163f..61d96b1 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -23,6 +23,7 @@
#include <boost/optional.hpp>
#include <functional>
+#include <list>
#include <memory>
#include <utility>
diff --git a/lib/Future.h b/lib/Future.h
index 5ee937e..290ebc6 100644
--- a/lib/Future.h
+++ b/lib/Future.h
@@ -20,14 +20,11 @@
#define LIB_FUTURE_H_
#include <atomic>
-#include <chrono>
+#include <condition_variable>
+#include <forward_list>
#include <functional>
-#include <future>
-#include <list>
#include <memory>
#include <mutex>
-#include <thread>
-#include <utility>
namespace pulsar {
@@ -38,71 +35,70 @@ class InternalState {
using Pair = std::pair<Result, Type>;
using Lock = std::unique_lock<std::mutex>;
+ enum Status : uint8_t
+ {
+ INITIAL,
+ COMPLETING,
+ COMPLETED
+ };
+
// NOTE: Add the constructor explicitly just to be compatible with GCC 4.8
InternalState() {}
void addListener(Listener listener) {
Lock lock{mutex_};
- listeners_.emplace_back(listener);
- lock.unlock();
-
if (completed()) {
- Type value;
- Result result = get(value);
- triggerListeners(result, value);
+ auto result = result_;
+ auto value = value_;
+ lock.unlock();
+ listener(result, value);
+ } else {
+ tailListener_ = listeners_.emplace_after(tailListener_, std::move(listener));
}
}
bool complete(Result result, const Type &value) {
- bool expected = false;
- if (!completed_.compare_exchange_strong(expected, true)) {
+ Status expected = Status::INITIAL;
+ if (!status_.compare_exchange_strong(expected, Status::COMPLETING)) {
return false;
}
- triggerListeners(result, value);
- promise_.set_value(std::make_pair(result, value));
- return true;
- }
-
- bool completed() const noexcept { return completed_; }
- Result get(Type &result) {
- const auto &pair = future_.get();
- result = pair.second;
- return pair.first;
- }
+ // Ensure if another thread calls `addListener` at the same time, that thread can get the value by
+ // `get` before the existing listeners are executed
+ Lock lock{mutex_};
+ result_ = result;
+ value_ = value;
+ status_ = COMPLETED;
+ cond_.notify_all();
- // Only public for test
- void triggerListeners(Result result, const Type &value) {
- while (true) {
- Lock lock{mutex_};
- if (listeners_.empty()) {
- return;
+ if (!listeners_.empty()) {
+ auto listeners = std::move(listeners_);
+ lock.unlock();
+ for (auto &&listener : listeners) {
+ listener(result, value);
}
+ }
- bool expected = false;
- if (!listenerRunning_.compare_exchange_strong(expected, true)) {
- // There is another thread that polled a listener that is running, skip polling and release
- // the lock. Here we wait for some time to avoid busy waiting.
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- continue;
- }
- auto listener = std::move(listeners_.front());
- listeners_.pop_front();
- lock.unlock();
+ return true;
+ }
- listener(result, value);
- listenerRunning_ = false;
- }
+ bool completed() const noexcept { return status_.load() == COMPLETED; }
+
+ Result get(Type &value) const {
+ Lock lock{mutex_};
+ cond_.wait(lock, [this] { return completed(); });
+ value = value_;
+ return result_;
}
private:
- std::atomic_bool completed_{false};
- std::promise<Pair> promise_;
- std::shared_future<Pair> future_{promise_.get_future()};
-
- std::list<Listener> listeners_;
mutable std::mutex mutex_;
- std::atomic_bool listenerRunning_{false};
+ mutable std::condition_variable cond_;
+ std::forward_list<Listener> listeners_;
+ decltype(listeners_.before_begin()) tailListener_{listeners_.before_begin()};
+ Result result_;
+ Type value_;
+ std::atomic<Status> status_{INITIAL};
};
template <typename Result, typename Type>
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 770ac45..91b9544 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -20,6 +20,7 @@
#define LIB_PRODUCERIMPL_H_
#include <boost/optional.hpp>
+#include <list>
#include <memory>
#include "Future.h"
diff --git a/tests/PromiseTest.cc b/tests/PromiseTest.cc
index 29ee2a3..ad67e7d 100644
--- a/tests/PromiseTest.cc
+++ b/tests/PromiseTest.cc
@@ -19,10 +19,13 @@
#include <gtest/gtest.h>
#include <chrono>
+#include <memory>
+#include <mutex>
#include <string>
#include <thread>
#include <vector>
+#include "WaitUtils.h"
#include "lib/Future.h"
#include "lib/LogUtils.h"
@@ -88,26 +91,38 @@ TEST(PromiseTest, testListeners) {
ASSERT_EQ(values, (std::vector<std::string>(2, "hello")));
}
-TEST(PromiseTest, testTriggerListeners) {
- InternalState<int, int> state;
- state.addListener([](int, const int&) {
- LOG_INFO("Start task 1...");
- std::this_thread::sleep_for(std::chrono::seconds(1));
- LOG_INFO("Finish task 1...");
+TEST(PromiseTest, testListenerDeadlock) {
+ Promise<int, int> promise;
+ auto future = promise.getFuture();
+ auto mutex = std::make_shared<std::mutex>();
+ auto done = std::make_shared<std::atomic_bool>(false);
+
+ future.addListener([mutex, done](int, int) {
+ LOG_INFO("Listener-1 before acquiring the lock");
+ std::lock_guard<std::mutex> lock{*mutex};
+ LOG_INFO("Listener-1 after acquiring the lock");
+ done->store(true);
});
- state.addListener([](int, const int&) {
- LOG_INFO("Start task 2...");
+
+ std::thread t1{[mutex, &future] {
+ std::lock_guard<std::mutex> lock{*mutex};
+ // Make it a great chance that `t2` executes `promise.setValue` first
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+
+ // Since the future is completed, `Future::get` will be called in `addListener` to get the result
+ LOG_INFO("Before adding Listener-2 (acquired the mutex)")
+ future.addListener([](int, int) { LOG_INFO("Listener-2 is triggered"); });
+ LOG_INFO("After adding Listener-2 (releasing the mutex)");
+ }};
+ t1.detach();
+ std::thread t2{[mutex, promise] {
+ // Make there a great chance that `t1` acquires `mutex` first
std::this_thread::sleep_for(std::chrono::seconds(1));
- LOG_INFO("Finish task 2...");
- });
+ LOG_INFO("Before setting value");
+ promise.setValue(0); // the 1st listener is called, which is blocked at acquiring `mutex`
+ LOG_INFO("After setting value");
+ }};
+ t2.detach();
- auto start = std::chrono::high_resolution_clock::now();
- auto future1 = std::async(std::launch::async, [&state] { state.triggerListeners(0, 0); });
- auto future2 = std::async(std::launch::async, [&state] { state.triggerListeners(0, 0); });
- future1.wait();
- future2.wait();
- auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::high_resolution_clock::now() - start)
- .count();
- ASSERT_TRUE(elapsed > 2000) << "elapsed: " << elapsed << "ms";
+ ASSERT_TRUE(waitUntil(std::chrono::seconds(5000), [done] { return done->load(); }));
}
diff --git a/tests/WaitUtils.h b/tests/WaitUtils.h
index d7db82e..4a03e53 100644
--- a/tests/WaitUtils.h
+++ b/tests/WaitUtils.h
@@ -25,13 +25,13 @@
namespace pulsar {
template <typename Rep, typename Period>
-inline void waitUntil(std::chrono::duration<Rep, Period> timeout, const std::function<bool()>& condition,
+inline bool waitUntil(std::chrono::duration<Rep, Period> timeout, const std::function<bool()>& condition,
long durationMs = 10) {
auto timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
while (timeoutMs > 0) {
auto now = std::chrono::high_resolution_clock::now();
if (condition()) {
- break;
+ return true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(durationMs));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -39,6 +39,7 @@ inline void waitUntil(std::chrono::duration<Rep, Period> timeout, const std::fun
.count();
timeoutMs -= elapsed;
}
+ return false;
}
} // namespace pulsar