You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/06/12 07:31:00 UTC
[kudu] branch master updated: [util] improved performance of
BlockingQueue
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 15af717 [util] improved performance of BlockingQueue
15af717 is described below
commit 15af717e8c200d4e7a33a25171a6bfe58d70aa65
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Tue Jun 9 14:47:15 2020 -0700
[util] improved performance of BlockingQueue
Updated the interface of the BlockingQueue adding methods with the move
semantics. Switched from std::list to std::deque for the underlying
queue. Other minor cleanup.
This update provides measurable performance boost for BlockingQueue
as per BlockingQueueTest.MultiThreadPerf scenario of the
blocking_queue-test. I ran the following 3 times on the same
machine (RELEASE build):
./bin/blocking_queue-test --gtest_filter='BlockingQueueTest.MultiThreadPerf' --num_blocking_writers=4 --num_blocking_readers=4 --num_non_blocking_writers=0
Before this patch:
BlockingGet() rate: 238495 calls/sec
BlockingPut() rate: 238495 calls/sec
total Blocking{Get,Put}() rate: 476990 calls/sec
BlockingGet() rate: 223278 calls/sec
BlockingPut() rate: 223278 calls/sec
total Blocking{Get,Put}() rate: 446556 calls/sec
BlockingGet() rate: 227804 calls/sec
BlockingPut() rate: 227804 calls/sec
total Blocking{Get,Put}() rate: 455608 calls/sec
After this patch:
BlockingGet() rate: 328264 calls/sec
BlockingPut() rate: 328264 calls/sec
total Blocking{Get,Put}() rate: 656528 calls/sec
BlockingGet() rate: 312607.4 calls/sec
BlockingPut() rate: 312607.4 calls/sec
total Blocking{Get,Put}() rate: 625214.8 calls/sec
BlockingGet() rate: 292966.4 calls/sec
BlockingPut() rate: 292966.4 calls/sec
total Blocking{Get,Put}() rate: 585932.8 calls/sec
========================================================================
The case of non-symmetric writes/reads now looks more perfromant as well:
./bin/blocking_queue-test --gtest_filter='*BlockingQueueMultiThreadPerfTest*' --num_blocking_writers=2 --num_blocking_readers=1 --num_non_blocking_writers=0
Before this patch:
BlockingGet() rate: 399823.8 calls/sec
BlockingPut() rate: 399823.8 calls/sec
total Blocking{Get,Put}() rate: 799647.6 calls/sec
After this patch:
BlockingGet() rate: 640630.6 calls/sec
BlockingPut() rate: 640630.6 calls/sec
total Blocking{Get,Put}() rate: 1281261.2 calls/sec
========================================================================
In addition, I ran more synthetic mt-log-test and the results show
slight improvement as well (at least, the performance didn't degrade):
./bin/mt-log-test --num_writer_threads=1 --num_batches_per_thread=1000000 --num_reader_threads=0 --num_ops_per_batch_avg=1 --verify_log=false --gtest_filter='MultiThreadedLogTest.TestAppends'
Before this patch:
Time spent inserting 1000000 batches(1 threads, 1000000 per-thread): real 15.309s user 0.000s sys 0.000s
Time spent inserting 1000000 batches(1 threads, 1000000 per-thread): real 15.899s user 0.000s sys 0.000s
Time spent inserting 1000000 batches(1 threads, 1000000 per-thread): real 15.012s user 0.000s sys 0.001s
After this patch:
Time spent inserting 1000000 batches(1 threads, 1000000 per-thread): real 14.964s user 0.001s sys 0.000s
Time spent inserting 1000000 batches(1 threads, 1000000 per-thread): real 14.513s user 0.000s sys 0.000s
Time spent inserting 1000000 batches(1 threads, 1000000 per-thread): real 14.991s user 0.000s sys 0.000s
Change-Id: Ie80620e5e86cd72c29320096dcdcc712eea1b0f2
Reviewed-on: http://gerrit.cloudera.org:8080/16063
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
src/kudu/consensus/log.cc | 33 ++++++-----
src/kudu/consensus/log.h | 7 ++-
src/kudu/subprocess/server.cc | 5 +-
src/kudu/subprocess/server.h | 2 +-
src/kudu/util/blocking_queue-test.cc | 9 +--
src/kudu/util/blocking_queue.h | 110 +++++++++++++++--------------------
6 files changed, 78 insertions(+), 88 deletions(-)
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 1afda83..94166d0 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -23,6 +23,7 @@
#include <memory>
#include <mutex>
#include <ostream>
+#include <type_traits>
#include <utility>
#include <boost/range/adaptor/reversed.hpp>
@@ -245,9 +246,8 @@ class Log::AppendThread {
// a new task was enqueued just as we were trying to go idle.
bool GoIdle();
- // Handle the actual appending of a group of entries. Responsible for deleting the
- // LogEntryBatch* pointers.
- void HandleBatches(vector<LogEntryBatch*> entry_batches);
+ // Handle the actual appending of a group of entries.
+ void HandleBatches(vector<unique_ptr<LogEntryBatch>> entry_batches);
string LogPrefix() const;
@@ -348,7 +348,7 @@ void Log::AppendThread::ProcessQueue() {
while (true) {
MonoTime deadline = MonoTime::Now() +
MonoDelta::FromMilliseconds(FLAGS_log_thread_idle_threshold_ms);
- vector<LogEntryBatch*> entry_batches;
+ vector<unique_ptr<LogEntryBatch>> entry_batches;
Status s = log_->entry_queue()->BlockingDrainTo(&entry_batches, deadline);
if (PREDICT_FALSE(s.IsAborted())) {
break;
@@ -362,7 +362,7 @@ void Log::AppendThread::ProcessQueue() {
VLOG_WITH_PREFIX(2) << "WAL Appender going idle";
}
-void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
+void Log::AppendThread::HandleBatches(vector<unique_ptr<LogEntryBatch>> entry_batches) {
if (log_->ctx_.metrics) {
log_->ctx_.metrics->entry_batches_per_group->Increment(entry_batches.size());
}
@@ -371,9 +371,9 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
SCOPED_LATENCY_METRIC(log_->ctx_.metrics, group_commit_latency);
bool is_all_commits = true;
- for (auto* entry_batch : entry_batches) {
- TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
- Status s = log_->WriteBatch(entry_batch);
+ for (auto& entry_batch : entry_batches) {
+ TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch.get());
+ Status s = log_->WriteBatch(entry_batch.get());
if (PREDICT_FALSE(!s.ok())) {
LOG_WITH_PREFIX(ERROR) << "Error appending to the log: " << s.ToString();
// TODO(af): If a single op fails to append, should we
@@ -393,7 +393,7 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
}
if (PREDICT_FALSE(!s.ok())) {
LOG_WITH_PREFIX(ERROR) << "Error syncing log: " << s.ToString();
- for (auto* entry_batch : entry_batches) {
+ for (const auto& entry_batch : entry_batches) {
entry_batch->SetAppendError(s);
}
} else {
@@ -401,13 +401,12 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
}
TRACE_EVENT0("log", "Callbacks");
SCOPED_WATCH_STACK(100);
- for (auto* entry_batch : entry_batches) {
+ for (auto& entry_batch : entry_batches) {
entry_batch->RunCallback();
-
// It's important to delete each batch as we see it, because
// deleting it may free up memory from memory trackers, and the
// callback of a later batch may want to use that memory.
- delete entry_batch;
+ entry_batch.reset();
}
}
@@ -840,13 +839,15 @@ unique_ptr<LogEntryBatch> Log::CreateBatchFromPB(
Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch) {
TRACE_EVENT0("log", "Log::AsyncAppend");
- TRACE_EVENT_FLOW_BEGIN0("log", "Batch", entry_batch.get());
- if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(entry_batch.get()).ok())) {
- TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch.get());
+ // entry_batch_trace_id is used the identifier for the trace, where only the
+ // address is stored, the pointer isn't de-referenced.
+ const LogEntryBatch* entry_batch_trace_id = entry_batch.get();
+ TRACE_EVENT_FLOW_BEGIN0("log", "Batch", entry_batch_trace_id);
+ if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(std::move(entry_batch)).ok())) {
+ TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch_trace_id);
return kLogShutdownStatus;
}
append_thread_->Wake();
- entry_batch.release();
return Status::OK();
}
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index a0358f7..cf8f6a2 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -19,9 +19,9 @@
#include <atomic>
#include <cstddef>
#include <cstdint>
+#include <deque>
#include <functional>
#include <limits>
-#include <list>
#include <map>
#include <memory>
#include <string>
@@ -79,7 +79,8 @@ struct LogContext {
std::string LogPrefix() const;
};
-typedef BlockingQueue<LogEntryBatch*, LogEntryBatchLogicalSize> LogEntryBatchQueue;
+typedef BlockingQueue<std::unique_ptr<LogEntryBatch>, LogEntryBatchLogicalSize>
+ LogEntryBatchQueue;
// State of segment allocation.
enum SegmentAllocationState {
@@ -641,7 +642,7 @@ class LogEntryBatch {
// Used by 'Log::queue_' to determine logical size of a LogEntryBatch.
struct LogEntryBatchLogicalSize {
- static size_t logical_size(const LogEntryBatch* batch) {
+ static size_t logical_size(const std::unique_ptr<LogEntryBatch>& batch) {
return batch->total_size_bytes();
}
};
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 76f3a3d..ca4111f 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -23,6 +23,7 @@
#include <memory>
#include <ostream>
#include <string>
+#include <type_traits>
#include <utility>
#include <vector>
@@ -175,7 +176,7 @@ Status SubprocessServer::Execute(SubprocessRequestPB* req,
CallAndTimer call_and_timer = {
make_shared<SubprocessCall>(req, resp, &cb, MonoTime::Now() + call_timeout_), {} };
RETURN_NOT_OK_PREPEND(
- outbound_call_queue_.BlockingPut(call_and_timer, call_and_timer.first->deadline()),
+ outbound_call_queue_.BlockingPut(std::move(call_and_timer), call_and_timer.first->deadline()),
"couldn't enqueue call");
return sync.Wait();
}
@@ -255,7 +256,7 @@ void SubprocessServer::ReceiveMessagesThread() {
// Before adding to the queue, record the size of the response queue.
metrics_.server_inbound_queue_size_bytes->Increment(inbound_response_queue_.size());
ResponsePBAndTimer resp_and_timer = { std::move(response), {} };
- if (s.ok() && !inbound_response_queue_.BlockingPut(resp_and_timer).ok()) {
+ if (s.ok() && !inbound_response_queue_.BlockingPut(std::move(resp_and_timer)).ok()) {
// The queue has been shut down and we should shut down too.
DCHECK_EQ(0, closing_.count());
LOG(INFO) << "failed to put response onto inbound queue";
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index 5d46ef7..b5d39a9 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -20,8 +20,8 @@
#include <atomic>
#include <cstddef>
#include <cstdint>
+#include <deque>
#include <functional>
-#include <list>
#include <map>
#include <memory>
#include <mutex>
diff --git a/src/kudu/util/blocking_queue-test.cc b/src/kudu/util/blocking_queue-test.cc
index 7fbbba7..11c4c6a 100644
--- a/src/kudu/util/blocking_queue-test.cc
+++ b/src/kudu/util/blocking_queue-test.cc
@@ -20,13 +20,14 @@
#include <algorithm>
#include <cstddef>
#include <cstdint>
-#include <list>
+#include <deque>
#include <map>
#include <memory>
-#include <ostream>
#include <numeric>
+#include <ostream>
#include <string>
#include <thread>
+#include <type_traits>
#include <vector>
#include <gflags/gflags.h>
@@ -264,9 +265,9 @@ TEST(BlockingQueueTest, TestGetFromShutdownQueue) {
}
TEST(BlockingQueueTest, TestUniquePtrMethods) {
- BlockingQueue<int*> test_queue(2);
+ BlockingQueue<unique_ptr<int>> test_queue(2);
unique_ptr<int> input_int(new int(123));
- ASSERT_EQ(test_queue.Put(&input_int), QUEUE_SUCCESS);
+ ASSERT_EQ(QUEUE_SUCCESS, test_queue.Put(std::move(input_int)));
unique_ptr<int> output_int;
ASSERT_OK(test_queue.BlockingGet(&output_int));
ASSERT_EQ(123, *output_int.get());
diff --git a/src/kudu/util/blocking_queue.h b/src/kudu/util/blocking_queue.h
index c9d59a2..8c4a853 100644
--- a/src/kudu/util/blocking_queue.h
+++ b/src/kudu/util/blocking_queue.h
@@ -16,11 +16,14 @@
// under the License.
#pragma once
-#include <list>
+#include <unistd.h>
+
+#include <algorithm>
+#include <deque>
#include <memory>
#include <string>
#include <type_traits>
-#include <unistd.h>
+#include <utility>
#include <vector>
#include "kudu/gutil/basictypes.h"
@@ -55,17 +58,17 @@ class BlockingQueue {
typedef typename std::remove_pointer<T>::type T_VAL;
explicit BlockingQueue(size_t max_size)
- : shutdown_(false),
- size_(0),
- max_size_(max_size),
- not_empty_(&lock_),
- not_full_(&lock_) {
+ : max_size_(max_size),
+ size_(0),
+ shutdown_(false),
+ not_empty_(&lock_),
+ not_full_(&lock_) {
}
// If the queue holds a bare pointer, it must be empty on destruction, since
// it may have ownership of the pointer.
~BlockingQueue() {
- DCHECK(list_.empty() || !std::is_pointer<T>::value)
+ DCHECK(queue_.empty() || !std::is_pointer<T>::value)
<< "BlockingQueue holds bare pointers at destruction time";
}
@@ -79,17 +82,18 @@ class BlockingQueue {
// - OK if successful
// - TimedOut if the deadline passed
// - Aborted if the queue shut down
- Status BlockingGet(T* out, MonoTime deadline = MonoTime()) {
+ Status BlockingGet(T* out, MonoTime deadline = {}) {
MutexLock l(lock_);
while (true) {
- if (!list_.empty()) {
- *out = list_.front();
- list_.pop_front();
+ if (!queue_.empty()) {
+ *out = std::move(queue_.front());
+ queue_.pop_front();
decrement_size_unlocked(*out);
+ l.Unlock();
not_full_.Signal();
return Status::OK();
}
- if (shutdown_) {
+ if (PREDICT_FALSE(shutdown_)) {
return Status::Aborted("");
}
if (!deadline.Initialized()) {
@@ -100,15 +104,6 @@ class BlockingQueue {
}
}
- // Get an element from the queue. Returns false if the queue is empty and
- // we were shut down prior to getting the element.
- Status BlockingGet(std::unique_ptr<T_VAL>* out, MonoTime deadline = MonoTime()) {
- T t = nullptr;
- RETURN_NOT_OK(BlockingGet(&t, deadline));
- out->reset(t);
- return Status::OK();
- }
-
// Get all elements from the queue and append them to a vector.
//
// If 'deadline' passes and no elements have been returned from the
@@ -122,16 +117,17 @@ class BlockingQueue {
// - OK if successful
// - TimedOut if the deadline passed
// - Aborted if the queue shut down
- Status BlockingDrainTo(std::vector<T>* out, MonoTime deadline = MonoTime()) {
+ Status BlockingDrainTo(std::vector<T>* out, MonoTime deadline = {}) {
MutexLock l(lock_);
while (true) {
- if (!list_.empty()) {
- out->reserve(list_.size());
- for (const T& elt : list_) {
- out->push_back(elt);
+ if (!queue_.empty()) {
+ out->reserve(queue_.size());
+ for (const T& elt : queue_) {
decrement_size_unlocked(elt);
}
- list_.clear();
+ std::move(queue_.begin(), queue_.end(), std::back_inserter(*out));
+ queue_.clear();
+ l.Unlock();
not_full_.Signal();
return Status::OK();
}
@@ -151,32 +147,26 @@ class BlockingQueue {
// QUEUE_SUCCESS: if successfully enqueued
// QUEUE_FULL: if the queue has reached max_size
// QUEUE_SHUTDOWN: if someone has already called Shutdown()
- QueueStatus Put(const T& val) {
+ //
+ // The templatized approach is for perfect forwarding while providing both
+ // Put(const T&) and Put(T&&) signatures for the method. See
+ // https://en.cppreference.com/w/cpp/utility/forward for details.
+ template<typename U>
+ QueueStatus Put(U&& val) {
MutexLock l(lock_);
+ if (PREDICT_FALSE(shutdown_)) {
+ return QUEUE_SHUTDOWN;
+ }
if (size_ >= max_size_) {
return QUEUE_FULL;
}
- if (shutdown_) {
- return QUEUE_SHUTDOWN;
- }
- list_.push_back(val);
increment_size_unlocked(val);
+ queue_.emplace_back(std::forward<U>(val));
l.Unlock();
not_empty_.Signal();
return QUEUE_SUCCESS;
}
- // Same as other Put() overload above.
- //
- // If the element was enqueued, the contents of 'val' are released.
- QueueStatus Put(std::unique_ptr<T_VAL>* val) {
- QueueStatus s = Put(val->get());
- if (s == QUEUE_SUCCESS) {
- ignore_result<>(val->release());
- }
- return s;
- }
-
// Puts an element onto the queue; if the queue is full, blocks until space
// becomes available, or until the deadline passes.
//
@@ -188,18 +178,23 @@ class BlockingQueue {
// - OK if successful
// - TimedOut if the deadline passed
// - Aborted if the queue shut down
- Status BlockingPut(const T& val, MonoTime deadline = MonoTime()) {
- if (deadline.Initialized() && MonoTime::Now() > deadline) {
+ //
+ // The templatized approach is for perfect forwarding while providing both
+ // BlockingPut(const T&) and BlockingPut(T&&) signatures for the method. See
+ // https://en.cppreference.com/w/cpp/utility/forward for details.
+ template<typename U>
+ Status BlockingPut(U&& val, MonoTime deadline = {}) {
+ if (PREDICT_FALSE(deadline.Initialized() && MonoTime::Now() > deadline)) {
return Status::TimedOut("");
}
MutexLock l(lock_);
while (true) {
- if (shutdown_) {
+ if (PREDICT_FALSE(shutdown_)) {
return Status::Aborted("");
}
if (size_ < max_size_) {
- list_.push_back(val);
increment_size_unlocked(val);
+ queue_.emplace_back(std::forward<U>(val));
l.Unlock();
not_empty_.Signal();
return Status::OK();
@@ -212,15 +207,6 @@ class BlockingQueue {
}
}
- // Same as other BlockingPut() overload above.
- //
- // If the element was enqueued, the contents of 'val' are released.
- Status BlockingPut(std::unique_ptr<T_VAL>* val, MonoTime deadline = MonoTime()) {
- RETURN_NOT_OK(BlockingPut(val->get(), deadline));
- ignore_result(val->release());
- return Status::OK();
- }
-
// Shuts down the queue.
//
// When a blocking queue is shut down, no more elements can be added to it,
@@ -237,7 +223,7 @@ class BlockingQueue {
bool empty() const {
MutexLock l(lock_);
- return list_.empty();
+ return queue_.empty();
}
size_t max_size() const {
@@ -253,7 +239,7 @@ class BlockingQueue {
std::string ret;
MutexLock l(lock_);
- for (const T& t : list_) {
+ for (const T& t : queue_) {
ret.append(t->ToString());
ret.append("\n");
}
@@ -272,13 +258,13 @@ class BlockingQueue {
size_ -= LOGICAL_SIZE::logical_size(t);
}
- bool shutdown_;
- size_t size_;
const size_t max_size_;
+ size_t size_;
+ bool shutdown_;
mutable Mutex lock_;
ConditionVariable not_empty_;
ConditionVariable not_full_;
- std::list<T> list_;
+ std::deque<T> queue_;
};
} // namespace kudu