You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2022/12/29 16:28:27 UTC

[kudu] branch master updated: [threadpool] Fix unsafe behaviour when SchedulerThread shutdown

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 155ea5af9 [threadpool] Fix unsafe behaviour when SchedulerThread shutdown
155ea5af9 is described below

commit 155ea5af946501b0ecff707678821473aedf91bc
Author: shenxingwuying <sh...@gmail.com>
AuthorDate: Fri Aug 19 16:28:28 2022 +0800

    [threadpool] Fix unsafe behaviour when SchedulerThread shutdown
    
    When shutting down a thread pool with SchedulerThread, thread pool's variable
    'scheduler_' is deleted and set to nullptr, at the same time, if thread pool
    token is not shutdown and a task is scheduled on the token, this may cause
    a coredump because 'scheduler_' is a nullptr.
    
    This patch fixes the bug, adding a mutex lock in class 'ThreadPool' to
    protect variable 'scheduler_'.
    
    Change-Id: I021422f7e51e1007c5bdbc877ab445f70ba12357
    Reviewed-on: http://gerrit.cloudera.org:8080/18867
    Tested-by: Kudu Jenkins
    Reviewed-by: Yingchun Lai <ac...@gmail.com>
---
 src/kudu/util/threadpool-test.cc | 106 +++++++++++++++++++++++++++++++++------
 src/kudu/util/threadpool.cc      |  48 +++++++++---------
 src/kudu/util/threadpool.h       |  28 +++++++----
 3 files changed, 132 insertions(+), 50 deletions(-)

diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 25dc561a9..41d18ad08 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -30,6 +30,7 @@
 #include <ostream>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <vector>
 
 #include <boost/smart_ptr/shared_ptr.hpp>
@@ -90,12 +91,12 @@ class ThreadPoolTest : public KuduTest {
         .Build(&pool_);
   }
 
-  Status RebuildPoolWithScheduler(int min_threads, int max_threads) {
+  Status RebuildPoolWithScheduler(int min_threads, int max_threads, int period_ms = 100) {
     return ThreadPoolBuilder(kDefaultPoolName)
         .set_min_threads(min_threads)
         .set_max_threads(max_threads)
         .set_enable_scheduler()
-        .set_schedule_period_ms(100)
+        .set_schedule_period_ms(period_ms)
         .Build(&pool_);
   }
 
@@ -133,7 +134,7 @@ class SimpleTask {
 };
 
 TEST_F(ThreadPoolTest, TestSimpleTasks) {
-  constexpr int kDelayMs = 1000;
+  constexpr int kDelayMs = 500;
 
   ASSERT_OK(RebuildPoolWithMinMax(4, 4));
 
@@ -150,17 +151,16 @@ TEST_F(ThreadPoolTest, TestSimpleTasks) {
 
   ASSERT_OK(RebuildPoolWithScheduler(4, 4));
   unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
-  MonoTime start = MonoTime::Now();
   ASSERT_OK(token->Schedule([&counter]() {
     SimpleTaskMethod(13, &counter);
   }, kDelayMs));
 
-  pool_->WaitForScheduler();
+  // make sure the tasks scheduled and executed in SchedulerThread.
+  SleepFor(MonoDelta::FromMilliseconds(2 * kDelayMs));
+  // make sure all tasks executed in thread pool.
+  pool_->Wait();
+  // check the results.
   ASSERT_EQ(10 + 15 + 20 + 15 + 123 + 13, base::subtle::NoBarrier_Load(&counter));
-  MonoDelta delta = MonoTime::Now() - start;
-  MonoDelta expect_upper_limit = MonoDelta::FromMilliseconds(static_cast<int>(kDelayMs * 1.2));
-  MonoDelta expect_lower_limit = MonoDelta::FromMilliseconds(static_cast<int>(kDelayMs * 0.9));
-  ASSERT_TRUE(delta.MoreThan(expect_lower_limit) && delta.LessThan(expect_upper_limit));
 }
 
 static void IssueTraceStatement() {
@@ -181,6 +181,32 @@ TEST_F(ThreadPoolTest, TestTracePropagation) {
   ASSERT_STR_CONTAINS(t->DumpToString(), "hello from task");
 }
 
+static void ContinuousIssueTraceStatement(ThreadPoolToken* token) {
+  Random r(SeedRandom());
+  // delay_ms is a random time: 1ms or 2ms or 3ms.
+  int delay_ms = 1 + static_cast<int>(r.Uniform(3));
+  Status status = token->Schedule(std::bind(&ContinuousIssueTraceStatement, token), delay_ms);
+  // At the case, 'pool_->Shutdown();' would shutdown its SchedulerThread firstly, after that
+  // 'token->Scheduler(...)' would return a IllegalState.
+  LOG_IF(WARNING, !status.ok()) << "ContinuousIssueTraceStatement: " << status.ToString();
+  ASSERT_TRUE(status.ok() || status.IsIllegalState());
+}
+
+TEST_F(ThreadPoolTest, TestExtremeScheduler) {
+  // The case is for the scenario:
+  //   1. There are many tasks in SchedulerThread need to be scheduled.
+  //   2. Every periodic schedule of SchedulerThread, many tasks can be scheduled.
+  //   3. At 1, 2 conditions, SchedulerThread's shutdown happened
+  //      and program should work well(no threads data race and core dump).
+  ASSERT_OK(RebuildPoolWithScheduler(1, 1, 1));
+  unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+  for (int i = 0; i < 2048; i++) {
+    ContinuousIssueTraceStatement(token.get());
+  }
+  SleepFor(MonoDelta::FromMilliseconds(5));
+  pool_->Shutdown();
+}
+
 TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
   ASSERT_OK(RebuildPoolWithMinMax(1, 1));
   pool_->Shutdown();
@@ -190,21 +216,71 @@ TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
 
   ASSERT_OK(RebuildPoolWithScheduler(1, 1));
   unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
-  ASSERT_OK(token->Schedule(&IssueTraceStatement, 200));
+  Atomic32 counter(0);
+  constexpr const int delay_ms = 100;
+  ASSERT_OK(token->Schedule([&counter]() { SimpleTaskMethod(10, &counter); }, delay_ms));
+  SleepFor(MonoDelta::FromMilliseconds(2 * delay_ms));
+  ASSERT_EQ(10, base::subtle::NoBarrier_Load(&counter));
   token->Shutdown();
-  SleepFor(MonoDelta::FromMilliseconds(500));
+  ASSERT_OK(token->Schedule([&counter]() { SimpleTaskMethod(3, &counter); }, delay_ms));
+  SleepFor(MonoDelta::FromMilliseconds(2 * delay_ms));
+  // The result does not change.
+  ASSERT_EQ(10, base::subtle::NoBarrier_Load(&counter));
   pool_->Shutdown();
-  ASSERT_TRUE(token->Schedule(&IssueTraceStatement, 1000).IsServiceUnavailable());
+  // Submit a delayed task, but it will not really executed.
+  ASSERT_TRUE(
+      token->Schedule([&counter]() { SimpleTaskMethod(5, &counter); }, delay_ms).IsIllegalState());
+  // The result does not change.
+  ASSERT_EQ(10, base::subtle::NoBarrier_Load(&counter));
 }
 
 TEST_F(ThreadPoolTest, TokenShutdownBeforeSchedulerExecute) {
   ASSERT_OK(RebuildPoolWithScheduler(1, 1));
   unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
-  ASSERT_OK(token->Schedule(&IssueTraceStatement, 200));
+  Atomic32 counter(0);
+  constexpr const int delay_ms = 200;
+  ASSERT_OK(token->Schedule([&counter]() { SimpleTaskMethod(10, &counter); }, delay_ms));
   token->Shutdown();
+  ASSERT_EQ(0, base::subtle::NoBarrier_Load(&counter));
+  SleepFor(MonoDelta::FromMilliseconds(2 * delay_ms));
+  ASSERT_OK(token->Schedule([&counter]() { SimpleTaskMethod(3, &counter); }, delay_ms));
+  SleepFor(MonoDelta::FromMilliseconds(2 * delay_ms));
+  ASSERT_EQ(0, base::subtle::NoBarrier_Load(&counter));
+  pool_->Shutdown();
+  ASSERT_TRUE(
+      token->Schedule([&counter]() { SimpleTaskMethod(3, &counter); }, delay_ms).IsIllegalState());
+  SleepFor(MonoDelta::FromMilliseconds(2 * delay_ms));
+  ASSERT_EQ(0, base::subtle::NoBarrier_Load(&counter));
+}
+
+TEST_F(ThreadPoolTest, SchedulerShutdownBeforeTokenShutdown) {
+  ASSERT_OK(RebuildPoolWithScheduler(1, 1));
+  unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+  ASSERT_OK(token->Schedule(&IssueTraceStatement, 200));
   SleepFor(MonoDelta::FromMilliseconds(500));
-  ASSERT_TRUE(token->Schedule(&IssueTraceStatement, 1000).IsServiceUnavailable());
   pool_->Shutdown();
+  Status status = token->Schedule(&IssueTraceStatement, 200);
+  ASSERT_TRUE(status.IsIllegalState() || status.IsServiceUnavailable());
+  token->Shutdown();
+}
+
+TEST_F(ThreadPoolTest, SchedulerWithNullToken) {
+  ASSERT_OK(RebuildPoolWithScheduler(1, 1));
+  ASSERT_OK(pool_->Schedule(nullptr, &IssueTraceStatement, MonoTime::Now()));
+
+  Atomic32 counter(0);
+  SimpleTask task(2, &counter);
+
+  ASSERT_OK(pool_->Submit([&counter]() { SimpleTaskMethod(1, &counter); }));
+  ASSERT_OK(pool_->Submit([&task]() { task.Run(); }));
+  // token is null is ok, but the task will ignore.
+  ASSERT_OK(pool_->Schedule(
+      nullptr, [&counter]() { SimpleTaskMethod(3, &counter); }, MonoTime::Now()));
+  // make sure the delayed task execute, the task would be ignored.
+  SleepFor(MonoDelta::FromMilliseconds(200));
+  // make sure all tasks executed in thread pool.
+  pool_->Wait();
+  ASSERT_EQ(1 + 2, base::subtle::NoBarrier_Load(&counter));
 }
 
 TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
@@ -286,7 +362,7 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithSchedulerAndNoMinimum) {
   ASSERT_EQ(0, pool_->num_threads());
   ASSERT_EQ(0, pool_->num_active_threads());
   pool_->Shutdown();
-  ASSERT_EQ(nullptr, pool_->scheduler());
+  ASSERT_EQ(nullptr, pool_->scheduler_);
 }
 
 TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 38e56d423..f8a10758d 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -33,6 +33,7 @@
 #include "kudu/gutil/sysinfo.h"
 #include "kudu/gutil/walltime.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/thread.h"
 #include "kudu/util/trace.h"
@@ -150,9 +151,9 @@ void SchedulerThread::RunLoop() {
     }
 
     for (const auto& task : pending_tasks) {
-      ThreadPoolToken* token = task.thread_pool_token;
+      ThreadPoolToken* token = task.thread_pool_token();
       while (token != nullptr) {
-        Status s = token->Submit(task.f);
+        Status s = token->Submit(task.func());
         if (s.ok()) {
           break;
         }
@@ -258,14 +259,10 @@ void ThreadPoolToken::Shutdown() {
 
 // Submit a task, running after delay_ms delay some time
 Status ThreadPoolToken::Schedule(std::function<void()> f, int64_t delay_ms) {
-  if (PREDICT_FALSE(!MaySubmitNewTasks())) {
-    return Status::ServiceUnavailable("Thread pool token was shut down");
-  }
   CHECK(mode() == ThreadPool::ExecutionMode::SERIAL);
-  MonoTime excute_time = MonoTime::Now();
-  excute_time.AddDelta(MonoDelta::FromMilliseconds(delay_ms));
-  pool_->scheduler()->Schedule(this, std::move(f), excute_time);
-  return Status::OK();
+  MonoTime execute_time = MonoTime::Now();
+  execute_time.AddDelta(MonoDelta::FromMilliseconds(delay_ms));
+  return pool_->Schedule(this, std::move(f), execute_time);
 }
 
 void ThreadPoolToken::Wait() {
@@ -417,10 +414,12 @@ Status ThreadPool::Init() {
 }
 
 void ThreadPool::Shutdown() {
-  if (scheduler_) {
-    scheduler_->Shutdown();
-    delete scheduler_;
-    scheduler_ = nullptr;
+  {
+    MutexLock l(scheduler_lock_);
+    if (scheduler_) {
+      delete scheduler_;
+      scheduler_ = nullptr;
+    }
   }
 
   MutexLock unique_lock(lock_);
@@ -525,6 +524,17 @@ Status ThreadPool::Submit(std::function<void()> f) {
   return DoSubmit(std::move(f), tokenless_.get());
 }
 
+Status ThreadPool::Schedule(ThreadPoolToken* token,
+                            std::function<void()> f,
+                            MonoTime execute_time) {
+  MutexLock l(scheduler_lock_);
+  if (!scheduler_) {
+    return Status::IllegalState("scheduler thread has been shutdown");
+  }
+  scheduler_->Schedule(token, std::move(f), execute_time);
+  return Status::OK();
+}
+
 Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) {
   DCHECK(token);
   const MonoTime submit_time = MonoTime::Now();
@@ -655,18 +665,6 @@ void ThreadPool::Wait() {
   }
 }
 
-void ThreadPool::WaitForScheduler() {
-  MutexLock unique_lock(lock_);
-  CheckNotPoolThreadUnlocked();
-  // Generally, ignore scheduler's pending tasks, but
-  // set ScheduledTaskWaitType::WAIT at unit tests.
-  bool wait_scheduler = (scheduler_ != nullptr);
-  while (total_queued_tasks_ > 0 || active_threads_ > 0 ||
-        (wait_scheduler && !scheduler_->empty())) {
-    idle_cond_.Wait();
-  }
-}
-
 bool ThreadPool::WaitUntil(const MonoTime& until) {
   MutexLock unique_lock(lock_);
   CheckNotPoolThreadUnlocked();
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 9b5400503..6f4119f86 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -184,11 +184,19 @@ public:
   // Shutdown the thread and clear the pending tasks.
   Status Shutdown();
 
-  struct SchedulerTask {
-    ThreadPoolToken* thread_pool_token;
-    std::function<void()> f;
+  class SchedulerTask {
+   public:
+    SchedulerTask(ThreadPoolToken* thread_pool_token, std::function<void()> func)
+        : thread_pool_token_(thread_pool_token), func_(std::move(func)) {}
+    ThreadPoolToken* thread_pool_token() const { return thread_pool_token_; }
+    const std::function<void()>& func() const { return func_; }
+
+   private:
+    ThreadPoolToken* thread_pool_token_;
+    std::function<void()> func_;
   };
 
+  // Submit a task to 'token', execute the task at execute_time.
   void Schedule(ThreadPoolToken* token,
                 std::function<void()> f,
                 const MonoTime& execute_time) {
@@ -273,6 +281,9 @@ class ThreadPool {
   // Submits a new task.
   Status Submit(std::function<void()> f) WARN_UNUSED_RESULT;
 
+  // Submit a task to 'token', execute the task at execute_time.
+  Status Schedule(ThreadPoolToken* token, std::function<void()> f, MonoTime execute_time);
+
   // Waits until all the tasks are completed.
   void Wait();
 
@@ -500,13 +511,6 @@ class ThreadPool {
   //  * a new task has been scheduled (i.e. added into the queue)
   void NotifyLoadMeterUnlocked(const MonoDelta& queue_time = MonoDelta());
 
-  SchedulerThread* scheduler() {
-    return scheduler_;
-  }
-
-  // Waits until all the tasks and scheduler's tasks completed.
-  void WaitForScheduler();
-
   // Return the number of threads currently running for this thread pool.
   // Used by tests to avoid tsan test case down.
   int num_active_threads() {
@@ -607,8 +611,12 @@ class ThreadPool {
   // Metrics for the entire thread pool.
   const ThreadPoolMetrics metrics_;
 
+  // Protect 'scheduler_'.
+  mutable Mutex scheduler_lock_;
+
   // TimerThread is used for some scenarios, such as
   // make a task delay execution.
+  // It would shut down and be deleted firstly when 'Shutdown'.
   SchedulerThread* scheduler_;
   uint32_t schedule_period_ms_;