You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2018/01/24 22:06:55 UTC
kudu git commit: consensus: avoid extra calls to protobuf SpaceUsed
in the log cache
Repository: kudu
Updated Branches:
refs/heads/master 60276c54a -> f755f8aa5
consensus: avoid extra calls to protobuf SpaceUsed in the log cache
The SpaceUsed function appears to be somewhat expensive since it walks
the entire protobuf structure. In complex protobufs (like those with
larger schemas) this can cause a lot of cache misses, etc. In fact,
SpaceUsed shows up as one of the top 5 CPU consumers in a YCSB workload
I'm looking at.
This patch adds the cached value of SpaceUsed to the log cache so that
it only needs to be computed upon insertion and not during removal.
Additionally it moves some more work outside the lock to reduce
contention.
Performance counter stats for 'build/latest/bin/full_stack-insert-scan-test --gtest_filter=*MRSOnlyStressTest* --inserts_per_client=200000 --concurrent_inserts=10 --rows_per_batch=1 --skip_scans':
321562.582290 task-clock (msec) # 4.896 CPUs utilized
11,833,623 context-switches # 0.037 M/sec
3,675,101 cpu-migrations # 0.011 M/sec
118,072 page-faults # 0.367 K/sec
1,035,621,247,373 cycles # 3.221 GHz
659,776,225,172 instructions # 0.64 insn per cycle
124,415,953,758 branches # 386.911 M/sec
1,520,148,589 branch-misses # 1.22% of all branches
65.679925745 seconds time elapsed
Performance counter stats for 'build/latest/bin/full_stack-insert-scan-test --gtest_filter=*MRSOnlyStressTest* --inserts_per_client=200000 --concurrent_inserts=10 --rows_per_batch=1 --skip_scans':
305878.625093 task-clock (msec) # 5.108 CPUs utilized
12,860,037 context-switches # 0.042 M/sec
3,877,232 cpu-migrations # 0.013 M/sec
114,011 page-faults # 0.373 K/sec
981,876,239,200 cycles # 3.210 GHz
599,697,732,411 instructions # 0.61 insn per cycle
112,309,222,234 branches # 367.169 M/sec
1,427,381,915 branch-misses # 1.27% of all branches
59.886073541 seconds time elapsed
Change-Id: I0439995fb1369a3333b7d5858518d01277b53076
Reviewed-on: http://gerrit.cloudera.org:8080/9093
Reviewed-by: Dan Burkert <da...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <da...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f755f8aa
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f755f8aa
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f755f8aa
Branch: refs/heads/master
Commit: f755f8aa56f4154088bc1644a409df1f3299689d
Parents: 60276c5
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Jan 22 08:40:26 2018 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Jan 24 22:06:42 2018 +0000
----------------------------------------------------------------------
src/kudu/consensus/log_cache.cc | 65 +++++++++++++++++++-----------------
src/kudu/consensus/log_cache.h | 12 +++++--
src/kudu/gutil/map-util.h | 18 ++++++++++
src/kudu/util/map-util-test.cc | 10 ++++++
4 files changed, 73 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/f755f8aa/src/kudu/consensus/log_cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc
index 275b9df..4002133 100644
--- a/src/kudu/consensus/log_cache.cc
+++ b/src/kudu/consensus/log_cache.cc
@@ -109,7 +109,7 @@ LogCache::LogCache(const scoped_refptr<MetricEntity>& metric_entity,
// code paths elsewhere.
auto zero_op = new ReplicateMsg();
*zero_op->mutable_id() = MinimumOpId();
- InsertOrDie(&cache_, 0, make_scoped_refptr_replicate(zero_op));
+ InsertOrDie(&cache_, 0, { make_scoped_refptr_replicate(zero_op), zero_op->SpaceUsed() });
}
LogCache::~LogCache() {
@@ -138,9 +138,10 @@ void LogCache::TruncateOpsAfterUnlocked(int64_t index) {
// Now remove the overwritten operations.
for (int64_t i = first_to_truncate; i < next_sequential_op_index_; ++i) {
- ReplicateRefPtr msg = EraseKeyReturnValuePtr(&cache_, i);
- if (msg != nullptr) {
- AccountForMessageRemovalUnlocked(msg);
+ auto it = cache_.find(i);
+ if (it != cache_.end()) {
+ AccountForMessageRemovalUnlocked(it->second);
+ cache_.erase(it);
}
}
next_sequential_op_index_ = index + 1;
@@ -148,26 +149,29 @@ void LogCache::TruncateOpsAfterUnlocked(int64_t index) {
Status LogCache::AppendOperations(const vector<ReplicateRefPtr>& msgs,
const StatusCallback& callback) {
- std::unique_lock<simple_spinlock> l(lock_);
+ CHECK_GT(msgs.size(), 0);
- int size = msgs.size();
- CHECK_GT(size, 0);
+ // SpaceUsed is relatively expensive, so do calculations outside the lock
+ // and cache the result with each message.
+ int64_t mem_required = 0;
+ vector<CacheEntry> entries_to_insert;
+ entries_to_insert.reserve(msgs.size());
+ for (const auto& msg : msgs) {
+ CacheEntry e = { msg, static_cast<int64_t>(msg->get()->SpaceUsedLong()) };
+ mem_required += e.mem_usage;
+ entries_to_insert.emplace_back(std::move(e));
+ }
- // If we're not appending a consecutive op we're likely overwriting and
- // need to replace operations in the cache.
int64_t first_idx_in_batch = msgs.front()->get()->id().index();
int64_t last_idx_in_batch = msgs.back()->get()->id().index();
+ std::unique_lock<simple_spinlock> l(lock_);
+ // If we're not appending a consecutive op we're likely overwriting and
+ // need to replace operations in the cache.
if (first_idx_in_batch != next_sequential_op_index_) {
TruncateOpsAfterUnlocked(first_idx_in_batch - 1);
}
-
- int64_t mem_required = 0;
- for (const auto& msg : msgs) {
- mem_required += msg->get()->SpaceUsed();
- }
-
// Try to consume the memory. If it can't be consumed, we may need to evict.
bool borrowed_memory = false;
if (!tracker_->TryConsume(mem_required)) {
@@ -192,9 +196,9 @@ Status LogCache::AppendOperations(const vector<ReplicateRefPtr>& msgs,
borrowed_memory = parent_tracker_->LimitExceeded();
}
- for (const auto& msg : msgs) {
- auto index = msg->get()->id().index();
- InsertOrDie(&cache_, index, msg);
+ for (auto& e : entries_to_insert) {
+ auto index = e.msg->get()->id().index();
+ EmplaceOrDie(&cache_, index, std::move(e));
next_sequential_op_index_ = index + 1;
}
@@ -267,7 +271,7 @@ Status LogCache::LookupOpId(int64_t op_index, OpId* op_id) const {
}
auto iter = cache_.find(op_index);
if (iter != cache_.end()) {
- *op_id = iter->second->get()->id();
+ *op_id = iter->second.msg->get()->id();
return Status::OK();
}
}
@@ -343,7 +347,7 @@ Status LogCache::ReadOps(int64_t after_op_index,
} else {
// Pull contiguous messages from the cache until the size limit is achieved.
for (; iter != cache_.end(); ++iter) {
- const ReplicateRefPtr& msg = iter->second;
+ const ReplicateRefPtr& msg = iter->second.msg;
int64_t index = msg->get()->id().index();
if (index != next_index) {
continue;
@@ -378,7 +382,8 @@ void LogCache::EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evic
int64_t bytes_evicted = 0;
for (auto iter = cache_.begin(); iter != cache_.end();) {
- const ReplicateRefPtr& msg = (*iter).second;
+ const CacheEntry& entry = (*iter).second;
+ const ReplicateRefPtr& msg = entry.msg;
VLOG_WITH_PREFIX_UNLOCKED(2) << "considering for eviction: " << msg->get()->id();
int64_t msg_index = msg->get()->id().index();
if (msg_index == 0) {
@@ -399,8 +404,8 @@ void LogCache::EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evic
}
VLOG_WITH_PREFIX_UNLOCKED(2) << "Evicting cache. Removing: " << msg->get()->id();
- AccountForMessageRemovalUnlocked(msg);
- bytes_evicted += msg->get()->SpaceUsed();
+ AccountForMessageRemovalUnlocked(entry);
+ bytes_evicted += entry.mem_usage;
cache_.erase(iter++);
if (bytes_evicted >= bytes_to_evict) {
@@ -410,9 +415,9 @@ void LogCache::EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evic
VLOG_WITH_PREFIX_UNLOCKED(1) << "Evicting log cache: after state: " << ToStringUnlocked();
}
-void LogCache::AccountForMessageRemovalUnlocked(const ReplicateRefPtr& msg) {
- tracker_->Release(msg->get()->SpaceUsed());
- metrics_.log_cache_size->DecrementBy(msg->get()->SpaceUsed());
+void LogCache::AccountForMessageRemovalUnlocked(const LogCache::CacheEntry& entry) {
+ tracker_->Release(entry.mem_usage);
+ metrics_.log_cache_size->DecrementBy(entry.mem_usage);
metrics_.log_cache_num_ops->Decrement();
}
@@ -461,8 +466,8 @@ void LogCache::DumpToStrings(vector<string>* lines) const {
int counter = 0;
lines->push_back(ToStringUnlocked());
lines->push_back("Messages:");
- for (const MessageCache::value_type& entry : cache_) {
- const ReplicateMsg* msg = entry.second->get();
+ for (const auto& entry : cache_) {
+ const ReplicateMsg* msg = entry.second.msg->get();
lines->push_back(
Substitute("Message[$0] $1.$2 : REPLICATE. Type: $3, Size: $4",
counter++, msg->id().term(), msg->id().index(),
@@ -480,8 +485,8 @@ void LogCache::DumpToHtml(std::ostream& out) const {
out << "<tr><th>Entry</th><th>OpId</th><th>Type</th><th>Size</th><th>Status</th></tr>" << endl;
int counter = 0;
- for (const MessageCache::value_type& entry : cache_) {
- const ReplicateMsg* msg = entry.second->get();
+ for (const auto& entry : cache_) {
+ const ReplicateMsg* msg = entry.second.msg->get();
out << Substitute("<tr><th>$0</th><th>$1.$2</th><td>REPLICATE $3</td>"
"<td>$4</td><td>$5</td></tr>",
counter++, msg->id().term(), msg->id().index(),
http://git-wip-us.apache.org/repos/asf/kudu/blob/f755f8aa/src/kudu/consensus/log_cache.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_cache.h b/src/kudu/consensus/log_cache.h
index 5ff8399..4daf518 100644
--- a/src/kudu/consensus/log_cache.h
+++ b/src/kudu/consensus/log_cache.h
@@ -149,6 +149,14 @@ class LogCache {
FRIEND_TEST(LogCacheTest, TestTruncation);
friend class LogCacheTest;
+ // An entry in the cache.
+ struct CacheEntry {
+ ReplicateRefPtr msg;
+ // The cached value of msg->SpaceUsedLong(). This method is expensive
+ // to compute, so we compute it only once upon insertion.
+ int64_t mem_usage;
+ };
+
// Try to evict the oldest operations from the queue, stopping either when
// 'bytes_to_evict' bytes have been evicted, or the op with index
// 'stop_after_index' has been evicted, whichever comes first.
@@ -156,7 +164,7 @@ class LogCache {
// Update metrics and MemTracker to account for the removal of the
// given message.
- void AccountForMessageRemovalUnlocked(const ReplicateRefPtr& msg);
+ void AccountForMessageRemovalUnlocked(const CacheEntry& entry);
void TruncateOpsAfterUnlocked(int64_t index);
@@ -184,7 +192,7 @@ class LogCache {
// An ordered map that serves as the buffer for the cached messages.
// Maps from log index -> ReplicateMsg
- typedef std::map<uint64_t, ReplicateRefPtr> MessageCache;
+ typedef std::map<uint64_t, CacheEntry> MessageCache;
MessageCache cache_;
// The next log index to append. Each append operation must either
http://git-wip-us.apache.org/repos/asf/kudu/blob/f755f8aa/src/kudu/gutil/map-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/gutil/map-util.h b/src/kudu/gutil/map-util.h
index 9883c1d..dd4df19 100644
--- a/src/kudu/gutil/map-util.h
+++ b/src/kudu/gutil/map-util.h
@@ -63,6 +63,8 @@
#define UTIL_GTL_MAP_UTIL_H_
#include <stddef.h>
+
+#include <tuple>
#include <utility>
#include <vector>
@@ -437,6 +439,22 @@ typename Collection::mapped_type& InsertKeyOrDie(
}
//
+// Emplace*()
+//
+template <class Collection, class... Args>
+bool EmplaceIfNotPresent(Collection* const collection,
+ Args&&... args) {
+ return collection->emplace(std::forward<Args>(args)...).second;
+}
+
+template <class Collection, class... Args>
+void EmplaceOrDie(Collection* const collection,
+ Args&&... args) {
+ CHECK(EmplaceIfNotPresent(collection, std::forward<Args>(args)...))
+ << "duplicate value";
+}
+
+//
// Lookup*()
//
http://git-wip-us.apache.org/repos/asf/kudu/blob/f755f8aa/src/kudu/util/map-util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/map-util-test.cc b/src/kudu/util/map-util-test.cc
index afbee94..3aa9448 100644
--- a/src/kudu/util/map-util-test.cc
+++ b/src/kudu/util/map-util-test.cc
@@ -103,4 +103,14 @@ TEST(EraseKeyReturnValuePtrTest, TestRawAndSmartSmartPointers) {
ASSERT_EQ(*value, "hello_world");
}
+TEST(EmplaceTest, TestEmplace) {
+ // Map with move-only value type.
+ map<string, unique_ptr<string>> my_map;
+ unique_ptr<string> val(new string("foo"));
+ ASSERT_TRUE(EmplaceIfNotPresent(&my_map, "k", std::move(val)));
+ ASSERT_TRUE(ContainsKey(my_map, "k"));
+ ASSERT_FALSE(EmplaceIfNotPresent(&my_map, "k", nullptr))
+ << "Should return false for already-present";
+}
+
} // namespace kudu