You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/12/10 19:40:28 UTC

[kudu] 02/02: [consensus] minor clean-up on LogCache

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit d0243afe2ed93aeb18e318068df1bc02de72ad1a
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Thu Dec 9 19:49:23 2021 -0800

    [consensus] minor clean-up on LogCache
    
    Since I was looking a bit into the code of the LogCache cache,
    I went ahead and did a minor clean-up here, such as
    
      * removing unused code
      * fixing code style
      * simplifying the going-over-max_size_bytes condition in ReadOps(),
        making sure the regression test for KUDU-1586 passes
      * fixing signed/unsigned comparison warning for a Raft op's index and
        the index of the corresponding entry in the cache
      * other unsorted minor updates
    
    Change-Id: I48f60c44209e269eb6b00278c6e32d4398ef9a55
    Reviewed-on: http://gerrit.cloudera.org:8080/18081
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/consensus/log_cache.cc | 84 ++++++++++++++++++-----------------------
 src/kudu/consensus/log_cache.h  | 20 +++++-----
 2 files changed, 46 insertions(+), 58 deletions(-)

diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc
index 67cf6f4..83b06f0 100644
--- a/src/kudu/consensus/log_cache.cc
+++ b/src/kudu/consensus/log_cache.cc
@@ -83,13 +83,12 @@ LogCache::LogCache(const scoped_refptr<MetricEntity>& metric_entity,
                    scoped_refptr<log::Log> log,
                    string local_uuid,
                    string tablet_id)
-  : log_(std::move(log)),
-    local_uuid_(std::move(local_uuid)),
-    tablet_id_(std::move(tablet_id)),
-    next_sequential_op_index_(0),
-    min_pinned_op_index_(0),
-    metrics_(metric_entity) {
-
+    : log_(std::move(log)),
+      local_uuid_(std::move(local_uuid)),
+      tablet_id_(std::move(tablet_id)),
+      next_sequential_op_index_(0),
+      min_pinned_op_index_(0),
+      metrics_(metric_entity) {
 
   const int64_t max_ops_size_bytes = FLAGS_log_cache_size_limit_mb * 1024L * 1024L;
   const int64_t global_max_ops_size_bytes = FLAGS_global_log_cache_size_limit_mb * 1024L * 1024L;
@@ -107,9 +106,12 @@ LogCache::LogCache(const scoped_refptr<MetricEntity>& metric_entity,
 
   // Put a fake message at index 0, since this simplifies a lot of our
   // code paths elsewhere.
-  auto zero_op = new ReplicateMsg();
+  auto zero_op = new ReplicateMsg;
   *zero_op->mutable_id() = MinimumOpId();
-  InsertOrDie(&cache_, 0, { make_scoped_refptr_replicate(zero_op), zero_op->SpaceUsedLong() });
+  DCHECK_EQ(kZeroOpIdx, zero_op->id().index());
+  InsertOrDie(&cache_,
+              kZeroOpIdx,
+              { make_scoped_refptr_replicate(zero_op), zero_op->SpaceUsedLong() });
 }
 
 LogCache::~LogCache() {
@@ -119,8 +121,7 @@ LogCache::~LogCache() {
 
 void LogCache::Init(const OpId& preceding_op) {
   std::lock_guard<simple_spinlock> l(lock_);
-  CHECK_EQ(cache_.size(), 1)
-    << "Cache should have only our special '0' op";
+  CHECK_EQ(1, cache_.size()) << "cache should have only special '0' op";
   next_sequential_op_index_ = preceding_op.index() + 1;
   min_pinned_op_index_ = next_sequential_op_index_;
 }
@@ -162,8 +163,8 @@ Status LogCache::AppendOperations(vector<ReplicateRefPtr> msgs,
     entries_to_insert.emplace_back(std::move(e));
   }
 
-  int64_t first_idx_in_batch = msgs.front()->get()->id().index();
-  int64_t last_idx_in_batch = msgs.back()->get()->id().index();
+  const int64_t first_idx_in_batch = msgs.front()->get()->id().index();
+  const int64_t last_idx_in_batch = msgs.back()->get()->id().index();
 
   std::unique_lock<simple_spinlock> l(lock_);
   // If we're not appending a consecutive op we're likely overwriting and
@@ -175,8 +176,8 @@ Status LogCache::AppendOperations(vector<ReplicateRefPtr> msgs,
   // Try to consume the memory. If it can't be consumed, we may need to evict.
   bool borrowed_memory = false;
   if (!tracker_->TryConsume(mem_required)) {
-    int spare = tracker_->SpareCapacity();
-    int need_to_free = mem_required - spare;
+    auto spare = tracker_->SpareCapacity();
+    auto need_to_free = mem_required - spare;
     VLOG_WITH_PREFIX_UNLOCKED(1) << "Memory limit would be exceeded trying to append "
                         << HumanReadableNumBytes::ToString(mem_required)
                         << " to log cache (available="
@@ -188,8 +189,8 @@ Status LogCache::AppendOperations(vector<ReplicateRefPtr> msgs,
     EvictSomeUnlocked(min_pinned_op_index_, need_to_free);
 
     // Force consuming, so that we don't refuse appending data. We might
-    // blow past our limit a little bit (as much as the number of tablets times
-    // the amount of in-flight data in the log), but until implementing the above TODO,
+    // blow past our limit as much as the number of tablets times the amount
+    // of in-flight data in the log, but until implementing the above TODO,
     // it's difficult to solve this issue.
     tracker_->Consume(mem_required);
 
@@ -283,9 +284,9 @@ namespace {
 // length delimiting and tagging of the message.
 int64_t TotalByteSizeForMessage(const ReplicateMsg& msg) {
   int64_t msg_size = google::protobuf::internal::WireFormatLite::LengthDelimitedSize(
-    msg.ByteSizeLong());
-  msg_size += 1; // for the type tag
-  return msg_size;
+      msg.ByteSizeLong());
+  // Add an extra byte for the type tag.
+  return msg_size + 1;
 }
 } // anonymous namespace
 
@@ -296,13 +297,12 @@ Status LogCache::ReadOps(int64_t after_op_index,
   DCHECK_GE(after_op_index, 0);
   RETURN_NOT_OK(LookupOpId(after_op_index, preceding_op));
 
-  std::unique_lock<simple_spinlock> l(lock_);
-  int64_t next_index = after_op_index + 1;
-
   // Return as many operations as we can, up to the limit
   int64_t remaining_space = max_size_bytes;
-  while (remaining_space > 0 && next_index < next_sequential_op_index_) {
+  int64_t next_index = after_op_index + 1;
 
+  std::unique_lock<simple_spinlock> l(lock_);
+  while (remaining_space > 0 && next_index < next_sequential_op_index_) {
     // If the messages the peer needs haven't been loaded into the queue yet,
     // load them.
     MessageCache::const_iterator iter = cache_.lower_bound(next_index);
@@ -332,10 +332,10 @@ Status LogCache::ReadOps(int64_t after_op_index,
       for (ReplicateMsg* msg : raw_replicate_ptrs) {
         CHECK_EQ(next_index, msg->id().index());
 
-        remaining_space -= TotalByteSizeForMessage(*msg);
-        if (remaining_space > 0 || messages->empty()) {
+        if (remaining_space > 0) {
           messages->push_back(make_scoped_refptr_replicate(msg));
-          next_index++;
+          remaining_space -= TotalByteSizeForMessage(*msg);
+          ++next_index;
         } else {
           delete msg;
         }
@@ -383,7 +383,7 @@ void LogCache::EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evic
     const ReplicateRefPtr& msg = entry.msg;
     VLOG_WITH_PREFIX_UNLOCKED(2) << "considering for eviction: " << msg->get()->id();
     int64_t msg_index = msg->get()->id().index();
-    if (msg_index == 0) {
+    if (msg_index == kZeroOpIdx) {
       // Always keep our special '0' op.
       ++iter;
       continue;
@@ -445,31 +445,21 @@ std::string LogCache::ToStringUnlocked() const {
 }
 
 std::string LogCache::LogPrefixUnlocked() const {
-  return Substitute("T $0 P $1: ",
-                    tablet_id_,
-                    local_uuid_);
-}
-
-void LogCache::DumpToLog() const {
-  vector<string> strings;
-  DumpToStrings(&strings);
-  for (const string& s : strings) {
-    LOG_WITH_PREFIX_UNLOCKED(INFO) << s;
-  }
+  return Substitute("T $0 P $1: ", tablet_id_, local_uuid_);
 }
 
 void LogCache::DumpToStrings(vector<string>* lines) const {
   std::lock_guard<simple_spinlock> lock(lock_);
-  int counter = 0;
+  lines->reserve(cache_.size() + 2);
   lines->push_back(ToStringUnlocked());
   lines->push_back("Messages:");
+  size_t counter = 0;
   for (const auto& entry : cache_) {
-    const ReplicateMsg* msg = entry.second.msg->get();
+    const auto* msg = entry.second.msg->get();
     lines->push_back(
-      Substitute("Message[$0] $1.$2 : REPLICATE. Type: $3, Size: $4",
-                 counter++, msg->id().term(), msg->id().index(),
-                 OperationType_Name(msg->op_type()),
-                 msg->ByteSizeLong()));
+        Substitute("Message[$0] $1.$2 : REPLICATE. Type: $3, Size: $4",
+                   counter++, msg->id().term(), msg->id().index(),
+                   OperationType_Name(msg->op_type()), msg->ByteSizeLong()));
   }
 }
 
@@ -496,8 +486,8 @@ void LogCache::DumpToHtml(std::ostream& out) const {
 #define INSTANTIATE_METRIC(x) \
   x.Instantiate(metric_entity, 0)
 LogCache::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_entity)
-  : log_cache_num_ops(INSTANTIATE_METRIC(METRIC_log_cache_num_ops)),
-    log_cache_size(INSTANTIATE_METRIC(METRIC_log_cache_size)) {
+    : log_cache_num_ops(INSTANTIATE_METRIC(METRIC_log_cache_num_ops)),
+      log_cache_size(INSTANTIATE_METRIC(METRIC_log_cache_size)) {
 }
 #undef INSTANTIATE_METRIC
 
diff --git a/src/kudu/consensus/log_cache.h b/src/kudu/consensus/log_cache.h
index 9091308..812f50e 100644
--- a/src/kudu/consensus/log_cache.h
+++ b/src/kudu/consensus/log_cache.h
@@ -14,8 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_CONSENSUS_LOG_CACHE_H
-#define KUDU_CONSENSUS_LOG_CACHE_H
+#pragma once
 
 #include <cstddef>
 #include <cstdint>
@@ -80,7 +79,7 @@ class LogCache {
   // The OpId which precedes the returned ops is returned in *preceding_op.
   // The index of this OpId will match 'after_op_index'.
   //
-  // If the ops being requested are not available in the log, this will synchronously
+  // If the ops being requested are not available in the cache, this will synchronously
   // read these ops from disk. Therefore, this function may take a substantial amount
   // of time and should not be called with important locks held, etc.
   Status ReadOps(int64_t after_op_index,
@@ -103,7 +102,7 @@ class LogCache {
   // Following this, reads of truncated indexes using ReadOps(), LookupOpId(),
   // HasOpBeenWritten(), etc, will return as if the operations were never appended.
   //
-  // NOTE: unless a new operation is appended followig 'index', this truncation does
+  // NOTE: unless a new operation is appended following 'index', this truncation does
   // not persist across server restarts.
   void TruncateOpsAfter(int64_t index);
 
@@ -122,9 +121,6 @@ class LogCache {
     return metrics_.log_cache_num_ops->value();
   }
 
-  // Dump the current contents of the cache to the log.
-  void DumpToLog() const;
-
   // Dumps the contents of the cache to the provided string vector.
   void DumpToStrings(std::vector<std::string>* lines) const;
 
@@ -150,6 +146,9 @@ class LogCache {
   FRIEND_TEST(LogCacheTest, TestTruncation);
   friend class LogCacheTest;
 
+  // Index of the special 'zero-op' entry in the cache.
+  static constexpr const int64_t kZeroOpIdx = 0;
+
   // An entry in the cache.
   struct CacheEntry {
     ReplicateRefPtr msg;
@@ -193,7 +192,7 @@ class LogCache {
 
   // An ordered map that serves as the buffer for the cached messages.
   // Maps from log index -> ReplicateMsg
-  typedef std::map<uint64_t, CacheEntry> MessageCache;
+  typedef std::map<int64_t, CacheEntry> MessageCache;
   MessageCache cache_;
 
   // The next log index to append. Each append operation must either
@@ -223,10 +222,10 @@ class LogCache {
     explicit Metrics(const scoped_refptr<MetricEntity>& metric_entity);
 
     // Keeps track of the total number of operations in the cache.
-    scoped_refptr<AtomicGauge<int64_t> > log_cache_num_ops;
+    scoped_refptr<AtomicGauge<int64_t>> log_cache_num_ops;
 
     // Keeps track of the memory consumed by the cache, in bytes.
-    scoped_refptr<AtomicGauge<int64_t> > log_cache_size;
+    scoped_refptr<AtomicGauge<int64_t>> log_cache_size;
   };
   Metrics metrics_;
 
@@ -235,4 +234,3 @@ class LogCache {
 
 } // namespace consensus
 } // namespace kudu
-#endif /* KUDU_CONSENSUS_LOG_CACHE_H */