You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/07/26 15:35:41 UTC

[kudu] branch master updated: [threadpool] Fix scheduler thread a coredump bug

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e48a43c4 [threadpool] Fix scheduler thread a coredump bug
6e48a43c4 is described below

commit 6e48a43c4505b4a4d281f7f732c63f51cec6bc63
Author: shenxingwuying <sh...@gmail.com>
AuthorDate: Tue Jun 28 18:13:37 2022 +0800

    [threadpool] Fix scheduler thread a coredump bug
    
    Scheduler thread will schedule some tasks.
    If a SchedulerTask's theadpool token is shutdown,
    Scheduling the task would coredump.
    
    The patch check the token whether is nullptr,
    and check the Submit(...) whether is shutdowning.
    And add an unit test.
    
    Change-Id: Ie5bc5511a745f3dc12dfe1a6a1813ece41ccc2a8
    Reviewed-on: http://gerrit.cloudera.org:8080/18674
    Reviewed-by: Yingchun Lai <ac...@gmail.com>
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <al...@apache.org>
---
 src/kudu/util/threadpool-test.cc | 17 +++++++++++--
 src/kudu/util/threadpool.cc      | 32 ++++++++++++++++++-------
 src/kudu/util/threadpool.h       | 52 +++++++++++++++++++++++++++++++++-------
 3 files changed, 82 insertions(+), 19 deletions(-)

diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 31e57723c..25dc561a9 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -190,10 +190,23 @@ TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
 
   ASSERT_OK(RebuildPoolWithScheduler(1, 1));
   unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+  ASSERT_OK(token->Schedule(&IssueTraceStatement, 200));
+  token->Shutdown();
+  SleepFor(MonoDelta::FromMilliseconds(500));
   pool_->Shutdown();
   ASSERT_TRUE(token->Schedule(&IssueTraceStatement, 1000).IsServiceUnavailable());
 }
 
+TEST_F(ThreadPoolTest, TokenShutdownBeforeSchedulerExecute) {
+  ASSERT_OK(RebuildPoolWithScheduler(1, 1));
+  unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+  ASSERT_OK(token->Schedule(&IssueTraceStatement, 200));
+  token->Shutdown();
+  SleepFor(MonoDelta::FromMilliseconds(500));
+  ASSERT_TRUE(token->Schedule(&IssueTraceStatement, 1000).IsServiceUnavailable());
+  pool_->Shutdown();
+}
+
 TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
   constexpr int kIdleTimeoutMs = 1;
   ASSERT_OK(RebuildPoolWithBuilder(
@@ -255,7 +268,6 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithSchedulerAndNoMinimum) {
   unique_ptr<ThreadPoolToken> token = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
   ASSERT_OK(token->Submit([&latch]() { latch.Wait(); }));
   ASSERT_EQ(4, pool_->num_threads());
-
   ASSERT_OK(token->Schedule([&latch]() { latch.Wait(); }, kDelayMs));
   ASSERT_OK(token->Schedule([&latch]() { latch.Wait(); }, static_cast<int>(kDelayMs * 1.2)));
   ASSERT_EQ(4, pool_->num_threads());
@@ -1052,8 +1064,9 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) {
   const int kNumSubmissions = 20;
   Random r(SeedRandom());
   vector<unique_ptr<ThreadPoolToken>> tokens;
+  tokens.resize(kNumTokens);
   for (int i = 0; i < kNumTokens; i++) {
-    tokens.emplace_back(pool_->NewToken(GetParam()));
+    tokens[i] = pool_->NewToken(GetParam());
   }
 
   atomic<int32_t> v(0);
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 95c16178c..38e56d423 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -142,16 +142,32 @@ void SchedulerThread::RunLoop() {
     vector<SchedulerTask> pending_tasks;
     {
       MutexLock auto_lock(mutex_);
-      auto upper_it = tasks_.upper_bound(now);
-      for (auto it = tasks_.begin(); it != upper_it; it++) {
+      auto upper_it = future_tasks_.upper_bound(now);
+      for (auto it = future_tasks_.begin(); it != upper_it; it++) {
         pending_tasks.emplace_back(std::move(it->second));
       }
-      tasks_.erase(tasks_.begin(), upper_it);
+      future_tasks_.erase(future_tasks_.begin(), upper_it);
     }
 
     for (const auto& task : pending_tasks) {
-      ThreadPoolToken* token = task.thread_pool_token_;
-      CHECK_OK(token->Submit(task.f));
+      ThreadPoolToken* token = task.thread_pool_token;
+      while (token != nullptr) {
+        Status s = token->Submit(task.f);
+        if (s.ok()) {
+          break;
+        }
+        DCHECK(s.IsServiceUnavailable())
+            << Substitute("threadpool token Submit status: $0", s.ToString());
+
+        if (!token->MaySubmitNewTasks()) {
+          // threadpool token is Shutdown, skip the task.
+          break;
+        }
+        // If developers use ThreadPoolToken::Schedule(...) too frequent, blocking queue's
+        // capacity will be full, then retry submit the task again.
+        VLOG(1) << Substitute("threadpool token Submit status: $0, retry the task", s.ToString());
+        SleepFor(MonoDelta::FromMilliseconds(1));
+      }
     }
   }
 }
@@ -401,14 +417,14 @@ Status ThreadPool::Init() {
 }
 
 void ThreadPool::Shutdown() {
-  MutexLock unique_lock(lock_);
-  CheckNotPoolThreadUnlocked();
-
   if (scheduler_) {
     scheduler_->Shutdown();
     delete scheduler_;
     scheduler_ = nullptr;
   }
+
+  MutexLock unique_lock(lock_);
+  CheckNotPoolThreadUnlocked();
   // Note: this is the same error seen at submission if the pool is at
   // capacity, so clients can't tell them apart. This isn't really a practical
   // concern though because shutting down a pool typically requires clients to
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 8c68a88fa..9b5400503 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -140,13 +140,38 @@ class ThreadPoolBuilder {
   MonoDelta queue_overload_threshold_;
   ThreadPoolMetrics metrics_;
   bool enable_scheduler_;
-  uint32_t schedule_period_ms_;
+  uint32_t schedule_period_ms_ = 100;
 
   DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder);
 };
 
-
 // SchedulerThread for asynchronized delay task execution.
+//
+// Notice: Take care of Shutdown's order.
+//
+// Firstly, shutdown the ThreadPool and wait SchedulerThread stop.
+// Secondly, delete all the thread pool tokens to release all tokens for the ThreadPool.
+// Lastly, release the ThreadPool.
+//
+// For example:
+//   static void Func(int n) { ... }
+//
+//   unique_ptr<ThreadPool> scheduler_pool;
+//   ThreadPoolBuilder("scheduler")
+//            .set_min_threads(1)
+//            .set_max_threads(4)
+//            .set_enable_scheduler()
+//            .set_schedule_period_ms(100)
+//            .Build(&scheduler_pool);
+//   unique_ptr<ThreadPoolToken> token =
+//            scheduler_pool.NewToken(ThreadPool::ExecutionMode::SERIAL);
+//   RETURN_NOT_OK(token->Schedule([]() { Func(10); }, 1000));
+//
+// The order of Shutdown and Release should adhere to the notice above, for example:
+//   scheduler_pool.Shutdown(); // shutdown scheduler_pool and wait scheduler thread stop.
+//   // The 'token' and 'scheduler_pool' will be released automatically in reverse order of
+//   // their declaration: that's the proper sequence as prescribed above.
+//
 class SchedulerThread {
 public:
   explicit SchedulerThread(std::string thread_pool_name, uint32_t schedule_period_ms);
@@ -160,18 +185,20 @@ public:
   Status Shutdown();
 
   struct SchedulerTask {
-    ThreadPoolToken* thread_pool_token_;
+    ThreadPoolToken* thread_pool_token;
     std::function<void()> f;
   };
 
-  void Schedule(ThreadPoolToken* token, std::function<void()> f, const MonoTime& execute_time) {
+  void Schedule(ThreadPoolToken* token,
+                std::function<void()> f,
+                const MonoTime& execute_time) {
     MutexLock unique_lock(mutex_);
-    tasks_.insert({execute_time, SchedulerTask({token, std::move(f)})});
+    future_tasks_.insert({execute_time, SchedulerTask({token, std::move(f)})});
   }
 
   bool empty() const {
     MutexLock unique_lock(mutex_);
-    return tasks_.empty();
+    return future_tasks_.empty();
   }
 
 private:
@@ -181,14 +208,21 @@ private:
   void RunLoop();
 
   const std::string thread_pool_name_;
+
   // scheduler's period checking time.
   const uint32_t schedule_period_ms_;
+
   CountDownLatch shutdown_;
-  // Protect `tasks_` data race.
+
+  // Protect `future_tasks_` data race.
   mutable Mutex mutex_;
 
+  // Thread to drive `future_tasks_` by absolutely time.
   scoped_refptr<Thread> thread_;
-  std::multimap<MonoTime, SchedulerTask> tasks_;
+
+  // Tasks order by their execute timepoint. The `thread_` would execute tasks,
+  // the tasks match the condition its timepoint <= now timepoint.
+  std::multimap<MonoTime, SchedulerTask> future_tasks_;
 };
 
 // Thread pool with a variable number of threads.
@@ -564,7 +598,7 @@ class ThreadPool {
   boost::intrusive::list<IdleThread> idle_threads_; // NOLINT(build/include_what_you_use)
 
   // ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
-  std::unique_ptr<ThreadPoolToken> tokenless_;
+  std::shared_ptr<ThreadPoolToken> tokenless_;
 
   // The meter to track whether the pool's queue is stalled/overloaded.
   // It's nullptr/none if the queue overload threshold is unset.