You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/12/03 20:43:01 UTC
[kudu] branch master updated: log: some cleanup and modernization
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new f4fe874 log: some cleanup and modernization
f4fe874 is described below
commit f4fe87430349a3f2f33c539329180104755e622e
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Mon Dec 2 13:05:56 2019 -0800
log: some cleanup and modernization
- Use pass by move when replacing LogReader segments.
- Remove WritableLogSegment::writable_file() private accessor. Internal code
can just as easily use writable_file_ directly.
- Make some RETURN_NOT_OK_PREPEND messages actually useful.
- Reduce unnecessary usage of more complex LogReader::Open variant.
Change-Id: I3106ede5243d05b2a43f8d43f581316b6ee7ada5
Reviewed-on: http://gerrit.cloudera.org:8080/14818
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Adar Dembo <ad...@cloudera.com>
---
src/kudu/consensus/log.cc | 23 ++++++++++----------
src/kudu/consensus/log.h | 4 ++--
src/kudu/consensus/log_reader.cc | 25 +++++++++++-----------
src/kudu/consensus/log_reader.h | 8 +++----
src/kudu/consensus/log_util.cc | 4 ++--
src/kudu/consensus/log_util.h | 4 ----
.../timestamp_advancement-itest.cc | 9 ++++----
7 files changed, 37 insertions(+), 40 deletions(-)
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 2db6b9f..5b19017 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -641,10 +641,9 @@ Status SegmentAllocator::AllocateNewSegment() {
VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
unique_ptr<WritableFile> segment_file;
Env* env = ctx_->fs_manager->env();
- RETURN_NOT_OK_PREPEND(env->NewTempWritableFile(opts,
- path_tmpl,
- &next_segment_path_,
- &segment_file), "ASDF");
+ RETURN_NOT_OK_PREPEND(env->NewTempWritableFile(
+ opts, path_tmpl, &next_segment_path_, &segment_file),
+ "could not create next WAL segment");
next_segment_file_.reset(segment_file.release());
VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << next_segment_path_;
@@ -658,7 +657,8 @@ Status SegmentAllocator::AllocateNewSegment() {
FLAGS_fs_wal_dir_reserved_bytes));
// TODO (perf) zero the new segments -- this could result in
// additional performance improvements.
- RETURN_NOT_OK_PREPEND(next_segment_file_->PreAllocate(max_segment_size_), "E");
+ RETURN_NOT_OK_PREPEND(next_segment_file_->PreAllocate(max_segment_size_),
+ "could not preallocate next WAL segment");
}
return Status::OK();
}
@@ -670,7 +670,8 @@ Status SegmentAllocator::SwitchToAllocatedSegment() {
string new_segment_path = ctx_->fs_manager->GetWalSegmentFileName(
tablet_id, active_segment_sequence_number_);
Env* env = ctx_->fs_manager->env();
- RETURN_NOT_OK_PREPEND(env->RenameFile(next_segment_path_, new_segment_path), "rename");
+ RETURN_NOT_OK_PREPEND(env->RenameFile(next_segment_path_, new_segment_path),
+ "could not rename next WAL segment");
if (opts_->force_fsync_all) {
RETURN_NOT_OK(env->SyncDir(ctx_->log_dir));
}
@@ -710,7 +711,7 @@ Status SegmentAllocator::SwitchToAllocatedSegment() {
new ReadableLogSegment(new_segment_path,
shared_ptr<RandomAccessFile>(readable_file.release())));
RETURN_NOT_OK(readable_segment->Init(header, new_segment->first_entry_offset()));
- RETURN_NOT_OK(reader_add_segment_(readable_segment));
+ RETURN_NOT_OK(reader_add_segment_(std::move(readable_segment)));
// Now set 'active_segment_' to the new segment.
active_segment_ = std::move(new_segment);
@@ -1187,14 +1188,14 @@ Status Log::RemoveRecoveryDirIfExists(FsManager* fs_manager, const string& table
return Status::OK();
}
-Status Log::AddEmptySegmentInReader(const scoped_refptr<ReadableLogSegment>& segment) {
+Status Log::AddEmptySegmentInReader(scoped_refptr<ReadableLogSegment> segment) {
std::lock_guard<percpu_rwlock> l(state_lock_);
- return reader_->AppendEmptySegment(segment);
+ return reader_->AppendEmptySegment(std::move(segment));
}
-Status Log::ReplaceSegmentInReader(const scoped_refptr<ReadableLogSegment>& segment) {
+Status Log::ReplaceSegmentInReader(scoped_refptr<ReadableLogSegment> segment) {
std::lock_guard<percpu_rwlock> l(state_lock_);
- return reader_->ReplaceLastSegment(segment);
+ return reader_->ReplaceLastSegment(std::move(segment));
}
std::string Log::LogPrefix() const { return ctx_.LogPrefix(); }
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index e877bf1..66e43d8 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -457,10 +457,10 @@ class Log : public RefCountedThreadSafe<Log> {
// Replaces the last "empty" segment in 'log_reader_', i.e. the one currently
// being written to, with the same segment once properly closed.
- Status ReplaceSegmentInReader(const scoped_refptr<ReadableLogSegment>& segment);
+ Status ReplaceSegmentInReader(scoped_refptr<ReadableLogSegment> segment);
// Adds the given segment to 'log_reader_'.
- Status AddEmptySegmentInReader(const scoped_refptr<ReadableLogSegment>& segment);
+ Status AddEmptySegmentInReader(scoped_refptr<ReadableLogSegment> segment);
Status Sync();
diff --git a/src/kudu/consensus/log_reader.cc b/src/kudu/consensus/log_reader.cc
index 741f908..f3ae935 100644
--- a/src/kudu/consensus/log_reader.cc
+++ b/src/kudu/consensus/log_reader.cc
@@ -139,6 +139,7 @@ Status LogReader::Init(const string& tablet_wal_path) {
"Unable to read children from path");
SegmentSequence read_segments;
+ read_segments.reserve(log_files.size()); // Overestimate; will shrink_to_fit later.
// build a log segment from each file
for (const string &log_file : log_files) {
@@ -165,20 +166,20 @@ Status LogReader::Init(const string& tablet_wal_path) {
RETURN_NOT_OK(segment->RebuildFooterByScanning());
}
- read_segments.push_back(segment);
+ read_segments.emplace_back(std::move(segment));
}
}
+ read_segments.shrink_to_fit();
// Sort the segments by sequence number.
std::sort(read_segments.begin(), read_segments.end(), LogSegmentSeqnoComparator());
-
{
std::lock_guard<simple_spinlock> lock(lock_);
string previous_seg_path;
int64_t previous_seg_seqno = -1;
- for (const SegmentSequence::value_type& entry : read_segments) {
+ for (auto& entry : read_segments) {
VLOG(1) << " Log Reader Indexed: " << SecureShortDebugString(entry->footer());
// Check that the log segments are in sequence.
if (previous_seg_seqno != -1 && entry->header().sequence_number() != previous_seg_seqno + 1) {
@@ -189,7 +190,7 @@ Status LogReader::Init(const string& tablet_wal_path) {
}
previous_seg_seqno = entry->header().sequence_number();
previous_seg_path = entry->path();
- RETURN_NOT_OK(AppendSegmentUnlocked(entry));
+ RETURN_NOT_OK(AppendSegmentUnlocked(std::move(entry)));
}
state_ = kLogReaderReading;
@@ -392,7 +393,7 @@ void LogReader::UpdateLastSegmentOffset(int64_t readable_to_offset) {
segment->UpdateReadableToOffset(readable_to_offset);
}
-Status LogReader::ReplaceLastSegment(const scoped_refptr<ReadableLogSegment>& segment) {
+Status LogReader::ReplaceLastSegment(scoped_refptr<ReadableLogSegment> segment) {
// This is used to replace the last segment once we close it properly so it must
// have a footer.
DCHECK(segment->HasFooter());
@@ -402,21 +403,21 @@ Status LogReader::ReplaceLastSegment(const scoped_refptr<ReadableLogSegment>& se
// Make sure the segment we're replacing has the same sequence number
CHECK(!segments_.empty());
CHECK_EQ(segment->header().sequence_number(), segments_.back()->header().sequence_number());
- segments_[segments_.size() - 1] = segment;
+ segments_[segments_.size() - 1] = std::move(segment);
return Status::OK();
}
-Status LogReader::AppendSegment(const scoped_refptr<ReadableLogSegment>& segment) {
+Status LogReader::AppendSegment(scoped_refptr<ReadableLogSegment> segment) {
DCHECK(segment->IsInitialized());
if (PREDICT_FALSE(!segment->HasFooter())) {
RETURN_NOT_OK(segment->RebuildFooterByScanning());
}
std::lock_guard<simple_spinlock> lock(lock_);
- return AppendSegmentUnlocked(segment);
+ return AppendSegmentUnlocked(std::move(segment));
}
-Status LogReader::AppendSegmentUnlocked(const scoped_refptr<ReadableLogSegment>& segment) {
+Status LogReader::AppendSegmentUnlocked(scoped_refptr<ReadableLogSegment> segment) {
DCHECK(segment->IsInitialized());
DCHECK(segment->HasFooter());
@@ -424,11 +425,11 @@ Status LogReader::AppendSegmentUnlocked(const scoped_refptr<ReadableLogSegment>&
CHECK_EQ(segments_.back()->header().sequence_number() + 1,
segment->header().sequence_number());
}
- segments_.push_back(segment);
+ segments_.emplace_back(std::move(segment));
return Status::OK();
}
-Status LogReader::AppendEmptySegment(const scoped_refptr<ReadableLogSegment>& segment) {
+Status LogReader::AppendEmptySegment(scoped_refptr<ReadableLogSegment> segment) {
DCHECK(segment->IsInitialized());
std::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(state_, kLogReaderReading);
@@ -436,7 +437,7 @@ Status LogReader::AppendEmptySegment(const scoped_refptr<ReadableLogSegment>& se
CHECK_EQ(segments_.back()->header().sequence_number() + 1,
segment->header().sequence_number());
}
- segments_.push_back(segment);
+ segments_.emplace_back(std::move(segment));
return Status::OK();
}
diff --git a/src/kudu/consensus/log_reader.h b/src/kudu/consensus/log_reader.h
index 7d6ae2f..cbf4c2d 100644
--- a/src/kudu/consensus/log_reader.h
+++ b/src/kudu/consensus/log_reader.h
@@ -134,11 +134,11 @@ class LogReader : public enable_make_shared<LogReader> {
// Index entries in 'segment's footer will be added to the index.
// If the segment has no footer it will be scanned so this should not be used
// for new segments.
- Status AppendSegment(const scoped_refptr<ReadableLogSegment>& segment);
+ Status AppendSegment(scoped_refptr<ReadableLogSegment> segment);
// Same as above but for segments without any entries.
// Used by the Log to add "empty" segments.
- Status AppendEmptySegment(const scoped_refptr<ReadableLogSegment>& segment);
+ Status AppendEmptySegment(scoped_refptr<ReadableLogSegment> segment);
// Removes segments with sequence numbers less than or equal to
// 'segment_sequence_number' from this reader.
@@ -150,14 +150,14 @@ class LogReader : public enable_make_shared<LogReader> {
// Requires that the last segment in 'segments_' has the same sequence
// number as 'segment'.
// Expects 'segment' to be properly closed and to have footer.
- Status ReplaceLastSegment(const scoped_refptr<ReadableLogSegment>& segment);
+ Status ReplaceLastSegment(scoped_refptr<ReadableLogSegment> segment);
// Appends 'segment' to the segment sequence.
// Assumes that the segment was scanned, if no footer was found.
// To be used only internally, clients of this class with private access (i.e. friends)
// should use the thread safe version, AppendSegment(), which will also scan the segment
// if no footer is present.
- Status AppendSegmentUnlocked(const scoped_refptr<ReadableLogSegment>& segment);
+ Status AppendSegmentUnlocked(scoped_refptr<ReadableLogSegment> segment);
// Used by Log to update its LogReader on how far it is possible to read
// the current segment. Requires that the reader has at least one segment
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 43477db..e2d4949 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -774,7 +774,7 @@ Status WritableLogSegment::WriteHeaderAndOpen(const LogSegmentHeaderPB& new_head
PutFixed32(&buf, new_header.ByteSize());
// Then Serialize the PB.
pb_util::AppendToString(new_header, &buf);
- RETURN_NOT_OK(writable_file()->Append(Slice(buf)));
+ RETURN_NOT_OK(writable_file_->Append(Slice(buf)));
header_.CopyFrom(new_header);
first_entry_offset_ = buf.size();
@@ -796,7 +796,7 @@ Status WritableLogSegment::WriteFooterAndClose(const LogSegmentFooterPB& footer)
buf.append(kLogSegmentFooterMagicString);
PutFixed32(&buf, footer.ByteSize());
- RETURN_NOT_OK_PREPEND(writable_file()->Append(Slice(buf)), "Could not write the footer");
+ RETURN_NOT_OK_PREPEND(writable_file_->Append(Slice(buf)), "Could not write the footer");
footer_.CopyFrom(footer);
is_footer_written_ = true;
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index 8edec1d..426387e 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -475,10 +475,6 @@ class WritableLogSegment {
private:
FRIEND_TEST(LogTest, TestAutoStopIdleAppendThread);
- const std::shared_ptr<WritableFile>& writable_file() const {
- return writable_file_;
- }
-
// The path to the log file.
const std::string path_;
diff --git a/src/kudu/integration-tests/timestamp_advancement-itest.cc b/src/kudu/integration-tests/timestamp_advancement-itest.cc
index 7cede18..3e6b77d 100644
--- a/src/kudu/integration-tests/timestamp_advancement-itest.cc
+++ b/src/kudu/integration-tests/timestamp_advancement-itest.cc
@@ -41,7 +41,6 @@
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/raft_consensus.h"
-#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
@@ -180,10 +179,10 @@ class TimestampAdvancementITest : public MiniClusterITestBase {
Status CheckForWriteReplicatesInLog(MiniTabletServer* ts, const string& tablet_id,
bool* has_write_replicates) const {
shared_ptr<LogReader> reader;
- RETURN_NOT_OK(LogReader::Open(env_,
- ts->server()->fs_manager()->GetTabletWalDir(tablet_id),
- scoped_refptr<log::LogIndex>(), tablet_id,
- scoped_refptr<MetricEntity>(), &reader));
+ RETURN_NOT_OK(LogReader::Open(
+ ts->server()->fs_manager(),
+ scoped_refptr<log::LogIndex>(), tablet_id,
+ scoped_refptr<MetricEntity>(), &reader));
log::SegmentSequence segs;
RETURN_NOT_OK(reader->GetSegmentsSnapshot(&segs));
unique_ptr<log::LogEntryPB> entry;