You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/11/21 06:00:52 UTC
[kudu] branch master updated: log: refactor close and replace last
segment
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
View the commit online:
https://github.com/apache/kudu/commit/0eb0a60f23251afbbe987e9bf88036b0b95f97ab
The following commit(s) were added to refs/heads/master by this push:
new 0eb0a60 log: refactor close and replace last segment
0eb0a60 is described below
commit 0eb0a60f23251afbbe987e9bf88036b0b95f97ab
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Wed Nov 20 21:17:57 2019 -0800
log: refactor close and replace last segment
Log::Close and SegmentAllocator::RollOver both tried to do the same thing,
except Log::Close didn't actually need to replace the last segment, nor did
it need to close the current segment with state_lock_ held.
I also snuck in a small fix to LogIndex::IndexChunk::Open's use of
ftruncate, and a visibility fix to a SegmentAllocator member.
Change-Id: I871b43514cbafe9a9b594a551fe653d766298123
Reviewed-on: http://gerrit.cloudera.org:8080/14767
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/consensus/log.cc | 72 ++++++++++++++++++++++-------------------
src/kudu/consensus/log.h | 19 +++++++++--
src/kudu/consensus/log_index.cc | 4 +--
3 files changed, 56 insertions(+), 39 deletions(-)
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index ae15638..2db6b9f 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -463,7 +463,7 @@ Status SegmentAllocator::Init(uint64_t sequence_number) {
"could not instantiate compression codec");
}
}
- active_segment_sequence_number = sequence_number;
+ active_segment_sequence_number_ = sequence_number;
RETURN_NOT_OK(ThreadPoolBuilder("log-alloc")
.set_max_threads(1)
.Build(&allocation_pool_));
@@ -531,7 +531,7 @@ Status SegmentAllocator::Sync() {
return Status::OK();
}
-Status SegmentAllocator::CloseCurrentSegment() {
+Status SegmentAllocator::CloseCurrentSegment(CloseMode mode) {
if (hooks_) {
RETURN_NOT_OK_PREPEND(hooks_->PreClose(), "PreClose hook failed");
}
@@ -548,6 +548,13 @@ Status SegmentAllocator::CloseCurrentSegment() {
if (hooks_) {
RETURN_NOT_OK_PREPEND(hooks_->PostClose(), "PostClose hook failed");
}
+
+ if (mode == CLOSE_AND_REPLACE_LAST_SEGMENT) {
+ scoped_refptr<ReadableLogSegment> last_segment;
+ RETURN_NOT_OK(GetClosedSegment(&last_segment));
+ return reader_replace_last_segment_(std::move(last_segment));
+ }
+
return Status::OK();
}
@@ -658,10 +665,10 @@ Status SegmentAllocator::AllocateNewSegment() {
Status SegmentAllocator::SwitchToAllocatedSegment() {
// Increment "next" log segment seqno.
- active_segment_sequence_number++;
+ active_segment_sequence_number_++;
const auto& tablet_id = ctx_->tablet_id;
- string new_segment_path = ctx_->fs_manager->GetWalSegmentFileName(tablet_id,
- active_segment_sequence_number);
+ string new_segment_path = ctx_->fs_manager->GetWalSegmentFileName(
+ tablet_id, active_segment_sequence_number_);
Env* env = ctx_->fs_manager->env();
RETURN_NOT_OK_PREPEND(env->RenameFile(next_segment_path_, new_segment_path), "rename");
if (opts_->force_fsync_all) {
@@ -674,7 +681,7 @@ Status SegmentAllocator::SwitchToAllocatedSegment() {
// Set up the new header and footer.
LogSegmentHeaderPB header;
- header.set_sequence_number(active_segment_sequence_number);
+ header.set_sequence_number(active_segment_sequence_number_);
header.set_tablet_id(tablet_id);
if (codec_) {
header.set_compression_codec(codec_->type());
@@ -723,10 +730,7 @@ Status SegmentAllocator::RollOver() {
// If this isn't the first active segment, close the segment and make it
// available to the log reader.
if (active_segment_) {
- RETURN_NOT_OK(CloseCurrentSegment());
- scoped_refptr<ReadableLogSegment> readable_segment;
- RETURN_NOT_OK(GetClosedSegment(&readable_segment));
- RETURN_NOT_OK(reader_replace_last_segment_(readable_segment));
+ RETURN_NOT_OK(CloseCurrentSegment(CLOSE_AND_REPLACE_LAST_SEGMENT));
}
RETURN_NOT_OK(SwitchToAllocatedSegment());
@@ -790,14 +794,14 @@ Status Log::Init() {
// The case where we are continuing an existing log.
// We must pick up where the previous WAL left off in terms of
// sequence numbers.
- uint64_t active_segment_sequence_number = 0;
+ uint64_t active_seg_seq_num = 0;
if (reader_->num_segments() != 0) {
VLOG_WITH_PREFIX(1) << "Using existing " << reader_->num_segments()
<< " segments from path: " << ctx_.fs_manager->GetWalsRootDir();
vector<scoped_refptr<ReadableLogSegment> > segments;
RETURN_NOT_OK(reader_->GetSegmentsSnapshot(&segments));
- active_segment_sequence_number = segments.back()->header().sequence_number();
+ active_seg_seq_num = segments.back()->header().sequence_number();
}
if (options_.force_fsync_all) {
@@ -808,7 +812,7 @@ Status Log::Init() {
}
// We always create a new segment when the log starts.
- RETURN_NOT_OK(segment_allocator_.Init(active_segment_sequence_number));
+ RETURN_NOT_OK(segment_allocator_.Init(active_seg_seq_num));
RETURN_NOT_OK(append_thread_->Init());
log_state_ = kLogWriting;
return Status::OK();
@@ -918,7 +922,7 @@ Status Log::UpdateIndexForBatch(const LogEntryBatch& batch,
LogIndexEntry index_entry;
index_entry.op_id = entry_pb.replicate().id();
- index_entry.segment_sequence_number = segment_allocator_.active_segment_sequence_number;
+ index_entry.segment_sequence_number = segment_allocator_.active_segment_sequence_number();
index_entry.offset_in_segment = start_offset;
RETURN_NOT_OK(log_index_->AddEntry(index_entry));
}
@@ -1111,28 +1115,28 @@ Status Log::Close() {
segment_allocator_.StopAllocationThread();
append_thread_->Shutdown();
- std::lock_guard<percpu_rwlock> l(state_lock_);
- switch (log_state_) {
- case kLogWriting: {
- RETURN_NOT_OK(segment_allocator_.CloseCurrentSegment());
- scoped_refptr<ReadableLogSegment> last_segment;
- RETURN_NOT_OK(segment_allocator_.GetClosedSegment(&last_segment));
- RETURN_NOT_OK(reader_->ReplaceLastSegment(last_segment));
- log_state_ = kLogClosed;
- VLOG_WITH_PREFIX(1) << "Log closed";
-
- // Release FDs held by these objects.
- log_index_.reset();
- reader_.reset();
- return Status::OK();
+ {
+ std::lock_guard<percpu_rwlock> l(state_lock_);
+ switch (log_state_) {
+ case kLogWriting:
+ log_state_ = kLogClosed;
+ break;
+ case kLogClosed:
+ VLOG_WITH_PREFIX(1) << "Log already closed";
+ return Status::OK();
+ default:
+ return Status::IllegalState(Substitute(
+ "Log not open. State: $0", log_state_));
}
- case kLogClosed:
- VLOG_WITH_PREFIX(1) << "Log already closed";
- return Status::OK();
-
- default:
- return Status::IllegalState(Substitute("Log not open. State: $0", log_state_));
}
+
+ RETURN_NOT_OK(segment_allocator_.CloseCurrentSegment(SegmentAllocator::CLOSE));
+ VLOG_WITH_PREFIX(1) << "Log closed";
+
+ // Release FDs held by these objects.
+ log_index_.reset();
+ reader_.reset();
+ return Status::OK();
}
bool Log::HasOnDiskData(FsManager* fs_manager, const string& tablet_id) {
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 06ebc4a..e877bf1 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -127,7 +127,15 @@ class SegmentAllocator {
Status Sync();
// Syncs the current segment and writes out the footer.
- Status CloseCurrentSegment();
+ enum CloseMode {
+ // Just close the current semgent.
+ CLOSE,
+
+ // Close the current segment and call reader_replace_last_segment_ to
+ // replace the last log segment in the log reader.
+ CLOSE_AND_REPLACE_LAST_SEGMENT,
+ };
+ Status CloseCurrentSegment(CloseMode mode);
// Update the footer based on the written 'batch', e.g. to track the
// last-written OpId.
@@ -137,10 +145,12 @@ class SegmentAllocator {
// current active segment.
void StopAllocationThread();
- uint64_t active_segment_sequence_number = 0;
-
std::string LogPrefix() const { return ctx_->LogPrefix(); }
+ uint64_t active_segment_sequence_number() const {
+ return active_segment_sequence_number_;
+ }
+
private:
friend class Log;
friend class LogTest;
@@ -229,6 +239,9 @@ class SegmentAllocator {
// Single-threaded threadpool on which to allocate segments.
std::unique_ptr<ThreadPool> allocation_pool_;
Promise<Status> allocation_status_;
+
+ // The sequence number of the 'active' log segment.
+ uint64_t active_segment_sequence_number_ = 0;
};
// Log interface, inspired by Raft's (logcabin) Log. Provides durability to
diff --git a/src/kudu/consensus/log_index.cc b/src/kudu/consensus/log_index.cc
index 587fb7a..bfeea72 100644
--- a/src/kudu/consensus/log_index.cc
+++ b/src/kudu/consensus/log_index.cc
@@ -126,12 +126,12 @@ Status LogIndex::IndexChunk::Open() {
int err;
RETRY_ON_EINTR(err, ftruncate(fd_, kChunkFileSize));
- RETURN_NOT_OK(CheckError(fd_, "truncate"));
+ RETURN_NOT_OK(CheckError(err, "truncate"));
mapping_ = static_cast<uint8_t*>(mmap(nullptr, kChunkFileSize, PROT_READ | PROT_WRITE,
MAP_SHARED, fd_, 0));
if (mapping_ == nullptr) {
- int err = errno;
+ err = errno;
return Status::IOError("Unable to mmap()", ErrnoToString(err), err);
}