You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/07/26 15:35:41 UTC
[kudu] branch master updated: [threadpool] Fix scheduler thread a coredump bug
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 6e48a43c4 [threadpool] Fix scheduler thread a coredump bug
6e48a43c4 is described below
commit 6e48a43c4505b4a4d281f7f732c63f51cec6bc63
Author: shenxingwuying <sh...@gmail.com>
AuthorDate: Tue Jun 28 18:13:37 2022 +0800
[threadpool] Fix scheduler thread a coredump bug
Scheduler thread will schedule some tasks.
If a SchedulerTask's theadpool token is shutdown,
Scheduling the task would coredump.
The patch check the token whether is nullptr,
and check the Submit(...) whether is shutdowning.
And add an unit test.
Change-Id: Ie5bc5511a745f3dc12dfe1a6a1813ece41ccc2a8
Reviewed-on: http://gerrit.cloudera.org:8080/18674
Reviewed-by: Yingchun Lai <ac...@gmail.com>
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <al...@apache.org>
---
src/kudu/util/threadpool-test.cc | 17 +++++++++++--
src/kudu/util/threadpool.cc | 32 ++++++++++++++++++-------
src/kudu/util/threadpool.h | 52 +++++++++++++++++++++++++++++++++-------
3 files changed, 82 insertions(+), 19 deletions(-)
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 31e57723c..25dc561a9 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -190,10 +190,23 @@ TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
ASSERT_OK(RebuildPoolWithScheduler(1, 1));
unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+ ASSERT_OK(token->Schedule(&IssueTraceStatement, 200));
+ token->Shutdown();
+ SleepFor(MonoDelta::FromMilliseconds(500));
pool_->Shutdown();
ASSERT_TRUE(token->Schedule(&IssueTraceStatement, 1000).IsServiceUnavailable());
}
+TEST_F(ThreadPoolTest, TokenShutdownBeforeSchedulerExecute) {
+ ASSERT_OK(RebuildPoolWithScheduler(1, 1));
+ unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+ ASSERT_OK(token->Schedule(&IssueTraceStatement, 200));
+ token->Shutdown();
+ SleepFor(MonoDelta::FromMilliseconds(500));
+ ASSERT_TRUE(token->Schedule(&IssueTraceStatement, 1000).IsServiceUnavailable());
+ pool_->Shutdown();
+}
+
TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
constexpr int kIdleTimeoutMs = 1;
ASSERT_OK(RebuildPoolWithBuilder(
@@ -255,7 +268,6 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithSchedulerAndNoMinimum) {
unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
ASSERT_OK(token->Submit([&latch]() { latch.Wait(); }));
ASSERT_EQ(4, pool_->num_threads());
-
ASSERT_OK(token->Schedule([&latch]() { latch.Wait(); }, kDelayMs));
ASSERT_OK(token->Schedule([&latch]() { latch.Wait(); }, static_cast<int>(kDelayMs * 1.2)));
ASSERT_EQ(4, pool_->num_threads());
@@ -1052,8 +1064,9 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) {
const int kNumSubmissions = 20;
Random r(SeedRandom());
vector<unique_ptr<ThreadPoolToken>> tokens;
+ tokens.resize(kNumTokens);
for (int i = 0; i < kNumTokens; i++) {
- tokens.emplace_back(pool_->NewToken(GetParam()));
+ tokens[i] = pool_->NewToken(GetParam());
}
atomic<int32_t> v(0);
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 95c16178c..38e56d423 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -142,16 +142,32 @@ void SchedulerThread::RunLoop() {
vector<SchedulerTask> pending_tasks;
{
MutexLock auto_lock(mutex_);
- auto upper_it = tasks_.upper_bound(now);
- for (auto it = tasks_.begin(); it != upper_it; it++) {
+ auto upper_it = future_tasks_.upper_bound(now);
+ for (auto it = future_tasks_.begin(); it != upper_it; it++) {
pending_tasks.emplace_back(std::move(it->second));
}
- tasks_.erase(tasks_.begin(), upper_it);
+ future_tasks_.erase(future_tasks_.begin(), upper_it);
}
for (const auto& task : pending_tasks) {
- ThreadPoolToken* token = task.thread_pool_token_;
- CHECK_OK(token->Submit(task.f));
+ ThreadPoolToken* token = task.thread_pool_token;
+ while (token != nullptr) {
+ Status s = token->Submit(task.f);
+ if (s.ok()) {
+ break;
+ }
+ DCHECK(s.IsServiceUnavailable())
+ << Substitute("threadpool token Submit status: $0", s.ToString());
+
+ if (!token->MaySubmitNewTasks()) {
+ // threadpool token is Shutdown, skip the task.
+ break;
+ }
+ // If developers use ThreadPoolToken::Schedule(...) too frequent, blocking queue's
+ // capacity will be full, then retry submit the task again.
+ VLOG(1) << Substitute("threadpool token Submit status: $0, retry the task", s.ToString());
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ }
}
}
}
@@ -401,14 +417,14 @@ Status ThreadPool::Init() {
}
void ThreadPool::Shutdown() {
- MutexLock unique_lock(lock_);
- CheckNotPoolThreadUnlocked();
-
if (scheduler_) {
scheduler_->Shutdown();
delete scheduler_;
scheduler_ = nullptr;
}
+
+ MutexLock unique_lock(lock_);
+ CheckNotPoolThreadUnlocked();
// Note: this is the same error seen at submission if the pool is at
// capacity, so clients can't tell them apart. This isn't really a practical
// concern though because shutting down a pool typically requires clients to
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 8c68a88fa..9b5400503 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -140,13 +140,38 @@ class ThreadPoolBuilder {
MonoDelta queue_overload_threshold_;
ThreadPoolMetrics metrics_;
bool enable_scheduler_;
- uint32_t schedule_period_ms_;
+ uint32_t schedule_period_ms_ = 100;
DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder);
};
-
// SchedulerThread for asynchronized delay task execution.
+//
+// Notice: Take care of Shutdown's order.
+//
+// Firstly, shutdown the ThreadPool and wait SchedulerThread stop.
+// Secondly, delete all the thread pool tokens to release all tokens for the ThreadPool.
+// Lastly, release the ThreadPool.
+//
+// For example:
+// static void Func(int n) { ... }
+//
+// unique_ptr<ThreadPool> scheduler_pool;
+// ThreadPoolBuilder("scheduler")
+// .set_min_threads(1)
+// .set_max_threads(4)
+// .set_enable_scheduler()
+// .set_schedule_period_ms(100)
+// .Build(&scheduler_pool);
+// unique_ptr<ThreadPoolToken> token =
+// scheduler_pool.NewToken(ThreadPool::ExecutionMode::SERIAL);
+// RETURN_NOT_OK(token->Schedule([]() { Func(10); }, 1000));
+//
+// The order of Shutdown and Release should adhere to the notice above, for example:
+// scheduler_pool.Shutdown(); // shutdown scheduler_pool and wait scheduler thread stop.
+// // The 'token' and 'scheduler_pool' will be released automatically in reverse order of
+// // their declaration: that's the proper sequence as prescribed above.
+//
class SchedulerThread {
public:
explicit SchedulerThread(std::string thread_pool_name, uint32_t schedule_period_ms);
@@ -160,18 +185,20 @@ public:
Status Shutdown();
struct SchedulerTask {
- ThreadPoolToken* thread_pool_token_;
+ ThreadPoolToken* thread_pool_token;
std::function<void()> f;
};
- void Schedule(ThreadPoolToken* token, std::function<void()> f, const MonoTime& execute_time) {
+ void Schedule(ThreadPoolToken* token,
+ std::function<void()> f,
+ const MonoTime& execute_time) {
MutexLock unique_lock(mutex_);
- tasks_.insert({execute_time, SchedulerTask({token, std::move(f)})});
+ future_tasks_.insert({execute_time, SchedulerTask({token, std::move(f)})});
}
bool empty() const {
MutexLock unique_lock(mutex_);
- return tasks_.empty();
+ return future_tasks_.empty();
}
private:
@@ -181,14 +208,21 @@ private:
void RunLoop();
const std::string thread_pool_name_;
+
// scheduler's period checking time.
const uint32_t schedule_period_ms_;
+
CountDownLatch shutdown_;
- // Protect `tasks_` data race.
+
+ // Protect `future_tasks_` data race.
mutable Mutex mutex_;
+ // Thread to drive `future_tasks_` by absolutely time.
scoped_refptr<Thread> thread_;
- std::multimap<MonoTime, SchedulerTask> tasks_;
+
+ // Tasks order by their execute timepoint. The `thread_` would execute tasks,
+ // the tasks match the condition its timepoint <= now timepoint.
+ std::multimap<MonoTime, SchedulerTask> future_tasks_;
};
// Thread pool with a variable number of threads.
@@ -564,7 +598,7 @@ class ThreadPool {
boost::intrusive::list<IdleThread> idle_threads_; // NOLINT(build/include_what_you_use)
// ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
- std::unique_ptr<ThreadPoolToken> tokenless_;
+ std::shared_ptr<ThreadPoolToken> tokenless_;
// The meter to track whether the pool's queue is stalled/overloaded.
// It's nullptr/none if the queue overload threshold is unset.