You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/12/03 20:43:01 UTC

[kudu] branch master updated: log: some cleanup and modernization

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new f4fe874  log: some cleanup and modernization
f4fe874 is described below

commit f4fe87430349a3f2f33c539329180104755e622e
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Mon Dec 2 13:05:56 2019 -0800

    log: some cleanup and modernization
    
    - Use pass by move when replacing LogReader segments.
    - Remove WritableLogSegment::writable_file() private accessor. Internal code
      can just as easily use writable_file_ directly.
    - Make some RETURN_NOT_OK_PREPEND messages actually useful.
    - Reduce unnecessary usage of more complex LogReader::Open variant.
    
    Change-Id: I3106ede5243d05b2a43f8d43f581316b6ee7ada5
    Reviewed-on: http://gerrit.cloudera.org:8080/14818
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/consensus/log.cc                          | 23 ++++++++++----------
 src/kudu/consensus/log.h                           |  4 ++--
 src/kudu/consensus/log_reader.cc                   | 25 +++++++++++-----------
 src/kudu/consensus/log_reader.h                    |  8 +++----
 src/kudu/consensus/log_util.cc                     |  4 ++--
 src/kudu/consensus/log_util.h                      |  4 ----
 .../timestamp_advancement-itest.cc                 |  9 ++++----
 7 files changed, 37 insertions(+), 40 deletions(-)

diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 2db6b9f..5b19017 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -641,10 +641,9 @@ Status SegmentAllocator::AllocateNewSegment() {
   VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
   unique_ptr<WritableFile> segment_file;
   Env* env = ctx_->fs_manager->env();
-  RETURN_NOT_OK_PREPEND(env->NewTempWritableFile(opts,
-                                         path_tmpl,
-                                         &next_segment_path_,
-                                         &segment_file), "ASDF");
+  RETURN_NOT_OK_PREPEND(env->NewTempWritableFile(
+      opts, path_tmpl, &next_segment_path_, &segment_file),
+                        "could not create next WAL segment");
   next_segment_file_.reset(segment_file.release());
   VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << next_segment_path_;
 
@@ -658,7 +657,8 @@ Status SegmentAllocator::AllocateNewSegment() {
                                                       FLAGS_fs_wal_dir_reserved_bytes));
     // TODO (perf) zero the new segments -- this could result in
     // additional performance improvements.
-    RETURN_NOT_OK_PREPEND(next_segment_file_->PreAllocate(max_segment_size_), "E");
+    RETURN_NOT_OK_PREPEND(next_segment_file_->PreAllocate(max_segment_size_),
+                          "could not preallocate next WAL segment");
   }
   return Status::OK();
 }
@@ -670,7 +670,8 @@ Status SegmentAllocator::SwitchToAllocatedSegment() {
   string new_segment_path = ctx_->fs_manager->GetWalSegmentFileName(
       tablet_id, active_segment_sequence_number_);
   Env* env = ctx_->fs_manager->env();
-  RETURN_NOT_OK_PREPEND(env->RenameFile(next_segment_path_, new_segment_path), "rename");
+  RETURN_NOT_OK_PREPEND(env->RenameFile(next_segment_path_, new_segment_path),
+                        "could not rename next WAL segment");
   if (opts_->force_fsync_all) {
     RETURN_NOT_OK(env->SyncDir(ctx_->log_dir));
   }
@@ -710,7 +711,7 @@ Status SegmentAllocator::SwitchToAllocatedSegment() {
     new ReadableLogSegment(new_segment_path,
                            shared_ptr<RandomAccessFile>(readable_file.release())));
   RETURN_NOT_OK(readable_segment->Init(header, new_segment->first_entry_offset()));
-  RETURN_NOT_OK(reader_add_segment_(readable_segment));
+  RETURN_NOT_OK(reader_add_segment_(std::move(readable_segment)));
 
   // Now set 'active_segment_' to the new segment.
   active_segment_ = std::move(new_segment);
@@ -1187,14 +1188,14 @@ Status Log::RemoveRecoveryDirIfExists(FsManager* fs_manager, const string& table
   return Status::OK();
 }
 
-Status Log::AddEmptySegmentInReader(const scoped_refptr<ReadableLogSegment>& segment) {
+Status Log::AddEmptySegmentInReader(scoped_refptr<ReadableLogSegment> segment) {
   std::lock_guard<percpu_rwlock> l(state_lock_);
-  return reader_->AppendEmptySegment(segment);
+  return reader_->AppendEmptySegment(std::move(segment));
 }
 
-Status Log::ReplaceSegmentInReader(const scoped_refptr<ReadableLogSegment>& segment) {
+Status Log::ReplaceSegmentInReader(scoped_refptr<ReadableLogSegment> segment) {
   std::lock_guard<percpu_rwlock> l(state_lock_);
-  return reader_->ReplaceLastSegment(segment);
+  return reader_->ReplaceLastSegment(std::move(segment));
 }
 
 std::string Log::LogPrefix() const { return ctx_.LogPrefix(); }
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index e877bf1..66e43d8 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -457,10 +457,10 @@ class Log : public RefCountedThreadSafe<Log> {
 
   // Replaces the last "empty" segment in 'log_reader_', i.e. the one currently
   // being written to, with the same segment once properly closed.
-  Status ReplaceSegmentInReader(const scoped_refptr<ReadableLogSegment>& segment);
+  Status ReplaceSegmentInReader(scoped_refptr<ReadableLogSegment> segment);
 
   // Adds the given segment to 'log_reader_'.
-  Status AddEmptySegmentInReader(const scoped_refptr<ReadableLogSegment>& segment);
+  Status AddEmptySegmentInReader(scoped_refptr<ReadableLogSegment> segment);
 
   Status Sync();
 
diff --git a/src/kudu/consensus/log_reader.cc b/src/kudu/consensus/log_reader.cc
index 741f908..f3ae935 100644
--- a/src/kudu/consensus/log_reader.cc
+++ b/src/kudu/consensus/log_reader.cc
@@ -139,6 +139,7 @@ Status LogReader::Init(const string& tablet_wal_path) {
                         "Unable to read children from path");
 
   SegmentSequence read_segments;
+  read_segments.reserve(log_files.size()); // Overestimate; will shrink_to_fit later.
 
   // build a log segment from each file
   for (const string &log_file : log_files) {
@@ -165,20 +166,20 @@ Status LogReader::Init(const string& tablet_wal_path) {
         RETURN_NOT_OK(segment->RebuildFooterByScanning());
       }
 
-      read_segments.push_back(segment);
+      read_segments.emplace_back(std::move(segment));
     }
   }
+  read_segments.shrink_to_fit();
 
   // Sort the segments by sequence number.
   std::sort(read_segments.begin(), read_segments.end(), LogSegmentSeqnoComparator());
 
-
   {
     std::lock_guard<simple_spinlock> lock(lock_);
 
     string previous_seg_path;
     int64_t previous_seg_seqno = -1;
-    for (const SegmentSequence::value_type& entry : read_segments) {
+    for (auto& entry : read_segments) {
       VLOG(1) << " Log Reader Indexed: " << SecureShortDebugString(entry->footer());
       // Check that the log segments are in sequence.
       if (previous_seg_seqno != -1 && entry->header().sequence_number() != previous_seg_seqno + 1) {
@@ -189,7 +190,7 @@ Status LogReader::Init(const string& tablet_wal_path) {
       }
       previous_seg_seqno = entry->header().sequence_number();
       previous_seg_path = entry->path();
-      RETURN_NOT_OK(AppendSegmentUnlocked(entry));
+      RETURN_NOT_OK(AppendSegmentUnlocked(std::move(entry)));
     }
 
     state_ = kLogReaderReading;
@@ -392,7 +393,7 @@ void LogReader::UpdateLastSegmentOffset(int64_t readable_to_offset) {
   segment->UpdateReadableToOffset(readable_to_offset);
 }
 
-Status LogReader::ReplaceLastSegment(const scoped_refptr<ReadableLogSegment>& segment) {
+Status LogReader::ReplaceLastSegment(scoped_refptr<ReadableLogSegment> segment) {
   // This is used to replace the last segment once we close it properly so it must
   // have a footer.
   DCHECK(segment->HasFooter());
@@ -402,21 +403,21 @@ Status LogReader::ReplaceLastSegment(const scoped_refptr<ReadableLogSegment>& se
   // Make sure the segment we're replacing has the same sequence number
   CHECK(!segments_.empty());
   CHECK_EQ(segment->header().sequence_number(), segments_.back()->header().sequence_number());
-  segments_[segments_.size() - 1] = segment;
+  segments_[segments_.size() - 1] = std::move(segment);
 
   return Status::OK();
 }
 
-Status LogReader::AppendSegment(const scoped_refptr<ReadableLogSegment>& segment) {
+Status LogReader::AppendSegment(scoped_refptr<ReadableLogSegment> segment) {
   DCHECK(segment->IsInitialized());
   if (PREDICT_FALSE(!segment->HasFooter())) {
     RETURN_NOT_OK(segment->RebuildFooterByScanning());
   }
   std::lock_guard<simple_spinlock> lock(lock_);
-  return AppendSegmentUnlocked(segment);
+  return AppendSegmentUnlocked(std::move(segment));
 }
 
-Status LogReader::AppendSegmentUnlocked(const scoped_refptr<ReadableLogSegment>& segment) {
+Status LogReader::AppendSegmentUnlocked(scoped_refptr<ReadableLogSegment> segment) {
   DCHECK(segment->IsInitialized());
   DCHECK(segment->HasFooter());
 
@@ -424,11 +425,11 @@ Status LogReader::AppendSegmentUnlocked(const scoped_refptr<ReadableLogSegment>&
     CHECK_EQ(segments_.back()->header().sequence_number() + 1,
              segment->header().sequence_number());
   }
-  segments_.push_back(segment);
+  segments_.emplace_back(std::move(segment));
   return Status::OK();
 }
 
-Status LogReader::AppendEmptySegment(const scoped_refptr<ReadableLogSegment>& segment) {
+Status LogReader::AppendEmptySegment(scoped_refptr<ReadableLogSegment> segment) {
   DCHECK(segment->IsInitialized());
   std::lock_guard<simple_spinlock> lock(lock_);
   CHECK_EQ(state_, kLogReaderReading);
@@ -436,7 +437,7 @@ Status LogReader::AppendEmptySegment(const scoped_refptr<ReadableLogSegment>& se
     CHECK_EQ(segments_.back()->header().sequence_number() + 1,
              segment->header().sequence_number());
   }
-  segments_.push_back(segment);
+  segments_.emplace_back(std::move(segment));
   return Status::OK();
 }
 
diff --git a/src/kudu/consensus/log_reader.h b/src/kudu/consensus/log_reader.h
index 7d6ae2f..cbf4c2d 100644
--- a/src/kudu/consensus/log_reader.h
+++ b/src/kudu/consensus/log_reader.h
@@ -134,11 +134,11 @@ class LogReader : public enable_make_shared<LogReader> {
   // Index entries in 'segment's footer will be added to the index.
   // If the segment has no footer it will be scanned so this should not be used
   // for new segments.
-  Status AppendSegment(const scoped_refptr<ReadableLogSegment>& segment);
+  Status AppendSegment(scoped_refptr<ReadableLogSegment> segment);
 
   // Same as above but for segments without any entries.
   // Used by the Log to add "empty" segments.
-  Status AppendEmptySegment(const scoped_refptr<ReadableLogSegment>& segment);
+  Status AppendEmptySegment(scoped_refptr<ReadableLogSegment> segment);
 
   // Removes segments with sequence numbers less than or equal to
   // 'segment_sequence_number' from this reader.
@@ -150,14 +150,14 @@ class LogReader : public enable_make_shared<LogReader> {
   // Requires that the last segment in 'segments_' has the same sequence
   // number as 'segment'.
   // Expects 'segment' to be properly closed and to have footer.
-  Status ReplaceLastSegment(const scoped_refptr<ReadableLogSegment>& segment);
+  Status ReplaceLastSegment(scoped_refptr<ReadableLogSegment> segment);
 
   // Appends 'segment' to the segment sequence.
   // Assumes that the segment was scanned, if no footer was found.
   // To be used only internally, clients of this class with private access (i.e. friends)
   // should use the thread safe version, AppendSegment(), which will also scan the segment
   // if no footer is present.
-  Status AppendSegmentUnlocked(const scoped_refptr<ReadableLogSegment>& segment);
+  Status AppendSegmentUnlocked(scoped_refptr<ReadableLogSegment> segment);
 
   // Used by Log to update its LogReader on how far it is possible to read
   // the current segment. Requires that the reader has at least one segment
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 43477db..e2d4949 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -774,7 +774,7 @@ Status WritableLogSegment::WriteHeaderAndOpen(const LogSegmentHeaderPB& new_head
   PutFixed32(&buf, new_header.ByteSize());
   // Then Serialize the PB.
   pb_util::AppendToString(new_header, &buf);
-  RETURN_NOT_OK(writable_file()->Append(Slice(buf)));
+  RETURN_NOT_OK(writable_file_->Append(Slice(buf)));
 
   header_.CopyFrom(new_header);
   first_entry_offset_ = buf.size();
@@ -796,7 +796,7 @@ Status WritableLogSegment::WriteFooterAndClose(const LogSegmentFooterPB& footer)
   buf.append(kLogSegmentFooterMagicString);
   PutFixed32(&buf, footer.ByteSize());
 
-  RETURN_NOT_OK_PREPEND(writable_file()->Append(Slice(buf)), "Could not write the footer");
+  RETURN_NOT_OK_PREPEND(writable_file_->Append(Slice(buf)), "Could not write the footer");
 
   footer_.CopyFrom(footer);
   is_footer_written_ = true;
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index 8edec1d..426387e 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -475,10 +475,6 @@ class WritableLogSegment {
  private:
   FRIEND_TEST(LogTest, TestAutoStopIdleAppendThread);
 
-  const std::shared_ptr<WritableFile>& writable_file() const {
-    return writable_file_;
-  }
-
   // The path to the log file.
   const std::string path_;
 
diff --git a/src/kudu/integration-tests/timestamp_advancement-itest.cc b/src/kudu/integration-tests/timestamp_advancement-itest.cc
index 7cede18..3e6b77d 100644
--- a/src/kudu/integration-tests/timestamp_advancement-itest.cc
+++ b/src/kudu/integration-tests/timestamp_advancement-itest.cc
@@ -41,7 +41,6 @@
 #include "kudu/consensus/log_util.h"
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/raft_consensus.h"
-#include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -180,10 +179,10 @@ class TimestampAdvancementITest : public MiniClusterITestBase {
   Status CheckForWriteReplicatesInLog(MiniTabletServer* ts, const string& tablet_id,
                                       bool* has_write_replicates) const {
     shared_ptr<LogReader> reader;
-    RETURN_NOT_OK(LogReader::Open(env_,
-                  ts->server()->fs_manager()->GetTabletWalDir(tablet_id),
-                  scoped_refptr<log::LogIndex>(), tablet_id,
-                  scoped_refptr<MetricEntity>(), &reader));
+    RETURN_NOT_OK(LogReader::Open(
+       ts->server()->fs_manager(),
+       scoped_refptr<log::LogIndex>(), tablet_id,
+       scoped_refptr<MetricEntity>(), &reader));
     log::SegmentSequence segs;
     RETURN_NOT_OK(reader->GetSegmentsSnapshot(&segs));
     unique_ptr<log::LogEntryPB> entry;