You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/08/23 01:17:03 UTC
[2/4] kudu git commit: consensus_peers: replace bespoke Raft
heartbeat logic with periodic timers
consensus_peers: replace bespoke Raft heartbeat logic with periodic timers
Building on the generic periodic timer implementation, this patch replaces
the one-off Raft heartbeating code with periodic timers.
There's only one semantic change, but it's an important one: the jittering
range has changed from [P/2, P] to [3P/4, 5P/4]. When I wrote commit 1070e76
I was nervous about making such a change, but since it reduces overall
heartbeat load, I think it makes sense to do it.
Change-Id: I5f7e1761d9f36dc6a25bd8e3e7d7a3b5c402afbf
Reviewed-on: http://gerrit.cloudera.org:8080/7734
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c8e04077
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c8e04077
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c8e04077
Branch: refs/heads/master
Commit: c8e0407770bd395cd4bca5096c4cad56ab575cd6
Parents: 780a392
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Aug 18 17:56:29 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Aug 23 01:15:14 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/consensus_peers.cc | 90 +++++---------------
src/kudu/consensus/consensus_peers.h | 22 +----
.../consensus/raft_consensus_quorum-test.cc | 2 +-
3 files changed, 24 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/c8e04077/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index 788b37d..66d155c 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -39,6 +39,7 @@
#include "kudu/gutil/move.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/periodic.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
@@ -47,8 +48,6 @@
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
-#include "kudu/util/random.h"
-#include "kudu/util/random_util.h"
#include "kudu/util/threadpool.h"
DEFINE_int32(consensus_rpc_timeout_ms, 1000,
@@ -82,6 +81,7 @@ DECLARE_int32(raft_heartbeat_interval_ms);
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::Messenger;
+using kudu::rpc::PeriodicTimer;
using kudu::rpc::RpcController;
using kudu::tserver::TabletServerErrorPB;
using std::shared_ptr;
@@ -129,19 +129,27 @@ Peer::Peer(RaftPeerPB peer_pb,
queue_(queue),
failed_attempts_(0),
messenger_(std::move(messenger)),
- raft_pool_token_(raft_pool_token),
- rng_(GetRandomSeed32()) {
+ raft_pool_token_(raft_pool_token) {
}
Status Peer::Init() {
{
std::lock_guard<simple_spinlock> l(peer_lock_);
queue_->TrackPeer(peer_pb_.permanent_uuid());
- UpdateNextHeartbeatTimeUnlocked();
}
- // Schedule the first heartbeat.
- ScheduleNextHeartbeatAndMaybeSignalRequest();
+ // Capture a weak_ptr reference into the functor so it can safely handle
+ // outliving the peer.
+ weak_ptr<Peer> w = shared_from_this();
+ heartbeater_ = PeriodicTimer::Create(
+ messenger_,
+ [w]() {
+ if (auto p = w.lock()) {
+ p->SignalRequest(true);
+ }
+ },
+ MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
+ heartbeater_->Start();
return Status::OK();
}
@@ -244,7 +252,7 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
if (req_has_ops) {
// If we're actually sending ops there's no need to heartbeat for a while.
- UpdateNextHeartbeatTimeUnlocked();
+ heartbeater_->Snooze();
}
MAYBE_FAULT(FLAGS_fault_crash_on_leader_request_fraction);
@@ -419,70 +427,12 @@ void Peer::Close() {
Peer::~Peer() {
Close();
- // We don't own the ops (the queue does).
- request_.mutable_ops()->ExtractSubrange(0, request_.ops_size(), nullptr);
-}
-
-void Peer::ScheduleNextHeartbeatAndMaybeSignalRequest() {
- // We must schedule the next callback with the lowest possible jittered
- // time as the delay so that if SendNextRequest() resets the next heartbeat
- // time with a very low value, the scheduled callback can still honor it.
- //
- // In effect, this means the callback period and the heartbeat period are
- // decoupled from one another.
- MonoDelta schedule_delay = GetHeartbeatJitterLowerBound();
- bool signal_now = false;
- {
- std::lock_guard<simple_spinlock> l(peer_lock_);
- if (closed_) {
- // The peer was closed.
- return;
- }
-
- MonoTime now = MonoTime::Now();
- if (now < next_hb_time_) {
- // It's not yet time to heartbeat. Reduce the scheduled delay if enough
- // time has elapsed, but don't increase it.
- schedule_delay = std::min(schedule_delay, next_hb_time_ - now);
- } else {
- // It's time to heartbeat. Although the next heartbeat time is set now,
- // it may be set again when we get to SendNextRequest().
- signal_now = true;
- UpdateNextHeartbeatTimeUnlocked();
- }
- }
-
- if (signal_now) {
- SignalRequest(true);
+ if (heartbeater_) {
+ heartbeater_->Stop();
}
- // Capture a weak_ptr reference into the submitted functor so that we can
- // safely handle the functor outliving its peer.
- weak_ptr<Peer> w_this = shared_from_this();
- messenger_->ScheduleOnReactor([w_this](const Status& s) {
- if (!s.ok()) {
- // The reactor was shut down.
- return;
- }
- if (auto p = w_this.lock()) {
- p->ScheduleNextHeartbeatAndMaybeSignalRequest();
- }
- }, schedule_delay);
-}
-
-MonoDelta Peer::GetHeartbeatJitterLowerBound() {
- return MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms / 2);
-}
-
-void Peer::UpdateNextHeartbeatTimeUnlocked() {
- DCHECK(peer_lock_.is_locked());
-
- // We randomize the next heartbeat time between period/2 and period so
- // multiple tablets on the same TS don't end up heartbeating in lockstep.
- int64_t half_period_ms = GetHeartbeatJitterLowerBound().ToMilliseconds();
- int64_t additional_jitter_ms = rng_.NextDoubleFraction() * half_period_ms;
- next_hb_time_ = MonoTime::Now() + MonoDelta::FromMilliseconds(
- half_period_ms + additional_jitter_ms);
+ // We don't own the ops (the queue does).
+ request_.mutable_ops()->ExtractSubrange(0, request_.ops_size(), nullptr);
}
RpcPeerProxy::RpcPeerProxy(gscoped_ptr<HostPort> hostport,
http://git-wip-us.apache.org/repos/asf/kudu/blob/c8e04077/src/kudu/consensus/consensus_peers.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.h b/src/kudu/consensus/consensus_peers.h
index 38018ea..78f0a4b 100644
--- a/src/kudu/consensus/consensus_peers.h
+++ b/src/kudu/consensus/consensus_peers.h
@@ -44,6 +44,7 @@ class ThreadPoolToken;
namespace rpc {
class Messenger;
+class PeriodicTimer;
}
namespace consensus {
@@ -111,9 +112,6 @@ class Peer : public std::enable_shared_from_this<Peer> {
std::shared_ptr<Peer>* peer);
private:
- // Returns the lowest possible jitter-induced heartbeat time delay.
- static MonoDelta GetHeartbeatJitterLowerBound();
-
Peer(RaftPeerPB peer_pb,
std::string tablet_id,
std::string leader_uuid,
@@ -149,17 +147,6 @@ class Peer : public std::enable_shared_from_this<Peer> {
std::string LogPrefixUnlocked() const;
- // Schedules the next heartbeat for this peer, optionally sending a heartbeat
- // now if it makes sense to do so. Initially called from Init() to schedule
- // the first heartbeat, and subsequently as a callback running on a reactor thread.
- void ScheduleNextHeartbeatAndMaybeSignalRequest();
-
- // Resets the next time that we should heartbeat to this peer. Does not
- // perform any actual scheduling.
- //
- // Must be called with 'peer_lock_' held.
- void UpdateNextHeartbeatTimeUnlocked();
-
const std::string& tablet_id() const { return tablet_id_; }
const std::string tablet_id_;
@@ -195,11 +182,8 @@ class Peer : public std::enable_shared_from_this<Peer> {
// RaftConsensus owns this shared token and is responsible for destroying it.
ThreadPoolToken* raft_pool_token_;
- // The next time that a heartbeat should be sent to this peer.
- MonoTime next_hb_time_;
-
- // A PRNG, used to generate jittered heartbeat time.
- Random rng_;
+ // Repeating timer responsible for scheduling heartbeats to this peer.
+ std::shared_ptr<rpc::PeriodicTimer> heartbeater_;
// lock that protects Peer state changes, initialization, etc.
mutable simple_spinlock peer_lock_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/c8e04077/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 5439cd8..45d5cec 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -865,7 +865,7 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) {
// should have increased between 3 to 8 times.
//
// Why the variance? Heartbeat timing is jittered such that the period
- // between heartbeats can be anywhere from half the interval to the full interval.
+ // between heartbeats can be anywhere from 3/4 to 5/4 the interval.
SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 4LL));
int repl0_final_count = follower0->update_calls_for_tests();