You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/08/23 01:17:03 UTC

[2/4] kudu git commit: consensus_peers: replace bespoke Raft heartbeat logic with periodic timers

consensus_peers: replace bespoke Raft heartbeat logic with periodic timers

Building on the generic periodic timer implementation, this patch replaces
the one-off Raft heartbeating code with periodic timers.

There's only one semantic change, but it's an important one: the jittering
range has changed from [P/2, P] to [3P/4, 5P/4]. When I wrote commit 1070e76
I was nervous about making such a change, but since it reduces overall
heartbeat load, I think it makes sense to do it.

Change-Id: I5f7e1761d9f36dc6a25bd8e3e7d7a3b5c402afbf
Reviewed-on: http://gerrit.cloudera.org:8080/7734
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c8e04077
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c8e04077
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c8e04077

Branch: refs/heads/master
Commit: c8e0407770bd395cd4bca5096c4cad56ab575cd6
Parents: 780a392
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Aug 18 17:56:29 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Aug 23 01:15:14 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_peers.cc           | 90 +++++---------------
 src/kudu/consensus/consensus_peers.h            | 22 +----
 .../consensus/raft_consensus_quorum-test.cc     |  2 +-
 3 files changed, 24 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/c8e04077/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index 788b37d..66d155c 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -39,6 +39,7 @@
 #include "kudu/gutil/move.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/messenger.h"
+#include "kudu/rpc/periodic.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
@@ -47,8 +48,6 @@
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
-#include "kudu/util/random.h"
-#include "kudu/util/random_util.h"
 #include "kudu/util/threadpool.h"
 
 DEFINE_int32(consensus_rpc_timeout_ms, 1000,
@@ -82,6 +81,7 @@ DECLARE_int32(raft_heartbeat_interval_ms);
 
 using kudu::pb_util::SecureShortDebugString;
 using kudu::rpc::Messenger;
+using kudu::rpc::PeriodicTimer;
 using kudu::rpc::RpcController;
 using kudu::tserver::TabletServerErrorPB;
 using std::shared_ptr;
@@ -129,19 +129,27 @@ Peer::Peer(RaftPeerPB peer_pb,
       queue_(queue),
       failed_attempts_(0),
       messenger_(std::move(messenger)),
-      raft_pool_token_(raft_pool_token),
-      rng_(GetRandomSeed32()) {
+      raft_pool_token_(raft_pool_token) {
 }
 
 Status Peer::Init() {
   {
     std::lock_guard<simple_spinlock> l(peer_lock_);
     queue_->TrackPeer(peer_pb_.permanent_uuid());
-    UpdateNextHeartbeatTimeUnlocked();
   }
 
-  // Schedule the first heartbeat.
-  ScheduleNextHeartbeatAndMaybeSignalRequest();
+  // Capture a weak_ptr reference into the functor so it can safely handle
+  // outliving the peer.
+  weak_ptr<Peer> w = shared_from_this();
+  heartbeater_ = PeriodicTimer::Create(
+      messenger_,
+      [w]() {
+        if (auto p = w.lock()) {
+          p->SignalRequest(true);
+        }
+      },
+      MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
+  heartbeater_->Start();
   return Status::OK();
 }
 
@@ -244,7 +252,7 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
 
   if (req_has_ops) {
     // If we're actually sending ops there's no need to heartbeat for a while.
-    UpdateNextHeartbeatTimeUnlocked();
+    heartbeater_->Snooze();
   }
 
   MAYBE_FAULT(FLAGS_fault_crash_on_leader_request_fraction);
@@ -419,70 +427,12 @@ void Peer::Close() {
 
 Peer::~Peer() {
   Close();
-  // We don't own the ops (the queue does).
-  request_.mutable_ops()->ExtractSubrange(0, request_.ops_size(), nullptr);
-}
-
-void Peer::ScheduleNextHeartbeatAndMaybeSignalRequest() {
-  // We must schedule the next callback with the lowest possible jittered
-  // time as the delay so that if SendNextRequest() resets the next heartbeat
-  // time with a very low value, the scheduled callback can still honor it.
-  //
-  // In effect, this means the callback period and the heartbeat period are
-  // decoupled from one another.
-  MonoDelta schedule_delay = GetHeartbeatJitterLowerBound();
-  bool signal_now = false;
-  {
-    std::lock_guard<simple_spinlock> l(peer_lock_);
-    if (closed_) {
-      // The peer was closed.
-      return;
-    }
-
-    MonoTime now = MonoTime::Now();
-    if (now < next_hb_time_) {
-      // It's not yet time to heartbeat. Reduce the scheduled delay if enough
-      // time has elapsed, but don't increase it.
-      schedule_delay = std::min(schedule_delay, next_hb_time_ - now);
-    } else {
-      // It's time to heartbeat. Although the next heartbeat time is set now,
-      // it may be set again when we get to SendNextRequest().
-      signal_now = true;
-      UpdateNextHeartbeatTimeUnlocked();
-    }
-  }
-
-  if (signal_now) {
-    SignalRequest(true);
+  if (heartbeater_) {
+    heartbeater_->Stop();
   }
 
-  // Capture a weak_ptr reference into the submitted functor so that we can
-  // safely handle the functor outliving its peer.
-  weak_ptr<Peer> w_this = shared_from_this();
-  messenger_->ScheduleOnReactor([w_this](const Status& s) {
-    if (!s.ok()) {
-      // The reactor was shut down.
-      return;
-    }
-    if (auto p = w_this.lock()) {
-      p->ScheduleNextHeartbeatAndMaybeSignalRequest();
-    }
-  }, schedule_delay);
-}
-
-MonoDelta Peer::GetHeartbeatJitterLowerBound() {
-  return MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms / 2);
-}
-
-void Peer::UpdateNextHeartbeatTimeUnlocked() {
-  DCHECK(peer_lock_.is_locked());
-
-  // We randomize the next heartbeat time between period/2 and period so
-  // multiple tablets on the same TS don't end up heartbeating in lockstep.
-  int64_t half_period_ms = GetHeartbeatJitterLowerBound().ToMilliseconds();
-  int64_t additional_jitter_ms = rng_.NextDoubleFraction() * half_period_ms;
-  next_hb_time_ =  MonoTime::Now() + MonoDelta::FromMilliseconds(
-      half_period_ms + additional_jitter_ms);
+  // We don't own the ops (the queue does).
+  request_.mutable_ops()->ExtractSubrange(0, request_.ops_size(), nullptr);
 }
 
 RpcPeerProxy::RpcPeerProxy(gscoped_ptr<HostPort> hostport,

http://git-wip-us.apache.org/repos/asf/kudu/blob/c8e04077/src/kudu/consensus/consensus_peers.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.h b/src/kudu/consensus/consensus_peers.h
index 38018ea..78f0a4b 100644
--- a/src/kudu/consensus/consensus_peers.h
+++ b/src/kudu/consensus/consensus_peers.h
@@ -44,6 +44,7 @@ class ThreadPoolToken;
 
 namespace rpc {
 class Messenger;
+class PeriodicTimer;
 }
 
 namespace consensus {
@@ -111,9 +112,6 @@ class Peer : public std::enable_shared_from_this<Peer> {
                               std::shared_ptr<Peer>* peer);
 
  private:
-  // Returns the lowest possible jitter-induced heartbeat time delay.
-  static MonoDelta GetHeartbeatJitterLowerBound();
-
   Peer(RaftPeerPB peer_pb,
        std::string tablet_id,
        std::string leader_uuid,
@@ -149,17 +147,6 @@ class Peer : public std::enable_shared_from_this<Peer> {
 
   std::string LogPrefixUnlocked() const;
 
-  // Schedules the next heartbeat for this peer, optionally sending a heartbeat
-  // now if it makes sense to do so. Initially called from Init() to schedule
-  // the first heartbeat, and subsequently as a callback running on a reactor thread.
-  void ScheduleNextHeartbeatAndMaybeSignalRequest();
-
-  // Resets the next time that we should heartbeat to this peer. Does not
-  // perform any actual scheduling.
-  //
-  // Must be called with 'peer_lock_' held.
-  void UpdateNextHeartbeatTimeUnlocked();
-
   const std::string& tablet_id() const { return tablet_id_; }
 
   const std::string tablet_id_;
@@ -195,11 +182,8 @@ class Peer : public std::enable_shared_from_this<Peer> {
   // RaftConsensus owns this shared token and is responsible for destroying it.
   ThreadPoolToken* raft_pool_token_;
 
-  // The next time that a heartbeat should be sent to this peer.
-  MonoTime next_hb_time_;
-
-  // A PRNG, used to generate jittered heartbeat time.
-  Random rng_;
+  // Repeating timer responsible for scheduling heartbeats to this peer.
+  std::shared_ptr<rpc::PeriodicTimer> heartbeater_;
 
   // lock that protects Peer state changes, initialization, etc.
   mutable simple_spinlock peer_lock_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/c8e04077/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 5439cd8..45d5cec 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -865,7 +865,7 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) {
   // should have increased between 3 to 8 times.
   //
   // Why the variance? Heartbeat timing is jittered such that the period
-  // between heartbeats can be anywhere from half the interval to the full interval.
+  // between heartbeats can be anywhere from 3/4 to 5/4 the interval.
   SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 4LL));
 
   int repl0_final_count = follower0->update_calls_for_tests();