You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2022/12/29 16:28:27 UTC
[kudu] branch master updated: [threadpool] Fix unsafe behaviour when SchedulerThread shutdown
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 155ea5af9 [threadpool] Fix unsafe behaviour when SchedulerThread shutdown
155ea5af9 is described below
commit 155ea5af946501b0ecff707678821473aedf91bc
Author: shenxingwuying <sh...@gmail.com>
AuthorDate: Fri Aug 19 16:28:28 2022 +0800
[threadpool] Fix unsafe behaviour when SchedulerThread shutdown
When shutting down a thread pool with SchedulerThread, thread pool's variable
'scheduler_' is deleted and set to nullptr, at the same time, if thread pool
token is not shutdown and a task is scheduled on the token, this may cause
a coredump because 'scheduler_' is a nullptr.
This patch fixes the bug, adding a mutex lock in class 'ThreadPool' to
protect variable 'scheduler_'.
Change-Id: I021422f7e51e1007c5bdbc877ab445f70ba12357
Reviewed-on: http://gerrit.cloudera.org:8080/18867
Tested-by: Kudu Jenkins
Reviewed-by: Yingchun Lai <ac...@gmail.com>
---
src/kudu/util/threadpool-test.cc | 106 +++++++++++++++++++++++++++++++++------
src/kudu/util/threadpool.cc | 48 +++++++++---------
src/kudu/util/threadpool.h | 28 +++++++----
3 files changed, 132 insertions(+), 50 deletions(-)
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 25dc561a9..41d18ad08 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -30,6 +30,7 @@
#include <ostream>
#include <string>
#include <thread>
+#include <type_traits>
#include <vector>
#include <boost/smart_ptr/shared_ptr.hpp>
@@ -90,12 +91,12 @@ class ThreadPoolTest : public KuduTest {
.Build(&pool_);
}
- Status RebuildPoolWithScheduler(int min_threads, int max_threads) {
+ Status RebuildPoolWithScheduler(int min_threads, int max_threads, int period_ms = 100) {
return ThreadPoolBuilder(kDefaultPoolName)
.set_min_threads(min_threads)
.set_max_threads(max_threads)
.set_enable_scheduler()
- .set_schedule_period_ms(100)
+ .set_schedule_period_ms(period_ms)
.Build(&pool_);
}
@@ -133,7 +134,7 @@ class SimpleTask {
};
TEST_F(ThreadPoolTest, TestSimpleTasks) {
- constexpr int kDelayMs = 1000;
+ constexpr int kDelayMs = 500;
ASSERT_OK(RebuildPoolWithMinMax(4, 4));
@@ -150,17 +151,16 @@ TEST_F(ThreadPoolTest, TestSimpleTasks) {
ASSERT_OK(RebuildPoolWithScheduler(4, 4));
unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
- MonoTime start = MonoTime::Now();
ASSERT_OK(token->Schedule([&counter]() {
SimpleTaskMethod(13, &counter);
}, kDelayMs));
- pool_->WaitForScheduler();
+ // make sure the tasks scheduled and executed in SchedulerThread.
+ SleepFor(MonoDelta::FromMilliseconds(2 * kDelayMs));
+ // make sure all tasks executed in thread pool.
+ pool_->Wait();
+ // check the results.
ASSERT_EQ(10 + 15 + 20 + 15 + 123 + 13, base::subtle::NoBarrier_Load(&counter));
- MonoDelta delta = MonoTime::Now() - start;
- MonoDelta expect_upper_limit = MonoDelta::FromMilliseconds(static_cast<int>(kDelayMs * 1.2));
- MonoDelta expect_lower_limit = MonoDelta::FromMilliseconds(static_cast<int>(kDelayMs * 0.9));
- ASSERT_TRUE(delta.MoreThan(expect_lower_limit) && delta.LessThan(expect_upper_limit));
}
static void IssueTraceStatement() {
@@ -181,6 +181,32 @@ TEST_F(ThreadPoolTest, TestTracePropagation) {
ASSERT_STR_CONTAINS(t->DumpToString(), "hello from task");
}
+static void ContinuousIssueTraceStatement(ThreadPoolToken* token) {
+ Random r(SeedRandom());
+ // delay_ms is a random time: 1ms or 2ms or 3ms.
+ int delay_ms = 1 + static_cast<int>(r.Uniform(3));
+ Status status = token->Schedule(std::bind(&ContinuousIssueTraceStatement, token), delay_ms);
+ // At the case, 'pool_->Shutdown();' would shutdown its SchedulerThread firstly, after that
+ // 'token->Scheduler(...)' would return a IllegalState.
+ LOG_IF(WARNING, !status.ok()) << "ContinuousIssueTraceStatement: " << status.ToString();
+ ASSERT_TRUE(status.ok() || status.IsIllegalState());
+}
+
+TEST_F(ThreadPoolTest, TestExtremeScheduler) {
+ // The case is for the scenario:
+ // 1. There are many tasks in SchedulerThread need to be scheduled.
+ // 2. Every periodic schedule of SchedulerThread, many tasks can be scheduled.
+ // 3. At 1, 2 conditions, SchedulerThread's shutdown happened
+ // and program should work well(no threads data race and core dump).
+ ASSERT_OK(RebuildPoolWithScheduler(1, 1, 1));
+ unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+ for (int i = 0; i < 2048; i++) {
+ ContinuousIssueTraceStatement(token.get());
+ }
+ SleepFor(MonoDelta::FromMilliseconds(5));
+ pool_->Shutdown();
+}
+
TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
ASSERT_OK(RebuildPoolWithMinMax(1, 1));
pool_->Shutdown();
@@ -190,21 +216,71 @@ TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
ASSERT_OK(RebuildPoolWithScheduler(1, 1));
unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
- ASSERT_OK(token->Schedule(&IssueTraceStatement, 200));
+ Atomic32 counter(0);
+ constexpr const int delay_ms = 100;
+ ASSERT_OK(token->Schedule([&counter]() { SimpleTaskMethod(10, &counter); }, delay_ms));
+ SleepFor(MonoDelta::FromMilliseconds(2 * delay_ms));
+ ASSERT_EQ(10, base::subtle::NoBarrier_Load(&counter));
token->Shutdown();
- SleepFor(MonoDelta::FromMilliseconds(500));
+ ASSERT_OK(token->Schedule([&counter]() { SimpleTaskMethod(3, &counter); }, delay_ms));
+ SleepFor(MonoDelta::FromMilliseconds(2 * delay_ms));
+ // The result does not change.
+ ASSERT_EQ(10, base::subtle::NoBarrier_Load(&counter));
pool_->Shutdown();
- ASSERT_TRUE(token->Schedule(&IssueTraceStatement, 1000).IsServiceUnavailable());
+ // Submit a delayed task, but it will not really executed.
+ ASSERT_TRUE(
+ token->Schedule([&counter]() { SimpleTaskMethod(5, &counter); }, delay_ms).IsIllegalState());
+ // The result does not change.
+ ASSERT_EQ(10, base::subtle::NoBarrier_Load(&counter));
}
TEST_F(ThreadPoolTest, TokenShutdownBeforeSchedulerExecute) {
ASSERT_OK(RebuildPoolWithScheduler(1, 1));
unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
- ASSERT_OK(token->Schedule(&IssueTraceStatement, 200));
+ Atomic32 counter(0);
+ constexpr const int delay_ms = 200;
+ ASSERT_OK(token->Schedule([&counter]() { SimpleTaskMethod(10, &counter); }, delay_ms));
token->Shutdown();
+ ASSERT_EQ(0, base::subtle::NoBarrier_Load(&counter));
+ SleepFor(MonoDelta::FromMilliseconds(2 * delay_ms));
+ ASSERT_OK(token->Schedule([&counter]() { SimpleTaskMethod(3, &counter); }, delay_ms));
+ SleepFor(MonoDelta::FromMilliseconds(2 * delay_ms));
+ ASSERT_EQ(0, base::subtle::NoBarrier_Load(&counter));
+ pool_->Shutdown();
+ ASSERT_TRUE(
+ token->Schedule([&counter]() { SimpleTaskMethod(3, &counter); }, delay_ms).IsIllegalState());
+ SleepFor(MonoDelta::FromMilliseconds(2 * delay_ms));
+ ASSERT_EQ(0, base::subtle::NoBarrier_Load(&counter));
+}
+
+TEST_F(ThreadPoolTest, SchedulerShutdownBeforeTokenShutdown) {
+ ASSERT_OK(RebuildPoolWithScheduler(1, 1));
+ unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+ ASSERT_OK(token->Schedule(&IssueTraceStatement, 200));
SleepFor(MonoDelta::FromMilliseconds(500));
- ASSERT_TRUE(token->Schedule(&IssueTraceStatement, 1000).IsServiceUnavailable());
pool_->Shutdown();
+ Status status = token->Schedule(&IssueTraceStatement, 200);
+ ASSERT_TRUE(status.IsIllegalState() || status.IsServiceUnavailable());
+ token->Shutdown();
+}
+
+TEST_F(ThreadPoolTest, SchedulerWithNullToken) {
+ ASSERT_OK(RebuildPoolWithScheduler(1, 1));
+ ASSERT_OK(pool_->Schedule(nullptr, &IssueTraceStatement, MonoTime::Now()));
+
+ Atomic32 counter(0);
+ SimpleTask task(2, &counter);
+
+ ASSERT_OK(pool_->Submit([&counter]() { SimpleTaskMethod(1, &counter); }));
+ ASSERT_OK(pool_->Submit([&task]() { task.Run(); }));
+ // token is null is ok, but the task will ignore.
+ ASSERT_OK(pool_->Schedule(
+ nullptr, [&counter]() { SimpleTaskMethod(3, &counter); }, MonoTime::Now()));
+ // make sure the delayed task execute, the task would be ignored.
+ SleepFor(MonoDelta::FromMilliseconds(200));
+ // make sure all tasks executed in thread pool.
+ pool_->Wait();
+ ASSERT_EQ(1 + 2, base::subtle::NoBarrier_Load(&counter));
}
TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
@@ -286,7 +362,7 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithSchedulerAndNoMinimum) {
ASSERT_EQ(0, pool_->num_threads());
ASSERT_EQ(0, pool_->num_active_threads());
pool_->Shutdown();
- ASSERT_EQ(nullptr, pool_->scheduler());
+ ASSERT_EQ(nullptr, pool_->scheduler_);
}
TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 38e56d423..f8a10758d 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -33,6 +33,7 @@
#include "kudu/gutil/sysinfo.h"
#include "kudu/gutil/walltime.h"
#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/thread.h"
#include "kudu/util/trace.h"
@@ -150,9 +151,9 @@ void SchedulerThread::RunLoop() {
}
for (const auto& task : pending_tasks) {
- ThreadPoolToken* token = task.thread_pool_token;
+ ThreadPoolToken* token = task.thread_pool_token();
while (token != nullptr) {
- Status s = token->Submit(task.f);
+ Status s = token->Submit(task.func());
if (s.ok()) {
break;
}
@@ -258,14 +259,10 @@ void ThreadPoolToken::Shutdown() {
// Submit a task, running after delay_ms delay some time
Status ThreadPoolToken::Schedule(std::function<void()> f, int64_t delay_ms) {
- if (PREDICT_FALSE(!MaySubmitNewTasks())) {
- return Status::ServiceUnavailable("Thread pool token was shut down");
- }
CHECK(mode() == ThreadPool::ExecutionMode::SERIAL);
- MonoTime excute_time = MonoTime::Now();
- excute_time.AddDelta(MonoDelta::FromMilliseconds(delay_ms));
- pool_->scheduler()->Schedule(this, std::move(f), excute_time);
- return Status::OK();
+ MonoTime execute_time = MonoTime::Now();
+ execute_time.AddDelta(MonoDelta::FromMilliseconds(delay_ms));
+ return pool_->Schedule(this, std::move(f), execute_time);
}
void ThreadPoolToken::Wait() {
@@ -417,10 +414,12 @@ Status ThreadPool::Init() {
}
void ThreadPool::Shutdown() {
- if (scheduler_) {
- scheduler_->Shutdown();
- delete scheduler_;
- scheduler_ = nullptr;
+ {
+ MutexLock l(scheduler_lock_);
+ if (scheduler_) {
+ delete scheduler_;
+ scheduler_ = nullptr;
+ }
}
MutexLock unique_lock(lock_);
@@ -525,6 +524,17 @@ Status ThreadPool::Submit(std::function<void()> f) {
return DoSubmit(std::move(f), tokenless_.get());
}
+Status ThreadPool::Schedule(ThreadPoolToken* token,
+ std::function<void()> f,
+ MonoTime execute_time) {
+ MutexLock l(scheduler_lock_);
+ if (!scheduler_) {
+ return Status::IllegalState("scheduler thread has been shutdown");
+ }
+ scheduler_->Schedule(token, std::move(f), execute_time);
+ return Status::OK();
+}
+
Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) {
DCHECK(token);
const MonoTime submit_time = MonoTime::Now();
@@ -655,18 +665,6 @@ void ThreadPool::Wait() {
}
}
-void ThreadPool::WaitForScheduler() {
- MutexLock unique_lock(lock_);
- CheckNotPoolThreadUnlocked();
- // Generally, ignore scheduler's pending tasks, but
- // set ScheduledTaskWaitType::WAIT at unit tests.
- bool wait_scheduler = (scheduler_ != nullptr);
- while (total_queued_tasks_ > 0 || active_threads_ > 0 ||
- (wait_scheduler && !scheduler_->empty())) {
- idle_cond_.Wait();
- }
-}
-
bool ThreadPool::WaitUntil(const MonoTime& until) {
MutexLock unique_lock(lock_);
CheckNotPoolThreadUnlocked();
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 9b5400503..6f4119f86 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -184,11 +184,19 @@ public:
// Shutdown the thread and clear the pending tasks.
Status Shutdown();
- struct SchedulerTask {
- ThreadPoolToken* thread_pool_token;
- std::function<void()> f;
+ class SchedulerTask {
+ public:
+ SchedulerTask(ThreadPoolToken* thread_pool_token, std::function<void()> func)
+ : thread_pool_token_(thread_pool_token), func_(std::move(func)) {}
+ ThreadPoolToken* thread_pool_token() const { return thread_pool_token_; }
+ const std::function<void()>& func() const { return func_; }
+
+ private:
+ ThreadPoolToken* thread_pool_token_;
+ std::function<void()> func_;
};
+ // Submit a task to 'token', execute the task at execute_time.
void Schedule(ThreadPoolToken* token,
std::function<void()> f,
const MonoTime& execute_time) {
@@ -273,6 +281,9 @@ class ThreadPool {
// Submits a new task.
Status Submit(std::function<void()> f) WARN_UNUSED_RESULT;
+ // Submit a task to 'token', execute the task at execute_time.
+ Status Schedule(ThreadPoolToken* token, std::function<void()> f, MonoTime execute_time);
+
// Waits until all the tasks are completed.
void Wait();
@@ -500,13 +511,6 @@ class ThreadPool {
// * a new task has been scheduled (i.e. added into the queue)
void NotifyLoadMeterUnlocked(const MonoDelta& queue_time = MonoDelta());
- SchedulerThread* scheduler() {
- return scheduler_;
- }
-
- // Waits until all the tasks and scheduler's tasks completed.
- void WaitForScheduler();
-
// Return the number of threads currently running for this thread pool.
// Used by tests to avoid tsan test case down.
int num_active_threads() {
@@ -607,8 +611,12 @@ class ThreadPool {
// Metrics for the entire thread pool.
const ThreadPoolMetrics metrics_;
+ // Protect 'scheduler_'.
+ mutable Mutex scheduler_lock_;
+
// TimerThread is used for some scenarios, such as
// make a task delay execution.
+ // It would shut down and be deleted firstly when 'Shutdown'.
SchedulerThread* scheduler_;
uint32_t schedule_period_ms_;