You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/11/21 06:00:52 UTC

[kudu] branch master updated: log: refactor close and replace last segment

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


View the commit online:
https://github.com/apache/kudu/commit/0eb0a60f23251afbbe987e9bf88036b0b95f97ab

The following commit(s) were added to refs/heads/master by this push:
     new 0eb0a60  log: refactor close and replace last segment
0eb0a60 is described below

commit 0eb0a60f23251afbbe987e9bf88036b0b95f97ab
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Wed Nov 20 21:17:57 2019 -0800

    log: refactor close and replace last segment
    
    Log::Close and SegmentAllocator::RollOver both tried to do the same thing,
    except Log::Close didn't actually need to replace the last segment, nor did
    it need to close the current segment with state_lock_ held.
    
    I also snuck in a small fix to LogIndex::IndexChunk::Open's use of
    ftruncate, and a visibility fix to a SegmentAllocator member.
    
    Change-Id: I871b43514cbafe9a9b594a551fe653d766298123
    Reviewed-on: http://gerrit.cloudera.org:8080/14767
    Tested-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/consensus/log.cc       | 72 ++++++++++++++++++++++-------------------
 src/kudu/consensus/log.h        | 19 +++++++++--
 src/kudu/consensus/log_index.cc |  4 +--
 3 files changed, 56 insertions(+), 39 deletions(-)

diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index ae15638..2db6b9f 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -463,7 +463,7 @@ Status SegmentAllocator::Init(uint64_t sequence_number) {
                             "could not instantiate compression codec");
     }
   }
-  active_segment_sequence_number = sequence_number;
+  active_segment_sequence_number_ = sequence_number;
   RETURN_NOT_OK(ThreadPoolBuilder("log-alloc")
       .set_max_threads(1)
       .Build(&allocation_pool_));
@@ -531,7 +531,7 @@ Status SegmentAllocator::Sync() {
   return Status::OK();
 }
 
-Status SegmentAllocator::CloseCurrentSegment() {
+Status SegmentAllocator::CloseCurrentSegment(CloseMode mode) {
   if (hooks_) {
     RETURN_NOT_OK_PREPEND(hooks_->PreClose(), "PreClose hook failed");
   }
@@ -548,6 +548,13 @@ Status SegmentAllocator::CloseCurrentSegment() {
   if (hooks_) {
     RETURN_NOT_OK_PREPEND(hooks_->PostClose(), "PostClose hook failed");
   }
+
+  if (mode == CLOSE_AND_REPLACE_LAST_SEGMENT) {
+    scoped_refptr<ReadableLogSegment> last_segment;
+    RETURN_NOT_OK(GetClosedSegment(&last_segment));
+    return reader_replace_last_segment_(std::move(last_segment));
+  }
+
   return Status::OK();
 }
 
@@ -658,10 +665,10 @@ Status SegmentAllocator::AllocateNewSegment() {
 
 Status SegmentAllocator::SwitchToAllocatedSegment() {
   // Increment "next" log segment seqno.
-  active_segment_sequence_number++;
+  active_segment_sequence_number_++;
   const auto& tablet_id = ctx_->tablet_id;
-  string new_segment_path = ctx_->fs_manager->GetWalSegmentFileName(tablet_id,
-                                                                    active_segment_sequence_number);
+  string new_segment_path = ctx_->fs_manager->GetWalSegmentFileName(
+      tablet_id, active_segment_sequence_number_);
   Env* env = ctx_->fs_manager->env();
   RETURN_NOT_OK_PREPEND(env->RenameFile(next_segment_path_, new_segment_path), "rename");
   if (opts_->force_fsync_all) {
@@ -674,7 +681,7 @@ Status SegmentAllocator::SwitchToAllocatedSegment() {
 
   // Set up the new header and footer.
   LogSegmentHeaderPB header;
-  header.set_sequence_number(active_segment_sequence_number);
+  header.set_sequence_number(active_segment_sequence_number_);
   header.set_tablet_id(tablet_id);
   if (codec_) {
     header.set_compression_codec(codec_->type());
@@ -723,10 +730,7 @@ Status SegmentAllocator::RollOver() {
   // If this isn't the first active segment, close the segment and make it
   // available to the log reader.
   if (active_segment_) {
-    RETURN_NOT_OK(CloseCurrentSegment());
-    scoped_refptr<ReadableLogSegment> readable_segment;
-    RETURN_NOT_OK(GetClosedSegment(&readable_segment));
-    RETURN_NOT_OK(reader_replace_last_segment_(readable_segment));
+    RETURN_NOT_OK(CloseCurrentSegment(CLOSE_AND_REPLACE_LAST_SEGMENT));
   }
   RETURN_NOT_OK(SwitchToAllocatedSegment());
 
@@ -790,14 +794,14 @@ Status Log::Init() {
   // The case where we are continuing an existing log.
   // We must pick up where the previous WAL left off in terms of
   // sequence numbers.
-  uint64_t active_segment_sequence_number = 0;
+  uint64_t active_seg_seq_num = 0;
   if (reader_->num_segments() != 0) {
     VLOG_WITH_PREFIX(1) << "Using existing " << reader_->num_segments()
                         << " segments from path: " << ctx_.fs_manager->GetWalsRootDir();
 
     vector<scoped_refptr<ReadableLogSegment> > segments;
     RETURN_NOT_OK(reader_->GetSegmentsSnapshot(&segments));
-    active_segment_sequence_number = segments.back()->header().sequence_number();
+    active_seg_seq_num = segments.back()->header().sequence_number();
   }
 
   if (options_.force_fsync_all) {
@@ -808,7 +812,7 @@ Status Log::Init() {
   }
 
   // We always create a new segment when the log starts.
-  RETURN_NOT_OK(segment_allocator_.Init(active_segment_sequence_number));
+  RETURN_NOT_OK(segment_allocator_.Init(active_seg_seq_num));
   RETURN_NOT_OK(append_thread_->Init());
   log_state_ = kLogWriting;
   return Status::OK();
@@ -918,7 +922,7 @@ Status Log::UpdateIndexForBatch(const LogEntryBatch& batch,
     LogIndexEntry index_entry;
 
     index_entry.op_id = entry_pb.replicate().id();
-    index_entry.segment_sequence_number = segment_allocator_.active_segment_sequence_number;
+    index_entry.segment_sequence_number = segment_allocator_.active_segment_sequence_number();
     index_entry.offset_in_segment = start_offset;
     RETURN_NOT_OK(log_index_->AddEntry(index_entry));
   }
@@ -1111,28 +1115,28 @@ Status Log::Close() {
   segment_allocator_.StopAllocationThread();
   append_thread_->Shutdown();
 
-  std::lock_guard<percpu_rwlock> l(state_lock_);
-  switch (log_state_) {
-    case kLogWriting: {
-      RETURN_NOT_OK(segment_allocator_.CloseCurrentSegment());
-      scoped_refptr<ReadableLogSegment> last_segment;
-      RETURN_NOT_OK(segment_allocator_.GetClosedSegment(&last_segment));
-      RETURN_NOT_OK(reader_->ReplaceLastSegment(last_segment));
-      log_state_ = kLogClosed;
-      VLOG_WITH_PREFIX(1) << "Log closed";
-
-      // Release FDs held by these objects.
-      log_index_.reset();
-      reader_.reset();
-      return Status::OK();
+  {
+    std::lock_guard<percpu_rwlock> l(state_lock_);
+    switch (log_state_) {
+      case kLogWriting:
+        log_state_ = kLogClosed;
+        break;
+      case kLogClosed:
+        VLOG_WITH_PREFIX(1) << "Log already closed";
+        return Status::OK();
+      default:
+        return Status::IllegalState(Substitute(
+            "Log not open. State: $0", log_state_));
     }
-    case kLogClosed:
-      VLOG_WITH_PREFIX(1) << "Log already closed";
-      return Status::OK();
-
-    default:
-      return Status::IllegalState(Substitute("Log not open. State: $0", log_state_));
   }
+
+  RETURN_NOT_OK(segment_allocator_.CloseCurrentSegment(SegmentAllocator::CLOSE));
+  VLOG_WITH_PREFIX(1) << "Log closed";
+
+  // Release FDs held by these objects.
+  log_index_.reset();
+  reader_.reset();
+  return Status::OK();
 }
 
 bool Log::HasOnDiskData(FsManager* fs_manager, const string& tablet_id) {
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 06ebc4a..e877bf1 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -127,7 +127,15 @@ class SegmentAllocator {
   Status Sync();
 
   // Syncs the current segment and writes out the footer.
-  Status CloseCurrentSegment();
+  enum CloseMode {
+    // Just close the current semgent.
+    CLOSE,
+
+    // Close the current segment and call reader_replace_last_segment_ to
+    // replace the last log segment in the log reader.
+    CLOSE_AND_REPLACE_LAST_SEGMENT,
+  };
+  Status CloseCurrentSegment(CloseMode mode);
 
   // Update the footer based on the written 'batch', e.g. to track the
   // last-written OpId.
@@ -137,10 +145,12 @@ class SegmentAllocator {
   // current active segment.
   void StopAllocationThread();
 
-  uint64_t active_segment_sequence_number = 0;
-
   std::string LogPrefix() const { return ctx_->LogPrefix(); }
 
+  uint64_t active_segment_sequence_number() const {
+    return active_segment_sequence_number_;
+  }
+
  private:
   friend class Log;
   friend class LogTest;
@@ -229,6 +239,9 @@ class SegmentAllocator {
   // Single-threaded threadpool on which to allocate segments.
   std::unique_ptr<ThreadPool> allocation_pool_;
   Promise<Status> allocation_status_;
+
+  // The sequence number of the 'active' log segment.
+  uint64_t active_segment_sequence_number_ = 0;
 };
 
 // Log interface, inspired by Raft's (logcabin) Log. Provides durability to
diff --git a/src/kudu/consensus/log_index.cc b/src/kudu/consensus/log_index.cc
index 587fb7a..bfeea72 100644
--- a/src/kudu/consensus/log_index.cc
+++ b/src/kudu/consensus/log_index.cc
@@ -126,12 +126,12 @@ Status LogIndex::IndexChunk::Open() {
 
   int err;
   RETRY_ON_EINTR(err, ftruncate(fd_, kChunkFileSize));
-  RETURN_NOT_OK(CheckError(fd_, "truncate"));
+  RETURN_NOT_OK(CheckError(err, "truncate"));
 
   mapping_ = static_cast<uint8_t*>(mmap(nullptr, kChunkFileSize, PROT_READ | PROT_WRITE,
                                         MAP_SHARED, fd_, 0));
   if (mapping_ == nullptr) {
-    int err = errno;
+    err = errno;
     return Status::IOError("Unable to mmap()", ErrnoToString(err), err);
   }