You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/06/11 22:14:51 UTC

[kudu] branch master updated: [util] add BlockingQueueTest.MultiThreadPerf test scenario

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 621cc24  [util] add BlockingQueueTest.MultiThreadPerf test scenario
621cc24 is described below

commit 621cc24b19b0b1bf5f11d31791a3a115be62b5af
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed Jun 10 20:11:10 2020 -0700

    [util] add BlockingQueueTest.MultiThreadPerf test scenario
    
    Added a new performance test scenario for BlockingQueue.
    
    Change-Id: If52e4e15d3c72ad3479334f0f3fa8bb10b8c50c6
    Reviewed-on: http://gerrit.cloudera.org:8080/16064
    Reviewed-by: Bankim Bhavsar <ba...@cloudera.com>
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/util/blocking_queue-test.cc | 157 ++++++++++++++++++++++++++++++++++-
 1 file changed, 156 insertions(+), 1 deletion(-)

diff --git a/src/kudu/util/blocking_queue-test.cc b/src/kudu/util/blocking_queue-test.cc
index 38d38dd..7fbbba7 100644
--- a/src/kudu/util/blocking_queue-test.cc
+++ b/src/kudu/util/blocking_queue-test.cc
@@ -17,28 +17,46 @@
 
 #include "kudu/util/blocking_queue.h"
 
+#include <algorithm>
 #include <cstddef>
 #include <cstdint>
 #include <list>
 #include <map>
 #include <memory>
+#include <ostream>
+#include <numeric>
 #include <string>
 #include <thread>
 #include <vector>
 
+#include <gflags/gflags.h>
+#include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/mutex.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
-
+#include "kudu/util/test_util.h"
+
+DEFINE_uint32(num_blocking_writers, 3,
+              "number of threads calling BlockingQueue::BlockingPut()");
+DEFINE_uint32(num_non_blocking_writers, 2,
+              "number of threads calling BlockingQueue::Put()");
+DEFINE_uint32(num_blocking_readers, 5,
+              "number of threads calling BlockingQueue::BlockingGet()");
+DEFINE_uint32(runtime_sec, 5, "duration of the test (seconds)");
+DEFINE_uint32(queue_capacity, 64, "capacity of the queue (number of elements)");
+
+using std::accumulate;
 using std::string;
 using std::thread;
 using std::unique_ptr;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 
@@ -332,4 +350,141 @@ TEST(BlockingQueueTest, TestMultipleThreads) {
   test.Run();
 }
 
+class BlockingQueueMultiThreadPerfTest : public ::testing::Test {
+ public:
+  void BlockingGetTask(size_t* counter) {
+    barrier_.CountDown();
+    barrier_.Wait();
+
+    uint64_t elem = 0;
+    while (true) {
+      auto s = queue_.BlockingGet(&elem);
+      if (!s.ok()) {
+        CHECK(s.IsAborted()) << s.ToString();
+        return;
+      }
+      ++(*counter);
+    }
+  }
+
+  void BlockingPutTask(size_t* counter) {
+    barrier_.CountDown();
+    barrier_.Wait();
+
+    uint64_t elem = 0;
+    while (true) {
+      auto s = queue_.BlockingPut(elem++);
+      if (!s.ok()) {
+        CHECK(s.IsAborted()) << s.ToString();
+        return;
+      }
+      ++(*counter);
+    }
+  }
+
+  void NonBlockingPutTask(size_t* counter) {
+    barrier_.CountDown();
+    barrier_.Wait();
+
+    uint64_t elem = 0;
+    while (true) {
+      auto result = queue_.Put(elem++);
+      if (result == QUEUE_SHUTDOWN) {
+        return;
+      }
+      switch (result) {
+        case QUEUE_SUCCESS:
+          ++(*counter);
+          continue;
+        case QUEUE_FULL:
+          SleepFor(MonoDelta::FromMicroseconds(25));
+          continue;
+        default:
+          LOG(FATAL) << "unexpected queue status: " << result;
+      }
+    }
+  }
+
+ protected:
+  BlockingQueueMultiThreadPerfTest()
+      : num_blocking_writers_(FLAGS_num_blocking_writers),
+        num_non_blocking_writers_(FLAGS_num_non_blocking_writers),
+        num_blocking_readers_(FLAGS_num_blocking_readers),
+        runtime_(MonoDelta::FromSeconds(FLAGS_runtime_sec)),
+        queue_(FLAGS_queue_capacity),
+        barrier_(num_blocking_writers_ +
+                 num_non_blocking_writers_ +
+                 num_blocking_readers_) {
+  }
+
+  const size_t num_blocking_writers_;
+  const size_t num_non_blocking_writers_;
+  const size_t num_blocking_readers_;
+  const MonoDelta runtime_;
+  BlockingQueue<uint64_t> queue_;
+  vector<thread> threads_;
+  CountDownLatch barrier_;
+};
+
+// This is a test scenario to assess the performance of BlockingQueue in the
+// terms of call rates when multiple concurrent writers and readers are present.
+TEST_F(BlockingQueueMultiThreadPerfTest, RequestRates) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  vector<size_t> blocking_read_counts(num_blocking_readers_, 0);
+  for (size_t i = 0; i < num_blocking_readers_; ++i) {
+    threads_.emplace_back(&BlockingQueueMultiThreadPerfTest::BlockingGetTask,
+                          this, &blocking_read_counts[i]);
+  }
+
+  vector<size_t> blocking_write_counts(num_blocking_writers_, 0);
+  for (size_t i = 0; i < num_blocking_writers_; ++i) {
+    threads_.emplace_back(&BlockingQueueMultiThreadPerfTest::BlockingPutTask,
+                          this, &blocking_write_counts[i]);
+  }
+
+  vector<size_t> non_blocking_write_counts(num_non_blocking_writers_, 0);
+  for (size_t i = 0; i < num_non_blocking_writers_; ++i) {
+    threads_.emplace_back(&BlockingQueueMultiThreadPerfTest::NonBlockingPutTask,
+                          this, &non_blocking_write_counts[i]);
+  }
+
+  SleepFor(runtime_);
+  queue_.Shutdown();
+
+  for_each(threads_.begin(), threads_.end(), [](thread& t) { t.join(); });
+
+  const auto blocking_reads_num = accumulate(
+      blocking_read_counts.begin(), blocking_read_counts.end(), 0UL);
+  const auto blocking_writes_num = accumulate(
+      blocking_write_counts.begin(), blocking_write_counts.end(), 0UL);
+  const auto non_blocking_writes_num = accumulate(
+      non_blocking_write_counts.begin(), non_blocking_write_counts.end(), 0UL);
+
+  LOG(INFO) << "number of successful BlockingGet() calls: "
+            << blocking_reads_num;
+  LOG(INFO) << "number of successful BlockingPut() calls: "
+            << blocking_writes_num;
+  LOG(INFO) << "number of successful Put() calls: "
+            << non_blocking_writes_num;
+
+  LOG(INFO) << Substitute(
+      "BlockingGet() rate: $0 calls/sec",
+      static_cast<double>(blocking_reads_num) / runtime_.ToSeconds());
+  LOG(INFO) << Substitute(
+      "BlockingPut() rate: $0 calls/sec",
+      static_cast<double>(blocking_writes_num) / runtime_.ToSeconds());
+  LOG(INFO) << Substitute(
+      "Put() (non-blocking) rate: $0 calls/sec",
+      static_cast<double>(non_blocking_writes_num) / runtime_.ToSeconds());
+  LOG(INFO) << Substitute(
+      "total Blocking{Get,Put}() rate: $0 calls/sec",
+      static_cast<double>(blocking_reads_num + blocking_writes_num) / runtime_.ToSeconds());
+  LOG(INFO) << Substitute(
+      "total rate: $0 calls/sec",
+      static_cast<double>(blocking_reads_num +
+                          blocking_writes_num +
+                          non_blocking_writes_num) / runtime_.ToSeconds());
+}
+
 }  // namespace kudu