You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/01/31 20:44:04 UTC
[2/3] kudu git commit: wal: Include standard prefix on glog lines
wal: Include standard prefix on glog lines
Change-Id: I724e85001beb68adbb806212ea3cb63292d0de56
Reviewed-on: http://gerrit.cloudera.org:8080/5827
Tested-by: Mike Percy <mp...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/19e88d57
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/19e88d57
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/19e88d57
Branch: refs/heads/master
Commit: 19e88d57d5c686a88fbc063f96e2965bc74fe703
Parents: 99fcc31
Author: Mike Percy <mp...@apache.org>
Authored: Sun Jan 29 00:33:06 2017 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Jan 31 20:43:28 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/log.cc | 79 ++++++++++++++++++++++++------------------
src/kudu/consensus/log.h | 2 ++
2 files changed, 47 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/19e88d57/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 2eb1c7d..716baa0 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -164,6 +164,8 @@ class Log::AppendThread {
private:
void RunThread();
+ string LogPrefix() const;
+
Log* const log_;
// Lock to protect access to thread_ during shutdown.
@@ -178,7 +180,7 @@ Log::AppendThread::AppendThread(Log *log)
Status Log::AppendThread::Init() {
DCHECK(!thread_) << "Already initialized";
- VLOG(1) << "Starting log append thread for tablet " << log_->tablet_id();
+ VLOG_WITH_PREFIX(1) << "Starting log append thread";
RETURN_NOT_OK(kudu::Thread::Create("log", "appender",
&AppendThread::RunThread, this, &thread_));
return Status::OK();
@@ -212,9 +214,9 @@ void Log::AppendThread::RunThread() {
TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
Status s = log_->DoAppend(entry_batch);
if (PREDICT_FALSE(!s.ok())) {
- LOG(ERROR) << "Error appending to the log: " << s.ToString();
+ LOG_WITH_PREFIX(ERROR) << "Error appending to the log: " << s.ToString();
entry_batch->set_failed_to_append();
- // TODO If a single transaction fails to append, should we
+ // TODO(af): If a single transaction fails to append, should we
// abort all subsequent transactions in this batch or allow
// them to be appended? What about transactions in future
// batches?
@@ -232,7 +234,7 @@ void Log::AppendThread::RunThread() {
s = log_->Sync();
}
if (PREDICT_FALSE(!s.ok())) {
- LOG(ERROR) << "Error syncing log" << s.ToString();
+ LOG_WITH_PREFIX(ERROR) << "Error syncing log: " << s.ToString();
for (LogEntryBatch* entry_batch : entry_batches) {
if (!entry_batch->callback().is_null()) {
entry_batch->callback().Run(s);
@@ -240,7 +242,7 @@ void Log::AppendThread::RunThread() {
}
} else {
TRACE_EVENT0("log", "Callbacks");
- VLOG(2) << "Synchronized " << entry_batches.size() << " entry batches";
+ VLOG_WITH_PREFIX(2) << "Synchronized " << entry_batches.size() << " entry batches";
SCOPED_WATCH_STACK(100);
for (LogEntryBatch* entry_batch : entry_batches) {
if (PREDICT_TRUE(!entry_batch->failed_to_append()
@@ -255,20 +257,24 @@ void Log::AppendThread::RunThread() {
entry_batches.clear();
}
}
- VLOG(1) << "Exiting AppendThread for tablet " << log_->tablet_id();
+ VLOG_WITH_PREFIX(1) << "Exiting AppendThread";
}
void Log::AppendThread::Shutdown() {
log_->entry_queue()->Shutdown();
std::lock_guard<std::mutex> lock_guard(lock_);
if (thread_) {
- VLOG(1) << "Shutting down log append thread for tablet " << log_->tablet_id();
+ VLOG_WITH_PREFIX(1) << "Shutting down log append thread";
CHECK_OK(ThreadJoiner(thread_.get()).Join());
- VLOG(1) << "Log append thread for tablet " << log_->tablet_id() << " is shut down";
+ VLOG_WITH_PREFIX(1) << "Log append thread is shut down";
thread_.reset();
}
}
+string Log::AppendThread::LogPrefix() const {
+ return log_->LogPrefix();
+}
+
// This task is submitted to allocation_pool_ in order to
// asynchronously pre-allocate new log segments.
void Log::SegmentAllocationTask() {
@@ -355,8 +361,8 @@ Status Log::Init() {
// We must pick up where the previous WAL left off in terms of
// sequence numbers.
if (reader_->num_segments() != 0) {
- VLOG(1) << "Using existing " << reader_->num_segments()
- << " segments from path: " << fs_manager_->GetWalsRootDir();
+ VLOG_WITH_PREFIX(1) << "Using existing " << reader_->num_segments()
+ << " segments from path: " << fs_manager_->GetWalsRootDir();
vector<scoped_refptr<ReadableLogSegment> > segments;
RETURN_NOT_OK(reader_->GetSegmentsSnapshot(&segments));
@@ -364,9 +370,10 @@ Status Log::Init() {
}
if (force_sync_all_) {
- KLOG_FIRST_N(INFO, 1) << "Log is configured to fsync() on all Append() calls";
+ KLOG_FIRST_N(INFO, 1) << LogPrefix() << "Log is configured to fsync() on all Append() calls";
} else {
- KLOG_FIRST_N(INFO, 1) << "Log is configured to *not* fsync() on all Append() calls";
+ KLOG_FIRST_N(INFO, 1) << LogPrefix()
+ << "Log is configured to *not* fsync() on all Append() calls";
}
// We always create a new segment when the log starts.
@@ -391,11 +398,11 @@ Status Log::AsyncAllocateSegment() {
Status Log::CloseCurrentSegment() {
if (!footer_builder_.has_min_replicate_index()) {
- VLOG(1) << "Writing a segment without any REPLICATE message. "
- "Segment: " << active_segment_->path();
+ VLOG_WITH_PREFIX(1) << "Writing a segment without any REPLICATE message. Segment: "
+ << active_segment_->path();
}
- VLOG(2) << "Segment footer for " << active_segment_->path()
- << ": " << SecureShortDebugString(footer_builder_);
+ VLOG_WITH_PREFIX(2) << "Segment footer for " << active_segment_->path()
+ << ": " << SecureShortDebugString(footer_builder_);
footer_builder_.set_close_timestamp_micros(GetCurrentTimeMicros());
RETURN_NOT_OK(active_segment_->WriteFooterAndClose(footer_builder_));
@@ -416,7 +423,7 @@ Status Log::RollOver() {
RETURN_NOT_OK(SwitchToAllocatedSegment());
- LOG(INFO) << "Rolled over to a new segment: " << active_segment_->path();
+ LOG_WITH_PREFIX(INFO) << "Rolled over to a new log segment at " << active_segment_->path();
return Status::OK();
}
@@ -531,25 +538,25 @@ Status Log::DoAppend(LogEntryBatch* entry_batch) {
// if the size of this entry overflows the current segment, get a new one
if (allocation_state() == kAllocationNotStarted) {
if ((active_segment_->Size() + entry_batch_bytes + 4) > max_segment_size_) {
- LOG(INFO) << "Max segment size reached. Starting new segment allocation. ";
+ LOG_WITH_PREFIX(INFO) << "Max segment size reached. Starting new segment allocation";
RETURN_NOT_OK(AsyncAllocateSegment());
if (!options_.async_preallocate_segments) {
- LOG_SLOW_EXECUTION(WARNING, 50, "Log roll took a long time") {
+ LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Log roll took a long time", LogPrefix())) {
RETURN_NOT_OK(RollOver());
}
}
}
} else if (allocation_state() == kAllocationFinished) {
- LOG_SLOW_EXECUTION(WARNING, 50, "Log roll took a long time") {
+ LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Log roll took a long time", LogPrefix())) {
RETURN_NOT_OK(RollOver());
}
} else {
- VLOG(1) << "Segment allocation already in progress...";
+ VLOG_WITH_PREFIX(1) << "Segment allocation already in progress...";
}
int64_t start_offset = active_segment_->written_offset();
- LOG_SLOW_EXECUTION(WARNING, 50, "Append to log took a long time") {
+ LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Append to log took a long time", LogPrefix())) {
SCOPED_LATENCY_METRIC(metrics_, append_latency);
SCOPED_WATCH_STACK(500);
@@ -624,14 +631,13 @@ Status Log::Sync() {
int sleep_ms = r.Normal(FLAGS_log_inject_latency_ms_mean,
FLAGS_log_inject_latency_ms_stddev);
if (sleep_ms > 0) {
- LOG(INFO) << "T " << tablet_id_ << ": Injecting "
- << sleep_ms << "ms of latency in Log::Sync()";
+ LOG_WITH_PREFIX(WARNING) << "Injecting " << sleep_ms << "ms of latency in Log::Sync()";
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
}
}
if (force_sync_all_ && !sync_disabled_) {
- LOG_SLOW_EXECUTION(WARNING, 50, "Fsync log took a long time") {
+ LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Fsync log took a long time", LogPrefix())) {
RETURN_NOT_OK(active_segment_->Sync());
if (log_hooks_) {
@@ -722,10 +728,10 @@ void Log::GetLatestEntryOpId(consensus::OpId* op_id) const {
Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) {
CHECK_GE(retention_indexes.for_durability, 0);
- VLOG(1) << "Running Log GC on " << log_dir_ << ": retaining "
+ VLOG_WITH_PREFIX(1) << "Running Log GC on " << log_dir_ << ": retaining "
"ops >= " << retention_indexes.for_durability << " for durability, "
"ops >= " << retention_indexes.for_peers << " for peers";
- VLOG_TIMING(1, "Log GC") {
+ VLOG_TIMING(1, Substitute("$0Log GC", LogPrefix())) {
SegmentSequence segments_to_delete;
{
@@ -735,7 +741,7 @@ Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) {
RETURN_NOT_OK(GetSegmentsToGCUnlocked(retention_indexes, &segments_to_delete));
if (segments_to_delete.empty()) {
- VLOG(1) << "No segments to delete.";
+ VLOG_WITH_PREFIX(1) << "No segments to delete.";
*num_gced = 0;
return Status::OK();
}
@@ -755,7 +761,7 @@ Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) {
segment->footer().min_replicate_index(),
segment->footer().max_replicate_index());
}
- LOG(INFO) << "Deleting log segment in path: " << segment->path() << ops_str;
+ LOG_WITH_PREFIX(INFO) << "Deleting log segment in path: " << segment->path() << ops_str;
RETURN_NOT_OK(fs_manager_->env()->DeleteFile(segment->path()));
(*num_gced)++;
}
@@ -829,7 +835,7 @@ Status Log::Close() {
RETURN_NOT_OK(CloseCurrentSegment());
RETURN_NOT_OK(ReplaceSegmentInReaderUnlocked());
log_state_ = kLogClosed;
- VLOG(1) << "Log closed";
+ VLOG_WITH_PREFIX(1) << "Log closed";
// Release FDs held by these objects.
log_index_.reset();
@@ -842,7 +848,7 @@ Status Log::Close() {
return Status::OK();
case kLogClosed:
- VLOG(1) << "Log already closed";
+ VLOG_WITH_PREFIX(1) << "Log already closed";
return Status::OK();
default:
@@ -861,7 +867,8 @@ Status Log::DeleteOnDiskData(FsManager* fs_manager, const string& tablet_id) {
if (!env->FileExists(wal_dir)) {
return Status::OK();
}
- LOG(INFO) << "T " << tablet_id << " Deleting WAL directory";
+ LOG(INFO) << Substitute("T $0 P $1: Deleting WAL directory at $2",
+ tablet_id, fs_manager->uuid(), wal_dir);
RETURN_NOT_OK_PREPEND(env->DeleteRecursively(wal_dir),
"Unable to recursively delete WAL dir for tablet " + tablet_id);
return Status::OK();
@@ -989,17 +996,21 @@ Status Log::CreatePlaceholderSegment(const WritableFileOptions& opts,
shared_ptr<WritableFile>* out) {
string tmp_suffix = strings::Substitute("$0$1", kTmpInfix, ".newsegmentXXXXXX");
string path_tmpl = JoinPathSegments(log_dir_, tmp_suffix);
- VLOG(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
+ VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
unique_ptr<WritableFile> segment_file;
RETURN_NOT_OK(fs_manager_->env()->NewTempWritableFile(opts,
path_tmpl,
result_path,
&segment_file));
- VLOG(1) << "Created next WAL segment, placeholder path: " << *result_path;
+ VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << *result_path;
out->reset(segment_file.release());
return Status::OK();
}
+std::string Log::LogPrefix() const {
+ return Substitute("T $0 P $1: ", tablet_id_, fs_manager_->uuid());
+}
+
Log::~Log() {
WARN_NOT_OK(Close(), "Error closing log");
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/19e88d57/src/kudu/consensus/log.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 8a5767f..5caa347 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -333,6 +333,8 @@ class Log : public RefCountedThreadSafe<Log> {
return allocation_state_;
}
+ std::string LogPrefix() const;
+
LogOptions options_;
FsManager *fs_manager_;
std::string log_dir_;