You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/09/07 00:22:07 UTC

[8/8] kudu git commit: KUDU-1513. consensus: improve log messages for lagging or tablet-copying peers

KUDU-1513. consensus: improve log messages for lagging or tablet-copying peers

This patch improves the log throttling utility code so that it supports
a user-defined "throttler" instance. It then uses a throttler for each
consensus peer in order to improve the logging:

- we used to spew "Successfully read <N> ops from disk" messages out of
  log_cache.cc when we were reading messages for a lagging peer. This
  message was not that useful, considering it didn't tell us which peer
  was lagging. This is now replaced by a specific (and throttled)
  message letting us know which peer is lagging and how far behind they
  are.

  Unfortunately the "how far behind" is only in terms of op indexes and
  not time, since we don't attach any wall time to ReplicateMsgs. Still,
  this is better than we had before.

- we used to spew "Sending request to start Tablet Copy" if a peer was
  already in the process of copying. This log message is relocated and
  also now throttled to once every few seconds.

Change-Id: I4dd560309841ba738b031ea87b12f8f612e6c674
Reviewed-on: http://gerrit.cloudera.org:8080/4184
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6a12ba3f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6a12ba3f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6a12ba3f

Branch: refs/heads/master
Commit: 6a12ba3f7d66dcf748e8864aae8139813c1c4746
Parents: c4e3ff6
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Aug 30 23:10:14 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Sep 7 00:17:55 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_peers.cc |  1 -
 src/kudu/consensus/consensus_queue.cc | 17 ++++++-
 src/kudu/consensus/consensus_queue.h  |  6 ++-
 src/kudu/consensus/log_cache.cc       |  2 +-
 src/kudu/util/logging-test.cc         | 33 ++++++++++++++
 src/kudu/util/logging.h               | 71 +++++++++++++++++++++++++-----
 src/kudu/util/logging_test_util.h     |  2 +-
 7 files changed, 116 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6a12ba3f/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index 75cdd2d..29e8524 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -300,7 +300,6 @@ Status Peer::SendTabletCopyRequest() {
     return Status::NotSupported("Tablet Copy is disabled");
   }
 
-  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Sending request to start Tablet Copy";
   RETURN_NOT_OK(queue_->GetTabletCopyRequestForPeer(peer_pb_.permanent_uuid(), &tc_request_));
   controller_.Reset();
   proxy_->StartTabletCopy(&tc_request_, &tc_response_, &controller_,

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a12ba3f/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index d684bcd..a62a7e2 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -66,7 +66,6 @@ TAG_FLAG(consensus_inject_latency_ms_in_notifications, unsafe);
 namespace kudu {
 namespace consensus {
 
-using log::AsyncLogReader;
 using log::Log;
 using rpc::Messenger;
 using strings::Substitute;
@@ -314,7 +313,8 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
   }
 
   if (PREDICT_FALSE(peer->needs_tablet_copy)) {
-    VLOG_WITH_PREFIX_UNLOCKED(1) << "Peer needs tablet copy: " << peer->ToString();
+    KLOG_EVERY_N_SECS_THROTTLER(INFO, 3, peer->status_log_throttler, "tablet copy")
+        << LogPrefixUnlocked() << "Peer " << uuid << " needs tablet copy" << THROTTLE_MSG;
     *needs_tablet_copy = true;
     return Status::OK();
   }
@@ -372,6 +372,19 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
   DCHECK(preceding_id.IsInitialized());
   request->mutable_preceding_id()->CopyFrom(preceding_id);
 
+  // If we are sending ops to the follower, but the batch doesn't reach the current
+  // committed index, we can consider the follower lagging, and it's worth
+  // logging this fact periodically.
+  if (request->ops_size() > 0) {
+    int64_t last_op_sent = request->ops(request->ops_size() - 1).id().index();
+    if (last_op_sent < request->committed_index().index()) {
+      KLOG_EVERY_N_SECS_THROTTLER(INFO, 3, peer->status_log_throttler, "lagging")
+          << LogPrefixUnlocked() << "Peer " << uuid << " is lagging by at least "
+          << (request->committed_index().index() - last_op_sent)
+          << " ops behind the committed index " << THROTTLE_MSG;
+    }
+  }
+
   if (PREDICT_FALSE(VLOG_IS_ON(2))) {
     if (request->ops_size() > 0) {
       VLOG_WITH_PREFIX_UNLOCKED(2) << "Sending request with operations to Peer: " << uuid

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a12ba3f/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index cccee1b..6dbb477 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -32,6 +32,7 @@
 #include "kudu/consensus/ref_counted_replicate.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/logging.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -43,7 +44,6 @@ class ThreadPool;
 
 namespace log {
 class Log;
-class AsyncLogReader;
 }
 
 namespace consensus {
@@ -115,6 +115,10 @@ class PeerMessageQueue {
     // Whether the follower was detected to need tablet copy.
     bool needs_tablet_copy;
 
+    // Throttler for how often we will log status messages pertaining to this
+    // peer (eg when it is lagging, etc).
+    logging::LogThrottler status_log_throttler;
+
    private:
     // The last term we saw from a given peer.
     // This is only used for sanity checking that a peer doesn't

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a12ba3f/src/kudu/consensus/log_cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc
index b0162aa..6619a22 100644
--- a/src/kudu/consensus/log_cache.cc
+++ b/src/kudu/consensus/log_cache.cc
@@ -304,7 +304,7 @@ Status LogCache::ReadOps(int64_t after_op_index,
           next_index, up_to, remaining_space, &raw_replicate_ptrs),
         Substitute("Failed to read ops $0..$1", next_index, up_to));
       l.lock();
-      LOG_WITH_PREFIX_UNLOCKED(INFO)
+      VLOG_WITH_PREFIX_UNLOCKED(2)
           << "Successfully read " << raw_replicate_ptrs.size() << " ops "
           << "from disk (" << next_index << ".."
           << (next_index + raw_replicate_ptrs.size() - 1) << ")";

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a12ba3f/src/kudu/util/logging-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/logging-test.cc b/src/kudu/util/logging-test.cc
index 82ce90d..85ad3cd 100644
--- a/src/kudu/util/logging-test.cc
+++ b/src/kudu/util/logging-test.cc
@@ -48,4 +48,37 @@ TEST(LoggingTest, TestThrottledLogging) {
   EXPECT_THAT(msgs[1], testing::ContainsRegex("\\[suppressed [0-9]{3,} similar messages\\]"));
 }
 
+TEST(LoggingTest, TestAdvancedThrottling) {
+  StringVectorSink sink;
+  ScopedRegisterSink srs(&sink);
+
+  logging::LogThrottler throttle_a;
+
+  // First, log only using a single tag and throttler.
+  for (int i = 0; i < 100000; i++) {
+    KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_a") << "test" << THROTTLE_MSG;
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    if (sink.logged_msgs().size() >= 2) break;
+  }
+  auto& msgs = sink.logged_msgs();
+  ASSERT_GE(msgs.size(), 2);
+
+  // The first log line shouldn't have a suppression count.
+  EXPECT_THAT(msgs[0], testing::ContainsRegex("test$"));
+  // The second one should have suppressed at least three digits worth of log messages.
+  EXPECT_THAT(msgs[1], testing::ContainsRegex("\\[suppressed [0-9]{3,} similar messages\\]"));
+  msgs.clear();
+
+  // Now, try logging using two different tags in rapid succession. This should not
+  // throttle, because the tag is switching.
+  KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << THROTTLE_MSG;
+  KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << THROTTLE_MSG;
+  KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_c") << "test c" << THROTTLE_MSG;
+  KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << THROTTLE_MSG;
+  ASSERT_EQ(msgs.size(), 3);
+  EXPECT_THAT(msgs[0], testing::ContainsRegex("test b$"));
+  EXPECT_THAT(msgs[1], testing::ContainsRegex("test c$"));
+  EXPECT_THAT(msgs[2], testing::ContainsRegex("test b$"));
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a12ba3f/src/kudu/util/logging.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/logging.h b/src/kudu/util/logging.h
index c2858f8..78fdf0b 100644
--- a/src/kudu/util/logging.h
+++ b/src/kudu/util/logging.h
@@ -22,6 +22,7 @@
 
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/walltime.h"
 #include "kudu/util/logging_callback.h"
 
@@ -37,14 +38,46 @@
 //
 // Example usage:
 //   KLOG_EVERY_N_SECS(WARNING, 1) << "server is low on memory" << THROTTLE_MSG;
-#define KLOG_EVERY_N_SECS(severity, n_secs) \
-  static logging_internal::LogThrottler LOG_THROTTLER;  \
-  int num_suppressed = 0; \
-  if (LOG_THROTTLER.ShouldLog(n_secs, &num_suppressed)) \
+//
+//
+// Advanced per-instance throttling
+// -----------------------------------
+// For cases where the throttling should be scoped to a given class instance,
+// you may define a logging::LogThrottler object and pass it to the
+// KLOG_EVERY_N_SECS_THROTTLER(...) macro. In addition, you must pass a "tag".
+// Only log messages with equal tags (by pointer equality) will be throttled.
+// For example:
+//
+//    struct MyThing {
+//      string name;
+//      LogThrottler throttler;
+//    };
+//
+//    if (...) {
+//      LOG_EVERY_N_SECS_THROTTLER(INFO, 1, my_thing->throttler, "coffee") <<
+//        my_thing->name << " needs coffee!";
+//    } else {
+//      LOG_EVERY_N_SECS_THROTTLER(INFO, 1, my_thing->throttler, "wine") <<
+//        my_thing->name << " needs wine!";
+//    }
+//
+// In this example, the "coffee"-related message will be collapsed into other
+// such messages within the prior one second; however, if the state alternates
+// between the "coffee" message and the "wine" message, then each such alternation
+// will yield a message.
+
+#define KLOG_EVERY_N_SECS_THROTTLER(severity, n_secs, throttler, tag) \
+  int VARNAME_LINENUM(num_suppressed) = 0;                            \
+  if (throttler.ShouldLog(n_secs, tag, &VARNAME_LINENUM(num_suppressed)))  \
     google::LogMessage( \
-      __FILE__, __LINE__, google::GLOG_ ## severity, num_suppressed, \
+      __FILE__, __LINE__, google::GLOG_ ## severity, VARNAME_LINENUM(num_suppressed), \
       &google::LogMessage::SendToLog).stream()
 
+#define KLOG_EVERY_N_SECS(severity, n_secs) \
+  static logging::LogThrottler LOG_THROTTLER;  \
+  KLOG_EVERY_N_SECS_THROTTLER(severity, n_secs, LOG_THROTTLER, "no-tag")
+
+
 namespace kudu {
 enum PRIVATE_ThrottleMsg {THROTTLE_MSG};
 } // namespace kudu
@@ -179,16 +212,33 @@ void ShutdownLoggingSafe();
 // Writes all command-line flags to the log at level INFO.
 void LogCommandLineFlags();
 
-namespace logging_internal {
-// Internal implementation class used for throttling log messages.
+namespace logging {
+
+// A LogThrottler instance tracks the throttling state for a particular
+// log message.
+//
+// This is used internally by KLOG_EVERY_N_SECS, but can also be used
+// explicitly in conjunction with KLOG_EVERY_N_SECS_THROTTLER. See the
+// macro descriptions above for details.
 class LogThrottler {
  public:
-  LogThrottler() : num_suppressed_(0), last_ts_(0) {
+  LogThrottler() : num_suppressed_(0), last_ts_(0), last_tag_(nullptr) {
     ANNOTATE_BENIGN_RACE(&last_ts_, "OK to be sloppy with log throttling");
   }
 
-  bool ShouldLog(int n_secs, int* num_suppressed) {
+  bool ShouldLog(int n_secs, const char* tag, int* num_suppressed) {
     MicrosecondsInt64 ts = GetMonoTimeMicros();
+
+    // When we switch tags, we should not show the "suppressed" messages, because
+    // in fact it's a different message that we skipped. So, reset it to zero,
+    // and always log the new message.
+    if (tag != last_tag_) {
+      *num_suppressed = num_suppressed_ = 0;
+      last_tag_ = tag;
+      last_ts_ = ts;
+      return true;
+    }
+
     if (ts - last_ts_ < n_secs * 1e6) {
       *num_suppressed = base::subtle::NoBarrier_AtomicIncrement(&num_suppressed_, 1);
       return false;
@@ -200,8 +250,9 @@ class LogThrottler {
  private:
   Atomic32 num_suppressed_;
   uint64_t last_ts_;
+  const char* last_tag_;
 };
-} // namespace logging_internal
+} // namespace logging
 
 std::ostream& operator<<(std::ostream &os, const PRIVATE_ThrottleMsg&);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a12ba3f/src/kudu/util/logging_test_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/logging_test_util.h b/src/kudu/util/logging_test_util.h
index dfa3486..8102375 100644
--- a/src/kudu/util/logging_test_util.h
+++ b/src/kudu/util/logging_test_util.h
@@ -35,7 +35,7 @@ class StringVectorSink : public google::LogSink {
                                     tm_time, message, message_len));
   }
 
-  const std::vector<std::string>& logged_msgs() const {
+  std::vector<std::string>& logged_msgs() {
     return logged_msgs_;
   }