You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/09/03 22:14:14 UTC

[kudu] branch master updated: [util] add performance test scenario for ThreadPool

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 45d1e66  [util] add performance test scenario for ThreadPool
45d1e66 is described below

commit 45d1e6612f5cb273137f7240136f78b5a944d9e7
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Tue Sep 1 17:13:50 2020 -0700

    [util] add performance test scenario for ThreadPool
    
    Added a test scenario to assess ThreadPool's performance in the absence
    and the presence of QueueLoadMeter.  The newly added scenario uses
    a mix of serial and concurrent ThreadPool tokens.
    
    Below are the results when running a RELEASE build on a 48-core
    E5-2680 Xeon @ 2.50GHz machine:
    
      Processed 600000 tasks in real 1.366s       user 0.933s sys 54.512s
      Processing rate (QueueLoadMeter disabled): 439345.75398195261 tasks/sec
    
      Processed 600000 tasks in real 2.484s       user 1.815s sys 111.655s
      Processing rate (QueueLoadMeter  enabled): 241564.74471673707 tasks/sec
    
    Change-Id: I470d204f3b3a22407361a920882a93d6f21dd274
    Reviewed-on: http://gerrit.cloudera.org:8080/16401
    Tested-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/util/threadpool-test.cc | 88 ++++++++++++++++++++++++++++++++++++++--
 1 file changed, 84 insertions(+), 4 deletions(-)

diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index cecd01b..7e98818 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -19,6 +19,7 @@
 
 #include <unistd.h>
 
+#include <algorithm>
 #include <atomic>
 #include <cstdint>
 #include <functional>
@@ -29,7 +30,6 @@
 #include <ostream>
 #include <string>
 #include <thread>
-#include <utility>
 #include <vector>
 
 #include <boost/smart_ptr/shared_ptr.hpp>
@@ -50,6 +50,7 @@
 #include "kudu/util/random.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 #include "kudu/util/trace.h"
@@ -673,7 +674,8 @@ TEST_F(ThreadPoolTest, QueueLoadMeter) {
 
   // Another mixed case: submit many long running tasks via a serial token
   // and many long running tasks that can run concurrently. The queue should
-  // become overloaded.
+  // become overloaded once the tasks in the head of the queue is kept there
+  // for longer than kQueueTimeThresholdMs time interval.
   {
     constexpr auto kNumTokens = 1;
     vector<unique_ptr<ThreadPoolToken>> tokens;
@@ -692,8 +694,8 @@ TEST_F(ThreadPoolTest, QueueLoadMeter) {
     }
     ASSERT_FALSE(pool_->QueueOverloaded());
 
-    // Add several light tasks in addition to the scheduled serial ones. This
-    // should not overload the queue.
+    // Add the heavy tasks in addition to the scheduled serial ones. The queue
+    // should become overloaded after kQueueTimeThresholdMs time interval.
     for (auto i = 0; i < 2 * kMaxThreads; ++i) {
       ASSERT_OK(pool_->Submit([](){
         SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
@@ -706,6 +708,84 @@ TEST_F(ThreadPoolTest, QueueLoadMeter) {
   }
 }
 
+// A test for various scenarios to assess ThreadPool's performance.
+class ThreadPoolPerformanceTest :
+    public ThreadPoolTest,
+    public testing::WithParamInterface<bool> {
+};
+INSTANTIATE_TEST_CASE_P(LoadMeterPresence, ThreadPoolPerformanceTest,
+                        ::testing::Values(false, true));
+
+// A scenario to assess ThreadPool's performance in the absence/presence
+// of the QueueLoadMeter. The scenario uses a mix of serial and concurrent
+// task tokens.
+TEST_P(ThreadPoolPerformanceTest, ConcurrentAndSerialTasksMix) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  constexpr auto kNumTasksPerSchedulerThread = 25000;
+  const auto kNumCPUs = base::NumCPUs();
+  const auto kMaxThreads = std::max(1, kNumCPUs / 2);
+  const auto kNumSchedulerThreads = std::max(2, kNumCPUs / 2);
+  const auto kNumSerialTokens = kNumSchedulerThreads / 4;
+  const auto load_meter_enabled = GetParam();
+
+  ThreadPoolBuilder builder(kDefaultPoolName);
+  builder.set_min_threads(kMaxThreads);
+  builder.set_max_threads(kMaxThreads);
+  if (load_meter_enabled) {
+    // The exact value of the queue overload threshold isn't important in this
+    // test scenario. With low enough setting and huge number of scheduled
+    // tasks, this guarantees that the queue becomes overloaded and all code
+    // paths in QueueLoadMeter are covered.
+    builder.set_queue_overload_threshold(MonoDelta::FromMilliseconds(1));
+  }
+  ASSERT_OK(RebuildPoolWithBuilder(builder));
+
+  vector<unique_ptr<ThreadPoolToken>> tokens;
+  tokens.reserve(kNumSchedulerThreads);
+  for (auto i = 0; i < kNumSchedulerThreads; ++i) {
+    tokens.emplace_back(pool_->NewToken(
+        i < kNumSerialTokens ? ThreadPool::ExecutionMode::SERIAL
+                             : ThreadPool::ExecutionMode::CONCURRENT));
+  }
+
+  vector<thread> threads;
+  threads.reserve(kNumSchedulerThreads);
+  Barrier b(kNumSchedulerThreads + 1);
+
+  for (auto si = 0; si < kNumSchedulerThreads; ++si) {
+    threads.emplace_back([&, si]() {
+      unique_ptr<ThreadPoolToken> token(pool_->NewToken(
+          (si < kNumSerialTokens) ? ThreadPool::ExecutionMode::SERIAL
+                                  : ThreadPool::ExecutionMode::CONCURRENT));
+      b.Wait();
+      for (auto i = 0; i < kNumTasksPerSchedulerThread; ++i) {
+        CHECK_OK(token->Submit([](){}));
+      }
+    });
+  }
+
+  Stopwatch sw(Stopwatch::ALL_THREADS);
+  b.Wait();
+  sw.start();
+  pool_->Wait();
+  sw.stop();
+
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  const auto time_elapsed = sw.elapsed();
+  LOG(INFO) << Substitute("Processed $0 tasks in $1",
+                          kNumSchedulerThreads * kNumTasksPerSchedulerThread,
+                          time_elapsed.ToString());
+  LOG(INFO) << Substitute(
+      "Processing rate (QueueLoadMeter $0): $1 tasks/sec",
+      load_meter_enabled ? " enabled" : "disabled",
+      static_cast<double>(kNumSchedulerThreads * kNumTasksPerSchedulerThread) /
+          time_elapsed.wall_seconds());
+}
+
 // Test that a thread pool will crash if asked to run its own blocking
 // functions in a pool thread.
 //