You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/06/11 22:14:51 UTC
[kudu] branch master updated: [util] add
BlockingQueueTest.MultiThreadPerf test scenario
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 621cc24 [util] add BlockingQueueTest.MultiThreadPerf test scenario
621cc24 is described below
commit 621cc24b19b0b1bf5f11d31791a3a115be62b5af
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed Jun 10 20:11:10 2020 -0700
[util] add BlockingQueueTest.MultiThreadPerf test scenario
Added a new performance test scenario for BlockingQueue.
Change-Id: If52e4e15d3c72ad3479334f0f3fa8bb10b8c50c6
Reviewed-on: http://gerrit.cloudera.org:8080/16064
Reviewed-by: Bankim Bhavsar <ba...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/util/blocking_queue-test.cc | 157 ++++++++++++++++++++++++++++++++++-
1 file changed, 156 insertions(+), 1 deletion(-)
diff --git a/src/kudu/util/blocking_queue-test.cc b/src/kudu/util/blocking_queue-test.cc
index 38d38dd..7fbbba7 100644
--- a/src/kudu/util/blocking_queue-test.cc
+++ b/src/kudu/util/blocking_queue-test.cc
@@ -17,28 +17,46 @@
#include "kudu/util/blocking_queue.h"
+#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <list>
#include <map>
#include <memory>
+#include <ostream>
+#include <numeric>
#include <string>
#include <thread>
#include <vector>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
#include <gtest/gtest.h>
+#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
-
+#include "kudu/util/test_util.h"
+
+DEFINE_uint32(num_blocking_writers, 3,
+ "number of threads calling BlockingQueue::BlockingPut()");
+DEFINE_uint32(num_non_blocking_writers, 2,
+ "number of threads calling BlockingQueue::Put()");
+DEFINE_uint32(num_blocking_readers, 5,
+ "number of threads calling BlockingQueue::BlockingGet()");
+DEFINE_uint32(runtime_sec, 5, "duration of the test (seconds)");
+DEFINE_uint32(queue_capacity, 64, "capacity of the queue (number of elements)");
+
+using std::accumulate;
using std::string;
using std::thread;
using std::unique_ptr;
using std::vector;
+using strings::Substitute;
namespace kudu {
@@ -332,4 +350,141 @@ TEST(BlockingQueueTest, TestMultipleThreads) {
test.Run();
}
+class BlockingQueueMultiThreadPerfTest : public ::testing::Test {
+ public:
+ void BlockingGetTask(size_t* counter) {
+ barrier_.CountDown();
+ barrier_.Wait();
+
+ uint64_t elem = 0;
+ while (true) {
+ auto s = queue_.BlockingGet(&elem);
+ if (!s.ok()) {
+ CHECK(s.IsAborted()) << s.ToString();
+ return;
+ }
+ ++(*counter);
+ }
+ }
+
+ void BlockingPutTask(size_t* counter) {
+ barrier_.CountDown();
+ barrier_.Wait();
+
+ uint64_t elem = 0;
+ while (true) {
+ auto s = queue_.BlockingPut(elem++);
+ if (!s.ok()) {
+ CHECK(s.IsAborted()) << s.ToString();
+ return;
+ }
+ ++(*counter);
+ }
+ }
+
+ void NonBlockingPutTask(size_t* counter) {
+ barrier_.CountDown();
+ barrier_.Wait();
+
+ uint64_t elem = 0;
+ while (true) {
+ auto result = queue_.Put(elem++);
+ if (result == QUEUE_SHUTDOWN) {
+ return;
+ }
+ switch (result) {
+ case QUEUE_SUCCESS:
+ ++(*counter);
+ continue;
+ case QUEUE_FULL:
+ SleepFor(MonoDelta::FromMicroseconds(25));
+ continue;
+ default:
+ LOG(FATAL) << "unexpected queue status: " << result;
+ }
+ }
+ }
+
+ protected:
+ BlockingQueueMultiThreadPerfTest()
+ : num_blocking_writers_(FLAGS_num_blocking_writers),
+ num_non_blocking_writers_(FLAGS_num_non_blocking_writers),
+ num_blocking_readers_(FLAGS_num_blocking_readers),
+ runtime_(MonoDelta::FromSeconds(FLAGS_runtime_sec)),
+ queue_(FLAGS_queue_capacity),
+ barrier_(num_blocking_writers_ +
+ num_non_blocking_writers_ +
+ num_blocking_readers_) {
+ }
+
+ const size_t num_blocking_writers_;
+ const size_t num_non_blocking_writers_;
+ const size_t num_blocking_readers_;
+ const MonoDelta runtime_;
+ BlockingQueue<uint64_t> queue_;
+ vector<thread> threads_;
+ CountDownLatch barrier_;
+};
+
+// This is a test scenario to assess the performance of BlockingQueue in the
+// terms of call rates when multiple concurrent writers and readers are present.
+TEST_F(BlockingQueueMultiThreadPerfTest, RequestRates) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ vector<size_t> blocking_read_counts(num_blocking_readers_, 0);
+ for (size_t i = 0; i < num_blocking_readers_; ++i) {
+ threads_.emplace_back(&BlockingQueueMultiThreadPerfTest::BlockingGetTask,
+ this, &blocking_read_counts[i]);
+ }
+
+ vector<size_t> blocking_write_counts(num_blocking_writers_, 0);
+ for (size_t i = 0; i < num_blocking_writers_; ++i) {
+ threads_.emplace_back(&BlockingQueueMultiThreadPerfTest::BlockingPutTask,
+ this, &blocking_write_counts[i]);
+ }
+
+ vector<size_t> non_blocking_write_counts(num_non_blocking_writers_, 0);
+ for (size_t i = 0; i < num_non_blocking_writers_; ++i) {
+ threads_.emplace_back(&BlockingQueueMultiThreadPerfTest::NonBlockingPutTask,
+ this, &non_blocking_write_counts[i]);
+ }
+
+ SleepFor(runtime_);
+ queue_.Shutdown();
+
+ for_each(threads_.begin(), threads_.end(), [](thread& t) { t.join(); });
+
+ const auto blocking_reads_num = accumulate(
+ blocking_read_counts.begin(), blocking_read_counts.end(), 0UL);
+ const auto blocking_writes_num = accumulate(
+ blocking_write_counts.begin(), blocking_write_counts.end(), 0UL);
+ const auto non_blocking_writes_num = accumulate(
+ non_blocking_write_counts.begin(), non_blocking_write_counts.end(), 0UL);
+
+ LOG(INFO) << "number of successful BlockingGet() calls: "
+ << blocking_reads_num;
+ LOG(INFO) << "number of successful BlockingPut() calls: "
+ << blocking_writes_num;
+ LOG(INFO) << "number of successful Put() calls: "
+ << non_blocking_writes_num;
+
+ LOG(INFO) << Substitute(
+ "BlockingGet() rate: $0 calls/sec",
+ static_cast<double>(blocking_reads_num) / runtime_.ToSeconds());
+ LOG(INFO) << Substitute(
+ "BlockingPut() rate: $0 calls/sec",
+ static_cast<double>(blocking_writes_num) / runtime_.ToSeconds());
+ LOG(INFO) << Substitute(
+ "Put() (non-blocking) rate: $0 calls/sec",
+ static_cast<double>(non_blocking_writes_num) / runtime_.ToSeconds());
+ LOG(INFO) << Substitute(
+ "total Blocking{Get,Put}() rate: $0 calls/sec",
+ static_cast<double>(blocking_reads_num + blocking_writes_num) / runtime_.ToSeconds());
+ LOG(INFO) << Substitute(
+ "total rate: $0 calls/sec",
+ static_cast<double>(blocking_reads_num +
+ blocking_writes_num +
+ non_blocking_writes_num) / runtime_.ToSeconds());
+}
+
} // namespace kudu