You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/12/10 19:40:28 UTC
[kudu] 02/02: [consensus] minor clean-up on LogCache
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit d0243afe2ed93aeb18e318068df1bc02de72ad1a
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Thu Dec 9 19:49:23 2021 -0800
[consensus] minor clean-up on LogCache
Since I was looking a bit into the code of the LogCache cache,
I went ahead and did a minor clean-up here, such as
* removing unused code
* fixing code style
* simplifying the going-over-max_size_bytes condition in ReadOps(),
making sure the regression test for KUDU-1586 passes
* fixing signed/unsigned comparison warning for a Raft op's index and
the index of the corresponding entry in the cache
* other unsorted minor updates
Change-Id: I48f60c44209e269eb6b00278c6e32d4398ef9a55
Reviewed-on: http://gerrit.cloudera.org:8080/18081
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Alexey Serbin <as...@cloudera.com>
---
src/kudu/consensus/log_cache.cc | 84 ++++++++++++++++++-----------------------
src/kudu/consensus/log_cache.h | 20 +++++-----
2 files changed, 46 insertions(+), 58 deletions(-)
diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc
index 67cf6f4..83b06f0 100644
--- a/src/kudu/consensus/log_cache.cc
+++ b/src/kudu/consensus/log_cache.cc
@@ -83,13 +83,12 @@ LogCache::LogCache(const scoped_refptr<MetricEntity>& metric_entity,
scoped_refptr<log::Log> log,
string local_uuid,
string tablet_id)
- : log_(std::move(log)),
- local_uuid_(std::move(local_uuid)),
- tablet_id_(std::move(tablet_id)),
- next_sequential_op_index_(0),
- min_pinned_op_index_(0),
- metrics_(metric_entity) {
-
+ : log_(std::move(log)),
+ local_uuid_(std::move(local_uuid)),
+ tablet_id_(std::move(tablet_id)),
+ next_sequential_op_index_(0),
+ min_pinned_op_index_(0),
+ metrics_(metric_entity) {
const int64_t max_ops_size_bytes = FLAGS_log_cache_size_limit_mb * 1024L * 1024L;
const int64_t global_max_ops_size_bytes = FLAGS_global_log_cache_size_limit_mb * 1024L * 1024L;
@@ -107,9 +106,12 @@ LogCache::LogCache(const scoped_refptr<MetricEntity>& metric_entity,
// Put a fake message at index 0, since this simplifies a lot of our
// code paths elsewhere.
- auto zero_op = new ReplicateMsg();
+ auto zero_op = new ReplicateMsg;
*zero_op->mutable_id() = MinimumOpId();
- InsertOrDie(&cache_, 0, { make_scoped_refptr_replicate(zero_op), zero_op->SpaceUsedLong() });
+ DCHECK_EQ(kZeroOpIdx, zero_op->id().index());
+ InsertOrDie(&cache_,
+ kZeroOpIdx,
+ { make_scoped_refptr_replicate(zero_op), zero_op->SpaceUsedLong() });
}
LogCache::~LogCache() {
@@ -119,8 +121,7 @@ LogCache::~LogCache() {
void LogCache::Init(const OpId& preceding_op) {
std::lock_guard<simple_spinlock> l(lock_);
- CHECK_EQ(cache_.size(), 1)
- << "Cache should have only our special '0' op";
+ CHECK_EQ(1, cache_.size()) << "cache should have only special '0' op";
next_sequential_op_index_ = preceding_op.index() + 1;
min_pinned_op_index_ = next_sequential_op_index_;
}
@@ -162,8 +163,8 @@ Status LogCache::AppendOperations(vector<ReplicateRefPtr> msgs,
entries_to_insert.emplace_back(std::move(e));
}
- int64_t first_idx_in_batch = msgs.front()->get()->id().index();
- int64_t last_idx_in_batch = msgs.back()->get()->id().index();
+ const int64_t first_idx_in_batch = msgs.front()->get()->id().index();
+ const int64_t last_idx_in_batch = msgs.back()->get()->id().index();
std::unique_lock<simple_spinlock> l(lock_);
// If we're not appending a consecutive op we're likely overwriting and
@@ -175,8 +176,8 @@ Status LogCache::AppendOperations(vector<ReplicateRefPtr> msgs,
// Try to consume the memory. If it can't be consumed, we may need to evict.
bool borrowed_memory = false;
if (!tracker_->TryConsume(mem_required)) {
- int spare = tracker_->SpareCapacity();
- int need_to_free = mem_required - spare;
+ auto spare = tracker_->SpareCapacity();
+ auto need_to_free = mem_required - spare;
VLOG_WITH_PREFIX_UNLOCKED(1) << "Memory limit would be exceeded trying to append "
<< HumanReadableNumBytes::ToString(mem_required)
<< " to log cache (available="
@@ -188,8 +189,8 @@ Status LogCache::AppendOperations(vector<ReplicateRefPtr> msgs,
EvictSomeUnlocked(min_pinned_op_index_, need_to_free);
// Force consuming, so that we don't refuse appending data. We might
- // blow past our limit a little bit (as much as the number of tablets times
- // the amount of in-flight data in the log), but until implementing the above TODO,
+ // blow past our limit as much as the number of tablets times the amount
+ // of in-flight data in the log, but until implementing the above TODO,
// it's difficult to solve this issue.
tracker_->Consume(mem_required);
@@ -283,9 +284,9 @@ namespace {
// length delimiting and tagging of the message.
int64_t TotalByteSizeForMessage(const ReplicateMsg& msg) {
int64_t msg_size = google::protobuf::internal::WireFormatLite::LengthDelimitedSize(
- msg.ByteSizeLong());
- msg_size += 1; // for the type tag
- return msg_size;
+ msg.ByteSizeLong());
+ // Add an extra byte for the type tag.
+ return msg_size + 1;
}
} // anonymous namespace
@@ -296,13 +297,12 @@ Status LogCache::ReadOps(int64_t after_op_index,
DCHECK_GE(after_op_index, 0);
RETURN_NOT_OK(LookupOpId(after_op_index, preceding_op));
- std::unique_lock<simple_spinlock> l(lock_);
- int64_t next_index = after_op_index + 1;
-
// Return as many operations as we can, up to the limit
int64_t remaining_space = max_size_bytes;
- while (remaining_space > 0 && next_index < next_sequential_op_index_) {
+ int64_t next_index = after_op_index + 1;
+ std::unique_lock<simple_spinlock> l(lock_);
+ while (remaining_space > 0 && next_index < next_sequential_op_index_) {
// If the messages the peer needs haven't been loaded into the queue yet,
// load them.
MessageCache::const_iterator iter = cache_.lower_bound(next_index);
@@ -332,10 +332,10 @@ Status LogCache::ReadOps(int64_t after_op_index,
for (ReplicateMsg* msg : raw_replicate_ptrs) {
CHECK_EQ(next_index, msg->id().index());
- remaining_space -= TotalByteSizeForMessage(*msg);
- if (remaining_space > 0 || messages->empty()) {
+ if (remaining_space > 0) {
messages->push_back(make_scoped_refptr_replicate(msg));
- next_index++;
+ remaining_space -= TotalByteSizeForMessage(*msg);
+ ++next_index;
} else {
delete msg;
}
@@ -383,7 +383,7 @@ void LogCache::EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evic
const ReplicateRefPtr& msg = entry.msg;
VLOG_WITH_PREFIX_UNLOCKED(2) << "considering for eviction: " << msg->get()->id();
int64_t msg_index = msg->get()->id().index();
- if (msg_index == 0) {
+ if (msg_index == kZeroOpIdx) {
// Always keep our special '0' op.
++iter;
continue;
@@ -445,31 +445,21 @@ std::string LogCache::ToStringUnlocked() const {
}
std::string LogCache::LogPrefixUnlocked() const {
- return Substitute("T $0 P $1: ",
- tablet_id_,
- local_uuid_);
-}
-
-void LogCache::DumpToLog() const {
- vector<string> strings;
- DumpToStrings(&strings);
- for (const string& s : strings) {
- LOG_WITH_PREFIX_UNLOCKED(INFO) << s;
- }
+ return Substitute("T $0 P $1: ", tablet_id_, local_uuid_);
}
void LogCache::DumpToStrings(vector<string>* lines) const {
std::lock_guard<simple_spinlock> lock(lock_);
- int counter = 0;
+ lines->reserve(cache_.size() + 2);
lines->push_back(ToStringUnlocked());
lines->push_back("Messages:");
+ size_t counter = 0;
for (const auto& entry : cache_) {
- const ReplicateMsg* msg = entry.second.msg->get();
+ const auto* msg = entry.second.msg->get();
lines->push_back(
- Substitute("Message[$0] $1.$2 : REPLICATE. Type: $3, Size: $4",
- counter++, msg->id().term(), msg->id().index(),
- OperationType_Name(msg->op_type()),
- msg->ByteSizeLong()));
+ Substitute("Message[$0] $1.$2 : REPLICATE. Type: $3, Size: $4",
+ counter++, msg->id().term(), msg->id().index(),
+ OperationType_Name(msg->op_type()), msg->ByteSizeLong()));
}
}
@@ -496,8 +486,8 @@ void LogCache::DumpToHtml(std::ostream& out) const {
#define INSTANTIATE_METRIC(x) \
x.Instantiate(metric_entity, 0)
LogCache::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_entity)
- : log_cache_num_ops(INSTANTIATE_METRIC(METRIC_log_cache_num_ops)),
- log_cache_size(INSTANTIATE_METRIC(METRIC_log_cache_size)) {
+ : log_cache_num_ops(INSTANTIATE_METRIC(METRIC_log_cache_num_ops)),
+ log_cache_size(INSTANTIATE_METRIC(METRIC_log_cache_size)) {
}
#undef INSTANTIATE_METRIC
diff --git a/src/kudu/consensus/log_cache.h b/src/kudu/consensus/log_cache.h
index 9091308..812f50e 100644
--- a/src/kudu/consensus/log_cache.h
+++ b/src/kudu/consensus/log_cache.h
@@ -14,8 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_CONSENSUS_LOG_CACHE_H
-#define KUDU_CONSENSUS_LOG_CACHE_H
+#pragma once
#include <cstddef>
#include <cstdint>
@@ -80,7 +79,7 @@ class LogCache {
// The OpId which precedes the returned ops is returned in *preceding_op.
// The index of this OpId will match 'after_op_index'.
//
- // If the ops being requested are not available in the log, this will synchronously
+ // If the ops being requested are not available in the cache, this will synchronously
// read these ops from disk. Therefore, this function may take a substantial amount
// of time and should not be called with important locks held, etc.
Status ReadOps(int64_t after_op_index,
@@ -103,7 +102,7 @@ class LogCache {
// Following this, reads of truncated indexes using ReadOps(), LookupOpId(),
// HasOpBeenWritten(), etc, will return as if the operations were never appended.
//
- // NOTE: unless a new operation is appended followig 'index', this truncation does
+ // NOTE: unless a new operation is appended following 'index', this truncation does
// not persist across server restarts.
void TruncateOpsAfter(int64_t index);
@@ -122,9 +121,6 @@ class LogCache {
return metrics_.log_cache_num_ops->value();
}
- // Dump the current contents of the cache to the log.
- void DumpToLog() const;
-
// Dumps the contents of the cache to the provided string vector.
void DumpToStrings(std::vector<std::string>* lines) const;
@@ -150,6 +146,9 @@ class LogCache {
FRIEND_TEST(LogCacheTest, TestTruncation);
friend class LogCacheTest;
+ // Index of the special 'zero-op' entry in the cache.
+ static constexpr const int64_t kZeroOpIdx = 0;
+
// An entry in the cache.
struct CacheEntry {
ReplicateRefPtr msg;
@@ -193,7 +192,7 @@ class LogCache {
// An ordered map that serves as the buffer for the cached messages.
// Maps from log index -> ReplicateMsg
- typedef std::map<uint64_t, CacheEntry> MessageCache;
+ typedef std::map<int64_t, CacheEntry> MessageCache;
MessageCache cache_;
// The next log index to append. Each append operation must either
@@ -223,10 +222,10 @@ class LogCache {
explicit Metrics(const scoped_refptr<MetricEntity>& metric_entity);
// Keeps track of the total number of operations in the cache.
- scoped_refptr<AtomicGauge<int64_t> > log_cache_num_ops;
+ scoped_refptr<AtomicGauge<int64_t>> log_cache_num_ops;
// Keeps track of the memory consumed by the cache, in bytes.
- scoped_refptr<AtomicGauge<int64_t> > log_cache_size;
+ scoped_refptr<AtomicGauge<int64_t>> log_cache_size;
};
Metrics metrics_;
@@ -235,4 +234,3 @@ class LogCache {
} // namespace consensus
} // namespace kudu
-#endif /* KUDU_CONSENSUS_LOG_CACHE_H */