You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/01/31 20:44:04 UTC

[2/3] kudu git commit: wal: Include standard prefix on glog lines

wal: Include standard prefix on glog lines

Change-Id: I724e85001beb68adbb806212ea3cb63292d0de56
Reviewed-on: http://gerrit.cloudera.org:8080/5827
Tested-by: Mike Percy <mp...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/19e88d57
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/19e88d57
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/19e88d57

Branch: refs/heads/master
Commit: 19e88d57d5c686a88fbc063f96e2965bc74fe703
Parents: 99fcc31
Author: Mike Percy <mp...@apache.org>
Authored: Sun Jan 29 00:33:06 2017 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Jan 31 20:43:28 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/log.cc | 79 ++++++++++++++++++++++++------------------
 src/kudu/consensus/log.h  |  2 ++
 2 files changed, 47 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/19e88d57/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 2eb1c7d..716baa0 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -164,6 +164,8 @@ class Log::AppendThread {
  private:
   void RunThread();
 
+  string LogPrefix() const;
+
   Log* const log_;
 
   // Lock to protect access to thread_ during shutdown.
@@ -178,7 +180,7 @@ Log::AppendThread::AppendThread(Log *log)
 
 Status Log::AppendThread::Init() {
   DCHECK(!thread_) << "Already initialized";
-  VLOG(1) << "Starting log append thread for tablet " << log_->tablet_id();
+  VLOG_WITH_PREFIX(1) << "Starting log append thread";
   RETURN_NOT_OK(kudu::Thread::Create("log", "appender",
       &AppendThread::RunThread, this, &thread_));
   return Status::OK();
@@ -212,9 +214,9 @@ void Log::AppendThread::RunThread() {
       TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
       Status s = log_->DoAppend(entry_batch);
       if (PREDICT_FALSE(!s.ok())) {
-        LOG(ERROR) << "Error appending to the log: " << s.ToString();
+        LOG_WITH_PREFIX(ERROR) << "Error appending to the log: " << s.ToString();
         entry_batch->set_failed_to_append();
-        // TODO If a single transaction fails to append, should we
+        // TODO(af): If a single transaction fails to append, should we
         // abort all subsequent transactions in this batch or allow
         // them to be appended? What about transactions in future
         // batches?
@@ -232,7 +234,7 @@ void Log::AppendThread::RunThread() {
       s = log_->Sync();
     }
     if (PREDICT_FALSE(!s.ok())) {
-      LOG(ERROR) << "Error syncing log" << s.ToString();
+      LOG_WITH_PREFIX(ERROR) << "Error syncing log: " << s.ToString();
       for (LogEntryBatch* entry_batch : entry_batches) {
         if (!entry_batch->callback().is_null()) {
           entry_batch->callback().Run(s);
@@ -240,7 +242,7 @@ void Log::AppendThread::RunThread() {
       }
     } else {
       TRACE_EVENT0("log", "Callbacks");
-      VLOG(2) << "Synchronized " << entry_batches.size() << " entry batches";
+      VLOG_WITH_PREFIX(2) << "Synchronized " << entry_batches.size() << " entry batches";
       SCOPED_WATCH_STACK(100);
       for (LogEntryBatch* entry_batch : entry_batches) {
         if (PREDICT_TRUE(!entry_batch->failed_to_append()
@@ -255,20 +257,24 @@ void Log::AppendThread::RunThread() {
       entry_batches.clear();
     }
   }
-  VLOG(1) << "Exiting AppendThread for tablet " << log_->tablet_id();
+  VLOG_WITH_PREFIX(1) << "Exiting AppendThread";
 }
 
 void Log::AppendThread::Shutdown() {
   log_->entry_queue()->Shutdown();
   std::lock_guard<std::mutex> lock_guard(lock_);
   if (thread_) {
-    VLOG(1) << "Shutting down log append thread for tablet " << log_->tablet_id();
+    VLOG_WITH_PREFIX(1) << "Shutting down log append thread";
     CHECK_OK(ThreadJoiner(thread_.get()).Join());
-    VLOG(1) << "Log append thread for tablet " << log_->tablet_id() << " is shut down";
+    VLOG_WITH_PREFIX(1) << "Log append thread is shut down";
     thread_.reset();
   }
 }
 
+string Log::AppendThread::LogPrefix() const {
+  return log_->LogPrefix();
+}
+
 // This task is submitted to allocation_pool_ in order to
 // asynchronously pre-allocate new log segments.
 void Log::SegmentAllocationTask() {
@@ -355,8 +361,8 @@ Status Log::Init() {
   // We must pick up where the previous WAL left off in terms of
   // sequence numbers.
   if (reader_->num_segments() != 0) {
-    VLOG(1) << "Using existing " << reader_->num_segments()
-            << " segments from path: " << fs_manager_->GetWalsRootDir();
+    VLOG_WITH_PREFIX(1) << "Using existing " << reader_->num_segments()
+                        << " segments from path: " << fs_manager_->GetWalsRootDir();
 
     vector<scoped_refptr<ReadableLogSegment> > segments;
     RETURN_NOT_OK(reader_->GetSegmentsSnapshot(&segments));
@@ -364,9 +370,10 @@ Status Log::Init() {
   }
 
   if (force_sync_all_) {
-    KLOG_FIRST_N(INFO, 1) << "Log is configured to fsync() on all Append() calls";
+    KLOG_FIRST_N(INFO, 1) << LogPrefix() << "Log is configured to fsync() on all Append() calls";
   } else {
-    KLOG_FIRST_N(INFO, 1) << "Log is configured to *not* fsync() on all Append() calls";
+    KLOG_FIRST_N(INFO, 1) << LogPrefix()
+                          << "Log is configured to *not* fsync() on all Append() calls";
   }
 
   // We always create a new segment when the log starts.
@@ -391,11 +398,11 @@ Status Log::AsyncAllocateSegment() {
 
 Status Log::CloseCurrentSegment() {
   if (!footer_builder_.has_min_replicate_index()) {
-    VLOG(1) << "Writing a segment without any REPLICATE message. "
-        "Segment: " << active_segment_->path();
+    VLOG_WITH_PREFIX(1) << "Writing a segment without any REPLICATE message. Segment: "
+                        << active_segment_->path();
   }
-  VLOG(2) << "Segment footer for " << active_segment_->path()
-          << ": " << SecureShortDebugString(footer_builder_);
+  VLOG_WITH_PREFIX(2) << "Segment footer for " << active_segment_->path()
+                      << ": " << SecureShortDebugString(footer_builder_);
 
   footer_builder_.set_close_timestamp_micros(GetCurrentTimeMicros());
   RETURN_NOT_OK(active_segment_->WriteFooterAndClose(footer_builder_));
@@ -416,7 +423,7 @@ Status Log::RollOver() {
 
   RETURN_NOT_OK(SwitchToAllocatedSegment());
 
-  LOG(INFO) << "Rolled over to a new segment: " << active_segment_->path();
+  LOG_WITH_PREFIX(INFO) << "Rolled over to a new log segment at " << active_segment_->path();
   return Status::OK();
 }
 
@@ -531,25 +538,25 @@ Status Log::DoAppend(LogEntryBatch* entry_batch) {
   // if the size of this entry overflows the current segment, get a new one
   if (allocation_state() == kAllocationNotStarted) {
     if ((active_segment_->Size() + entry_batch_bytes + 4) > max_segment_size_) {
-      LOG(INFO) << "Max segment size reached. Starting new segment allocation. ";
+      LOG_WITH_PREFIX(INFO) << "Max segment size reached. Starting new segment allocation";
       RETURN_NOT_OK(AsyncAllocateSegment());
       if (!options_.async_preallocate_segments) {
-        LOG_SLOW_EXECUTION(WARNING, 50, "Log roll took a long time") {
+        LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Log roll took a long time", LogPrefix())) {
           RETURN_NOT_OK(RollOver());
         }
       }
     }
   } else if (allocation_state() == kAllocationFinished) {
-    LOG_SLOW_EXECUTION(WARNING, 50, "Log roll took a long time") {
+    LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Log roll took a long time", LogPrefix())) {
       RETURN_NOT_OK(RollOver());
     }
   } else {
-    VLOG(1) << "Segment allocation already in progress...";
+    VLOG_WITH_PREFIX(1) << "Segment allocation already in progress...";
   }
 
   int64_t start_offset = active_segment_->written_offset();
 
-  LOG_SLOW_EXECUTION(WARNING, 50, "Append to log took a long time") {
+  LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Append to log took a long time", LogPrefix())) {
     SCOPED_LATENCY_METRIC(metrics_, append_latency);
     SCOPED_WATCH_STACK(500);
 
@@ -624,14 +631,13 @@ Status Log::Sync() {
     int sleep_ms = r.Normal(FLAGS_log_inject_latency_ms_mean,
                             FLAGS_log_inject_latency_ms_stddev);
     if (sleep_ms > 0) {
-      LOG(INFO) << "T " << tablet_id_ << ": Injecting "
-                << sleep_ms << "ms of latency in Log::Sync()";
+      LOG_WITH_PREFIX(WARNING) << "Injecting " << sleep_ms << "ms of latency in Log::Sync()";
       SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
     }
   }
 
   if (force_sync_all_ && !sync_disabled_) {
-    LOG_SLOW_EXECUTION(WARNING, 50, "Fsync log took a long time") {
+    LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Fsync log took a long time", LogPrefix())) {
       RETURN_NOT_OK(active_segment_->Sync());
 
       if (log_hooks_) {
@@ -722,10 +728,10 @@ void Log::GetLatestEntryOpId(consensus::OpId* op_id) const {
 Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) {
   CHECK_GE(retention_indexes.for_durability, 0);
 
-  VLOG(1) << "Running Log GC on " << log_dir_ << ": retaining "
+  VLOG_WITH_PREFIX(1) << "Running Log GC on " << log_dir_ << ": retaining "
       "ops >= " << retention_indexes.for_durability << " for durability, "
       "ops >= " << retention_indexes.for_peers << " for peers";
-  VLOG_TIMING(1, "Log GC") {
+  VLOG_TIMING(1, Substitute("$0Log GC", LogPrefix())) {
     SegmentSequence segments_to_delete;
 
     {
@@ -735,7 +741,7 @@ Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) {
       RETURN_NOT_OK(GetSegmentsToGCUnlocked(retention_indexes, &segments_to_delete));
 
       if (segments_to_delete.empty()) {
-        VLOG(1) << "No segments to delete.";
+        VLOG_WITH_PREFIX(1) << "No segments to delete.";
         *num_gced = 0;
         return Status::OK();
       }
@@ -755,7 +761,7 @@ Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) {
                              segment->footer().min_replicate_index(),
                              segment->footer().max_replicate_index());
       }
-      LOG(INFO) << "Deleting log segment in path: " << segment->path() << ops_str;
+      LOG_WITH_PREFIX(INFO) << "Deleting log segment in path: " << segment->path() << ops_str;
       RETURN_NOT_OK(fs_manager_->env()->DeleteFile(segment->path()));
       (*num_gced)++;
     }
@@ -829,7 +835,7 @@ Status Log::Close() {
       RETURN_NOT_OK(CloseCurrentSegment());
       RETURN_NOT_OK(ReplaceSegmentInReaderUnlocked());
       log_state_ = kLogClosed;
-      VLOG(1) << "Log closed";
+      VLOG_WITH_PREFIX(1) << "Log closed";
 
       // Release FDs held by these objects.
       log_index_.reset();
@@ -842,7 +848,7 @@ Status Log::Close() {
       return Status::OK();
 
     case kLogClosed:
-      VLOG(1) << "Log already closed";
+      VLOG_WITH_PREFIX(1) << "Log already closed";
       return Status::OK();
 
     default:
@@ -861,7 +867,8 @@ Status Log::DeleteOnDiskData(FsManager* fs_manager, const string& tablet_id) {
   if (!env->FileExists(wal_dir)) {
     return Status::OK();
   }
-  LOG(INFO) << "T " << tablet_id << " Deleting WAL directory";
+  LOG(INFO) << Substitute("T $0 P $1: Deleting WAL directory at $2",
+                          tablet_id, fs_manager->uuid(), wal_dir);
   RETURN_NOT_OK_PREPEND(env->DeleteRecursively(wal_dir),
                         "Unable to recursively delete WAL dir for tablet " + tablet_id);
   return Status::OK();
@@ -989,17 +996,21 @@ Status Log::CreatePlaceholderSegment(const WritableFileOptions& opts,
                                      shared_ptr<WritableFile>* out) {
   string tmp_suffix = strings::Substitute("$0$1", kTmpInfix, ".newsegmentXXXXXX");
   string path_tmpl = JoinPathSegments(log_dir_, tmp_suffix);
-  VLOG(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
+  VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
   unique_ptr<WritableFile> segment_file;
   RETURN_NOT_OK(fs_manager_->env()->NewTempWritableFile(opts,
                                                         path_tmpl,
                                                         result_path,
                                                         &segment_file));
-  VLOG(1) << "Created next WAL segment, placeholder path: " << *result_path;
+  VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << *result_path;
   out->reset(segment_file.release());
   return Status::OK();
 }
 
+std::string Log::LogPrefix() const {
+  return Substitute("T $0 P $1: ", tablet_id_, fs_manager_->uuid());
+}
+
 Log::~Log() {
   WARN_NOT_OK(Close(), "Error closing log");
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/19e88d57/src/kudu/consensus/log.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 8a5767f..5caa347 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -333,6 +333,8 @@ class Log : public RefCountedThreadSafe<Log> {
     return allocation_state_;
   }
 
+  std::string LogPrefix() const;
+
   LogOptions options_;
   FsManager *fs_manager_;
   std::string log_dir_;