You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by dr...@apache.org on 2018/02/23 23:09:22 UTC

[1/3] kudu git commit: KUDU-2297 (part 3): refactor process-wide stack collection out of /stacks

Repository: kudu
Updated Branches:
  refs/heads/master 416b3018a -> 00815045f


KUDU-2297 (part 3): refactor process-wide stack collection out of /stacks

Previously a bunch of logic to collect all the stacks from the process
was in the /stacks path handler. This logic is relatively generic and
shouldn't be intermingled with the formatting code. In particular I'd
like to use it in the diagnostics log, where a different output format
is desirable.

Change-Id: Ibb7c6edd31254f3d7e0cbef1eaf575bde3570df6
Reviewed-on: http://gerrit.cloudera.org:8080/9329
Tested-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/831483b4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/831483b4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/831483b4

Branch: refs/heads/master
Commit: 831483b47fd0e2165c3612811fdbf6337e3891b8
Parents: 416b301
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Feb 14 14:43:32 2018 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Fri Feb 23 23:06:30 2018 +0000

----------------------------------------------------------------------
 src/kudu/server/default_path_handlers.cc | 101 +++++---------------------
 src/kudu/server/diagnostics_log.cc       |   4 +-
 src/kudu/util/debug-util-test.cc         |  65 +++++++++--------
 src/kudu/util/debug-util.cc              |  75 +++++++++++++++++++
 src/kudu/util/debug-util.h               |  59 +++++++++++++++
 5 files changed, 191 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/831483b4/src/kudu/server/default_path_handlers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/default_path_handlers.cc b/src/kudu/server/default_path_handlers.cc
index b857be5..7574def 100644
--- a/src/kudu/server/default_path_handlers.cc
+++ b/src/kudu/server/default_path_handlers.cc
@@ -18,17 +18,13 @@
 #include "kudu/server/default_path_handlers.h"
 
 #include <sys/stat.h>
-#include <sys/types.h>
 
 #include <cstddef>
 #include <cstdint>
 #include <fstream>
-#include <iterator>
-#include <map>
 #include <memory>
 #include <string>
 #include <unordered_map>
-#include <utility>
 #include <vector>
 
 #include <boost/algorithm/string/predicate.hpp>
@@ -48,13 +44,12 @@
 #include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/split.h"
-#include "kudu/gutil/strings/strip.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/server/pprof_path_handlers.h"
 #include "kudu/server/webserver.h"
+#include "kudu/util/array_view.h"
 #include "kudu/util/debug-util.h"
 #include "kudu/util/easy_json.h"
-#include "kudu/util/env.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/flags.h"
@@ -157,90 +152,34 @@ static void FlagsHandler(const Webserver::WebRequest& req,
 // Prints out the current stack trace of all threads in the process.
 static void StacksHandler(const Webserver::WebRequest& /*req*/,
                           Webserver::PrerenderedWebResponse* resp) {
-  MonoTime start = MonoTime::Now();
   std::ostringstream* output = resp->output;
-  vector<pid_t> tids;
-  Status s = ListThreads(&tids);
+
+  StackTraceSnapshot snap;
+  auto start = MonoTime::Now();
+  Status s = snap.SnapshotAllStacks();
   if (!s.ok()) {
-    *output << "Failed to list threads: " << s.ToString();
+    *output << "Failed to collect stacks: " << s.ToString();
     return;
   }
-  struct Info {
-    pid_t tid;
-    Status status;
-    string thread_name;
-    StackTraceCollector stc;
-    StackTrace stack;
-  };
-
-  // Initially trigger all the stack traces.
-  vector<Info> infos(tids.size());
-  for (int i = 0; i < tids.size(); i++) {
-    infos[i].tid = tids[i];
-    infos[i].status = infos[i].stc.TriggerAsync(tids[i], &infos[i].stack);
-  }
-
-  // Now collect the thread names while we are waiting on stack trace collection.
-  for (auto& info : infos) {
-    // Get the thread's name by reading proc.
-    // TODO(todd): should we have the dumped thread fill in its own name using
-    // prctl to avoid having to open and read /proc? Or maybe we should use the
-    // Kudu ThreadMgr to get the thread names for the cases where we are using
-    // the kudu::Thread wrapper at least.
-    faststring buf;
-    Status s = ReadFileToString(Env::Default(),
-                                Substitute("/proc/self/task/$0/comm", info.tid),
-                                &buf);
-    if (!s.ok()) {
-      info.thread_name = "<unknown name>";
-    }  else {
-      info.thread_name = buf.ToString();
-      StripTrailingNewline(&info.thread_name);
-    }
-  }
-
-  // Now actually collect all the stacks.
-  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(1);
-  for (auto& info : infos) {
-    info.status = info.status.AndThen([&] {
-        return info.stc.AwaitCollection(deadline);
-      });
-  }
+  auto dur = MonoTime::Now() - start;
 
-  // And group the threads by their stack trace.
-  std::multimap<string, Info*> grouped_infos;
-  int num_failed = 0;
-  for (auto& info : infos) {
-    if (info.status.ok()) {
-      grouped_infos.emplace(info.stack.ToHexString(), &info);
-    } else {
-      num_failed++;
-    }
-  }
-  MonoDelta dur = MonoTime::Now() - start;
-
-  *output << "Collected stacks from " << grouped_infos.size() << " threads in "
+  *output << "Collected stacks from " << snap.num_threads() << " threads in "
           << dur.ToString() << "\n";
-  if (num_failed) {
-    *output << "Failed to collect stacks from " << num_failed << " threads "
+  if (snap.num_failed()) {
+    *output << "Failed to collect stacks from " << snap.num_failed() << " threads "
             << "(they may have exited while we were iterating over the threads)\n";
   }
   *output << "\n";
-  for (auto it = grouped_infos.begin(); it != grouped_infos.end();) {
-    auto end_group = grouped_infos.equal_range(it->first).second;
-    const auto& stack = it->second->stack;
-    int num_in_group = std::distance(it, end_group);
-    if (num_in_group > 1) {
-      *output << num_in_group << " threads with same stack:\n";
-    }
-
-    while (it != end_group) {
-      const auto& info = it->second;
-      *output << "TID " << info->tid << "(" << info->thread_name << "):\n";
-      ++it;
-    }
-    *output << stack.Symbolize() << "\n\n";
-  }
+  snap.VisitGroups([&](ArrayView<StackTraceSnapshot::ThreadInfo> threads) {
+      if (threads.size() > 1) {
+        *output << threads.size() << " threads with same stack:\n";
+      }
+
+      for (auto& info : threads) {
+        *output << "TID " << info.tid << "(" << info.thread_name << "):\n";
+      }
+      *output << threads[0].stack.Symbolize() << "\n\n";
+    });
 }
 
 // Registered to handle "/memz", and prints out memory allocation statistics.

http://git-wip-us.apache.org/repos/asf/kudu/blob/831483b4/src/kudu/server/diagnostics_log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/diagnostics_log.cc b/src/kudu/server/diagnostics_log.cc
index fc91a3a..0246347 100644
--- a/src/kudu/server/diagnostics_log.cc
+++ b/src/kudu/server/diagnostics_log.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/server/diagnostics_log.h"
+
 #include <cstdint>
 #include <memory>
 #include <ostream>
@@ -26,7 +28,6 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
-#include "kudu/server/diagnostics_log.h"
 #include "kudu/util/condition_variable.h"
 #include "kudu/util/env.h"
 #include "kudu/util/jsonwriter.h"
@@ -38,7 +39,6 @@
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
 
-
 using std::string;
 using std::unique_ptr;
 using strings::Substitute;

http://git-wip-us.apache.org/repos/asf/kudu/blob/831483b4/src/kudu/util/debug-util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/debug-util-test.cc b/src/kudu/util/debug-util-test.cc
index c51bf0c..a9b841f 100644
--- a/src/kudu/util/debug-util-test.cc
+++ b/src/kudu/util/debug-util-test.cc
@@ -23,7 +23,6 @@
 
 #include <csignal>
 #include <cstddef>
-#include <memory>
 #include <ostream>
 #include <string>
 #include <vector>
@@ -32,6 +31,7 @@
 #include <gtest/gtest.h>
 
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/util/array_view.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/debug-util.h"
 #include "kudu/util/monotime.h"
@@ -149,7 +149,17 @@ TEST_F(DebugUtilTest, TestSignalStackTrace) {
 // Test which dumps all known threads within this process.
 // We don't validate the results in any way -- but this verifies that we can
 // dump library threads such as the libc timer_thread and properly time out.
-TEST_F(DebugUtilTest, TestDumpAllThreads) {
+TEST_F(DebugUtilTest, TestSnapshot) {
+  // The test and runtime environment runs various utility threads (for example,
+  // the kernel stack watchdog, the TSAN runtime thread, the test timeout thread, etc).
+  // Count them before we start any additional threads for this test.
+  int initial_thread_count;
+  {
+    vector<pid_t> threads;
+    ASSERT_OK(ListThreads(&threads));
+    initial_thread_count = threads.size();
+  }
+
   // Start a bunch of sleeping threads.
   const int kNumThreads = 30;
   CountDownLatch l(1);
@@ -166,34 +176,29 @@ TEST_F(DebugUtilTest, TestDumpAllThreads) {
       }
     });
 
-  // Trigger all of the stack traces.
-  vector<pid_t> tids;
-  ASSERT_OK(ListThreads(&tids));
-  vector<StackTraceCollector> collectors(tids.size());
-  vector<StackTrace> traces(tids.size());
-  vector<Status> status(tids.size());
-
-  for (int i = 0; i < tids.size(); i++) {
-    status[i] = collectors[i].TriggerAsync(tids[i], &traces[i]);
-  }
-
-  // Collect them all.
-  MonoTime deadline;
-  #ifdef THREAD_SANITIZER
-  // TSAN runs its own separate background thread which blocks all signals and
-  // thus will cause a timeout here.
-  deadline = MonoTime::Now() + MonoDelta::FromSeconds(3);
-  #else
-  // In normal builds we can expect to get a response from all threads.
-  deadline = MonoTime::Max();
-  #endif
-  for (int i = 0; i < tids.size(); i++) {
-    status[i] = status[i].AndThen([&] {
-        return collectors[i].AwaitCollection(deadline);
-      });
-    LOG(INFO) << "Thread " << tids[i] << ": " << status[i].ToString()
-              << ": " << traces[i].ToHexString();
-  }
+  StackTraceSnapshot snap;
+  ASSERT_OK(snap.SnapshotAllStacks());
+  int count = 0;
+  int groups = 0;
+  snap.VisitGroups([&](ArrayView<StackTraceSnapshot::ThreadInfo> group) {
+      groups++;
+      for (auto& info : group) {
+        count++;
+        LOG(INFO) << info.tid << " " << info.thread_name;
+      }
+      LOG(INFO) << group[0].stack.ToHexString();
+    });
+  int tsan_threads = 0;
+#ifdef THREAD_SANITIZER
+  // TSAN starts an extra thread of its own.
+  tsan_threads++;
+#endif
+  ASSERT_EQ(kNumThreads + initial_thread_count, count);
+  // The threads might not have exactly identical stacks, but
+  // we should have far fewer groups than the total number
+  // of threads.
+  ASSERT_LE(groups, kNumThreads / 2);
+  ASSERT_EQ(tsan_threads, snap.num_failed());
 }
 
 TEST_F(DebugUtilTest, Benchmark) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/831483b4/src/kudu/util/debug-util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/debug-util.cc b/src/kudu/util/debug-util.cc
index 128cbdf..e2654f4 100644
--- a/src/kudu/util/debug-util.cc
+++ b/src/kudu/util/debug-util.cc
@@ -28,11 +28,13 @@
 #endif
 #include <unistd.h>
 
+#include <algorithm>
 #include <atomic>
 #include <cerrno>
 #include <climits>
 #include <csignal>
 #include <ctime>
+#include <iterator>
 #include <memory>
 #include <ostream>
 #include <string>
@@ -53,13 +55,17 @@
 #include "kudu/gutil/spinlock.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/strip.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/array_view.h"
 #include "kudu/util/debug/leak_annotations.h"
 #ifndef __linux__
 #include "kudu/util/debug/sanitizer_scopes.h"
 #endif
 #include "kudu/util/debug/unwind_safeness.h"
+#include "kudu/util/env.h"
 #include "kudu/util/errno.h"
+#include "kudu/util/faststring.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/thread.h"
@@ -694,4 +700,73 @@ uint64_t StackTrace::HashCode() const {
                                sizeof(frames_[0]) * num_frames_);
 }
 
+bool StackTrace::LessThan(const StackTrace& s) const {
+  return std::lexicographical_compare(frames_, &frames_[num_frames_],
+                                      s.frames_, &s.frames_[num_frames_]);
+}
+
+Status StackTraceSnapshot::SnapshotAllStacks() {
+  vector<pid_t> tids;
+  RETURN_NOT_OK_PREPEND(ListThreads(&tids), "could not list threads");
+
+  collectors_.clear();
+  collectors_.resize(tids.size());
+  infos_.clear();
+  infos_.resize(tids.size());
+  for (int i = 0; i < tids.size(); i++) {
+    infos_[i].tid = tids[i];
+    infos_[i].status = collectors_[i].TriggerAsync(tids[i], &infos_[i].stack);
+  }
+
+  // Now collect the thread names while we are waiting on stack trace collection.
+  for (auto& info : infos_) {
+    if (!info.status.ok()) continue;
+
+    // Get the thread's name by reading proc.
+    // TODO(todd): should we have the dumped thread fill in its own name using
+    // prctl to avoid having to open and read /proc? Or maybe we should use the
+    // Kudu ThreadMgr to get the thread names for the cases where we are using
+    // the kudu::Thread wrapper at least.
+    faststring buf;
+    Status s = ReadFileToString(Env::Default(),
+                                strings::Substitute("/proc/self/task/$0/comm", info.tid),
+                                &buf);
+    if (!s.ok()) {
+      info.thread_name = "<unknown name>";
+    }  else {
+      info.thread_name = buf.ToString();
+      StripTrailingNewline(&info.thread_name);
+    }
+  }
+  num_failed_ = 0;
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(1);
+  for (int i = 0; i < infos_.size(); i++) {
+    infos_[i].status = infos_[i].status.AndThen([&] {
+        return collectors_[i].AwaitCollection(deadline);
+      });
+    if (!infos_[i].status.ok()) {
+      num_failed_++;
+      CHECK(!infos_[i].stack.HasCollected()) << infos_[i].status.ToString();
+    }
+  }
+  collectors_.clear();
+
+  std::sort(infos_.begin(), infos_.end(), [](const ThreadInfo& a, const ThreadInfo& b) {
+      return a.stack.LessThan(b.stack);
+    });
+  return Status::OK();
+}
+
+void StackTraceSnapshot::VisitGroups(const StackTraceSnapshot::VisitorFunc& visitor) {
+  auto group_start = infos_.begin();
+  auto group_end = group_start;
+  while (group_end != infos_.end()) {
+    do {
+      ++group_end;
+    } while (group_end != infos_.end() && group_end->stack.Equals(group_start->stack));
+    visitor(ArrayView<ThreadInfo>(&*group_start, std::distance(group_start, group_end)));
+    group_start = group_end;
+  }
+}
+
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/831483b4/src/kudu/util/debug-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/debug-util.h b/src/kudu/util/debug-util.h
index 8ed8f43..4679f36 100644
--- a/src/kudu/util/debug-util.h
+++ b/src/kudu/util/debug-util.h
@@ -21,6 +21,7 @@
 
 #include <cstdint>
 #include <cstring>
+#include <functional>
 #include <string>
 #include <vector>
 
@@ -30,8 +31,10 @@
 
 namespace kudu {
 
+template <typename T> class ArrayView;
 class MonoTime;
 class StackTrace;
+class StackTraceCollector;
 
 namespace stack_trace_internal {
 struct SignalData;
@@ -131,6 +134,9 @@ class StackTrace {
                      num_frames_ * sizeof(frames_[0]));
   }
 
+  // Comparison operator for use in sorting.
+  bool LessThan(const StackTrace& s) const;
+
   // Collect and store the current stack trace. Skips the top 'skip_frames' frames
   // from the stack. For example, a value of '1' will skip whichever function
   // called the 'Collect()' function. The 'Collect' function itself is always skipped.
@@ -182,6 +188,59 @@ class StackTrace {
   void* frames_[kMaxFrames];
 };
 
+// Utility class for gathering a process-wide snapshot of the stack traces
+// of all threads.
+class StackTraceSnapshot {
+ public:
+  // The information about each thread will be gathered in a struct.
+  struct ThreadInfo {
+    // The TID of the thread.
+    int64_t tid;
+
+    // The status of collection. If a thread exits during collection or
+    // was blocking signals, it's possible to have an error here.
+    Status status;
+
+    // The name of the thread.
+    // May be missing if 'status' is not OK.
+    std::string thread_name;
+
+    // The current stack trace of the thread.
+    // Always missing if 'status' is not OK.
+    StackTrace stack;
+  };
+  using VisitorFunc = std::function<void(ArrayView<ThreadInfo> group)>;
+
+  // Snapshot the stack traces of all threads in the process.
+  //
+  // NOTE: this may take some time and should not be called in a latency-sensitive
+  // context.
+  Status SnapshotAllStacks();
+
+  // After having collected stacks, visit them, grouped by shared
+  // stack trace. The visitor function will be called once per group.
+  // Each group is guaranteed to be non-empty.
+  //
+  // Any threads which failed to collect traces are returned as a single group
+  // having empty stack traces.
+  //
+  // REQUIRES: a previous successful call to SnapshotAllStacks().
+  void VisitGroups(const VisitorFunc& visitor);
+
+  // Return the number of threads which were interrogated for a stack trace.
+  //
+  // NOTE: this includes threads which failed to collect.
+  int num_threads() const { return infos_.size(); }
+
+  // Return the number of threads which failed to collect a stack trace.
+  int num_failed() const { return num_failed_; }
+
+ private:
+  std::vector<StackTraceSnapshot::ThreadInfo> infos_;
+  std::vector<StackTraceCollector> collectors_;
+  int num_failed_ = 0;
+};
+
 
 // Class to collect the stack trace of another thread within this process.
 // This allows for more advanced use cases than 'DumpThreadStack(tid)' above.


[3/3] kudu git commit: Don't perform compactions when clean time has not been advanced

Posted by dr...@apache.org.
Don't perform compactions when clean time has not been advanced

Due to KUDU-2233 we might not advance safe time, and thus clean
time, at bootstrap causing possible corruption or crashes.

This patch adds a check to make sure that clean time has been
advanced at all before performing a compaction.

This is temporary fix until we have a more strict check like the
one proposed in https://gerrit.cloudera.org/#/c/8887/.

Change-Id: Ia74abdf7d806efc4239dc9cff4a5da28621d331a
Reviewed-on: http://gerrit.cloudera.org:8080/9436
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: David Ribeiro Alves <da...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/00815045
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/00815045
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/00815045

Branch: refs/heads/master
Commit: 00815045fc3295c12023fd7ae7ad220645a10c3a
Parents: 09298f3
Author: David Alves <dr...@apache.org>
Authored: Thu Feb 22 09:38:29 2018 -0800
Committer: David Ribeiro Alves <da...@gmail.com>
Committed: Fri Feb 23 23:06:45 2018 +0000

----------------------------------------------------------------------
 src/kudu/tablet/tablet.cc | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/00815045/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 431de32..86182ab 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1675,6 +1675,13 @@ Status Tablet::Compact(CompactFlags flags) {
 
 void Tablet::UpdateCompactionStats(MaintenanceOpStats* stats) {
 
+  if (mvcc_.GetCleanTimestamp() == Timestamp::kInitialTimestamp) {
+    KLOG_EVERY_N_SECS(WARNING, 30) << LogPrefix() <<  "Can't schedule compaction. Clean time has "
+                                   << "not been advanced past its initial value.";
+    stats->set_runnable(false);
+    return;
+  }
+
   // TODO: use workload statistics here to find out how "hot" the tablet has
   // been in the last 5 minutes, and somehow scale the compaction quality
   // based on that, so we favor hot tablets.


[2/3] kudu git commit: KUDU-2297 (part 4): periodically dump stacks to diagnostics log

Posted by dr...@apache.org.
KUDU-2297 (part 4): periodically dump stacks to diagnostics log

This modifies the diagnostics log to periodically dump stack traces.
This is slightly complicated by the fact that symbolized stack traces
can be relatively large. So, we separate the logging of symbols and
stack traces. When an address first appears in a log file, it is logged
as part of a symbol line. Later logs of the same address do not need
to re-log the symbol.

With this, a typical stack trace dump of an idle tserver is about 4KB
pre-compression, and a 'symbols' dump is about 6KB. So logging stacks
reasonably often should not use much disk space or IO.

Currently this is enabled on the same interval as the metrics log, but
only if a new experimental flag --diagnostics-log-stack-traces is
enabled. I'm planning to move it to a different flag in a later commit,
but wanted to keep this one simple and incremental.

Change-Id: Ic32abf2c48ac6a5f3c384e58838b34671bcaf147
Reviewed-on: http://gerrit.cloudera.org:8080/9330
Tested-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/09298f3b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/09298f3b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/09298f3b

Branch: refs/heads/master
Commit: 09298f3b6756f10a7d598b4de77676d49c38f117
Parents: 831483b
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Feb 14 16:17:43 2018 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Fri Feb 23 23:06:40 2018 +0000

----------------------------------------------------------------------
 src/kudu/server/diagnostics_log.cc | 152 +++++++++++++++++++++++++++++++-
 src/kudu/server/diagnostics_log.h  |   6 ++
 src/kudu/util/debug-util.cc        |  36 ++++----
 src/kudu/util/debug-util.h         |  19 +++-
 src/kudu/util/rolling_log-test.cc  |  16 ++--
 src/kudu/util/rolling_log.cc       |  13 +--
 src/kudu/util/rolling_log.h        |  36 +++++---
 7 files changed, 236 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/09298f3b/src/kudu/server/diagnostics_log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/diagnostics_log.cc b/src/kudu/server/diagnostics_log.cc
index 0246347..ab47d8a 100644
--- a/src/kudu/server/diagnostics_log.cc
+++ b/src/kudu/server/diagnostics_log.cc
@@ -22,14 +22,22 @@
 #include <ostream>
 #include <string>
 #include <utility>
+#include <vector>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
+#include <sparsehash/dense_hash_set>
 
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
+#include "kudu/util/array_view.h"
 #include "kudu/util/condition_variable.h"
+#include "kudu/util/debug-util.h"
 #include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/jsonwriter.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/metrics.h"
@@ -41,17 +49,60 @@
 
 using std::string;
 using std::unique_ptr;
+using std::vector;
 using strings::Substitute;
 
+// GLog already implements symbolization. Just import their hidden symbol.
+namespace google {
+// Symbolizes a program counter.  On success, returns true and write the
+// symbol name to "out".  The symbol name is demangled if possible
+// (supports symbols generated by GCC 3.x or newer).  Otherwise,
+// returns false.
+bool Symbolize(void *pc, char *out, int out_size);
+}
+
+
+// TODO(todd): this is a placeholder flag. This should actually be an interval.
+// Tagging as experimental and disabling by default while this work is in flux.
+DEFINE_bool(diagnostics_log_stack_traces, false,
+            "Whether to periodically log stack traces to the diagnostics log.");
+TAG_FLAG(diagnostics_log_stack_traces, runtime);
+TAG_FLAG(diagnostics_log_stack_traces, experimental);
+
 namespace kudu {
 namespace server {
 
+// Track which symbols have been emitted to the log already.
+class DiagnosticsLog::SymbolSet {
+ public:
+  SymbolSet() {
+    set_.set_empty_key(nullptr);
+  }
+
+  // Return true if the addr was added, false if it already existed.
+  bool Add(void* addr) {
+    return InsertIfNotPresent(&set_, addr);
+  }
+
+  void ResetIfLogRolled(int roll_count) {
+    if (roll_count_ != roll_count) {
+      roll_count_ = roll_count;
+      set_.clear();
+    }
+  }
+
+ private:
+  int roll_count_ = 0;
+  google::dense_hash_set<void*> set_;
+};
+
 DiagnosticsLog::DiagnosticsLog(string log_dir,
                                MetricRegistry* metric_registry) :
     log_dir_(std::move(log_dir)),
     metric_registry_(metric_registry),
     wake_(&lock_),
-    metrics_log_interval_(MonoDelta::FromSeconds(60)) {
+    metrics_log_interval_(MonoDelta::FromSeconds(60)),
+    symbols_(new SymbolSet()) {
 }
 DiagnosticsLog::~DiagnosticsLog() {
   Stop();
@@ -111,10 +162,109 @@ void DiagnosticsLog::RunThread() {
       } else {
         next_log = MonoTime::Now() + metrics_log_interval_;
       }
+
+      if (FLAGS_diagnostics_log_stack_traces) {
+        s = LogStacks();
+        if (!s.ok()) {
+          WARN_NOT_OK(s, "Unable to collect stacks to diagnostics log");
+          next_log = MonoTime::Now() + kWaitBetweenFailures;
+        } else {
+          next_log = MonoTime::Now() + metrics_log_interval_;
+        }
+      }
     }
   }
 }
 
+Status DiagnosticsLog::LogStacks() {
+  StackTraceSnapshot snap;
+  snap.set_capture_thread_names(false);
+  RETURN_NOT_OK(snap.SnapshotAllStacks());
+
+  std::ostringstream buf;
+  MicrosecondsInt64 now = GetCurrentTimeMicros();
+
+  // Because symbols are potentially long strings, and likely to be
+  // very repetitive, we do a sort of dictionary encoding here. When
+  // we roll a file, we clear our symbol table. Then, within that
+  // file, the first time we see any address, we add it to the table
+  // and make sure it is output in a 'symbols' record. Subsequent
+  // repetitions of the same address do not need to re-output the
+  // symbol.
+  symbols_->ResetIfLogRolled(log_->roll_count());
+  vector<std::pair<void*, string>> new_symbols;
+  snap.VisitGroups([&](ArrayView<StackTraceSnapshot::ThreadInfo> group) {
+      const StackTrace& stack = group[0].stack;
+      for (int i = 0; i < stack.num_frames(); i++) {
+        void* addr = stack.frame(i);
+        if (symbols_->Add(addr)) {
+          char buf[1024];
+          // Subtract 1 from the address before symbolizing, because the
+          // address on the stack is actually the return address of the function
+          // call rather than the address of the call instruction itself.
+          if (google::Symbolize(static_cast<char*>(addr) - 1, buf, sizeof(buf))) {
+            new_symbols.emplace_back(addr, buf);
+          }
+          // If symbolization fails, don't bother adding it. Readers of the log
+          // will just see that it's missing from the symbol map and should handle that
+          // as an unknown symbol.
+        }
+      }
+    });
+  if (!new_symbols.empty()) {
+    buf << "I" << FormatTimestampForLog(now)
+        << " symbols " << now << " ";
+    JsonWriter jw(&buf, JsonWriter::COMPACT);
+    jw.StartObject();
+    for (auto& p : new_symbols) {
+      jw.String(StringPrintf("%p", p.first));
+      jw.String(p.second);
+    }
+    jw.EndObject();
+    buf << "\n";
+  }
+
+  buf << "I" << FormatTimestampForLog(now) << " stacks " << now << " ";
+  JsonWriter jw(&buf, JsonWriter::COMPACT);
+  jw.StartObject();
+  jw.String("reason");
+  jw.String("periodic");
+  jw.String("groups");
+  jw.StartArray();
+  snap.VisitGroups([&](ArrayView<StackTraceSnapshot::ThreadInfo> group) {
+      jw.StartObject();
+
+      jw.String("tids");
+      jw.StartArray();
+      for (auto& t : group) {
+        // TODO(todd): should we also output the thread names, perhaps in
+        // a sort of dictionary fashion? It's more instructive but in
+        // practice the stack traces should usually indicate the work
+        // that's being done, anyway, which is enough to tie back to the
+        // thread. The TID is smaller and useful for correlating against
+        // messages in the normal glog as well.
+        jw.Int64(t.tid);
+      }
+      jw.EndArray();
+
+      jw.String("stack");
+      jw.StartArray();
+      const StackTrace& stack = group[0].stack;
+      for (int i = 0; i < stack.num_frames(); i++) {
+        jw.String(StringPrintf("%p", stack.frame(i)));
+      }
+      jw.EndArray();
+      jw.EndObject();
+    });
+  jw.EndArray(); // array of thread groups
+  jw.EndObject(); // end of top-level object
+  buf << "\n";
+
+  RETURN_NOT_OK(log_->Append(buf.str()));
+
+  return Status::OK();
+}
+
 Status DiagnosticsLog::LogMetrics() {
   MetricJsonOptions opts;
   opts.include_raw_histograms = true;

http://git-wip-us.apache.org/repos/asf/kudu/blob/09298f3b/src/kudu/server/diagnostics_log.h
----------------------------------------------------------------------
diff --git a/src/kudu/server/diagnostics_log.h b/src/kudu/server/diagnostics_log.h
index 75e1a69..258e2be 100644
--- a/src/kudu/server/diagnostics_log.h
+++ b/src/kudu/server/diagnostics_log.h
@@ -46,8 +46,11 @@ class DiagnosticsLog {
   void Stop();
 
  private:
+  class SymbolSet;
+
   void RunThread();
   Status LogMetrics();
+  Status LogStacks();
 
   const std::string log_dir_;
   const MetricRegistry* metric_registry_;
@@ -63,6 +66,9 @@ class DiagnosticsLog {
 
   int64_t metrics_epoch_ = 0;
 
+  // Out-of-line this internal data to keep the header smaller.
+  std::unique_ptr<SymbolSet> symbols_;
+
   DISALLOW_COPY_AND_ASSIGN(DiagnosticsLog);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/09298f3b/src/kudu/util/debug-util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/debug-util.cc b/src/kudu/util/debug-util.cc
index e2654f4..5c28132 100644
--- a/src/kudu/util/debug-util.cc
+++ b/src/kudu/util/debug-util.cc
@@ -719,23 +719,25 @@ Status StackTraceSnapshot::SnapshotAllStacks() {
   }
 
   // Now collect the thread names while we are waiting on stack trace collection.
-  for (auto& info : infos_) {
-    if (!info.status.ok()) continue;
-
-    // Get the thread's name by reading proc.
-    // TODO(todd): should we have the dumped thread fill in its own name using
-    // prctl to avoid having to open and read /proc? Or maybe we should use the
-    // Kudu ThreadMgr to get the thread names for the cases where we are using
-    // the kudu::Thread wrapper at least.
-    faststring buf;
-    Status s = ReadFileToString(Env::Default(),
-                                strings::Substitute("/proc/self/task/$0/comm", info.tid),
-                                &buf);
-    if (!s.ok()) {
-      info.thread_name = "<unknown name>";
-    }  else {
-      info.thread_name = buf.ToString();
-      StripTrailingNewline(&info.thread_name);
+  if (capture_thread_names_) {
+    for (auto& info : infos_) {
+      if (!info.status.ok()) continue;
+
+      // Get the thread's name by reading proc.
+      // TODO(todd): should we have the dumped thread fill in its own name using
+      // prctl to avoid having to open and read /proc? Or maybe we should use the
+      // Kudu ThreadMgr to get the thread names for the cases where we are using
+      // the kudu::Thread wrapper at least.
+      faststring buf;
+      Status s = ReadFileToString(Env::Default(),
+                                  strings::Substitute("/proc/self/task/$0/comm", info.tid),
+                                  &buf);
+      if (!s.ok()) {
+        info.thread_name = "<unknown name>";
+      }  else {
+        info.thread_name = buf.ToString();
+        StripTrailingNewline(&info.thread_name);
+      }
     }
   }
   num_failed_ = 0;

http://git-wip-us.apache.org/repos/asf/kudu/blob/09298f3b/src/kudu/util/debug-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/debug-util.h b/src/kudu/util/debug-util.h
index 4679f36..193788e 100644
--- a/src/kudu/util/debug-util.h
+++ b/src/kudu/util/debug-util.h
@@ -25,6 +25,8 @@
 #include <string>
 #include <vector>
 
+#include <glog/logging.h>
+
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/strings/fastmem.h"
 #include "kudu/util/status.h"
@@ -144,6 +146,14 @@ class StackTrace {
   // This function is async-safe.
   void Collect(int skip_frames = 0);
 
+  int num_frames() const {
+    return num_frames_;
+  }
+
+  void* frame(int i) const {
+    DCHECK_LE(i, num_frames_);
+    return frames_[i];
+  }
 
   enum Flags {
     // Do not fix up the addresses on the stack to try to point to the 'call'
@@ -202,7 +212,8 @@ class StackTraceSnapshot {
     Status status;
 
     // The name of the thread.
-    // May be missing if 'status' is not OK.
+    // May be missing if 'status' is not OK or if thread name collection was
+    // disabled.
     std::string thread_name;
 
     // The current stack trace of the thread.
@@ -211,6 +222,10 @@ class StackTraceSnapshot {
   };
   using VisitorFunc = std::function<void(ArrayView<ThreadInfo> group)>;
 
+  void set_capture_thread_names(bool c) {
+    capture_thread_names_ = c;
+  }
+
   // Snapshot the stack traces of all threads in the process.
   //
   // NOTE: this may take some time and should not be called in a latency-sensitive
@@ -239,6 +254,8 @@ class StackTraceSnapshot {
   std::vector<StackTraceSnapshot::ThreadInfo> infos_;
   std::vector<StackTraceCollector> collectors_;
   int num_failed_ = 0;
+
+  bool capture_thread_names_ = true;
 };
 
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/09298f3b/src/kudu/util/rolling_log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/rolling_log-test.cc b/src/kudu/util/rolling_log-test.cc
index 5c4c13b..f4f8186 100644
--- a/src/kudu/util/rolling_log-test.cc
+++ b/src/kudu/util/rolling_log-test.cc
@@ -19,6 +19,7 @@
 
 #include <unistd.h>
 
+#include <algorithm>
 #include <cstdint>
 #include <string>
 #include <vector>
@@ -68,6 +69,7 @@ class RollingLogTest : public KuduTest {
       ASSERT_TRUE(HasSuffixString(child, pid_suffix) ||
                   HasSuffixString(child, pid_suffix + ".gz")) << "bad child: " << child;
     }
+    std::sort(children->begin(), children->end());
     ASSERT_EQ(children->size(), expected_count) << *children;
   }
 
@@ -78,27 +80,29 @@ class RollingLogTest : public KuduTest {
 TEST_F(RollingLogTest, TestLog) {
   RollingLog log(env_, log_dir_, "mylog");
   log.SetCompressionEnabled(false);
-  log.SetSizeLimitBytes(100);
+  log.SetRollThresholdBytes(100);
 
   // Before writing anything, we shouldn't open a log file.
   vector<string> children;
   NO_FATALS(AssertLogCount(0, &children));
 
   // Appending some data should write a new segment.
-  ASSERT_OK(log.Append("Hello world\n"));
+  const string kTestString = "Hello world\n";
+  ASSERT_OK(log.Append(kTestString));
   NO_FATALS(AssertLogCount(1, &children));
 
   for (int i = 0; i < 10; i++) {
-    ASSERT_OK(log.Append("Hello world\n"));
+    ASSERT_OK(log.Append(kTestString));
   }
   NO_FATALS(AssertLogCount(2, &children));
 
   faststring data;
   string path = JoinPathSegments(log_dir_, children[0]);
   ASSERT_OK(ReadFileToString(env_, path, &data));
-  ASSERT_TRUE(HasPrefixString(data.ToString(), "Hello world\n"))
+  ASSERT_TRUE(HasPrefixString(data.ToString(), kTestString))
     << "Data missing";
-  ASSERT_LE(data.size(), 100) << "Size limit not respected";
+  ASSERT_LE(data.size(), 100 + kTestString.length())
+      << "Roll threshold not respected";
 }
 
 // Test with compression on.
@@ -128,7 +132,7 @@ TEST_F(RollingLogTest, TestCompression) {
 TEST_F(RollingLogTest, TestFileCountLimit) {
   RollingLog log(env_, log_dir_, "mylog");
   ASSERT_OK(log.Open());
-  log.SetSizeLimitBytes(100);
+  log.SetRollThresholdBytes(100);
   log.SetMaxNumSegments(3);
 
   for (int i = 0; i < 100; i++) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/09298f3b/src/kudu/util/rolling_log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/rolling_log.cc b/src/kudu/util/rolling_log.cc
index a508e91..50f9fbd 100644
--- a/src/kudu/util/rolling_log.cc
+++ b/src/kudu/util/rolling_log.cc
@@ -47,7 +47,7 @@ using std::string;
 using std::unique_ptr;
 using strings::Substitute;
 
-static const int kDefaultSizeLimitBytes = 64 * 1024 * 1024; // 64MB
+static const int kDefaultRollThresholdBytes = 64 * 1024 * 1024; // 64MB
 
 DECLARE_int32(max_log_files);
 
@@ -57,7 +57,7 @@ RollingLog::RollingLog(Env* env, string log_dir, string log_name)
     : env_(env),
       log_dir_(std::move(log_dir)),
       log_name_(std::move(log_name)),
-      size_limit_bytes_(kDefaultSizeLimitBytes),
+      roll_threshold_bytes_(kDefaultRollThresholdBytes),
       max_num_segments_(FLAGS_max_log_files),
       compress_after_close_(true) {}
 
@@ -65,9 +65,9 @@ RollingLog::~RollingLog() {
   WARN_NOT_OK(Close(), "Unable to close RollingLog");
 }
 
-void RollingLog::SetSizeLimitBytes(int64_t size) {
+void RollingLog::SetRollThresholdBytes(int64_t size) {
   CHECK_GT(size, 0);
-  size_limit_bytes_ = size;
+  roll_threshold_bytes_ = size;
 }
 
 void RollingLog::SetMaxNumSegments(int num_segments) {
@@ -189,11 +189,12 @@ Status RollingLog::Append(StringPiece s) {
     RETURN_NOT_OK_PREPEND(Open(), "Unable to open log");
   }
 
-  if (file_->Size() + s.size() > size_limit_bytes_) {
+  RETURN_NOT_OK(file_->Append(s));
+  if (file_->Size() > roll_threshold_bytes_) {
     RETURN_NOT_OK_PREPEND(Close(), "Unable to roll log");
+    roll_count_++;
     RETURN_NOT_OK_PREPEND(Open(), "Unable to roll log");
   }
-  RETURN_NOT_OK(file_->Append(s));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/09298f3b/src/kudu/util/rolling_log.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/rolling_log.h b/src/kudu/util/rolling_log.h
index 897572a..0bb6755 100644
--- a/src/kudu/util/rolling_log.h
+++ b/src/kudu/util/rolling_log.h
@@ -22,6 +22,7 @@
 #include <string>
 
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/stringpiece.h"
 #include "kudu/util/status.h"
 
@@ -63,9 +64,17 @@ class RollingLog {
   // the log as necessary if it is not open.
   Status Open();
 
-  // Set the pre-compression size limit for the current and any future log files.
-  // Note that this is the limit on a single segment of the log, not the total.
-  void SetSizeLimitBytes(int64_t size);
+  // Set the pre-compression size threshold at which the log file will be rolled.
+  // If the log is already open, this applies for the the current and any future
+  // log file.
+  //
+  // NOTE: This is the limit on a single segment of the log, not a limit on the total
+  // size of the log.
+  //
+  // NOTE: The threshold is checked _after_ each call to Append(). So, the size of
+  // the log may overshoot this threshold by as much as the size of a single appended
+  // message.
+  void SetRollThresholdBytes(int64_t size);
 
   // Set the total number of log segments to be retained. When the log is rolled,
   // old segments are removed to achieve the targeted number of segments.
@@ -77,17 +86,20 @@ class RollingLog {
 
   // Append the given data to the current log file.
   //
-  // If appending this data would cross the configured file size limit, a new file
-  // is created and the data is appended there.
-  //
-  // Note that this is a synchronous API and causes potentially-blocking IO on the
-  // current thread. However, this does not fsync() or otherwise ensure durability
-  // of the appended data.
-  Status Append(StringPiece data);
+  // If, after appending this data, the file size has crossed the configured roll
+  // threshold, a new empty log file is created. Note that this is a synchronous API and
+  // causes potentially-blocking IO on the current thread. However, this does not fsync()
+  // or otherwise ensure durability of the appended data.
+  Status Append(StringPiece data) WARN_UNUSED_RESULT;
 
   // Close the log.
   Status Close();
 
+  // Return the number of times this log has rolled since it was first opened.
+  int roll_count() const {
+    return roll_count_;
+  }
+
  private:
   std::string GetLogFileName(int sequence) const;
 
@@ -101,12 +113,14 @@ class RollingLog {
   const std::string log_dir_;
   const std::string log_name_;
 
-  int64_t size_limit_bytes_;
+  int64_t roll_threshold_bytes_;
   int max_num_segments_;
 
   std::unique_ptr<WritableFile> file_;
   bool compress_after_close_;
 
+  int roll_count_ = 0;
+
   DISALLOW_COPY_AND_ASSIGN(RollingLog);
 };