You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/08/23 01:17:04 UTC

[3/4] kudu git commit: consensus: use periodic timers for failure detection

consensus: use periodic timers for failure detection

This patch replaces the existing failure detection (FD) with a new approach
built using periodic timers. The existing approach had a major drawback:
each failure monitor required a dedicated thread, and there was a monitor
for each replica.

The new approach "schedules" a failure into the future using the server's
reactor thread pool, "resetting" it when leader activity is detected.
There's an inherent semantic mismatch between dedicated threads that
periodically wake to check for failures and this new approach; I tried to
provide similar semantics as best I could.

Things worth noting:
- Most importantly: some FD periods are now shorter. This is because the
  existing implementation "double counted" failure periods when adding
  backoff (once in LeaderElectionExpBackoffDeltaUnlocked, and once by virtue
  of the failure period comparison made by the failure monitor). This seemed
  accidental to me, so I didn't bother preserving that behavior.
- It's tough to "expire" an FD using timers. Luckily, this only happens in
  RaftConsensus::Start, so by making PeriodicTimer::Start accept an optional
  delta, we can begin FD with an early delta that reflects the desired
  "detect a failure immediately but not too quickly" semantic, similar to
  how the dedicated failure monitor thread operates.
- ReportFailureDetected is now run on a shared reactor thread rather than a
  dedicated failure monitor thread. Since StartElection performs IO, I
  thunked it onto the Raft thread pool.
- Timer operations cannot fail, so I removed the return values from the
  various FD-related functions.
- I also consolidated the two SnoozeFailureDetector variants; I found that
  this made it easier to look at all the call-sites.

Change-Id: I8acdb44e12b975fda4a226aa784db95bc7b4e330
Reviewed-on: http://gerrit.cloudera.org:8080/7735
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/21b0f3d5
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/21b0f3d5
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/21b0f3d5

Branch: refs/heads/master
Commit: 21b0f3d5e255760535e281efe5879fe657df1f1c
Parents: c8e0407
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Aug 18 17:00:50 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Aug 23 01:15:17 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/raft_consensus.cc            | 210 +++++++------------
 src/kudu/consensus/raft_consensus.h             |  74 +++----
 .../exactly_once_writes-itest.cc                |   4 -
 .../integration-tests/raft_consensus-itest.cc   |   4 -
 4 files changed, 109 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/21b0f3d5/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index c082b89..042c5d1 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -55,9 +55,9 @@
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/stringpiece.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/periodic.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/debug/trace_event.h"
-#include "kudu/util/failure_detector.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/metrics.h"
@@ -77,19 +77,6 @@ DEFINE_int32(raft_heartbeat_interval_ms, 500,
              "and consider a leader to have failed if it misses several in a row.");
 TAG_FLAG(raft_heartbeat_interval_ms, advanced);
 
-// Defaults to be the same value as the leader heartbeat interval.
-DEFINE_int32(leader_failure_monitor_check_mean_ms, -1,
-             "The mean failure-checking interval of the randomized failure monitor. If this "
-             "is configured to -1 (the default), uses the value of 'raft_heartbeat_interval_ms'.");
-TAG_FLAG(leader_failure_monitor_check_mean_ms, experimental);
-
-// Defaults to half of the mean (above).
-DEFINE_int32(leader_failure_monitor_check_stddev_ms, -1,
-             "The standard deviation of the failure-checking interval of the randomized "
-             "failure monitor. If this is configured to -1 (the default), this is set to "
-             "half of the mean check interval.");
-TAG_FLAG(leader_failure_monitor_check_stddev_ms, experimental);
-
 DEFINE_double(leader_failure_max_missed_heartbeat_periods, 3.0,
              "Maximum heartbeat periods that the leader can fail to heartbeat in before we "
              "consider the leader to be failed. The total failure timeout in milliseconds is "
@@ -147,41 +134,16 @@ METRIC_DEFINE_gauge_int64(tablet, raft_term,
                           "each time a leader election is started.");
 
 using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::PeriodicTimer;
 using kudu::tserver::TabletServerErrorPB;
 using std::string;
 using std::unique_ptr;
+using std::weak_ptr;
 using strings::Substitute;
 
-namespace  {
-
-// Return the mean interval at which to check for failures of the
-// leader.
-int GetFailureMonitorCheckMeanMs() {
-  int val = FLAGS_leader_failure_monitor_check_mean_ms;
-  if (val < 0) {
-    val = FLAGS_raft_heartbeat_interval_ms;
-  }
-  return val;
-}
-
-// Return the standard deviation for the interval at which to check
-// for failures of the leader.
-int GetFailureMonitorCheckStddevMs() {
-  int val = FLAGS_leader_failure_monitor_check_stddev_ms;
-  if (val < 0) {
-    val = GetFailureMonitorCheckMeanMs() / 2;
-  }
-  return val;
-}
-
-} // anonymous namespace
-
 namespace kudu {
 namespace consensus {
 
-// Special string that represents any known leader to the failure detector.
-static const char* const kTimerId = "election-timer";
-
 RaftConsensus::RaftConsensus(
     ConsensusOptions options,
     RaftPeerPB local_peer_pb,
@@ -269,24 +231,17 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
 
   unique_ptr<PendingRounds> pending(new PendingRounds(LogPrefixThreadSafe(), time_manager_));
 
-  failure_monitor_.reset(new RandomizedFailureMonitor(GetRandomSeed32(),
-                                                      GetFailureMonitorCheckMeanMs(),
-                                                      GetFailureMonitorCheckStddevMs()));
-  failure_detector_.reset(new TimedFailureDetector(MonoDelta::FromMilliseconds(
-          FLAGS_raft_heartbeat_interval_ms *
-          FLAGS_leader_failure_max_missed_heartbeat_periods)));
-
-  // This just starts the monitor thread -- no failure detector is registered yet.
-  if (FLAGS_enable_leader_failure_detection) {
-    RETURN_NOT_OK(failure_monitor_->Start());
-  }
-
-  // Register the failure detector instance with the monitor.
-  // We still have not enabled failure detection for the leader election timer.
-  // That happens separately via the helper functions
-  // EnsureFailureDetector(Enabled/Disabled)Unlocked();
-  RETURN_NOT_OK(failure_monitor_->MonitorFailureDetector(options_.tablet_id,
-                                                         failure_detector_));
+  // Capture a weak_ptr reference into the functor so it can safely handle
+  // outliving the consensus instance.
+  weak_ptr<RaftConsensus> w = shared_from_this();
+  failure_detector_ = PeriodicTimer::Create(
+      peer_proxy_factory_->messenger(),
+      [w]() {
+        if (auto consensus = w.lock()) {
+          consensus->ReportFailureDetected();
+        }
+      },
+      MinimumElectionTimeout());
 
   {
     ThreadRestrictions::AssertWaitAllowed();
@@ -299,7 +254,6 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
     pending_ = std::move(pending);
 
     ClearLeaderUnlocked();
-    RETURN_NOT_OK(EnsureFailureDetectorEnabled());
 
     // Our last persisted term can be higher than the last persisted operation
     // (i.e. if we called an election) but reverse should never happen.
@@ -326,22 +280,26 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
     // appending any uncommitted replicate messages to the queue.
     pending_->SetInitialCommittedOpId(info.last_committed_id);
 
-    // If this is the first term expire the FD immediately so that we have a fast first
-    // election, otherwise we just let the timer expire normally.
+    // If this is the first term expire the FD immediately so that we have a
+    // fast first election, otherwise we just let the timer expire normally.
+    boost::optional<MonoDelta> initial_delta;
     if (GetCurrentTermUnlocked() == 0) {
-      // Initialize the failure detector timeout to some time in the past so that
-      // the next time the failure detector monitor runs it triggers an election
-      // (unless someone else requested a vote from us first, which resets the
-      // election timer). We do it this way instead of immediately running an
-      // election to get a higher likelihood of enough servers being available
-      // when the first one attempts an election to avoid multiple election
-      // cycles on startup, while keeping that "waiting period" random.
+      // The failure detector is initialized to a low value to trigger an early
+      // election (unless someone else requested a vote from us first, which
+      // resets the election timer).
+      //
+      // We do it this way instead of immediately running an election to get a
+      // higher likelihood of enough servers being available when the first one
+      // attempts an election to avoid multiple election cycles on startup,
+      // while keeping that "waiting period" random.
       if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
         LOG_WITH_PREFIX_UNLOCKED(INFO) << "Consensus starting up: Expiring failure detector timer "
                                           "to make a prompt election more likely";
+        initial_delta = MonoDelta::FromMilliseconds(
+            rng_.Uniform(FLAGS_raft_heartbeat_interval_ms));
       }
-      RETURN_NOT_OK(ExpireFailureDetector());
     }
+    EnableFailureDetector(initial_delta);
 
     // Now assume "follower" duties.
     RETURN_NOT_OK(BecomeReplicaUnlocked());
@@ -429,7 +387,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
       return Status::OK();
     }
     if (PREDICT_FALSE(active_role == RaftPeerPB::NON_PARTICIPANT)) {
-      RETURN_NOT_OK(SnoozeFailureDetector()); // Reduce election noise while in this state.
+      SnoozeFailureDetector(DO_NOT_LOG); // Reduce election noise while in this state.
       return Status::IllegalState("Not starting election: Node is currently "
                                   "a non-participant in the raft config",
                                   SecureShortDebugString(cmeta_->ActiveConfig()));
@@ -441,10 +399,8 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
     // Snooze to avoid the election timer firing again as much as possible.
     // We do not disable the election timer while running an election, so that
     // if the election times out, we will try again.
-    RETURN_NOT_OK(EnsureFailureDetectorEnabled());
-
     MonoDelta timeout = LeaderElectionExpBackoffDeltaUnlocked();
-    RETURN_NOT_OK(SnoozeFailureDetector(timeout, ALLOW_LOGGING));
+    SnoozeFailureDetector(ALLOW_LOGGING, timeout);
 
     // Increment the term and vote for ourselves, unless it's a pre-election.
     if (mode != PRE_ELECTION) {
@@ -532,8 +488,8 @@ Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
 
   // Snooze the failure detector for an extra leader failure timeout.
   // This should ensure that a different replica is elected leader after this one steps down.
-  WARN_NOT_OK(SnoozeFailureDetector(MinimumElectionTimeout(), ALLOW_LOGGING),
-              "unable to snooze failure detector after stepping down");
+  SnoozeFailureDetector(ALLOW_LOGGING, MonoDelta::FromMilliseconds(
+      2 * MinimumElectionTimeout().ToMilliseconds()));
   return Status::OK();
 }
 
@@ -545,14 +501,17 @@ scoped_refptr<ConsensusRound> RaftConsensus::NewRound(
                                                std::move(replicated_cb)));
 }
 
-void RaftConsensus::ReportFailureDetected(const std::string& name, const Status& /*msg*/) {
-  DCHECK_EQ(name, kTimerId);
-  // Start an election.
-  Status s = StartElection(FLAGS_raft_enable_pre_election ? PRE_ELECTION : NORMAL_ELECTION,
-                           ELECTION_TIMEOUT_EXPIRED);
-  if (PREDICT_FALSE(!s.ok())) {
-    LOG_WITH_PREFIX(WARNING) << "Failed to trigger leader election: " << s.ToString();
-  }
+void RaftConsensus::ReportFailureDetectedTask() {
+  WARN_NOT_OK(StartElection(FLAGS_raft_enable_pre_election ?
+      PRE_ELECTION : NORMAL_ELECTION, ELECTION_TIMEOUT_EXPIRED),
+              LogPrefixThreadSafe() + "failed to trigger leader election");
+}
+
+void RaftConsensus::ReportFailureDetected() {
+  // We're running on a timer thread; start an election on a different thread pool.
+  WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(
+      &RaftConsensus::ReportFailureDetectedTask, shared_from_this())),
+              LogPrefixThreadSafe() + "failed to submit failure detected task");
 }
 
 Status RaftConsensus::BecomeLeaderUnlocked() {
@@ -564,7 +523,7 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Leader. State: " << ToStringUnlocked();
 
   // Disable FD while we are leader.
-  RETURN_NOT_OK(EnsureFailureDetectorDisabled());
+  DisableFailureDetector();
 
   // Don't vote for anyone if we're a leader.
   withhold_votes_until_ = MonoTime::Max();
@@ -601,7 +560,7 @@ Status RaftConsensus::BecomeReplicaUnlocked() {
   ClearLeaderUnlocked();
 
   // FD should be running while we are a follower.
-  RETURN_NOT_OK(EnsureFailureDetectorEnabled());
+  EnableFailureDetector();
 
   // Now that we're a replica, we can allow voting for other nodes.
   withhold_votes_until_ = MonoTime::Min();
@@ -1219,7 +1178,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     // Snooze the failure detector as soon as we decide to accept the message.
     // We are guaranteed to be acting as a FOLLOWER at this point by the above
     // sanity check.
-    RETURN_NOT_OK(SnoozeFailureDetector());
+    SnoozeFailureDetector(DO_NOT_LOG);
 
     // We update the lag metrics here in addition to after appending to the queue so the
     // metrics get updated even when the operation is rejected.
@@ -1394,8 +1353,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       // If just waiting for our log append to finish lets snooze the timer.
       // We don't want to fire leader election because we're waiting on our own log.
       if (s.IsTimedOut()) {
-        WARN_NOT_OK(SnoozeFailureDetector(),
-                    "failed to snooze failure detector");
+        SnoozeFailureDetector(DO_NOT_LOG);
       }
     } while (s.IsTimedOut());
     RETURN_NOT_OK(s);
@@ -1825,7 +1783,7 @@ void RaftConsensus::Shutdown() {
 
   // Shut down things that might acquire locks during destruction.
   if (raft_pool_token_) raft_pool_token_->Shutdown();
-  if (failure_monitor_) failure_monitor_->Shutdown();
+  if (failure_detector_) DisableFailureDetector();
 
   shutdown_.Store(true, kMemOrderRelease);
 }
@@ -1969,8 +1927,8 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
   // We know our vote will be "yes", so avoid triggering an election while we
   // persist our vote to disk. We use an exponential backoff to avoid too much
   // split-vote contention when nodes display high latencies.
-  MonoDelta additional_backoff = LeaderElectionExpBackoffDeltaUnlocked();
-  RETURN_NOT_OK(SnoozeFailureDetector(additional_backoff, ALLOW_LOGGING));
+  MonoDelta backoff = LeaderElectionExpBackoffDeltaUnlocked();
+  SnoozeFailureDetector(ALLOW_LOGGING, backoff);
 
   if (!request->is_pre_election()) {
     // Persist our vote to disk.
@@ -1981,7 +1939,7 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
 
   // Give peer time to become leader. Snooze one more time after persisting our
   // vote. When disk latency is high, this should help reduce churn.
-  RETURN_NOT_OK(SnoozeFailureDetector(additional_backoff, DO_NOT_LOG));
+  SnoozeFailureDetector(DO_NOT_LOG, backoff);
 
   LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.",
                           GetRequestVoteLogPrefixUnlocked(*request),
@@ -2135,10 +2093,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
     // - When we lose or otherwise we can fall into a cycle, where everyone keeps
     //   triggering elections but no election ever completes because by the time they
     //   finish another one is triggered already.
-    // We ignore the status as we don't want to fail if we the timer is
-    // disabled.
-    ignore_result(SnoozeFailureDetector(LeaderElectionExpBackoffDeltaUnlocked(),
-                                        ALLOW_LOGGING));
+    SnoozeFailureDetector(ALLOW_LOGGING, LeaderElectionExpBackoffDeltaUnlocked());
 
     if (result.decision == VOTE_DENIED) {
       failed_elections_since_stable_leader_++;
@@ -2381,57 +2336,32 @@ void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, con
 }
 
 
-Status RaftConsensus::EnsureFailureDetectorEnabled() {
-  if (PREDICT_FALSE(!FLAGS_enable_leader_failure_detection)) {
-    return Status::OK();
-  }
-  if (failure_detector_->IsTracking(kTimerId)) {
-    return Status::OK();
+void RaftConsensus::EnableFailureDetector(boost::optional<MonoDelta> delta) {
+  if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
+    failure_detector_->Start(std::move(delta));
   }
-  return failure_detector_->Track(kTimerId,
-                                  MonoTime::Now(),
-                                  // Unretained to avoid a circular ref.
-                                  Bind(&RaftConsensus::ReportFailureDetected, Unretained(this)));
 }
 
-Status RaftConsensus::EnsureFailureDetectorDisabled() {
-  if (PREDICT_FALSE(!FLAGS_enable_leader_failure_detection)) {
-    return Status::OK();
+void RaftConsensus::DisableFailureDetector() {
+  if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
+    failure_detector_->Stop();
   }
-
-  if (!failure_detector_->IsTracking(kTimerId)) {
-    return Status::OK();
-  }
-  return failure_detector_->UnTrack(kTimerId);
 }
 
-Status RaftConsensus::ExpireFailureDetector() {
-  if (PREDICT_FALSE(!FLAGS_enable_leader_failure_detection)) {
-    return Status::OK();
-  }
-
-  return failure_detector_->MessageFrom(kTimerId, MonoTime::Min());
-}
-
-Status RaftConsensus::SnoozeFailureDetector() {
-  return SnoozeFailureDetector(MonoDelta::FromMicroseconds(0), DO_NOT_LOG);
-}
-
-Status RaftConsensus::SnoozeFailureDetector(const MonoDelta& additional_delta,
-                                            AllowLogging allow_logging) {
-  if (PREDICT_FALSE(!FLAGS_enable_leader_failure_detection)) {
-    return Status::OK();
-  }
-
-  MonoTime time = MonoTime::Now() + additional_delta;
+void RaftConsensus::SnoozeFailureDetector(AllowLogging allow_logging,
+                                          boost::optional<MonoDelta> delta) {
+  if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
+    if (allow_logging == ALLOW_LOGGING) {
+      LOG(INFO) << LogPrefixThreadSafe()
+                << Substitute("Snoozing failure detection for $0",
+                              delta ? delta->ToString() : "election timeout");
+    }
 
-  if (allow_logging == ALLOW_LOGGING) {
-    LOG(INFO) << LogPrefixThreadSafe()
-              << "Snoozing failure detection for election timeout plus an additional "
-              << additional_delta.ToString();
+    if (!delta) {
+      delta = MinimumElectionTimeout();
+    }
+    failure_detector_->Snooze(std::move(delta));
   }
-
-  return failure_detector_->MessageFrom(kTimerId, time);
 }
 
 MonoDelta RaftConsensus::MinimumElectionTimeout() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/21b0f3d5/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index e80b8b0..efcc06e 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -56,8 +56,6 @@ class optional;
 
 namespace kudu {
 
-class FailureDetector;
-class RandomizedFailureMonitor;
 
 typedef std::lock_guard<simple_spinlock> Lock;
 typedef gscoped_ptr<Lock> ScopedLock;
@@ -69,6 +67,10 @@ class Status;
 template <typename Sig>
 class Callback;
 
+namespace rpc {
+class PeriodicTimer;
+}
+
 namespace consensus {
 
 class ConsensusMetadataManager;
@@ -173,10 +175,6 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
       gscoped_ptr<ReplicateMsg> replicate_msg,
       ConsensusReplicatedCallback replicated_cb);
 
-  // Call StartElection(), log a warning if the call fails (usually due to
-  // being shut down).
-  void ReportFailureDetected(const std::string& name, const Status& msg);
-
   // Called by a Leader to replicate an entry to the state machine.
   //
   // From the leader instance perspective execution proceeds as follows:
@@ -519,40 +517,42 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   // Start tracking the leader for failures. This typically occurs at startup
   // and when the local peer steps down as leader.
+  //
+  // If 'delta' is set, it is used as the initial failure period. Otherwise,
+  // the minimum election timeout is used.
+  //
   // If the failure detector is already registered, has no effect.
-  Status EnsureFailureDetectorEnabled();
+  void EnableFailureDetector(boost::optional<MonoDelta> delta = boost::none);
 
-  // Untrack the current leader from failure detector.
-  // This typically happens when the local peer becomes leader.
-  // If the failure detector is already unregistered, has no effect.
-  Status EnsureFailureDetectorDisabled();
-
-  // Set the failure detector to an "expired" state, so that the next time
-  // the failure monitor runs it triggers an election.
-  // This is primarily intended to be used at startup time.
-  Status ExpireFailureDetector();
+  // Stop tracking the current leader for failures. This typically occurs when
+  // the local peer becomes leader.
+  //
+  // If the failure detector is already disabled, has no effect.
+  void DisableFailureDetector();
 
   // "Reset" the failure detector to indicate leader activity.
-  // The failure detector must currently be enabled.
-  // When this is called a failure is guaranteed not to be detected
-  // before 'FLAGS_leader_failure_max_missed_heartbeat_periods' *
-  // 'FLAGS_raft_heartbeat_interval_ms' has elapsed.
-  Status SnoozeFailureDetector() WARN_UNUSED_RESULT;
-
-  // Like the above but adds 'additional_delta' to the default timeout
-  // period. If allow_logging is set to ALLOW_LOGGING, then this method
-  // will print a log message when called.
-  Status SnoozeFailureDetector(const MonoDelta& additional_delta,
-                               AllowLogging allow_logging) WARN_UNUSED_RESULT;
+  //
+  // When this is called a failure is guaranteed not to be detected before
+  // 'FLAGS_leader_failure_max_missed_heartbeat_periods' *
+  // 'FLAGS_raft_heartbeat_interval_ms' has elapsed, unless 'delta' is set, in
+  // which case its value is used as the next failure period.
+  //
+  // If 'allow_logging' is set to ALLOW_LOGGING, then this method will print a
+  // log message when called.
+  //
+  // If the failure detector is unregistered, has no effect.
+  void SnoozeFailureDetector(AllowLogging allow_logging,
+                             boost::optional<MonoDelta> delta = boost::none);
 
   // Return the minimum election timeout. Due to backoff and random
   // jitter, election timeouts may be longer than this.
   MonoDelta MinimumElectionTimeout() const;
 
-  // Calculates an additional snooze delta for leader election.
-  // The additional delta increases exponentially with the difference
-  // between the current term and the term of the last committed
-  // operation.
+  // Calculates a snooze delta for leader election.
+  //
+  // The delta increases exponentially with the difference between the current
+  // term and the term of the last committed operation.
+  //
   // The maximum delta is capped by 'FLAGS_leader_failure_exp_backoff_max_delta_ms'.
   MonoDelta LeaderElectionExpBackoffDeltaUnlocked();
 
@@ -584,6 +584,13 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
                              const RaftConfigPB& committed_config,
                              const std::string& reason);
 
+  // Called when the failure detector expires.
+  // Submits ReportFailureDetectedTask() to a thread pool.
+  void ReportFailureDetected();
+
+  // Call StartElection(), log a warning if the call fails (usually due to
+  // being shut down).
+  void ReportFailureDetectedTask();
 
   // Handle the completion of replication of a config change operation.
   // If 'status' is OK, this takes care of persisting the new configuration
@@ -735,10 +742,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   Random rng_;
 
-  // TODO(mpercy): Plumb this from ServerBase.
-  std::unique_ptr<RandomizedFailureMonitor> failure_monitor_;
-
-  scoped_refptr<FailureDetector> failure_detector_;
+  std::shared_ptr<rpc::PeriodicTimer> failure_detector_;
 
   // If any RequestVote() RPC arrives before this timestamp,
   // the request will be ignored. This prevents abandoned or partitioned

http://git-wip-us.apache.org/repos/asf/kudu/blob/21b0f3d5/src/kudu/integration-tests/exactly_once_writes-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/exactly_once_writes-itest.cc b/src/kudu/integration-tests/exactly_once_writes-itest.cc
index 8a57675..e7a3d72 100644
--- a/src/kudu/integration-tests/exactly_once_writes-itest.cc
+++ b/src/kudu/integration-tests/exactly_once_writes-itest.cc
@@ -286,8 +286,6 @@ TEST_F(ExactlyOnceSemanticsITest, TestWritesWithExactlyOnceSemanticsWithCrashyNo
 
   // Make leader elections faster so we get through more cycles of leaders.
   ts_flags.emplace_back("--raft_heartbeat_interval_ms=200");
-  ts_flags.emplace_back("--leader_failure_monitor_check_mean_ms=100");
-  ts_flags.emplace_back("--leader_failure_monitor_check_stddev_ms=50");
 
   // Avoid preallocating segments since bootstrap is a little bit
   // faster if it doesn't have to scan forward through the preallocated
@@ -318,8 +316,6 @@ TEST_F(ExactlyOnceSemanticsITest, TestWritesWithExactlyOnceSemanticsWithChurnyEl
 #else
   ts_flags.emplace_back("--raft_heartbeat_interval_ms=2");
 #endif
-  ts_flags.emplace_back("--leader_failure_monitor_check_mean_ms=2");
-  ts_flags.emplace_back("--leader_failure_monitor_check_stddev_ms=1");
 
   int num_batches = 200;
   if (AllowSlowTests()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/21b0f3d5/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index bc7f849..5c0918f 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -897,8 +897,6 @@ void RaftConsensusITest::CreateClusterForCrashyNodesTests() {
 
   // Make leader elections faster so we get through more cycles of leaders.
   ts_flags.emplace_back("--raft_heartbeat_interval_ms=100");
-  ts_flags.emplace_back("--leader_failure_monitor_check_mean_ms=50");
-  ts_flags.emplace_back("--leader_failure_monitor_check_stddev_ms=25");
 
   // Avoid preallocating segments since bootstrap is a little bit
   // faster if it doesn't have to scan forward through the preallocated
@@ -1003,8 +1001,6 @@ void RaftConsensusITest::CreateClusterForChurnyElectionsTests(
 #else
   ts_flags.emplace_back("--raft_heartbeat_interval_ms=1");
 #endif
-  ts_flags.emplace_back("--leader_failure_monitor_check_mean_ms=1");
-  ts_flags.emplace_back("--leader_failure_monitor_check_stddev_ms=1");
 
   ts_flags.insert(ts_flags.end(), extra_ts_flags.cbegin(), extra_ts_flags.cend());