You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/05/31 21:28:01 UTC
[2/2] kudu git commit: consensus: Get rid of ReplicaState class
consensus: Get rid of ReplicaState class
Merges the logic in ReplicaState into the RaftConsensus class.
ReplicaState adds complexity but doesn't really serve a purpose anymore.
There are no functional changes in this patch.
Change-Id: Ie1e62eff37d3f8655100b364939375608063aa80
Reviewed-on: http://gerrit.cloudera.org:8080/7007
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Mike Percy <mp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9e40867c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9e40867c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9e40867c
Branch: refs/heads/master
Commit: 9e40867cc9a0d9a7cdc912140a6554eaec5a8d1e
Parents: ca2704c
Author: Mike Percy <mp...@apache.org>
Authored: Fri May 26 19:09:33 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Wed May 31 21:27:42 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/CMakeLists.txt | 1 -
src/kudu/consensus/consensus.h | 4 +-
src/kudu/consensus/raft_consensus.cc | 733 +++++++++++++------
src/kudu/consensus/raft_consensus.h | 221 +++++-
.../consensus/raft_consensus_quorum-test.cc | 25 +-
src/kudu/consensus/raft_consensus_state.cc | 360 ---------
src/kudu/consensus/raft_consensus_state.h | 232 ------
7 files changed, 700 insertions(+), 876 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt
index 929fb16..e93ea94 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -106,7 +106,6 @@ set(CONSENSUS_SRCS
pending_rounds.cc
quorum_util.cc
raft_consensus.cc
- raft_consensus_state.cc
time_manager.cc
)
http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.h b/src/kudu/consensus/consensus.h
index 7084aff..59910be 100644
--- a/src/kudu/consensus/consensus.h
+++ b/src/kudu/consensus/consensus.h
@@ -258,10 +258,10 @@ class Consensus : public RefCountedThreadSafe<Consensus> {
virtual RaftPeerPB::Role role() const = 0;
// Returns the uuid of this peer.
- virtual std::string peer_uuid() const = 0;
+ virtual const std::string& peer_uuid() const = 0;
// Returns the id of the tablet whose updates this consensus instance helps coordinate.
- virtual std::string tablet_id() const = 0;
+ virtual const std::string& tablet_id() const = 0;
virtual scoped_refptr<TimeManager> time_manager() const = 0;
http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 363478b..c5370c7 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -32,7 +32,6 @@
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/peer_manager.h"
#include "kudu/consensus/quorum_util.h"
-#include "kudu/consensus/raft_consensus_state.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/stringprintf.h"
@@ -161,7 +160,7 @@ using tserver::TabletServerErrorPB;
static const char* const kTimerId = "election-timer";
scoped_refptr<RaftConsensus> RaftConsensus::Create(
- const ConsensusOptions& options,
+ ConsensusOptions options,
unique_ptr<ConsensusMetadata> cmeta,
const RaftPeerPB& local_peer_pb,
const scoped_refptr<MetricEntity>& metric_entity,
@@ -201,7 +200,7 @@ scoped_refptr<RaftConsensus> RaftConsensus::Create(
log));
return make_scoped_refptr(new RaftConsensus(
- options,
+ std::move(options),
std::move(cmeta),
std::move(rpc_factory),
std::move(queue),
@@ -217,27 +216,31 @@ scoped_refptr<RaftConsensus> RaftConsensus::Create(
}
RaftConsensus::RaftConsensus(
- const ConsensusOptions& options,
+ ConsensusOptions options,
unique_ptr<ConsensusMetadata> cmeta,
gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
gscoped_ptr<PeerMessageQueue> queue,
gscoped_ptr<PeerManager> peer_manager,
gscoped_ptr<ThreadPool> thread_pool,
const scoped_refptr<MetricEntity>& metric_entity,
- const std::string& peer_uuid,
+ std::string peer_uuid,
scoped_refptr<TimeManager> time_manager,
ReplicaTransactionFactory* txn_factory,
const scoped_refptr<log::Log>& log,
shared_ptr<MemTracker> parent_mem_tracker,
Callback<void(const std::string& reason)> mark_dirty_clbk)
- : thread_pool_(std::move(thread_pool)),
- log_(log),
- time_manager_(std::move(time_manager)),
- peer_proxy_factory_(std::move(peer_proxy_factory)),
- txn_factory_(txn_factory),
- peer_manager_(std::move(peer_manager)),
- queue_(std::move(queue)),
- pending_(Substitute("T $0 P $1: ", options.tablet_id, peer_uuid), time_manager_),
+ : options_(std::move(options)),
+ peer_uuid_(std::move(peer_uuid)),
+ state_(kInitialized),
+ cmeta_(DCHECK_NOTNULL(std::move(cmeta))),
+ thread_pool_(DCHECK_NOTNULL(std::move(thread_pool))),
+ log_(DCHECK_NOTNULL(log)),
+ time_manager_(DCHECK_NOTNULL(std::move(time_manager))),
+ peer_proxy_factory_(DCHECK_NOTNULL(std::move(peer_proxy_factory))),
+ txn_factory_(DCHECK_NOTNULL(txn_factory)),
+ peer_manager_(DCHECK_NOTNULL(std::move(peer_manager))),
+ queue_(DCHECK_NOTNULL(std::move(queue))),
+ pending_(Substitute("T $0 P $1: ", options_.tablet_id, peer_uuid_), time_manager_),
rng_(GetRandomSeed32()),
failure_monitor_(GetRandomSeed32(), GetFailureMonitorCheckMeanMs(),
GetFailureMonitorCheckStddevMs()),
@@ -253,12 +256,8 @@ RaftConsensus::RaftConsensus(
follower_memory_pressure_rejections_(metric_entity->FindOrCreateCounter(
&METRIC_follower_memory_pressure_rejections)),
term_metric_(metric_entity->FindOrCreateGauge(&METRIC_raft_term,
- cmeta->current_term())),
+ cmeta_->current_term())),
parent_mem_tracker_(std::move(parent_mem_tracker)) {
- DCHECK(log_);
- state_.reset(new ReplicaState(options,
- peer_uuid,
- std::move(cmeta)));
}
RaftConsensus::~RaftConsensus() {
@@ -275,21 +274,31 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
// We still have not enabled failure detection for the leader election timer.
// That happens separately via the helper functions
// EnsureFailureDetector(Enabled/Disabled)Unlocked();
- RETURN_NOT_OK(failure_monitor_.MonitorFailureDetector(state_->GetOptions().tablet_id,
+ RETURN_NOT_OK(failure_monitor_.MonitorFailureDetector(options_.tablet_id,
failure_detector_));
{
- ReplicaState::UniqueLock lock;
- RETURN_NOT_OK(state_->LockForStart(&lock));
- state_->ClearLeaderUnlocked();
+ UniqueLock lock;
+ RETURN_NOT_OK(LockForStart(&lock));
+ ClearLeaderUnlocked();
+
+ // Our last persisted term can be higher than the last persisted operation
+ // (i.e. if we called an election) but reverse should never happen.
+ if (info.last_id.term() > GetCurrentTermUnlocked()) {
+ return Status::Corruption(Substitute("Unable to start RaftConsensus: "
+ "The last op in the WAL with id $0 has a term ($1) that is greater "
+ "than the latest recorded term, which is $2",
+ OpIdToString(info.last_id),
+ info.last_id.term(),
+ GetCurrentTermUnlocked()));
+ }
- RETURN_NOT_OK_PREPEND(state_->StartUnlocked(info.last_id),
- "Unable to start Raft ReplicaState");
+ state_ = kRunning;
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Replica starting. Triggering "
<< info.orphaned_replicates.size()
<< " pending transactions. Active config: "
- << SecureShortDebugString(state_->GetActiveConfigUnlocked());
+ << SecureShortDebugString(GetActiveConfigUnlocked());
for (ReplicateMsg* replicate : info.orphaned_replicates) {
ReplicateRefPtr replicate_ptr = make_scoped_refptr_replicate(new ReplicateMsg(*replicate));
RETURN_NOT_OK(StartReplicaTransactionUnlocked(replicate_ptr));
@@ -301,14 +310,14 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
}
{
- ReplicaState::UniqueLock lock;
- RETURN_NOT_OK(state_->LockForConfigChange(&lock));
+ UniqueLock lock;
+ RETURN_NOT_OK(LockForConfigChange(&lock));
RETURN_NOT_OK(EnsureFailureDetectorEnabledUnlocked());
// If this is the first term expire the FD immediately so that we have a fast first
// election, otherwise we just let the timer expire normally.
- if (state_->GetCurrentTermUnlocked() == 0) {
+ if (GetCurrentTermUnlocked() == 0) {
// Initialize the failure detector timeout to some time in the past so that
// the next time the failure detector monitor runs it triggers an election
// (unless someone else requested a vote from us first, which resets the
@@ -341,21 +350,21 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
}
bool RaftConsensus::IsRunning() const {
- ReplicaState::UniqueLock lock;
- Status s = state_->LockForRead(&lock);
+ UniqueLock lock;
+ Status s = LockForRead(&lock);
if (PREDICT_FALSE(!s.ok())) return false;
- return state_->state() == ReplicaState::kRunning;
+ return state_ == kRunning;
}
Status RaftConsensus::EmulateElection() {
- ReplicaState::UniqueLock lock;
- RETURN_NOT_OK(state_->LockForConfigChange(&lock));
+ UniqueLock lock;
+ RETURN_NOT_OK(LockForConfigChange(&lock));
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Emulating election...";
// Assume leadership of new term.
- RETURN_NOT_OK(HandleTermAdvanceUnlocked(state_->GetCurrentTermUnlocked() + 1));
- SetLeaderUuidUnlocked(state_->GetPeerUuid());
+ RETURN_NOT_OK(HandleTermAdvanceUnlocked(GetCurrentTermUnlocked() + 1));
+ SetLeaderUuidUnlocked(peer_uuid_);
return BecomeLeaderUnlocked();
}
@@ -391,14 +400,14 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
const char* mode_str = ModeString(mode);
TRACE_EVENT2("consensus", "RaftConsensus::StartElection",
- "peer", state_->LogPrefixThreadSafe(),
+ "peer", LogPrefixThreadSafe(),
"mode", mode_str);
scoped_refptr<LeaderElection> election;
{
- ReplicaState::UniqueLock lock;
- RETURN_NOT_OK(state_->LockForConfigChange(&lock));
+ UniqueLock lock;
+ RETURN_NOT_OK(LockForConfigChange(&lock));
- RaftPeerPB::Role active_role = state_->GetActiveRoleUnlocked();
+ RaftPeerPB::Role active_role = GetActiveRoleUnlocked();
if (active_role == RaftPeerPB::LEADER) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Not starting " << mode << " -- already leader";
return Status::OK();
@@ -407,11 +416,11 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
SnoozeFailureDetectorUnlocked(); // Avoid excessive election noise while in this state.
return Status::IllegalState("Not starting election: Node is currently "
"a non-participant in the raft config",
- SecureShortDebugString(state_->GetActiveConfigUnlocked()));
+ SecureShortDebugString(GetActiveConfigUnlocked()));
}
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Starting " << mode_str
- << " (" << ReasonString(reason, state_->GetLeaderUuidUnlocked()) << ")";
+ << " (" << ReasonString(reason, GetLeaderUuidUnlocked()) << ")";
// Snooze to avoid the election timer firing again as much as possible.
// We do not disable the election timer while running an election, so that
@@ -427,12 +436,12 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
// We skip flushing the term to disk because setting the vote just below also
// flushes to disk, and the double fsync doesn't buy us anything.
- RETURN_NOT_OK(HandleTermAdvanceUnlocked(state_->GetCurrentTermUnlocked() + 1,
- ReplicaState::SKIP_FLUSH_TO_DISK));
- RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(state_->GetPeerUuid()));
+ RETURN_NOT_OK(HandleTermAdvanceUnlocked(GetCurrentTermUnlocked() + 1,
+ SKIP_FLUSH_TO_DISK));
+ RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(peer_uuid_));
}
- const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
+ const RaftConfigPB& active_config = GetActiveConfigUnlocked();
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Starting " << mode_str << " with config: "
<< SecureShortDebugString(active_config);
@@ -443,23 +452,23 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
// Vote for ourselves.
bool duplicate;
- RETURN_NOT_OK(counter->RegisterVote(state_->GetPeerUuid(), VOTE_GRANTED, &duplicate));
- CHECK(!duplicate) << state_->LogPrefixUnlocked()
+ RETURN_NOT_OK(counter->RegisterVote(peer_uuid_, VOTE_GRANTED, &duplicate));
+ CHECK(!duplicate) << LogPrefixUnlocked()
<< "Inexplicable duplicate self-vote for term "
- << state_->GetCurrentTermUnlocked();
+ << GetCurrentTermUnlocked();
VoteRequestPB request;
request.set_ignore_live_leader(mode == ELECT_EVEN_IF_LEADER_IS_ALIVE);
- request.set_candidate_uuid(state_->GetPeerUuid());
+ request.set_candidate_uuid(peer_uuid_);
if (mode == PRE_ELECTION) {
// In a pre-election, we haven't bumped our own term yet, so we need to be
// asking for votes for the next term.
request.set_is_pre_election(true);
- request.set_candidate_term(state_->GetCurrentTermUnlocked() + 1);
+ request.set_candidate_term(GetCurrentTermUnlocked() + 1);
} else {
- request.set_candidate_term(state_->GetCurrentTermUnlocked());
+ request.set_candidate_term(GetCurrentTermUnlocked());
}
- request.set_tablet_id(state_->GetOptions().tablet_id);
+ request.set_tablet_id(options_.tablet_id);
*request.mutable_candidate_status()->mutable_last_received() =
queue_->GetLastOpIdInLog();
@@ -480,7 +489,8 @@ Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) {
while (role() != consensus::RaftPeerPB::LEADER) {
if (MonoTime::Now() >= deadline) {
return Status::TimedOut(Substitute("Peer $0 is not leader of tablet $1 after $2. Role: $3",
- peer_uuid(), tablet_id(), timeout.ToString(), role()));
+ peer_uuid_, options_.tablet_id, timeout.ToString(),
+ role()));
}
SleepFor(MonoDelta::FromMilliseconds(10));
}
@@ -489,9 +499,9 @@ Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) {
Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
TRACE_EVENT0("consensus", "RaftConsensus::StepDown");
- ReplicaState::UniqueLock lock;
- RETURN_NOT_OK(state_->LockForConfigChange(&lock));
- if (state_->GetActiveRoleUnlocked() != RaftPeerPB::LEADER) {
+ UniqueLock lock;
+ RETURN_NOT_OK(LockForConfigChange(&lock));
+ if (GetActiveRoleUnlocked() != RaftPeerPB::LEADER) {
resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
StatusToPB(Status::IllegalState("Not currently leader"),
resp->mutable_error()->mutable_status());
@@ -514,9 +524,9 @@ void RaftConsensus::ReportFailureDetected(const std::string& name, const Status&
Status RaftConsensus::BecomeLeaderUnlocked() {
TRACE_EVENT2("consensus", "RaftConsensus::BecomeLeaderUnlocked",
- "peer", peer_uuid(),
- "tablet", tablet_id());
- LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Leader. State: " << state_->ToStringUnlocked();
+ "peer", peer_uuid_,
+ "tablet", options_.tablet_id);
+ LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Leader. State: " << ToStringUnlocked();
// Disable FD while we are leader.
RETURN_NOT_OK(EnsureFailureDetectorDisabledUnlocked());
@@ -547,9 +557,9 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
Status RaftConsensus::BecomeReplicaUnlocked() {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Follower/Learner. State: "
- << state_->ToStringUnlocked();
+ << ToStringUnlocked();
- state_->ClearLeaderUnlocked();
+ ClearLeaderUnlocked();
// FD should be running while we are a follower.
RETURN_NOT_OK(EnsureFailureDetectorEnabledUnlocked());
@@ -570,9 +580,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
std::lock_guard<simple_spinlock> lock(update_lock_);
{
- ReplicaState::UniqueLock lock;
- RETURN_NOT_OK(state_->LockForReplicate(&lock, *round->replicate_msg()));
- RETURN_NOT_OK(round->CheckBoundTerm(state_->GetCurrentTermUnlocked()));
+ UniqueLock lock;
+ RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg()));
+ RETURN_NOT_OK(round->CheckBoundTerm(GetCurrentTermUnlocked()));
RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
}
@@ -581,9 +591,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
}
Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) {
- ReplicaState::UniqueLock lock;
- RETURN_NOT_OK(state_->LockForReplicate(&lock, *round->replicate_msg()));
- round->BindToTerm(state_->GetCurrentTermUnlocked());
+ UniqueLock lock;
+ RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg()));
+ round->BindToTerm(GetCurrentTermUnlocked());
return Status::OK();
}
@@ -615,7 +625,7 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
const RaftConfigPB& new_config = change_record->new_config();
if (!new_config.unsafe_config_change()) {
- Status s = state_->CheckNoConfigChangePendingUnlocked();
+ Status s = CheckNoConfigChangePendingUnlocked();
if (PREDICT_FALSE(!s.ok())) {
s = s.CloneAndAppend(Substitute("\n New config: $0", SecureShortDebugString(new_config)));
LOG_WITH_PREFIX_UNLOCKED(INFO) << s.ToString();
@@ -625,10 +635,10 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
// Check if the pending Raft config has an OpId less than the committed
// config. If so, this is a replay at startup in which the COMMIT
// messages were delayed.
- const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked();
+ const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
if (round->replicate_msg()->id().index() > committed_config.opid_index()) {
- RETURN_NOT_OK(state_->SetPendingConfigUnlocked(new_config));
- if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
+ RETURN_NOT_OK(SetPendingConfigUnlocked(new_config));
+ if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
RETURN_NOT_OK(RefreshConsensusQueueAndPeersUnlocked());
}
} else {
@@ -645,10 +655,10 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
}
void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
- ReplicaState::UniqueLock lock;
- Status s = state_->LockForCommit(&lock);
+ UniqueLock lock;
+ Status s = LockForCommit(&lock);
if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << state_->LogPrefixThreadSafe()
+ LOG(WARNING) << LogPrefixThreadSafe()
<< "Unable to take state lock to update committed index: "
<< s.ToString();
return;
@@ -656,16 +666,16 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
pending_.AdvanceCommittedIndex(commit_index);
- if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
+ if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
peer_manager_->SignalRequest(false);
}
}
void RaftConsensus::NotifyTermChange(int64_t term) {
- ReplicaState::UniqueLock lock;
- Status s = state_->LockForConfigChange(&lock);
+ UniqueLock lock;
+ Status s = LockForConfigChange(&lock);
if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << state_->LogPrefixThreadSafe() << "Unable to lock ReplicaState for term change"
+ LOG(WARNING) << LogPrefixThreadSafe() << "Unable to lock consensus for term change"
<< " when notified of new term " << term << ": " << s.ToString();
return;
}
@@ -680,22 +690,22 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
uuid, term, reason);
if (!FLAGS_evict_failed_followers) {
- LOG(INFO) << state_->LogPrefixThreadSafe() << fail_msg
+ LOG(INFO) << LogPrefixThreadSafe() << fail_msg
<< "Eviction of failed followers is disabled. Doing nothing.";
return;
}
RaftConfigPB committed_config;
{
- ReplicaState::UniqueLock lock;
- Status s = state_->LockForRead(&lock);
+ UniqueLock lock;
+ Status s = LockForRead(&lock);
if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << state_->LogPrefixThreadSafe() << fail_msg
- << "Unable to lock ReplicaState for read: " << s.ToString();
+ LOG(WARNING) << LogPrefixThreadSafe() << fail_msg
+ << "Unable to lock consensus for read: " << s.ToString();
return;
}
- int64_t current_term = state_->GetCurrentTermUnlocked();
+ int64_t current_term = GetCurrentTermUnlocked();
if (current_term != term) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "Notified about a follower failure in "
<< "previous term " << term << ", but a leader election "
@@ -704,34 +714,34 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
return;
}
- if (state_->IsConfigChangePendingUnlocked()) {
+ if (IsConfigChangePendingUnlocked()) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "There is already a config change operation "
<< "in progress. Unable to evict follower until it completes. "
<< "Doing nothing.";
return;
}
- committed_config = state_->GetCommittedConfigUnlocked();
+ committed_config = GetCommittedConfigUnlocked();
}
- // Run config change on thread pool after dropping ReplicaState lock.
+ // Run config change on thread pool after dropping lock.
WARN_NOT_OK(thread_pool_->SubmitClosure(Bind(&RaftConsensus::TryRemoveFollowerTask,
this, uuid, committed_config, reason)),
- state_->LogPrefixThreadSafe() + "Unable to start RemoteFollowerTask");
+ LogPrefixThreadSafe() + "Unable to start RemoteFollowerTask");
}
void RaftConsensus::TryRemoveFollowerTask(const string& uuid,
const RaftConfigPB& committed_config,
const std::string& reason) {
ChangeConfigRequestPB req;
- req.set_tablet_id(tablet_id());
+ req.set_tablet_id(options_.tablet_id);
req.mutable_server()->set_permanent_uuid(uuid);
req.set_type(REMOVE_SERVER);
req.set_cas_config_opid_index(committed_config.opid_index());
- LOG(INFO) << state_->LogPrefixThreadSafe() << "Attempting to remove follower "
+ LOG(INFO) << LogPrefixThreadSafe() << "Attempting to remove follower "
<< uuid << " from the Raft config. Reason: " << reason;
boost::optional<TabletServerErrorPB::Code> error_code;
WARN_NOT_OK(ChangeConfig(req, Bind(&DoNothingStatusCB), &error_code),
- state_->LogPrefixThreadSafe() + "Unable to remove follower " + uuid);
+ LogPrefixThreadSafe() + "Unable to remove follower " + uuid);
}
Status RaftConsensus::Update(const ConsensusRequestPB* request,
@@ -743,7 +753,7 @@ Status RaftConsensus::Update(const ConsensusRequestPB* request,
"is set to true.");
}
- response->set_responder_uuid(state_->GetPeerUuid());
+ response->set_responder_uuid(peer_uuid_);
VLOG_WITH_PREFIX(2) << "Replica received request: " << SecureShortDebugString(*request);
@@ -753,7 +763,7 @@ Status RaftConsensus::Update(const ConsensusRequestPB* request,
if (PREDICT_FALSE(VLOG_IS_ON(1))) {
if (request->ops_size() == 0) {
VLOG_WITH_PREFIX(1) << "Replica replied to status only request. Replica: "
- << state_->ToString() << ". Response: "
+ << ToString() << ". Response: "
<< SecureShortDebugString(*response);
}
}
@@ -784,10 +794,10 @@ Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg
}
Status RaftConsensus::IsSingleVoterConfig(bool* single_voter) const {
- ReplicaState::UniqueLock lock;
- RETURN_NOT_OK(state_->LockForRead(&lock));
- const RaftConfigPB& config = state_->GetCommittedConfigUnlocked();
- const string& uuid = state_->GetPeerUuid();
+ UniqueLock lock;
+ RETURN_NOT_OK(LockForRead(&lock));
+ const RaftConfigPB& config = GetCommittedConfigUnlocked();
+ const string& uuid = peer_uuid_;
if (CountVoters(config) == 1 && IsRaftConfigVoter(uuid, config)) {
*single_voter = true;
} else {
@@ -876,16 +886,16 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB* request,
ConsensusResponsePB* response) {
// Do term checks first:
- if (PREDICT_FALSE(request->caller_term() != state_->GetCurrentTermUnlocked())) {
+ if (PREDICT_FALSE(request->caller_term() != GetCurrentTermUnlocked())) {
// If less, reject.
- if (request->caller_term() < state_->GetCurrentTermUnlocked()) {
+ if (request->caller_term() < GetCurrentTermUnlocked()) {
string msg = Substitute("Rejecting Update request from peer $0 for earlier term $1. "
"Current term is $2. Ops: $3",
request->caller_uuid(),
request->caller_term(),
- state_->GetCurrentTermUnlocked(),
+ GetCurrentTermUnlocked(),
OpsRangeString(*request));
LOG_WITH_PREFIX_UNLOCKED(INFO) << msg;
FillConsensusResponseError(response,
@@ -1019,13 +1029,13 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* reque
// the effective leader of the configuration. If they are not currently marked as
// the leader locally, mark them as leader now.
const string& caller_uuid = request->caller_uuid();
- if (PREDICT_FALSE(state_->HasLeaderUnlocked() &&
- state_->GetLeaderUuidUnlocked() != caller_uuid)) {
+ if (PREDICT_FALSE(HasLeaderUnlocked() &&
+ GetLeaderUuidUnlocked() != caller_uuid)) {
LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Unexpected new leader in same term! "
- << "Existing leader UUID: " << state_->GetLeaderUuidUnlocked() << ", "
+ << "Existing leader UUID: " << GetLeaderUuidUnlocked() << ", "
<< "new leader UUID: " << caller_uuid;
}
- if (PREDICT_FALSE(!state_->HasLeaderUnlocked())) {
+ if (PREDICT_FALSE(!HasLeaderUnlocked())) {
SetLeaderUuidUnlocked(request->caller_uuid());
}
@@ -1035,8 +1045,8 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* reque
Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
ConsensusResponsePB* response) {
TRACE_EVENT2("consensus", "RaftConsensus::UpdateReplica",
- "peer", peer_uuid(),
- "tablet", tablet_id());
+ "peer", peer_uuid_,
+ "tablet", options_.tablet_id);
Synchronizer log_synchronizer;
StatusCallback sync_status_cb = log_synchronizer.AsStatusCallback();
@@ -1132,8 +1142,8 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
LeaderRequest deduped_req;
{
- ReplicaState::UniqueLock lock;
- RETURN_NOT_OK(state_->LockForUpdate(&lock));
+ UniqueLock lock;
+ RETURN_NOT_OK(LockForUpdate(&lock));
deduped_req.leader_uuid = request->caller_uuid();
@@ -1323,8 +1333,8 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
// If just waiting for our log append to finish lets snooze the timer.
// We don't want to fire leader election because we're waiting on our own log.
if (s.IsTimedOut()) {
- ReplicaState::UniqueLock lock;
- RETURN_NOT_OK(state_->LockForRead(&lock));
+ UniqueLock lock;
+ RETURN_NOT_OK(LockForRead(&lock));
SnoozeFailureDetectorUnlocked();
}
} while (s.IsTimedOut());
@@ -1333,7 +1343,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
TRACE("finished");
}
- VLOG_WITH_PREFIX(2) << "Replica updated. " << state_->ToString()
+ VLOG_WITH_PREFIX(2) << "Replica updated. " << ToString()
<< ". Request: " << SecureShortDebugString(*request);
TRACE("UpdateReplicas() finished");
@@ -1342,7 +1352,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) {
TRACE("Filling consensus response to leader.");
- response->set_responder_term(state_->GetCurrentTermUnlocked());
+ response->set_responder_term(GetCurrentTermUnlocked());
response->mutable_status()->mutable_last_received()->CopyFrom(
queue_->GetLastOpIdInLog());
response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(
@@ -1361,13 +1371,13 @@ void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* response) {
TRACE_EVENT2("consensus", "RaftConsensus::RequestVote",
- "peer", peer_uuid(),
- "tablet", tablet_id());
- response->set_responder_uuid(state_->GetPeerUuid());
+ "peer", peer_uuid_,
+ "tablet", options_.tablet_id);
+ response->set_responder_uuid(peer_uuid_);
// We must acquire the update lock in order to ensure that this vote action
// takes place between requests.
- // Lock ordering: The update lock must be acquired before the ReplicaState lock.
+ // Lock ordering: update_lock_ must be acquired before lock_.
std::unique_lock<simple_spinlock> update_guard(update_lock_, std::defer_lock);
if (FLAGS_enable_leader_failure_detection) {
update_guard.try_lock();
@@ -1383,18 +1393,18 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
// timeouts, just vote a quick NO.
//
// We still need to take the state lock in order to respond with term info, etc.
- ReplicaState::UniqueLock state_guard;
- RETURN_NOT_OK(state_->LockForConfigChange(&state_guard));
+ UniqueLock state_guard;
+ RETURN_NOT_OK(LockForConfigChange(&state_guard));
return RequestVoteRespondIsBusy(request, response);
}
// Acquire the replica state lock so we can read / modify the consensus state.
- ReplicaState::UniqueLock state_guard;
- RETURN_NOT_OK(state_->LockForConfigChange(&state_guard));
+ UniqueLock state_guard;
+ RETURN_NOT_OK(LockForConfigChange(&state_guard));
// If the node is not in the configuration, allow the vote (this is required by Raft)
// but log an informational message anyway.
- if (!IsRaftConfigMember(request->candidate_uuid(), state_->GetActiveConfigUnlocked())) {
+ if (!IsRaftConfigMember(request->candidate_uuid(), GetActiveConfigUnlocked())) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Handling vote request from an unknown peer "
<< request->candidate_uuid();
}
@@ -1421,16 +1431,16 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
}
// Candidate is running behind.
- if (request->candidate_term() < state_->GetCurrentTermUnlocked()) {
+ if (request->candidate_term() < GetCurrentTermUnlocked()) {
return RequestVoteRespondInvalidTerm(request, response);
}
// We already voted this term.
- if (request->candidate_term() == state_->GetCurrentTermUnlocked() &&
- state_->HasVotedCurrentTermUnlocked()) {
+ if (request->candidate_term() == GetCurrentTermUnlocked() &&
+ HasVotedCurrentTermUnlocked()) {
// Already voted for the same candidate in the current term.
- if (state_->GetVotedForCurrentTermUnlocked() == request->candidate_uuid()) {
+ if (GetVotedForCurrentTermUnlocked() == request->candidate_uuid()) {
return RequestVoteRespondVoteAlreadyGranted(request, response);
}
@@ -1449,14 +1459,14 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
// has actually now successfully become leader of the prior term, in which case
// bumping our term here would disrupt it.
if (!request->is_pre_election() &&
- request->candidate_term() > state_->GetCurrentTermUnlocked()) {
+ request->candidate_term() > GetCurrentTermUnlocked()) {
// If we are going to vote for this peer, then we will flush the consensus metadata
// to disk below when we record the vote, and we can skip flushing the term advancement
// to disk here.
- auto flush = vote_yes ? ReplicaState::SKIP_FLUSH_TO_DISK : ReplicaState::FLUSH_TO_DISK;
+ auto flush = vote_yes ? SKIP_FLUSH_TO_DISK : FLUSH_TO_DISK;
RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term(), flush),
Substitute("Could not step down in RequestVote. Current term: $0, candidate term: $1",
- state_->GetCurrentTermUnlocked(), request->candidate_term()));
+ GetCurrentTermUnlocked(), request->candidate_term()));
}
if (!vote_yes) {
@@ -1482,10 +1492,10 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
ChangeConfigType type = req.type();
const RaftPeerPB& server = req.server();
{
- ReplicaState::UniqueLock lock;
- RETURN_NOT_OK(state_->LockForConfigChange(&lock));
- RETURN_NOT_OK(state_->CheckActiveLeaderUnlocked());
- RETURN_NOT_OK(state_->CheckNoConfigChangePendingUnlocked());
+ UniqueLock lock;
+ RETURN_NOT_OK(LockForConfigChange(&lock));
+ RETURN_NOT_OK(CheckActiveLeaderUnlocked());
+ RETURN_NOT_OK(CheckNoConfigChangePendingUnlocked());
// We are required by Raft to reject config change operations until we have
// committed at least one operation in our current term as leader.
@@ -1498,7 +1508,7 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
return Status::InvalidArgument("server must have permanent_uuid specified",
SecureShortDebugString(req));
}
- const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked();
+ const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
// Support atomic ChangeConfig requests.
if (req.has_cas_config_opid_index()) {
@@ -1535,13 +1545,13 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
break;
case REMOVE_SERVER:
- if (server_uuid == peer_uuid()) {
+ if (server_uuid == peer_uuid_) {
return Status::InvalidArgument(
Substitute("Cannot remove peer $0 from the config because it is the leader. "
"Force another leader to be elected to remove this server. "
"Consensus state: $1",
server_uuid,
- SecureShortDebugString(state_->ConsensusStateUnlocked())));
+ SecureShortDebugString(ConsensusStateUnlocked())));
}
if (!RemoveFromRaftConfig(&new_config, server_uuid)) {
return Status::NotFound(
@@ -1586,21 +1596,19 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
int64 last_committed_index;
OpId preceding_opid;
uint64 msg_timestamp;
- string local_peer_uuid;
{
// Take the snapshot of the replica state and queue state so that
// we can stick them in the consensus update request later.
- ReplicaState::UniqueLock lock;
- RETURN_NOT_OK(state_->LockForRead(&lock));
- local_peer_uuid = state_->GetPeerUuid();
- current_term = state_->GetCurrentTermUnlocked();
- committed_config = state_->GetCommittedConfigUnlocked();
- if (state_->IsConfigChangePendingUnlocked()) {
+ UniqueLock lock;
+ RETURN_NOT_OK(LockForRead(&lock));
+ current_term = GetCurrentTermUnlocked();
+ committed_config = GetCommittedConfigUnlocked();
+ if (IsConfigChangePendingUnlocked()) {
LOG_WITH_PREFIX_UNLOCKED(WARNING)
<< "Replica has a pending config, but the new config "
<< "will be unsafely changed anyway. "
<< "Currently pending config on the node: "
- << SecureShortDebugString(state_->GetPendingConfigUnlocked());
+ << SecureShortDebugString(GetPendingConfigUnlocked());
}
all_replicated_index = queue_->GetAllReplicatedIndex();
last_committed_index = queue_->GetCommittedIndex();
@@ -1641,13 +1649,13 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
// Although it is valid for a local replica to not have itself
// in the committed config, it is rare and a replica without itself
// in the latest config is definitely not caught up with the latest leader's log.
- if (!IsRaftConfigVoter(local_peer_uuid, new_config)) {
+ if (!IsRaftConfigVoter(peer_uuid_, new_config)) {
return Status::InvalidArgument(Substitute("Local replica uuid $0 is not "
"a VOTER in the new config, "
"rejecting the unsafe config "
"change request for tablet $1. "
"Rejected config: $2" ,
- local_peer_uuid, req.tablet_id(),
+ peer_uuid_, req.tablet_id(),
SecureShortDebugString(new_config)));
}
new_config.set_unsafe_config_change(true);
@@ -1718,9 +1726,9 @@ void RaftConsensus::Shutdown() {
if (shutdown_.Load(kMemOrderAcquire)) return;
{
- ReplicaState::UniqueLock lock;
+ UniqueLock lock;
// Transition to kShuttingDown state.
- CHECK_OK(state_->LockForShutdown(&lock));
+ CHECK_OK(LockForShutdown(&lock));
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down.";
}
@@ -1730,13 +1738,12 @@ void RaftConsensus::Shutdown() {
// We must close the queue after we close the peers.
queue_->Close();
-
{
- ReplicaState::UniqueLock lock;
- CHECK_OK(state_->LockForShutdown(&lock));
- CHECK_EQ(ReplicaState::kShuttingDown, state_->state());
+ UniqueLock lock;
+ CHECK_OK(LockForShutdown(&lock));
+ CHECK_EQ(kShuttingDown, state_);
CHECK_OK(pending_.CancelPendingTransactions());
- CHECK_OK(state_->ShutdownUnlocked());
+ state_ = kShutDown;
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!";
}
@@ -1772,25 +1779,25 @@ Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg
}
Status RaftConsensus::AdvanceTermForTests(int64_t new_term) {
- ReplicaState::UniqueLock lock;
- CHECK_OK(state_->LockForConfigChange(&lock));
+ UniqueLock lock;
+ CHECK_OK(LockForConfigChange(&lock));
return HandleTermAdvanceUnlocked(new_term);
}
std::string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const {
return Substitute("$0Leader $1election vote request",
- state_->LogPrefixUnlocked(),
+ LogPrefixUnlocked(),
request.is_pre_election() ? "pre-" : "");
}
void RaftConsensus::FillVoteResponseVoteGranted(VoteResponsePB* response) {
- response->set_responder_term(state_->GetCurrentTermUnlocked());
+ response->set_responder_term(GetCurrentTermUnlocked());
response->set_vote_granted(true);
}
void RaftConsensus::FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code,
VoteResponsePB* response) {
- response->set_responder_term(state_->GetCurrentTermUnlocked());
+ response->set_responder_term(GetCurrentTermUnlocked());
response->set_vote_granted(false);
response->mutable_consensus_error()->set_code(error_code);
}
@@ -1803,7 +1810,7 @@ Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request
GetRequestVoteLogPrefixUnlocked(*request),
request->candidate_uuid(),
request->candidate_term(),
- state_->GetCurrentTermUnlocked());
+ GetCurrentTermUnlocked());
LOG(INFO) << msg;
StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
return Status::OK();
@@ -1827,8 +1834,8 @@ Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB
"Already voted for candidate $3 in this term.",
GetRequestVoteLogPrefixUnlocked(*request),
request->candidate_uuid(),
- state_->GetCurrentTermUnlocked(),
- state_->GetVotedForCurrentTermUnlocked());
+ GetCurrentTermUnlocked(),
+ GetVotedForCurrentTermUnlocked());
LOG(INFO) << msg;
StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
return Status::OK();
@@ -1890,7 +1897,7 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
if (!request->is_pre_election()) {
// Persist our vote to disk.
- RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(request->candidate_uuid()));
+ RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(request->candidate_uuid()));
}
FillVoteResponseVoteGranted(response);
@@ -1902,38 +1909,30 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.",
GetRequestVoteLogPrefixUnlocked(*request),
request->candidate_uuid(),
- state_->GetCurrentTermUnlocked());
+ GetCurrentTermUnlocked());
return Status::OK();
}
RaftPeerPB::Role RaftConsensus::role() const {
- ReplicaState::UniqueLock lock;
- CHECK_OK(state_->LockForRead(&lock));
- return state_->GetActiveRoleUnlocked();
-}
-
-std::string RaftConsensus::LogPrefixUnlocked() {
- return state_->LogPrefixUnlocked();
-}
-
-std::string RaftConsensus::LogPrefix() {
- return state_->LogPrefix();
+ UniqueLock lock;
+ CHECK_OK(LockForRead(&lock));
+ return GetActiveRoleUnlocked();
}
void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
+ DCHECK(lock_.is_locked());
failed_elections_since_stable_leader_ = 0;
- state_->SetLeaderUuidUnlocked(uuid);
- MarkDirty("New leader " + uuid);
+ cmeta_->set_leader_uuid(uuid);
+ MarkDirty(Substitute("New leader $0", uuid));
}
-
Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_config,
const RaftConfigPB& new_config,
const StatusCallback& client_cb) {
auto cc_replicate = new ReplicateMsg();
cc_replicate->set_op_type(CHANGE_CONFIG_OP);
ChangeConfigRecordPB* cc_req = cc_replicate->mutable_change_config_record();
- cc_req->set_tablet_id(tablet_id());
+ cc_req->set_tablet_id(options_.tablet_id);
*cc_req->mutable_old_config() = old_config;
*cc_req->mutable_new_config() = new_config;
CHECK_OK(time_manager_->AssignTimestamp(cc_replicate));
@@ -1950,8 +1949,8 @@ Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_conf
}
Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
- DCHECK_EQ(RaftPeerPB::LEADER, state_->GetActiveRoleUnlocked());
- const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
+ DCHECK_EQ(RaftPeerPB::LEADER, GetActiveRoleUnlocked());
+ const RaftConfigPB& active_config = GetActiveConfigUnlocked();
// Change the peers so that we're able to replicate messages remotely and
// locally. The peer manager must be closed before updating the active config
@@ -1961,46 +1960,46 @@ Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
// TODO(todd): should use queue committed index here? in that case do
// we need to pass it in at all?
queue_->SetLeaderMode(pending_.GetCommittedIndex(),
- state_->GetCurrentTermUnlocked(),
+ GetCurrentTermUnlocked(),
active_config);
RETURN_NOT_OK(peer_manager_->UpdateRaftConfig(active_config));
return Status::OK();
}
-string RaftConsensus::peer_uuid() const {
- return state_->GetPeerUuid();
+const string& RaftConsensus::peer_uuid() const {
+ return peer_uuid_;
}
-string RaftConsensus::tablet_id() const {
- return state_->GetOptions().tablet_id;
+const string& RaftConsensus::tablet_id() const {
+ return options_.tablet_id;
}
ConsensusStatePB RaftConsensus::ConsensusState() const {
- ReplicaState::UniqueLock lock;
- CHECK_OK(state_->LockForRead(&lock));
- return state_->ConsensusStateUnlocked();
+ UniqueLock lock;
+ CHECK_OK(LockForRead(&lock));
+ return ConsensusStateUnlocked();
}
RaftConfigPB RaftConsensus::CommittedConfig() const {
- ReplicaState::UniqueLock lock;
- CHECK_OK(state_->LockForRead(&lock));
- return state_->GetCommittedConfigUnlocked();
+ UniqueLock lock;
+ CHECK_OK(LockForRead(&lock));
+ return GetCommittedConfigUnlocked();
}
void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
out << "<h1>Raft Consensus State</h1>" << std::endl;
out << "<h2>State</h2>" << std::endl;
- out << "<pre>" << EscapeForHtmlToString(state_->ToString()) << "</pre>" << std::endl;
+ out << "<pre>" << EscapeForHtmlToString(ToString()) << "</pre>" << std::endl;
out << "<h2>Queue</h2>" << std::endl;
out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
// Dump the queues on a leader.
RaftPeerPB::Role role;
{
- ReplicaState::UniqueLock lock;
- CHECK_OK(state_->LockForRead(&lock));
- role = state_->GetActiveRoleUnlocked();
+ UniqueLock lock;
+ CHECK_OK(LockForRead(&lock));
+ role = GetActiveRoleUnlocked();
}
if (role == RaftPeerPB::LEADER) {
out << "<h2>Queue overview</h2>" << std::endl;
@@ -2011,17 +2010,13 @@ void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
}
}
-ReplicaState* RaftConsensus::GetReplicaStateForTests() {
- return state_.get();
-}
-
void RaftConsensus::ElectionCallback(ElectionReason reason, const ElectionResult& result) {
// The election callback runs on a reactor thread, so we need to defer to our
// threadpool. If the threadpool is already shut down for some reason, it's OK --
// we're OK with the callback never running.
WARN_NOT_OK(thread_pool_->SubmitClosure(Bind(&RaftConsensus::DoElectionCallback,
this, reason, result)),
- state_->LogPrefixThreadSafe() + "Unable to run election callback");
+ LogPrefixThreadSafe() + "Unable to run election callback");
}
void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResult& result) {
@@ -2031,8 +2026,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
// Snooze to avoid the election timer firing again as much as possible.
{
- ReplicaState::UniqueLock lock;
- CHECK_OK(state_->LockForRead(&lock));
+ UniqueLock lock;
+ CHECK_OK(LockForRead(&lock));
// We need to snooze when we win and when we lose:
// - When we win because we're about to disable the timer and become leader.
// - When we lose or otherwise we can fall into a cycle, where everyone keeps
@@ -2056,7 +2051,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
// because it already voted in term 2. The check below ensures that peer B
// will bump to term 2 when it gets the vote rejection, such that its
// next pre-election (for term 3) would succeed.
- if (result.highest_voter_term > state_->GetCurrentTermUnlocked()) {
+ if (result.highest_voter_term > GetCurrentTermUnlocked()) {
HandleTermAdvanceUnlocked(result.highest_voter_term);
}
@@ -2069,8 +2064,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
}
// The vote was granted, become leader.
- ReplicaState::UniqueLock lock;
- Status s = state_->LockForConfigChange(&lock);
+ UniqueLock lock;
+ Status s = LockForConfigChange(&lock);
if (PREDICT_FALSE(!s.ok())) {
LOG_WITH_PREFIX(INFO) << "Received " << election_type << " callback for term "
<< election_term << " while not running: "
@@ -2085,7 +2080,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
election_started_in_term--;
}
- if (election_started_in_term != state_->GetCurrentTermUnlocked()) {
+ if (election_started_in_term != GetCurrentTermUnlocked()) {
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Leader " << election_type << " decision vote started in "
<< "defunct term " << election_started_in_term << ": "
@@ -2093,8 +2088,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
return;
}
- const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
- if (!IsRaftConfigVoter(state_->GetPeerUuid(), active_config)) {
+ const RaftConfigPB& active_config = GetActiveConfigUnlocked();
+ if (!IsRaftConfigVoter(peer_uuid_, active_config)) {
LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Leader " << election_type
<< " decision while not in active config. "
<< "Result: Term " << election_term << ": "
@@ -2103,7 +2098,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
return;
}
- if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
+ if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
// If this was a pre-election, it's possible to see the following interleaving:
//
// 1. Term N (follower): send a real election for term N
@@ -2132,7 +2127,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
"Couldn't start leader election after successful pre-election");
} else {
// We won a real election. Convert role to LEADER.
- SetLeaderUuidUnlocked(state_->GetPeerUuid());
+ SetLeaderUuidUnlocked(peer_uuid_);
// TODO(todd): BecomeLeaderUnlocked() can fail due to state checks during shutdown.
// It races with the above state check.
@@ -2142,8 +2137,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
}
Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
- ReplicaState::UniqueLock lock;
- RETURN_NOT_OK(state_->LockForRead(&lock));
+ UniqueLock lock;
+ RETURN_NOT_OK(LockForRead(&lock));
if (type == RECEIVED_OPID) {
*DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog();
} else if (type == COMMITTED_OPID) {
@@ -2166,7 +2161,7 @@ log::RetentionIndexes RaftConsensus::GetRetentionIndexes() {
void RaftConsensus::MarkDirty(const std::string& reason) {
WARN_NOT_OK(thread_pool_->SubmitClosure(Bind(mark_dirty_clbk_, reason)),
- state_->LogPrefixThreadSafe() + "Unable to run MarkDirty callback");
+ LogPrefixThreadSafe() + "Unable to run MarkDirty callback");
}
void RaftConsensus::MarkDirtyOnSuccess(const string& reason,
@@ -2181,8 +2176,9 @@ void RaftConsensus::MarkDirtyOnSuccess(const string& reason,
void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
const StatusCallback& client_cb,
const Status& status) {
- // NOTE: the ReplicaState lock is held here because this is triggered by
- // ReplicaState's abort or commit paths.
+ // NOTE: lock_ is held here because this is triggered by
+ // PendingRounds::AbortOpsAfter() and AdvanceCommittedIndex().
+ DCHECK(lock_.is_locked());
OperationType op_type = round->replicate_msg()->op_type();
const string& op_type_str = OperationType_Name(op_type);
CHECK(IsConsensusOnlyOperation(op_type)) << "Unexpected op type: " << op_type_str;
@@ -2192,13 +2188,16 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
// Fall through to the generic handling.
}
+ // TODO(mpercy): May need some refactoring to unlock 'lock_' before invoking
+ // the client callback.
+
if (!status.ok()) {
- LOG(INFO) << state_->LogPrefixThreadSafe() << op_type_str << " replication failed: "
+ LOG(INFO) << LogPrefixThreadSafe() << op_type_str << " replication failed: "
<< status.ToString();
client_cb.Run(status);
return;
}
- VLOG(1) << state_->LogPrefixThreadSafe() << "Committing " << op_type_str << " with op id "
+ VLOG(1) << LogPrefixThreadSafe() << "Committing " << op_type_str << " with op id "
<< round->id();
gscoped_ptr<CommitMsg> commit_msg(new CommitMsg);
commit_msg->set_op_type(round->replicate_msg()->op_type());
@@ -2216,11 +2215,11 @@ void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, con
if (!status.ok()) {
// If the config change being aborted is the current pending one, abort it.
- if (state_->IsConfigChangePendingUnlocked() &&
- state_->GetPendingConfigUnlocked().opid_index() == op_id.index()) {
+ if (IsConfigChangePendingUnlocked() &&
+ GetPendingConfigUnlocked().opid_index() == op_id.index()) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Aborting config change with OpId "
<< op_id << ": " << status.ToString();
- state_->ClearPendingConfigUnlocked();
+ ClearPendingConfigUnlocked();
} else {
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Skipping abort of non-pending config change with OpId "
@@ -2250,14 +2249,14 @@ void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, con
// Check if the pending Raft config has an OpId less than the committed
// config. If so, this is a replay at startup in which the COMMIT
// messages were delayed.
- const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked();
+ const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
if (new_config.opid_index() > committed_config.opid_index()) {
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Committing config change with OpId "
<< op_id << ": "
<< DiffRaftConfigs(old_config, new_config)
<< ". New config: { " << SecureShortDebugString(new_config) << " }";
- CHECK_OK(state_->SetCommittedConfigUnlocked(new_config));
+ CHECK_OK(SetCommittedConfigUnlocked(new_config));
} else {
LOG_WITH_PREFIX_UNLOCKED(INFO)
<< "Ignoring commit of config change with OpId "
@@ -2347,23 +2346,303 @@ MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
}
Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
- ReplicaState::FlushToDisk flush) {
- if (new_term <= state_->GetCurrentTermUnlocked()) {
+ FlushToDisk flush) {
+ if (new_term <= GetCurrentTermUnlocked()) {
return Status::IllegalState(Substitute("Can't advance term to: $0 current term: $1 is higher.",
- new_term, state_->GetCurrentTermUnlocked()));
+ new_term, GetCurrentTermUnlocked()));
}
- if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
+ if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Stepping down as leader of term "
- << state_->GetCurrentTermUnlocked();
+ << GetCurrentTermUnlocked();
RETURN_NOT_OK(BecomeReplicaUnlocked());
}
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advancing to term " << new_term;
- RETURN_NOT_OK(state_->SetCurrentTermUnlocked(new_term, flush));
+ RETURN_NOT_OK(SetCurrentTermUnlocked(new_term, flush));
term_metric_->set_value(new_term);
last_received_cur_leader_ = MinimumOpId();
return Status::OK();
}
+Status RaftConsensus::LockForStart(UniqueLock* lock) const {
+ ThreadRestrictions::AssertWaitAllowed();
+ UniqueLock l(lock_);
+ CHECK_EQ(state_, kInitialized) << "Illegal state for Start()."
+ << " Replica is not in kInitialized state";
+ lock->swap(l);
+ return Status::OK();
+}
+
+Status RaftConsensus::LockForRead(UniqueLock* lock) const {
+ ThreadRestrictions::AssertWaitAllowed();
+ UniqueLock l(lock_);
+ lock->swap(l);
+ return Status::OK();
+}
+
+Status RaftConsensus::LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const {
+ ThreadRestrictions::AssertWaitAllowed();
+ DCHECK(!msg.has_id()) << "Should not have an ID yet: " << SecureShortDebugString(msg);
+ UniqueLock l(lock_);
+ if (PREDICT_FALSE(state_ != kRunning)) {
+ return Status::IllegalState("Replica not in running state");
+ }
+
+ RETURN_NOT_OK(CheckActiveLeaderUnlocked());
+ lock->swap(l);
+ return Status::OK();
+}
+
+Status RaftConsensus::LockForCommit(UniqueLock* lock) const {
+ TRACE_EVENT0("consensus", "RaftConsensus::LockForCommit");
+ ThreadRestrictions::AssertWaitAllowed();
+ UniqueLock l(lock_);
+ if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) {
+ return Status::IllegalState("Replica not in running state");
+ }
+ lock->swap(l);
+ return Status::OK();
+}
+
+Status RaftConsensus::LockForUpdate(UniqueLock* lock) const {
+ TRACE_EVENT0("consensus", "RaftConsensus::LockForUpdate");
+ ThreadRestrictions::AssertWaitAllowed();
+ UniqueLock l(lock_);
+ if (PREDICT_FALSE(state_ != kRunning)) {
+ return Status::IllegalState("Replica not in running state");
+ }
+ if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) {
+ LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
+ }
+ lock->swap(l);
+ return Status::OK();
+}
+
+Status RaftConsensus::LockForConfigChange(UniqueLock* lock) const {
+ TRACE_EVENT0("consensus", "RaftConsensus::LockForConfigChange");
+
+ ThreadRestrictions::AssertWaitAllowed();
+ UniqueLock l(lock_);
+ // Can only change the config on running replicas.
+ if (PREDICT_FALSE(state_ != kRunning)) {
+ return Status::IllegalState("Unable to lock ReplicaState for config change",
+ Substitute("State = $0", state_));
+ }
+ lock->swap(l);
+ return Status::OK();
+}
+
+Status RaftConsensus::LockForShutdown(UniqueLock* lock) {
+ TRACE_EVENT0("consensus", "RaftConsensus::LockForShutdown");
+ ThreadRestrictions::AssertWaitAllowed();
+ UniqueLock l(lock_);
+ if (state_ != kShuttingDown && state_ != kShutDown) {
+ state_ = kShuttingDown;
+ }
+ lock->swap(l);
+ return Status::OK();
+}
+
+Status RaftConsensus::CheckActiveLeaderUnlocked() const {
+ RaftPeerPB::Role role = GetActiveRoleUnlocked();
+ switch (role) {
+ case RaftPeerPB::LEADER:
+ return Status::OK();
+ default:
+ ConsensusStatePB cstate = ConsensusStateUnlocked();
+ return Status::IllegalState(Substitute("Replica $0 is not leader of this config. Role: $1. "
+ "Consensus state: $2",
+ peer_uuid_,
+ RaftPeerPB::Role_Name(role),
+ SecureShortDebugString(cstate)));
+ }
+}
+
+ConsensusStatePB RaftConsensus::ConsensusStateUnlocked() const {
+ return cmeta_->ToConsensusStatePB();
+}
+
+RaftPeerPB::Role RaftConsensus::GetActiveRoleUnlocked() const {
+ DCHECK(lock_.is_locked());
+ return cmeta_->active_role();
+}
+
+bool RaftConsensus::IsConfigChangePendingUnlocked() const {
+ DCHECK(lock_.is_locked());
+ return cmeta_->has_pending_config();
+}
+
+Status RaftConsensus::CheckNoConfigChangePendingUnlocked() const {
+ DCHECK(lock_.is_locked());
+ if (IsConfigChangePendingUnlocked()) {
+ return Status::IllegalState(
+ Substitute("RaftConfig change currently pending. Only one is allowed at a time.\n"
+ " Committed config: $0.\n Pending config: $1",
+ SecureShortDebugString(GetCommittedConfigUnlocked()),
+ SecureShortDebugString(GetPendingConfigUnlocked())));
+ }
+ return Status::OK();
+}
+
+Status RaftConsensus::SetPendingConfigUnlocked(const RaftConfigPB& new_config) {
+ DCHECK(lock_.is_locked());
+ RETURN_NOT_OK_PREPEND(VerifyRaftConfig(new_config, PENDING_CONFIG),
+ "Invalid config to set as pending");
+ if (!new_config.unsafe_config_change()) {
+ CHECK(!cmeta_->has_pending_config())
+ << "Attempt to set pending config while another is already pending! "
+ << "Existing pending config: " << SecureShortDebugString(cmeta_->pending_config()) << "; "
+ << "Attempted new pending config: " << SecureShortDebugString(new_config);
+ } else if (cmeta_->has_pending_config()) {
+ LOG_WITH_PREFIX_UNLOCKED(INFO)
+ << "Allowing unsafe config change even though there is a pending config! "
+ << "Existing pending config: " << SecureShortDebugString(cmeta_->pending_config()) << "; "
+ << "New pending config: " << SecureShortDebugString(new_config);
+ }
+ cmeta_->set_pending_config(new_config);
+ return Status::OK();
+}
+
+void RaftConsensus::ClearPendingConfigUnlocked() {
+ cmeta_->clear_pending_config();
+}
+
+const RaftConfigPB& RaftConsensus::GetPendingConfigUnlocked() const {
+ DCHECK(lock_.is_locked());
+ CHECK(IsConfigChangePendingUnlocked()) << "No pending config";
+ return cmeta_->pending_config();
+}
+
+Status RaftConsensus::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit) {
+ TRACE_EVENT0("consensus", "RaftConsensus::SetCommittedConfigUnlocked");
+ DCHECK(lock_.is_locked());
+ DCHECK(config_to_commit.IsInitialized());
+ RETURN_NOT_OK_PREPEND(VerifyRaftConfig(config_to_commit, COMMITTED_CONFIG),
+ "Invalid config to set as committed");
+
+ // Compare committed with pending configuration, ensure that they are the same.
+ // In the event of an unsafe config change triggered by an administrator,
+ // it is possible that the config being committed may not match the pending config
+ // because unsafe config change allows multiple pending configs to exist.
+ // Therefore we only need to validate that 'config_to_commit' matches the pending config
+ // if the pending config does not have its 'unsafe_config_change' flag set.
+ if (IsConfigChangePendingUnlocked()) {
+ const RaftConfigPB& pending_config = GetPendingConfigUnlocked();
+ if (!pending_config.unsafe_config_change()) {
+ // Quorums must be exactly equal, even w.r.t. peer ordering.
+ CHECK_EQ(GetPendingConfigUnlocked().SerializeAsString(),
+ config_to_commit.SerializeAsString())
+ << Substitute("New committed config must equal pending config, but does not. "
+ "Pending config: $0, committed config: $1",
+ SecureShortDebugString(pending_config),
+ SecureShortDebugString(config_to_commit));
+ }
+ }
+ cmeta_->set_committed_config(config_to_commit);
+ cmeta_->clear_pending_config();
+ CHECK_OK(cmeta_->Flush());
+ return Status::OK();
+}
+
+const RaftConfigPB& RaftConsensus::GetCommittedConfigUnlocked() const {
+ DCHECK(lock_.is_locked());
+ return cmeta_->committed_config();
+}
+
+const RaftConfigPB& RaftConsensus::GetActiveConfigUnlocked() const {
+ DCHECK(lock_.is_locked());
+ return cmeta_->active_config();
+}
+
+Status RaftConsensus::SetCurrentTermUnlocked(int64_t new_term,
+ FlushToDisk flush) {
+ TRACE_EVENT1("consensus", "RaftConsensus::SetCurrentTermUnlocked",
+ "term", new_term);
+ DCHECK(lock_.is_locked());
+ if (PREDICT_FALSE(new_term <= GetCurrentTermUnlocked())) {
+ return Status::IllegalState(
+ Substitute("Cannot change term to a term that is lower than or equal to the current one. "
+ "Current: $0, Proposed: $1", GetCurrentTermUnlocked(), new_term));
+ }
+ cmeta_->set_current_term(new_term);
+ cmeta_->clear_voted_for();
+ if (flush == FLUSH_TO_DISK) {
+ CHECK_OK(cmeta_->Flush());
+ }
+ ClearLeaderUnlocked();
+ return Status::OK();
+}
+
+const int64_t RaftConsensus::GetCurrentTermUnlocked() const {
+ DCHECK(lock_.is_locked());
+ return cmeta_->current_term();
+}
+
+const string& RaftConsensus::GetLeaderUuidUnlocked() const {
+ DCHECK(lock_.is_locked());
+ return cmeta_->leader_uuid();
+}
+
+const bool RaftConsensus::HasVotedCurrentTermUnlocked() const {
+ DCHECK(lock_.is_locked());
+ return cmeta_->has_voted_for();
+}
+
+Status RaftConsensus::SetVotedForCurrentTermUnlocked(const std::string& uuid) {
+ TRACE_EVENT1("consensus", "RaftConsensus::SetVotedForCurrentTermUnlocked",
+ "uuid", uuid);
+ DCHECK(lock_.is_locked());
+ cmeta_->set_voted_for(uuid);
+ CHECK_OK(cmeta_->Flush());
+ return Status::OK();
+}
+
+const std::string& RaftConsensus::GetVotedForCurrentTermUnlocked() const {
+ DCHECK(lock_.is_locked());
+ DCHECK(cmeta_->has_voted_for());
+ return cmeta_->voted_for();
+}
+
+const ConsensusOptions& RaftConsensus::GetOptions() const {
+ return options_;
+}
+
+string RaftConsensus::LogPrefix() const {
+ UniqueLock lock;
+ CHECK_OK(LockForRead(&lock));
+ return LogPrefixUnlocked();
+}
+
+string RaftConsensus::LogPrefixUnlocked() const {
+ DCHECK(lock_.is_locked());
+ return Substitute("T $0 P $1 [term $2 $3]: ",
+ options_.tablet_id,
+ peer_uuid_,
+ GetCurrentTermUnlocked(),
+ RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
+}
+
+string RaftConsensus::LogPrefixThreadSafe() const {
+ return Substitute("T $0 P $1: ",
+ options_.tablet_id,
+ peer_uuid_);
+}
+
+string RaftConsensus::ToString() const {
+ ThreadRestrictions::AssertWaitAllowed();
+ UniqueLock lock(lock_);
+ return ToStringUnlocked();
+}
+
+string RaftConsensus::ToStringUnlocked() const {
+ DCHECK(lock_.is_locked());
+ return Substitute("Replica: $0, State: $1, Role: $2",
+ peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
+}
+
+ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const {
+ return cmeta_.get();
+}
+
} // namespace consensus
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index b2186db..b1badde 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_CONSENSUS_RAFT_CONSENSUS_H_
-#define KUDU_CONSENSUS_RAFT_CONSENSUS_H_
+#pragma once
#include <boost/optional/optional_fwd.hpp>
#include <memory>
@@ -30,7 +29,6 @@
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/consensus_queue.h"
#include "kudu/consensus/pending_rounds.h"
-#include "kudu/consensus/raft_consensus_state.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/util/atomic.h"
#include "kudu/util/failure_detector.h"
@@ -64,8 +62,10 @@ struct ElectionResult;
class RaftConsensus : public Consensus,
public PeerMessageQueueObserver {
public:
+ typedef std::unique_lock<simple_spinlock> UniqueLock;
+
static scoped_refptr<RaftConsensus> Create(
- const ConsensusOptions& options,
+ ConsensusOptions options,
std::unique_ptr<ConsensusMetadata> cmeta,
const RaftPeerPB& local_peer_pb,
const scoped_refptr<MetricEntity>& metric_entity,
@@ -76,14 +76,14 @@ class RaftConsensus : public Consensus,
const std::shared_ptr<MemTracker>& parent_mem_tracker,
const Callback<void(const std::string& reason)>& mark_dirty_clbk);
- RaftConsensus(const ConsensusOptions& options,
+ RaftConsensus(ConsensusOptions options,
std::unique_ptr<ConsensusMetadata> cmeta,
gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
gscoped_ptr<PeerMessageQueue> queue,
gscoped_ptr<PeerManager> peer_manager,
gscoped_ptr<ThreadPool> thread_pool,
const scoped_refptr<MetricEntity>& metric_entity,
- const std::string& peer_uuid,
+ std::string peer_uuid,
scoped_refptr<TimeManager> time_manager,
ReplicaTransactionFactory* txn_factory,
const scoped_refptr<log::Log>& log,
@@ -133,9 +133,11 @@ class RaftConsensus : public Consensus,
RaftPeerPB::Role role() const override;
- std::string peer_uuid() const override;
+ // Thread-safe.
+ const std::string& peer_uuid() const override;
- std::string tablet_id() const override;
+ // Thread-safe.
+ const std::string& tablet_id() const override;
scoped_refptr<TimeManager> time_manager() const override { return time_manager_; }
@@ -150,11 +152,6 @@ class RaftConsensus : public Consensus,
// Makes this peer advance it's term (and step down if leader), for tests.
Status AdvanceTermForTests(int64_t new_term);
- // Returns the replica state for tests. This should never be used outside of
- // tests, in particular calling the LockFor* methods on the returned object
- // can cause consensus to deadlock.
- ReplicaState* GetReplicaStateForTests();
-
int update_calls_for_tests() const {
return update_calls_for_tests_.Load();
}
@@ -177,9 +174,29 @@ class RaftConsensus : public Consensus,
log::RetentionIndexes GetRetentionIndexes() override;
private:
- friend class ReplicaState;
friend class RaftConsensusQuorumTest;
+ FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind);
+ FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind);
+ FRIEND_TEST(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum);
FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty);
+ FRIEND_TEST(RaftConsensusQuorumTest, TestRequestVote);
+
+ enum State {
+ // State after the replica is built.
+ kInitialized,
+
+ // State signaling the replica accepts requests (from clients
+ // if leader, from leader if follower)
+ kRunning,
+
+ // State signaling that the replica is shutting down and no longer accepting
+ // new transactions or commits.
+ kShuttingDown,
+
+ // State signaling the replica is shut down and does not accept
+ // any more requests.
+ kShutDown,
+ };
// Control whether printing of log messages should be done for a particular
// function call.
@@ -188,6 +205,12 @@ class RaftConsensus : public Consensus,
ALLOW_LOGGING = 1,
};
+ // Enum for the 'flush' argument to SetCurrentTermUnlocked() below.
+ enum FlushToDisk {
+ SKIP_FLUSH_TO_DISK,
+ FLUSH_TO_DISK,
+ };
+
// Helper struct that contains the messages from the leader that we need to
// append to our log, after they've been deduplicated.
struct LeaderRequest {
@@ -201,10 +224,6 @@ class RaftConsensus : public Consensus,
std::string OpsRangeString() const;
};
- std::string LogPrefixUnlocked();
-
- std::string LogPrefix();
-
// Set the leader UUID of the configuration and mark the tablet config dirty for
// reporting to the master.
void SetLeaderUuidUnlocked(const std::string& uuid);
@@ -224,12 +243,12 @@ class RaftConsensus : public Consensus,
// Returns OK once the change config transaction that has this peer as leader
// has been enqueued, the transaction will complete asynchronously.
//
- // The ReplicaState must be locked for configuration change before calling.
+ // 'lock_' must be held for configuration change before calling.
Status BecomeLeaderUnlocked();
// Makes the peer become a replica, i.e. a FOLLOWER or a LEARNER.
//
- // The ReplicaState must be locked for configuration change before calling.
+ // 'lock_' must be held for configuration change before calling.
Status BecomeReplicaUnlocked();
// Updates the state in a replica by storing the received operations in the log
@@ -286,7 +305,7 @@ class RaftConsensus : public Consensus,
// Raft configuration.
Status IsSingleVoterConfig(bool* single_voter) const;
- // Return header string for RequestVote log messages. The ReplicaState lock must be held.
+ // Return header string for RequestVote log messages. 'lock_' must be held.
std::string GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const;
// Fills the response with the current status, if an update was successful.
@@ -386,7 +405,7 @@ class RaftConsensus : public Consensus,
//
// 'flush' may be used to control whether the term change is flushed to disk.
Status HandleTermAdvanceUnlocked(ConsensusTerm new_term,
- ReplicaState::FlushToDisk flush = ReplicaState::FLUSH_TO_DISK);
+ FlushToDisk flush = FLUSH_TO_DISK);
// Asynchronously (on thread_pool_) notify the TabletReplica that the consensus configuration
// has changed, thus reporting it back to the master.
@@ -426,25 +445,162 @@ class RaftConsensus : public Consensus,
// type of message it is.
// The 'client_cb' will be invoked at the end of this execution.
//
- // NOTE: this must be called with the ReplicaState lock held.
+ // NOTE: Must be called while holding 'lock_'.
void NonTxRoundReplicationFinished(ConsensusRound* round,
const StatusCallback& client_cb,
const Status& status);
// As a leader, append a new ConsensusRound to the queue.
- // Only virtual and protected for mocking purposes.
Status AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round);
// As a follower, start a consensus round not associated with a Transaction.
- // Only virtual and protected for mocking purposes.
Status StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg);
- // Add a new pending operation to the ReplicaState, including the special handling
+ // Add a new pending operation to PendingRounds, including the special handling
// necessary if this round contains a configuration change. These rounds must
// take effect as soon as they are received, rather than waiting for commitment
// (see Diego Ongaro's thesis section 4.1).
Status AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round);
+ // Locks a replica in preparation for StartUnlocked(). Makes
+ // sure the replica is in kInitialized state.
+ Status LockForStart(UniqueLock* lock) const WARN_UNUSED_RESULT;
+
+ // Obtains the lock for a state read, does not check state.
+ Status LockForRead(UniqueLock* lock) const WARN_UNUSED_RESULT;
+
+ // Locks a replica down until the critical section of an append completes,
+ // i.e. until the replicate message has been assigned an id and placed in
+ // the log queue.
+ // This also checks that the replica is in the appropriate
+ // state (role) to replicate the provided operation, that the operation
+ // contains a replicate message and is of the appropriate type, and returns
+ // Status::IllegalState if that is not the case.
+ Status LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const WARN_UNUSED_RESULT;
+
+ // Locks a replica down until the critical section of a commit completes.
+ // This succeeds for all states since a replica which has initiated
+ // a Prepare()/Replicate() must eventually commit even if it's state
+ // has changed after the initial Append()/Update().
+ Status LockForCommit(UniqueLock* lock) const WARN_UNUSED_RESULT;
+
+ // Locks a replica down until an the critical section of an update completes.
+ // Further updates from the same or some other leader will be blocked until
+ // this completes. This also checks that the replica is in the appropriate
+ // state (role) to be updated and returns Status::IllegalState if that
+ // is not the case.
+ Status LockForUpdate(UniqueLock* lock) const WARN_UNUSED_RESULT;
+
+ Status LockForConfigChange(UniqueLock* lock) const WARN_UNUSED_RESULT;
+
+ // Changes the role to non-participant and returns a lock that can be
+ // used to make sure no state updates come in until Shutdown() is
+ // completed.
+ Status LockForShutdown(UniqueLock* lock) WARN_UNUSED_RESULT;
+
+ // Ensure the local peer is the active leader.
+ // Returns OK if leader, IllegalState otherwise.
+ Status CheckActiveLeaderUnlocked() const;
+
+ // Return current consensus state summary.
+ ConsensusStatePB ConsensusStateUnlocked() const;
+
+ // Returns the currently active Raft role.
+ RaftPeerPB::Role GetActiveRoleUnlocked() const;
+
+ // Returns true if there is a configuration change currently in-flight but not yet
+ // committed.
+ bool IsConfigChangePendingUnlocked() const;
+
+ // Inverse of IsConfigChangePendingUnlocked(): returns OK if there is
+ // currently *no* configuration change pending, and IllegalState is there *is* a
+ // configuration change pending.
+ Status CheckNoConfigChangePendingUnlocked() const;
+
+ // Sets the given configuration as pending commit. Does not persist into the peers
+ // metadata. In order to be persisted, SetCommittedConfigUnlocked() must be called.
+ Status SetPendingConfigUnlocked(const RaftConfigPB& new_config) WARN_UNUSED_RESULT;
+
+ // Clear (cancel) the pending configuration.
+ void ClearPendingConfigUnlocked();
+
+ // Return the pending configuration, or crash if one is not set.
+ const RaftConfigPB& GetPendingConfigUnlocked() const;
+
+ // Changes the committed config for this replica. Checks that there is a
+ // pending configuration and that it is equal to this one. Persists changes to disk.
+ // Resets the pending configuration to null.
+ Status SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit);
+
+ // Return the persisted configuration.
+ const RaftConfigPB& GetCommittedConfigUnlocked() const;
+
+ // Return the "active" configuration - if there is a pending configuration return it;
+ // otherwise return the committed configuration.
+ const RaftConfigPB& GetActiveConfigUnlocked() const;
+
+ // Checks if the term change is legal. If so, sets 'current_term'
+ // to 'new_term' and sets 'has voted' to no for the current term.
+ //
+ // If the caller knows that it will call another method soon after
+ // to flush the change to disk, it may set 'flush' to 'SKIP_FLUSH_TO_DISK'.
+ Status SetCurrentTermUnlocked(int64_t new_term,
+ FlushToDisk flush) WARN_UNUSED_RESULT;
+
+ // Returns the term set in the last config change round.
+ const int64_t GetCurrentTermUnlocked() const;
+
+ // Accessors for the leader of the current term.
+ const std::string& GetLeaderUuidUnlocked() const;
+ bool HasLeaderUnlocked() const { return !GetLeaderUuidUnlocked().empty(); }
+ void ClearLeaderUnlocked() { SetLeaderUuidUnlocked(""); }
+
+ // Return whether this peer has voted in the current term.
+ const bool HasVotedCurrentTermUnlocked() const;
+
+ // Record replica's vote for the current term, then flush the consensus
+ // metadata to disk.
+ Status SetVotedForCurrentTermUnlocked(const std::string& uuid) WARN_UNUSED_RESULT;
+
+ // Return replica's vote for the current term.
+ // The vote must be set; use HasVotedCurrentTermUnlocked() to check.
+ const std::string& GetVotedForCurrentTermUnlocked() const;
+
+ const ConsensusOptions& GetOptions() const;
+
+ std::string LogPrefix() const;
+ std::string LogPrefixUnlocked() const;
+
+ // A variant of LogPrefix which does not take the lock. This is a slightly
+ // less thorough prefix which only includes immutable (and thus thread-safe)
+ // information, but does not require the lock.
+ std::string LogPrefixThreadSafe() const;
+
+ std::string ToString() const;
+ std::string ToStringUnlocked() const;
+
+ ConsensusMetadata* consensus_metadata_for_tests() const;
+
+ const ConsensusOptions options_;
+
+ // The UUID of the local peer.
+ const std::string peer_uuid_;
+
+ // TODO(dralves) hack to serialize updates due to repeated/out-of-order messages
+ // should probably be refactored out.
+ //
+ // Lock ordering note: If both 'update_lock_' and 'lock_' are to be taken,
+ // 'update_lock_' lock must be taken first.
+ mutable simple_spinlock update_lock_;
+
+ // Coarse-grained lock that protects all mutable data members.
+ mutable simple_spinlock lock_;
+
+ State state_;
+
+ // Consensus metadata persistence object.
+ std::unique_ptr<ConsensusMetadata> cmeta_;
+
// Threadpool for constructing requests to peers, handling RPC callbacks,
// etc.
gscoped_ptr<ThreadPool> thread_pool_;
@@ -462,10 +618,8 @@ class RaftConsensus : public Consensus,
// The queue of messages that must be sent to peers.
gscoped_ptr<PeerMessageQueue> queue_;
- gscoped_ptr<ReplicaState> state_;
-
// The currently pending rounds that have not yet been committed by
- // consensus. Protected by the locks inside state_.
+ // consensus. Protected by 'lock_'.
// TODO(todd) these locks will become more fine-grained.
PendingRounds pending_;
@@ -493,13 +647,6 @@ class RaftConsensus : public Consensus,
const Callback<void(const std::string& reason)> mark_dirty_clbk_;
- // TODO(dralves) hack to serialize updates due to repeated/out-of-order messages
- // should probably be refactored out.
- //
- // Lock ordering note: If both this lock and the ReplicaState lock are to be
- // taken, this lock must be taken first.
- mutable simple_spinlock update_lock_;
-
AtomicBool shutdown_;
// The number of times Update() has been called, used for some test assertions.
@@ -515,5 +662,3 @@ class RaftConsensus : public Consensus,
} // namespace consensus
} // namespace kudu
-
-#endif /* KUDU_CONSENSUS_RAFT_CONSENSUS_H_ */
http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index a6db53b..bb1575e 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -31,7 +31,6 @@
#include "kudu/consensus/peer_manager.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/raft_consensus.h"
-#include "kudu/consensus/raft_consensus_state.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
@@ -661,9 +660,8 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) {
scoped_refptr<RaftConsensus> follower0;
CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
- ReplicaState* follower0_rs = follower0->GetReplicaStateForTests();
- ReplicaState::UniqueLock lock;
- ASSERT_OK(follower0_rs->LockForRead(&lock));
+ RaftConsensus::UniqueLock lock;
+ ASSERT_OK(follower0->LockForRead(&lock));
// If the locked replica would stop consensus we would hang here
// as we wait for operations to be replicated to a majority.
@@ -705,15 +703,13 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) {
// and never letting them go.
scoped_refptr<RaftConsensus> follower0;
CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
- ReplicaState* follower0_rs = follower0->GetReplicaStateForTests();
- ReplicaState::UniqueLock lock0;
- ASSERT_OK(follower0_rs->LockForRead(&lock0));
+ RaftConsensus::UniqueLock lock0;
+ ASSERT_OK(follower0->LockForRead(&lock0));
scoped_refptr<RaftConsensus> follower1;
CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1));
- ReplicaState* follower1_rs = follower1->GetReplicaStateForTests();
- ReplicaState::UniqueLock lock1;
- ASSERT_OK(follower1_rs->LockForRead(&lock1));
+ RaftConsensus::UniqueLock lock1;
+ ASSERT_OK(follower1->LockForRead(&lock1));
// Append a single message to the queue
ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));
@@ -895,15 +891,13 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) {
// This will force an election in which we expect to make the last
// non-shutdown peer in the list become leader.
- int flush_count_before = new_leader->GetReplicaStateForTests()
- ->consensus_metadata_for_tests()->flush_count_for_tests();
+ int flush_count_before = new_leader->consensus_metadata_for_tests()->flush_count_for_tests();
LOG(INFO) << "Running election for future leader with index " << (current_config_size - 1);
ASSERT_OK(new_leader->StartElection(Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
Consensus::EXTERNAL_REQUEST));
WaitUntilLeaderForTests(new_leader.get());
LOG(INFO) << "Election won";
- int flush_count_after = new_leader->GetReplicaStateForTests()
- ->consensus_metadata_for_tests()->flush_count_for_tests();
+ int flush_count_after = new_leader->consensus_metadata_for_tests()->flush_count_for_tests();
ASSERT_EQ(flush_count_after, flush_count_before + 1)
<< "Expected only one consensus metadata flush for a leader election";
@@ -1021,8 +1015,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
scoped_refptr<RaftConsensus> peer;
CHECK_OK(peers_->GetPeerByIdx(kPeerIndex, &peer));
auto flush_count = [&]() {
- return peer->GetReplicaStateForTests()
- ->consensus_metadata_for_tests()->flush_count_for_tests();
+ return peer->consensus_metadata_for_tests()->flush_count_for_tests();
};
VoteRequestPB request;