You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/06/12 07:31:00 UTC

[kudu] branch master updated: [util] improved performance of BlockingQueue

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 15af717  [util] improved performance of BlockingQueue
15af717 is described below

commit 15af717e8c200d4e7a33a25171a6bfe58d70aa65
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Tue Jun 9 14:47:15 2020 -0700

    [util] improved performance of BlockingQueue
    
    Updated the interface of the BlockingQueue adding methods with the move
    semantics.  Switched from std::list to std::deque for the underlying
    queue.  Other minor cleanup.
    
    This update provides measurable performance boost for BlockingQueue
    as per BlockingQueueTest.MultiThreadPerf scenario of the
    blocking_queue-test.  I ran the following 3 times on the same
    machine (RELEASE build):
    
      ./bin/blocking_queue-test --gtest_filter='BlockingQueueTest.MultiThreadPerf'  --num_blocking_writers=4 --num_blocking_readers=4 --num_non_blocking_writers=0
    
    Before this patch:
      BlockingGet() rate: 238495 calls/sec
      BlockingPut() rate: 238495 calls/sec
      total Blocking{Get,Put}() rate: 476990 calls/sec
    
      BlockingGet() rate: 223278 calls/sec
      BlockingPut() rate: 223278 calls/sec
      total Blocking{Get,Put}() rate: 446556 calls/sec
    
      BlockingGet() rate: 227804 calls/sec
      BlockingPut() rate: 227804 calls/sec
      total Blocking{Get,Put}() rate: 455608 calls/sec
    
    After this patch:
      BlockingGet() rate: 328264 calls/sec
      BlockingPut() rate: 328264 calls/sec
      total Blocking{Get,Put}() rate: 656528 calls/sec
    
      BlockingGet() rate: 312607.4 calls/sec
      BlockingPut() rate: 312607.4 calls/sec
      total Blocking{Get,Put}() rate: 625214.8 calls/sec
    
      BlockingGet() rate: 292966.4 calls/sec
      BlockingPut() rate: 292966.4 calls/sec
      total Blocking{Get,Put}() rate: 585932.8 calls/sec
    
    ========================================================================
    
    The case of non-symmetric writes/reads now looks more perfromant as well:
    
      ./bin/blocking_queue-test --gtest_filter='*BlockingQueueMultiThreadPerfTest*'  --num_blocking_writers=2 --num_blocking_readers=1 --num_non_blocking_writers=0
    
    Before this patch:
      BlockingGet() rate: 399823.8 calls/sec
      BlockingPut() rate: 399823.8 calls/sec
      total Blocking{Get,Put}() rate: 799647.6 calls/sec
    
    After this patch:
      BlockingGet() rate: 640630.6 calls/sec
      BlockingPut() rate: 640630.6 calls/sec
      total Blocking{Get,Put}() rate: 1281261.2 calls/sec
    
    ========================================================================
    
    In addition, I ran more synthetic mt-log-test and the results show
    slight improvement as well (at least, the performance didn't degrade):
    
      ./bin/mt-log-test --num_writer_threads=1 --num_batches_per_thread=1000000 --num_reader_threads=0 --num_ops_per_batch_avg=1 --verify_log=false --gtest_filter='MultiThreadedLogTest.TestAppends'
    
    Before this patch:
      Time spent inserting 1000000 batches(1 threads, 1000000 per-thread): real 15.309s  user 0.000s     sys 0.000s
      Time spent inserting 1000000 batches(1 threads, 1000000 per-thread): real 15.899s  user 0.000s     sys 0.000s
      Time spent inserting 1000000 batches(1 threads, 1000000 per-thread): real 15.012s  user 0.000s     sys 0.001s
    
    After this patch:
      Time spent inserting 1000000 batches(1 threads, 1000000 per-thread): real 14.964s  user 0.001s     sys 0.000s
      Time spent inserting 1000000 batches(1 threads, 1000000 per-thread): real 14.513s  user 0.000s     sys 0.000s
      Time spent inserting 1000000 batches(1 threads, 1000000 per-thread): real 14.991s  user 0.000s     sys 0.000s
    
    Change-Id: Ie80620e5e86cd72c29320096dcdcc712eea1b0f2
    Reviewed-on: http://gerrit.cloudera.org:8080/16063
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/consensus/log.cc            |  33 ++++++-----
 src/kudu/consensus/log.h             |   7 ++-
 src/kudu/subprocess/server.cc        |   5 +-
 src/kudu/subprocess/server.h         |   2 +-
 src/kudu/util/blocking_queue-test.cc |   9 +--
 src/kudu/util/blocking_queue.h       | 110 +++++++++++++++--------------------
 6 files changed, 78 insertions(+), 88 deletions(-)

diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 1afda83..94166d0 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -23,6 +23,7 @@
 #include <memory>
 #include <mutex>
 #include <ostream>
+#include <type_traits>
 #include <utility>
 
 #include <boost/range/adaptor/reversed.hpp>
@@ -245,9 +246,8 @@ class Log::AppendThread {
   // a new task was enqueued just as we were trying to go idle.
   bool GoIdle();
 
-  // Handle the actual appending of a group of entries. Responsible for deleting the
-  // LogEntryBatch* pointers.
-  void HandleBatches(vector<LogEntryBatch*> entry_batches);
+  // Handle the actual appending of a group of entries.
+  void HandleBatches(vector<unique_ptr<LogEntryBatch>> entry_batches);
 
   string LogPrefix() const;
 
@@ -348,7 +348,7 @@ void Log::AppendThread::ProcessQueue() {
   while (true) {
     MonoTime deadline = MonoTime::Now() +
         MonoDelta::FromMilliseconds(FLAGS_log_thread_idle_threshold_ms);
-    vector<LogEntryBatch*> entry_batches;
+    vector<unique_ptr<LogEntryBatch>> entry_batches;
     Status s = log_->entry_queue()->BlockingDrainTo(&entry_batches, deadline);
     if (PREDICT_FALSE(s.IsAborted())) {
       break;
@@ -362,7 +362,7 @@ void Log::AppendThread::ProcessQueue() {
   VLOG_WITH_PREFIX(2) << "WAL Appender going idle";
 }
 
-void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
+void Log::AppendThread::HandleBatches(vector<unique_ptr<LogEntryBatch>> entry_batches) {
   if (log_->ctx_.metrics) {
     log_->ctx_.metrics->entry_batches_per_group->Increment(entry_batches.size());
   }
@@ -371,9 +371,9 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
   SCOPED_LATENCY_METRIC(log_->ctx_.metrics, group_commit_latency);
 
   bool is_all_commits = true;
-  for (auto* entry_batch : entry_batches) {
-    TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
-    Status s = log_->WriteBatch(entry_batch);
+  for (auto& entry_batch : entry_batches) {
+    TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch.get());
+    Status s = log_->WriteBatch(entry_batch.get());
     if (PREDICT_FALSE(!s.ok())) {
       LOG_WITH_PREFIX(ERROR) << "Error appending to the log: " << s.ToString();
       // TODO(af): If a single op fails to append, should we
@@ -393,7 +393,7 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
   }
   if (PREDICT_FALSE(!s.ok())) {
     LOG_WITH_PREFIX(ERROR) << "Error syncing log: " << s.ToString();
-    for (auto* entry_batch : entry_batches) {
+    for (const auto& entry_batch : entry_batches) {
       entry_batch->SetAppendError(s);
     }
   } else {
@@ -401,13 +401,12 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
   }
   TRACE_EVENT0("log", "Callbacks");
   SCOPED_WATCH_STACK(100);
-  for (auto* entry_batch : entry_batches) {
+  for (auto& entry_batch : entry_batches) {
     entry_batch->RunCallback();
-
     // It's important to delete each batch as we see it, because
     // deleting it may free up memory from memory trackers, and the
     // callback of a later batch may want to use that memory.
-    delete entry_batch;
+    entry_batch.reset();
   }
 }
 
@@ -840,13 +839,15 @@ unique_ptr<LogEntryBatch> Log::CreateBatchFromPB(
 
 Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch) {
   TRACE_EVENT0("log", "Log::AsyncAppend");
-  TRACE_EVENT_FLOW_BEGIN0("log", "Batch", entry_batch.get());
-  if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(entry_batch.get()).ok())) {
-    TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch.get());
+  // entry_batch_trace_id is used the identifier for the trace, where only the
+  // address is stored, the pointer isn't de-referenced.
+  const LogEntryBatch* entry_batch_trace_id = entry_batch.get();
+  TRACE_EVENT_FLOW_BEGIN0("log", "Batch", entry_batch_trace_id);
+  if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(std::move(entry_batch)).ok())) {
+    TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch_trace_id);
     return kLogShutdownStatus;
   }
   append_thread_->Wake();
-  entry_batch.release();
   return Status::OK();
 }
 
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index a0358f7..cf8f6a2 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -19,9 +19,9 @@
 #include <atomic>
 #include <cstddef>
 #include <cstdint>
+#include <deque>
 #include <functional>
 #include <limits>
-#include <list>
 #include <map>
 #include <memory>
 #include <string>
@@ -79,7 +79,8 @@ struct LogContext {
   std::string LogPrefix() const;
 };
 
-typedef BlockingQueue<LogEntryBatch*, LogEntryBatchLogicalSize> LogEntryBatchQueue;
+typedef BlockingQueue<std::unique_ptr<LogEntryBatch>, LogEntryBatchLogicalSize>
+    LogEntryBatchQueue;
 
 // State of segment allocation.
 enum SegmentAllocationState {
@@ -641,7 +642,7 @@ class LogEntryBatch {
 
 // Used by 'Log::queue_' to determine logical size of a LogEntryBatch.
 struct LogEntryBatchLogicalSize {
-  static size_t logical_size(const LogEntryBatch* batch) {
+  static size_t logical_size(const std::unique_ptr<LogEntryBatch>& batch) {
     return batch->total_size_bytes();
   }
 };
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 76f3a3d..ca4111f 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -23,6 +23,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -175,7 +176,7 @@ Status SubprocessServer::Execute(SubprocessRequestPB* req,
   CallAndTimer call_and_timer = {
       make_shared<SubprocessCall>(req, resp, &cb, MonoTime::Now() + call_timeout_), {} };
   RETURN_NOT_OK_PREPEND(
-      outbound_call_queue_.BlockingPut(call_and_timer, call_and_timer.first->deadline()),
+      outbound_call_queue_.BlockingPut(std::move(call_and_timer), call_and_timer.first->deadline()),
       "couldn't enqueue call");
   return sync.Wait();
 }
@@ -255,7 +256,7 @@ void SubprocessServer::ReceiveMessagesThread() {
     // Before adding to the queue, record the size of the response queue.
     metrics_.server_inbound_queue_size_bytes->Increment(inbound_response_queue_.size());
     ResponsePBAndTimer resp_and_timer = { std::move(response), {} };
-    if (s.ok() && !inbound_response_queue_.BlockingPut(resp_and_timer).ok()) {
+    if (s.ok() && !inbound_response_queue_.BlockingPut(std::move(resp_and_timer)).ok()) {
       // The queue has been shut down and we should shut down too.
       DCHECK_EQ(0, closing_.count());
       LOG(INFO) << "failed to put response onto inbound queue";
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index 5d46ef7..b5d39a9 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -20,8 +20,8 @@
 #include <atomic>
 #include <cstddef>
 #include <cstdint>
+#include <deque>
 #include <functional>
-#include <list>
 #include <map>
 #include <memory>
 #include <mutex>
diff --git a/src/kudu/util/blocking_queue-test.cc b/src/kudu/util/blocking_queue-test.cc
index 7fbbba7..11c4c6a 100644
--- a/src/kudu/util/blocking_queue-test.cc
+++ b/src/kudu/util/blocking_queue-test.cc
@@ -20,13 +20,14 @@
 #include <algorithm>
 #include <cstddef>
 #include <cstdint>
-#include <list>
+#include <deque>
 #include <map>
 #include <memory>
-#include <ostream>
 #include <numeric>
+#include <ostream>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <vector>
 
 #include <gflags/gflags.h>
@@ -264,9 +265,9 @@ TEST(BlockingQueueTest, TestGetFromShutdownQueue) {
 }
 
 TEST(BlockingQueueTest, TestUniquePtrMethods) {
-  BlockingQueue<int*> test_queue(2);
+  BlockingQueue<unique_ptr<int>> test_queue(2);
   unique_ptr<int> input_int(new int(123));
-  ASSERT_EQ(test_queue.Put(&input_int), QUEUE_SUCCESS);
+  ASSERT_EQ(QUEUE_SUCCESS, test_queue.Put(std::move(input_int)));
   unique_ptr<int> output_int;
   ASSERT_OK(test_queue.BlockingGet(&output_int));
   ASSERT_EQ(123, *output_int.get());
diff --git a/src/kudu/util/blocking_queue.h b/src/kudu/util/blocking_queue.h
index c9d59a2..8c4a853 100644
--- a/src/kudu/util/blocking_queue.h
+++ b/src/kudu/util/blocking_queue.h
@@ -16,11 +16,14 @@
 // under the License.
 #pragma once
 
-#include <list>
+#include <unistd.h>
+
+#include <algorithm>
+#include <deque>
 #include <memory>
 #include <string>
 #include <type_traits>
-#include <unistd.h>
+#include <utility>
 #include <vector>
 
 #include "kudu/gutil/basictypes.h"
@@ -55,17 +58,17 @@ class BlockingQueue {
   typedef typename std::remove_pointer<T>::type T_VAL;
 
   explicit BlockingQueue(size_t max_size)
-    : shutdown_(false),
-      size_(0),
-      max_size_(max_size),
-      not_empty_(&lock_),
-      not_full_(&lock_) {
+      : max_size_(max_size),
+        size_(0),
+        shutdown_(false),
+        not_empty_(&lock_),
+        not_full_(&lock_) {
   }
 
   // If the queue holds a bare pointer, it must be empty on destruction, since
   // it may have ownership of the pointer.
   ~BlockingQueue() {
-    DCHECK(list_.empty() || !std::is_pointer<T>::value)
+    DCHECK(queue_.empty() || !std::is_pointer<T>::value)
         << "BlockingQueue holds bare pointers at destruction time";
   }
 
@@ -79,17 +82,18 @@ class BlockingQueue {
   // - OK if successful
   // - TimedOut if the deadline passed
   // - Aborted if the queue shut down
-  Status BlockingGet(T* out, MonoTime deadline = MonoTime()) {
+  Status BlockingGet(T* out, MonoTime deadline = {}) {
     MutexLock l(lock_);
     while (true) {
-      if (!list_.empty()) {
-        *out = list_.front();
-        list_.pop_front();
+      if (!queue_.empty()) {
+        *out = std::move(queue_.front());
+        queue_.pop_front();
         decrement_size_unlocked(*out);
+        l.Unlock();
         not_full_.Signal();
         return Status::OK();
       }
-      if (shutdown_) {
+      if (PREDICT_FALSE(shutdown_)) {
         return Status::Aborted("");
       }
       if (!deadline.Initialized()) {
@@ -100,15 +104,6 @@ class BlockingQueue {
     }
   }
 
-  // Get an element from the queue.  Returns false if the queue is empty and
-  // we were shut down prior to getting the element.
-  Status BlockingGet(std::unique_ptr<T_VAL>* out, MonoTime deadline = MonoTime()) {
-    T t = nullptr;
-    RETURN_NOT_OK(BlockingGet(&t, deadline));
-    out->reset(t);
-    return Status::OK();
-  }
-
   // Get all elements from the queue and append them to a vector.
   //
   // If 'deadline' passes and no elements have been returned from the
@@ -122,16 +117,17 @@ class BlockingQueue {
   // - OK if successful
   // - TimedOut if the deadline passed
   // - Aborted if the queue shut down
-  Status BlockingDrainTo(std::vector<T>* out, MonoTime deadline = MonoTime()) {
+  Status BlockingDrainTo(std::vector<T>* out, MonoTime deadline = {}) {
     MutexLock l(lock_);
     while (true) {
-      if (!list_.empty()) {
-        out->reserve(list_.size());
-        for (const T& elt : list_) {
-          out->push_back(elt);
+      if (!queue_.empty()) {
+        out->reserve(queue_.size());
+        for (const T& elt : queue_) {
           decrement_size_unlocked(elt);
         }
-        list_.clear();
+        std::move(queue_.begin(), queue_.end(), std::back_inserter(*out));
+        queue_.clear();
+        l.Unlock();
         not_full_.Signal();
         return Status::OK();
       }
@@ -151,32 +147,26 @@ class BlockingQueue {
   //   QUEUE_SUCCESS: if successfully enqueued
   //   QUEUE_FULL: if the queue has reached max_size
   //   QUEUE_SHUTDOWN: if someone has already called Shutdown()
-  QueueStatus Put(const T& val) {
+  //
+  // The templatized approach is for perfect forwarding while providing both
+  // Put(const T&) and Put(T&&) signatures for the method. See
+  // https://en.cppreference.com/w/cpp/utility/forward for details.
+  template<typename U>
+  QueueStatus Put(U&& val) {
     MutexLock l(lock_);
+    if (PREDICT_FALSE(shutdown_)) {
+      return QUEUE_SHUTDOWN;
+    }
     if (size_ >= max_size_) {
       return QUEUE_FULL;
     }
-    if (shutdown_) {
-      return QUEUE_SHUTDOWN;
-    }
-    list_.push_back(val);
     increment_size_unlocked(val);
+    queue_.emplace_back(std::forward<U>(val));
     l.Unlock();
     not_empty_.Signal();
     return QUEUE_SUCCESS;
   }
 
-  // Same as other Put() overload above.
-  //
-  // If the element was enqueued, the contents of 'val' are released.
-  QueueStatus Put(std::unique_ptr<T_VAL>* val) {
-    QueueStatus s = Put(val->get());
-    if (s == QUEUE_SUCCESS) {
-      ignore_result<>(val->release());
-    }
-    return s;
-  }
-
   // Puts an element onto the queue; if the queue is full, blocks until space
   // becomes available, or until the deadline passes.
   //
@@ -188,18 +178,23 @@ class BlockingQueue {
   // - OK if successful
   // - TimedOut if the deadline passed
   // - Aborted if the queue shut down
-  Status BlockingPut(const T& val, MonoTime deadline = MonoTime()) {
-    if (deadline.Initialized() && MonoTime::Now() > deadline) {
+  //
+  // The templatized approach is for perfect forwarding while providing both
+  // BlockingPut(const T&) and BlockingPut(T&&) signatures for the method. See
+  // https://en.cppreference.com/w/cpp/utility/forward for details.
+  template<typename U>
+  Status BlockingPut(U&& val, MonoTime deadline = {}) {
+    if (PREDICT_FALSE(deadline.Initialized() && MonoTime::Now() > deadline)) {
       return Status::TimedOut("");
     }
     MutexLock l(lock_);
     while (true) {
-      if (shutdown_) {
+      if (PREDICT_FALSE(shutdown_)) {
         return Status::Aborted("");
       }
       if (size_ < max_size_) {
-        list_.push_back(val);
         increment_size_unlocked(val);
+        queue_.emplace_back(std::forward<U>(val));
         l.Unlock();
         not_empty_.Signal();
         return Status::OK();
@@ -212,15 +207,6 @@ class BlockingQueue {
     }
   }
 
-  // Same as other BlockingPut() overload above.
-  //
-  // If the element was enqueued, the contents of 'val' are released.
-  Status BlockingPut(std::unique_ptr<T_VAL>* val, MonoTime deadline = MonoTime()) {
-    RETURN_NOT_OK(BlockingPut(val->get(), deadline));
-    ignore_result(val->release());
-    return Status::OK();
-  }
-
   // Shuts down the queue.
   //
   // When a blocking queue is shut down, no more elements can be added to it,
@@ -237,7 +223,7 @@ class BlockingQueue {
 
   bool empty() const {
     MutexLock l(lock_);
-    return list_.empty();
+    return queue_.empty();
   }
 
   size_t max_size() const {
@@ -253,7 +239,7 @@ class BlockingQueue {
     std::string ret;
 
     MutexLock l(lock_);
-    for (const T& t : list_) {
+    for (const T& t : queue_) {
       ret.append(t->ToString());
       ret.append("\n");
     }
@@ -272,13 +258,13 @@ class BlockingQueue {
     size_ -= LOGICAL_SIZE::logical_size(t);
   }
 
-  bool shutdown_;
-  size_t size_;
   const size_t max_size_;
+  size_t size_;
+  bool shutdown_;
   mutable Mutex lock_;
   ConditionVariable not_empty_;
   ConditionVariable not_full_;
-  std::list<T> list_;
+  std::deque<T> queue_;
 };
 
 } // namespace kudu