You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2018/01/24 22:06:55 UTC

kudu git commit: consensus: avoid extra calls to protobuf SpaceUsed in the log cache

Repository: kudu
Updated Branches:
  refs/heads/master 60276c54a -> f755f8aa5


consensus: avoid extra calls to protobuf SpaceUsed in the log cache

The SpaceUsed function appears to be somewhat expensive since it walks
the entire protobuf structure. In complex protobufs (like those with
larger schemas) this can cause a lot of cache misses, etc. In fact,
SpaceUsed shows up as one of the top 5 CPU consumers in a YCSB workload
I'm looking at.

This patch adds the cached value of SpaceUsed to the log cache so that
it only needs to be computed upon insertion and not during removal.
Additionally it moves some more work outside the lock to reduce
contention.

 Performance counter stats for 'build/latest/bin/full_stack-insert-scan-test --gtest_filter=*MRSOnlyStressTest* --inserts_per_client=200000 --concurrent_inserts=10 --rows_per_batch=1 --skip_scans':

     321562.582290      task-clock (msec)         #    4.896 CPUs utilized
        11,833,623      context-switches          #    0.037 M/sec
         3,675,101      cpu-migrations            #    0.011 M/sec
           118,072      page-faults               #    0.367 K/sec
 1,035,621,247,373      cycles                    #    3.221 GHz
   659,776,225,172      instructions              #    0.64  insn per cycle
   124,415,953,758      branches                  #  386.911 M/sec
     1,520,148,589      branch-misses             #    1.22% of all branches

      65.679925745 seconds time elapsed

 Performance counter stats for 'build/latest/bin/full_stack-insert-scan-test --gtest_filter=*MRSOnlyStressTest* --inserts_per_client=200000 --concurrent_inserts=10 --rows_per_batch=1 --skip_scans':

     305878.625093      task-clock (msec)         #    5.108 CPUs utilized
        12,860,037      context-switches          #    0.042 M/sec
         3,877,232      cpu-migrations            #    0.013 M/sec
           114,011      page-faults               #    0.373 K/sec
   981,876,239,200      cycles                    #    3.210 GHz
   599,697,732,411      instructions              #    0.61  insn per cycle
   112,309,222,234      branches                  #  367.169 M/sec
     1,427,381,915      branch-misses             #    1.27% of all branches

      59.886073541 seconds time elapsed

Change-Id: I0439995fb1369a3333b7d5858518d01277b53076
Reviewed-on: http://gerrit.cloudera.org:8080/9093
Reviewed-by: Dan Burkert <da...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <da...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f755f8aa
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f755f8aa
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f755f8aa

Branch: refs/heads/master
Commit: f755f8aa56f4154088bc1644a409df1f3299689d
Parents: 60276c5
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Jan 22 08:40:26 2018 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Jan 24 22:06:42 2018 +0000

----------------------------------------------------------------------
 src/kudu/consensus/log_cache.cc | 65 +++++++++++++++++++-----------------
 src/kudu/consensus/log_cache.h  | 12 +++++--
 src/kudu/gutil/map-util.h       | 18 ++++++++++
 src/kudu/util/map-util-test.cc  | 10 ++++++
 4 files changed, 73 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/f755f8aa/src/kudu/consensus/log_cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc
index 275b9df..4002133 100644
--- a/src/kudu/consensus/log_cache.cc
+++ b/src/kudu/consensus/log_cache.cc
@@ -109,7 +109,7 @@ LogCache::LogCache(const scoped_refptr<MetricEntity>& metric_entity,
   // code paths elsewhere.
   auto zero_op = new ReplicateMsg();
   *zero_op->mutable_id() = MinimumOpId();
-  InsertOrDie(&cache_, 0, make_scoped_refptr_replicate(zero_op));
+  InsertOrDie(&cache_, 0, { make_scoped_refptr_replicate(zero_op), zero_op->SpaceUsed() });
 }
 
 LogCache::~LogCache() {
@@ -138,9 +138,10 @@ void LogCache::TruncateOpsAfterUnlocked(int64_t index) {
 
   // Now remove the overwritten operations.
   for (int64_t i = first_to_truncate; i < next_sequential_op_index_; ++i) {
-    ReplicateRefPtr msg = EraseKeyReturnValuePtr(&cache_, i);
-    if (msg != nullptr) {
-      AccountForMessageRemovalUnlocked(msg);
+    auto it = cache_.find(i);
+    if (it != cache_.end()) {
+      AccountForMessageRemovalUnlocked(it->second);
+      cache_.erase(it);
     }
   }
   next_sequential_op_index_ = index + 1;
@@ -148,26 +149,29 @@ void LogCache::TruncateOpsAfterUnlocked(int64_t index) {
 
 Status LogCache::AppendOperations(const vector<ReplicateRefPtr>& msgs,
                                   const StatusCallback& callback) {
-  std::unique_lock<simple_spinlock> l(lock_);
+  CHECK_GT(msgs.size(), 0);
 
-  int size = msgs.size();
-  CHECK_GT(size, 0);
+  // SpaceUsed is relatively expensive, so do calculations outside the lock
+  // and cache the result with each message.
+  int64_t mem_required = 0;
+  vector<CacheEntry> entries_to_insert;
+  entries_to_insert.reserve(msgs.size());
+  for (const auto& msg : msgs) {
+    CacheEntry e = { msg, static_cast<int64_t>(msg->get()->SpaceUsedLong()) };
+    mem_required += e.mem_usage;
+    entries_to_insert.emplace_back(std::move(e));
+  }
 
-  // If we're not appending a consecutive op we're likely overwriting and
-  // need to replace operations in the cache.
   int64_t first_idx_in_batch = msgs.front()->get()->id().index();
   int64_t last_idx_in_batch = msgs.back()->get()->id().index();
 
+  std::unique_lock<simple_spinlock> l(lock_);
+  // If we're not appending a consecutive op we're likely overwriting and
+  // need to replace operations in the cache.
   if (first_idx_in_batch != next_sequential_op_index_) {
     TruncateOpsAfterUnlocked(first_idx_in_batch - 1);
   }
 
-
-  int64_t mem_required = 0;
-  for (const auto& msg : msgs) {
-    mem_required += msg->get()->SpaceUsed();
-  }
-
   // Try to consume the memory. If it can't be consumed, we may need to evict.
   bool borrowed_memory = false;
   if (!tracker_->TryConsume(mem_required)) {
@@ -192,9 +196,9 @@ Status LogCache::AppendOperations(const vector<ReplicateRefPtr>& msgs,
     borrowed_memory = parent_tracker_->LimitExceeded();
   }
 
-  for (const auto& msg : msgs) {
-    auto index = msg->get()->id().index();
-    InsertOrDie(&cache_, index, msg);
+  for (auto& e : entries_to_insert) {
+    auto index = e.msg->get()->id().index();
+    EmplaceOrDie(&cache_, index, std::move(e));
     next_sequential_op_index_ = index + 1;
   }
 
@@ -267,7 +271,7 @@ Status LogCache::LookupOpId(int64_t op_index, OpId* op_id) const {
     }
     auto iter = cache_.find(op_index);
     if (iter != cache_.end()) {
-      *op_id = iter->second->get()->id();
+      *op_id = iter->second.msg->get()->id();
       return Status::OK();
     }
   }
@@ -343,7 +347,7 @@ Status LogCache::ReadOps(int64_t after_op_index,
     } else {
       // Pull contiguous messages from the cache until the size limit is achieved.
       for (; iter != cache_.end(); ++iter) {
-        const ReplicateRefPtr& msg = iter->second;
+        const ReplicateRefPtr& msg = iter->second.msg;
         int64_t index = msg->get()->id().index();
         if (index != next_index) {
           continue;
@@ -378,7 +382,8 @@ void LogCache::EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evic
 
   int64_t bytes_evicted = 0;
   for (auto iter = cache_.begin(); iter != cache_.end();) {
-    const ReplicateRefPtr& msg = (*iter).second;
+    const CacheEntry& entry = (*iter).second;
+    const ReplicateRefPtr& msg = entry.msg;
     VLOG_WITH_PREFIX_UNLOCKED(2) << "considering for eviction: " << msg->get()->id();
     int64_t msg_index = msg->get()->id().index();
     if (msg_index == 0) {
@@ -399,8 +404,8 @@ void LogCache::EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evic
     }
 
     VLOG_WITH_PREFIX_UNLOCKED(2) << "Evicting cache. Removing: " << msg->get()->id();
-    AccountForMessageRemovalUnlocked(msg);
-    bytes_evicted += msg->get()->SpaceUsed();
+    AccountForMessageRemovalUnlocked(entry);
+    bytes_evicted += entry.mem_usage;
     cache_.erase(iter++);
 
     if (bytes_evicted >= bytes_to_evict) {
@@ -410,9 +415,9 @@ void LogCache::EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evic
   VLOG_WITH_PREFIX_UNLOCKED(1) << "Evicting log cache: after state: " << ToStringUnlocked();
 }
 
-void LogCache::AccountForMessageRemovalUnlocked(const ReplicateRefPtr& msg) {
-  tracker_->Release(msg->get()->SpaceUsed());
-  metrics_.log_cache_size->DecrementBy(msg->get()->SpaceUsed());
+void LogCache::AccountForMessageRemovalUnlocked(const LogCache::CacheEntry& entry) {
+  tracker_->Release(entry.mem_usage);
+  metrics_.log_cache_size->DecrementBy(entry.mem_usage);
   metrics_.log_cache_num_ops->Decrement();
 }
 
@@ -461,8 +466,8 @@ void LogCache::DumpToStrings(vector<string>* lines) const {
   int counter = 0;
   lines->push_back(ToStringUnlocked());
   lines->push_back("Messages:");
-  for (const MessageCache::value_type& entry : cache_) {
-    const ReplicateMsg* msg = entry.second->get();
+  for (const auto& entry : cache_) {
+    const ReplicateMsg* msg = entry.second.msg->get();
     lines->push_back(
       Substitute("Message[$0] $1.$2 : REPLICATE. Type: $3, Size: $4",
                  counter++, msg->id().term(), msg->id().index(),
@@ -480,8 +485,8 @@ void LogCache::DumpToHtml(std::ostream& out) const {
   out << "<tr><th>Entry</th><th>OpId</th><th>Type</th><th>Size</th><th>Status</th></tr>" << endl;
 
   int counter = 0;
-  for (const MessageCache::value_type& entry : cache_) {
-    const ReplicateMsg* msg = entry.second->get();
+  for (const auto& entry : cache_) {
+    const ReplicateMsg* msg = entry.second.msg->get();
     out << Substitute("<tr><th>$0</th><th>$1.$2</th><td>REPLICATE $3</td>"
                       "<td>$4</td><td>$5</td></tr>",
                       counter++, msg->id().term(), msg->id().index(),

http://git-wip-us.apache.org/repos/asf/kudu/blob/f755f8aa/src/kudu/consensus/log_cache.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_cache.h b/src/kudu/consensus/log_cache.h
index 5ff8399..4daf518 100644
--- a/src/kudu/consensus/log_cache.h
+++ b/src/kudu/consensus/log_cache.h
@@ -149,6 +149,14 @@ class LogCache {
   FRIEND_TEST(LogCacheTest, TestTruncation);
   friend class LogCacheTest;
 
+  // An entry in the cache.
+  struct CacheEntry {
+    ReplicateRefPtr msg;
+    // The cached value of msg->SpaceUsedLong(). This method is expensive
+    // to compute, so we compute it only once upon insertion.
+    int64_t mem_usage;
+  };
+
   // Try to evict the oldest operations from the queue, stopping either when
   // 'bytes_to_evict' bytes have been evicted, or the op with index
   // 'stop_after_index' has been evicted, whichever comes first.
@@ -156,7 +164,7 @@ class LogCache {
 
   // Update metrics and MemTracker to account for the removal of the
   // given message.
-  void AccountForMessageRemovalUnlocked(const ReplicateRefPtr& msg);
+  void AccountForMessageRemovalUnlocked(const CacheEntry& entry);
 
   void TruncateOpsAfterUnlocked(int64_t index);
 
@@ -184,7 +192,7 @@ class LogCache {
 
   // An ordered map that serves as the buffer for the cached messages.
   // Maps from log index -> ReplicateMsg
-  typedef std::map<uint64_t, ReplicateRefPtr> MessageCache;
+  typedef std::map<uint64_t, CacheEntry> MessageCache;
   MessageCache cache_;
 
   // The next log index to append. Each append operation must either

http://git-wip-us.apache.org/repos/asf/kudu/blob/f755f8aa/src/kudu/gutil/map-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/gutil/map-util.h b/src/kudu/gutil/map-util.h
index 9883c1d..dd4df19 100644
--- a/src/kudu/gutil/map-util.h
+++ b/src/kudu/gutil/map-util.h
@@ -63,6 +63,8 @@
 #define UTIL_GTL_MAP_UTIL_H_
 
 #include <stddef.h>
+
+#include <tuple>
 #include <utility>
 #include <vector>
 
@@ -437,6 +439,22 @@ typename Collection::mapped_type& InsertKeyOrDie(
 }
 
 //
+// Emplace*()
+//
+template <class Collection, class... Args>
+bool EmplaceIfNotPresent(Collection* const collection,
+                         Args&&... args) {
+  return collection->emplace(std::forward<Args>(args)...).second;
+}
+
+template <class Collection, class... Args>
+void EmplaceOrDie(Collection* const collection,
+                  Args&&... args) {
+  CHECK(EmplaceIfNotPresent(collection, std::forward<Args>(args)...))
+      << "duplicate value";
+}
+
+//
 // Lookup*()
 //
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/f755f8aa/src/kudu/util/map-util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/map-util-test.cc b/src/kudu/util/map-util-test.cc
index afbee94..3aa9448 100644
--- a/src/kudu/util/map-util-test.cc
+++ b/src/kudu/util/map-util-test.cc
@@ -103,4 +103,14 @@ TEST(EraseKeyReturnValuePtrTest, TestRawAndSmartSmartPointers) {
   ASSERT_EQ(*value, "hello_world");
 }
 
+TEST(EmplaceTest, TestEmplace) {
+  // Map with move-only value type.
+  map<string, unique_ptr<string>> my_map;
+  unique_ptr<string> val(new string("foo"));
+  ASSERT_TRUE(EmplaceIfNotPresent(&my_map, "k", std::move(val)));
+  ASSERT_TRUE(ContainsKey(my_map, "k"));
+  ASSERT_FALSE(EmplaceIfNotPresent(&my_map, "k", nullptr))
+      << "Should return false for already-present";
+}
+
 } // namespace kudu