You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/06/01 20:51:00 UTC
[2/3] kudu git commit: consensus: Get rid of LockFor*() methods
consensus: Get rid of LockFor*() methods
Simplify the locking logic by removing layers of abstraction.
Also add State_Name() helper for state-related error messages.
Change-Id: I6858752f4fbeb70b09eb4375c52e4aeaa1bb8e71
Reviewed-on: http://gerrit.cloudera.org:8080/7012
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3846861a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3846861a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3846861a
Branch: refs/heads/master
Commit: 3846861ab258a0ac0497893865875b2138964fe3
Parents: 30682fd
Author: Mike Percy <mp...@apache.org>
Authored: Tue May 30 14:00:53 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Jun 1 20:44:45 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/raft_consensus.cc | 298 +++++++++----------
src/kudu/consensus/raft_consensus.h | 68 ++---
.../consensus/raft_consensus_quorum-test.cc | 9 +-
3 files changed, 169 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index c5370c7..6a2c39f 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -278,8 +278,11 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
failure_detector_));
{
- UniqueLock lock;
- RETURN_NOT_OK(LockForStart(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ CHECK_EQ(kInitialized, state_) << LogPrefixUnlocked() << "Illegal state for Start(): "
+ << State_Name(state_);
+
ClearLeaderUnlocked();
// Our last persisted term can be higher than the last persisted operation
@@ -310,8 +313,9 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
}
{
- UniqueLock lock;
- RETURN_NOT_OK(LockForConfigChange(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
RETURN_NOT_OK(EnsureFailureDetectorEnabledUnlocked());
@@ -350,15 +354,19 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
}
bool RaftConsensus::IsRunning() const {
- UniqueLock lock;
- Status s = LockForRead(&lock);
- if (PREDICT_FALSE(!s.ok())) return false;
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
return state_ == kRunning;
}
Status RaftConsensus::EmulateElection() {
- UniqueLock lock;
- RETURN_NOT_OK(LockForConfigChange(&lock));
+ TRACE_EVENT2("consensus", "RaftConsensus::EmulateElection",
+ "peer", peer_uuid_,
+ "tablet", options_.tablet_id);
+
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Emulating election...";
@@ -404,8 +412,9 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
"mode", mode_str);
scoped_refptr<LeaderElection> election;
{
- UniqueLock lock;
- RETURN_NOT_OK(LockForConfigChange(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
RaftPeerPB::Role active_role = GetActiveRoleUnlocked();
if (active_role == RaftPeerPB::LEADER) {
@@ -413,7 +422,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
return Status::OK();
}
if (PREDICT_FALSE(active_role == RaftPeerPB::NON_PARTICIPANT)) {
- SnoozeFailureDetectorUnlocked(); // Avoid excessive election noise while in this state.
+ RETURN_NOT_OK(SnoozeFailureDetectorUnlocked()); // Reduce election noise while in this state.
return Status::IllegalState("Not starting election: Node is currently "
"a non-participant in the raft config",
SecureShortDebugString(GetActiveConfigUnlocked()));
@@ -499,8 +508,9 @@ Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) {
Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
TRACE_EVENT0("consensus", "RaftConsensus::StepDown");
- UniqueLock lock;
- RETURN_NOT_OK(LockForConfigChange(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
if (GetActiveRoleUnlocked() != RaftPeerPB::LEADER) {
resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
StatusToPB(Status::IllegalState("Not currently leader"),
@@ -580,8 +590,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
std::lock_guard<simple_spinlock> lock(update_lock_);
{
- UniqueLock lock;
- RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg()));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
RETURN_NOT_OK(round->CheckBoundTerm(GetCurrentTermUnlocked()));
RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
}
@@ -591,8 +602,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
}
Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) {
- UniqueLock lock;
- RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg()));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
round->BindToTerm(GetCurrentTermUnlocked());
return Status::OK();
}
@@ -655,12 +667,19 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
}
void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
- UniqueLock lock;
- Status s = LockForCommit(&lock);
- if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << LogPrefixThreadSafe()
- << "Unable to take state lock to update committed index: "
- << s.ToString();
+ TRACE_EVENT2("consensus", "RaftConsensus::NotifyCommitIndex",
+ "tablet", options_.tablet_id,
+ "commit_index", commit_index);
+
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ // We will process commit notifications while shutting down because a replica
+ // which has initiated a Prepare() / Replicate() may eventually commit even if
+ // its state has changed after the initial Append() / Update().
+ if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) {
+ LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to update committed index: "
+ << "Replica not in running state: "
+ << State_Name(state_);
return;
}
@@ -672,11 +691,16 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
}
void RaftConsensus::NotifyTermChange(int64_t term) {
- UniqueLock lock;
- Status s = LockForConfigChange(&lock);
+ TRACE_EVENT2("consensus", "RaftConsensus::NotifyTermChange",
+ "tablet", options_.tablet_id,
+ "term", term);
+
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ Status s = CheckRunningUnlocked();
if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << LogPrefixThreadSafe() << "Unable to lock consensus for term change"
- << " when notified of new term " << term << ": " << s.ToString();
+ LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to handle notification of new term "
+ << "(" << term << "): " << s.ToString();
return;
}
WARN_NOT_OK(HandleTermAdvanceUnlocked(term), "Couldn't advance consensus term.");
@@ -697,14 +721,8 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
RaftConfigPB committed_config;
{
- UniqueLock lock;
- Status s = LockForRead(&lock);
- if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << LogPrefixThreadSafe() << fail_msg
- << "Unable to lock consensus for read: " << s.ToString();
- return;
- }
-
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
int64_t current_term = GetCurrentTermUnlocked();
if (current_term != term) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "Notified about a follower failure in "
@@ -794,8 +812,8 @@ Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg
}
Status RaftConsensus::IsSingleVoterConfig(bool* single_voter) const {
- UniqueLock lock;
- RETURN_NOT_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
const RaftConfigPB& config = GetCommittedConfigUnlocked();
const string& uuid = peer_uuid_;
if (CountVoters(config) == 1 && IsRaftConfigVoter(uuid, config)) {
@@ -1142,8 +1160,12 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
LeaderRequest deduped_req;
{
- UniqueLock lock;
- RETURN_NOT_OK(LockForUpdate(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
+ if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) {
+ LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
+ }
deduped_req.leader_uuid = request->caller_uuid();
@@ -1333,9 +1355,9 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
// If just waiting for our log append to finish lets snooze the timer.
// We don't want to fire leader election because we're waiting on our own log.
if (s.IsTimedOut()) {
- UniqueLock lock;
- RETURN_NOT_OK(LockForRead(&lock));
- SnoozeFailureDetectorUnlocked();
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(SnoozeFailureDetectorUnlocked());
}
} while (s.IsTimedOut());
RETURN_NOT_OK(s);
@@ -1393,14 +1415,16 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
// timeouts, just vote a quick NO.
//
// We still need to take the state lock in order to respond with term info, etc.
- UniqueLock state_guard;
- RETURN_NOT_OK(LockForConfigChange(&state_guard));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
return RequestVoteRespondIsBusy(request, response);
}
// Acquire the replica state lock so we can read / modify the consensus state.
- UniqueLock state_guard;
- RETURN_NOT_OK(LockForConfigChange(&state_guard));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
// If the node is not in the configuration, allow the vote (this is required by Raft)
// but log an informational message anyway.
@@ -1480,6 +1504,10 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
const StatusCallback& client_cb,
boost::optional<TabletServerErrorPB::Code>* error_code) {
+ TRACE_EVENT2("consensus", "RaftConsensus::ChangeConfig",
+ "peer", peer_uuid_,
+ "tablet", options_.tablet_id);
+
if (PREDICT_FALSE(!req.has_type())) {
return Status::InvalidArgument("Must specify 'type' argument to ChangeConfig()",
SecureShortDebugString(req));
@@ -1492,8 +1520,9 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
ChangeConfigType type = req.type();
const RaftPeerPB& server = req.server();
{
- UniqueLock lock;
- RETURN_NOT_OK(LockForConfigChange(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
RETURN_NOT_OK(CheckActiveLeaderUnlocked());
RETURN_NOT_OK(CheckNoConfigChangePendingUnlocked());
@@ -1599,8 +1628,8 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
{
// Take the snapshot of the replica state and queue state so that
// we can stick them in the consensus update request later.
- UniqueLock lock;
- RETURN_NOT_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
current_term = GetCurrentTermUnlocked();
committed_config = GetCommittedConfigUnlocked();
if (IsConfigChangePendingUnlocked()) {
@@ -1719,6 +1748,10 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
}
void RaftConsensus::Shutdown() {
+ TRACE_EVENT2("consensus", "RaftConsensus::Shutdown",
+ "peer", peer_uuid_,
+ "tablet", options_.tablet_id);
+
// Avoid taking locks if already shut down so we don't violate
// ThreadRestrictions assertions in the case where the RaftConsensus
// destructor runs on the reactor thread due to an election callback being
@@ -1726,9 +1759,11 @@ void RaftConsensus::Shutdown() {
if (shutdown_.Load(kMemOrderAcquire)) return;
{
- UniqueLock lock;
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
// Transition to kShuttingDown state.
- CHECK_OK(LockForShutdown(&lock));
+ CHECK_NE(kShutDown, state_) << State_Name(state_); // We are protected here by 'shutdown_'.
+ state_ = kShuttingDown;
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down.";
}
@@ -1739,10 +1774,10 @@ void RaftConsensus::Shutdown() {
queue_->Close();
{
- UniqueLock lock;
- CHECK_OK(LockForShutdown(&lock));
- CHECK_EQ(kShuttingDown, state_);
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
CHECK_OK(pending_.CancelPendingTransactions());
+ CHECK_EQ(kShuttingDown, state_) << State_Name(state_);
state_ = kShutDown;
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!";
}
@@ -1779,8 +1814,9 @@ Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg
}
Status RaftConsensus::AdvanceTermForTests(int64_t new_term) {
- UniqueLock lock;
- CHECK_OK(LockForConfigChange(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ CHECK_OK(CheckRunningUnlocked());
return HandleTermAdvanceUnlocked(new_term);
}
@@ -1914,11 +1950,27 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
}
RaftPeerPB::Role RaftConsensus::role() const {
- UniqueLock lock;
- CHECK_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
return GetActiveRoleUnlocked();
}
+const char* RaftConsensus::State_Name(State state) {
+ switch (state) {
+ case kInitialized:
+ return "Initialized";
+ case kRunning:
+ return "Running";
+ case kShuttingDown:
+ return "Shutting down";
+ case kShutDown:
+ return "Shut down";
+ default:
+ LOG(DFATAL) << "Unknown State value: " << state;
+ return "Unknown";
+ }
+}
+
void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
DCHECK(lock_.is_locked());
failed_elections_since_stable_leader_ = 0;
@@ -1975,14 +2027,14 @@ const string& RaftConsensus::tablet_id() const {
}
ConsensusStatePB RaftConsensus::ConsensusState() const {
- UniqueLock lock;
- CHECK_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
return ConsensusStateUnlocked();
}
RaftConfigPB RaftConsensus::CommittedConfig() const {
- UniqueLock lock;
- CHECK_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
return GetCommittedConfigUnlocked();
}
@@ -1997,8 +2049,8 @@ void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
// Dump the queues on a leader.
RaftPeerPB::Role role;
{
- UniqueLock lock;
- CHECK_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
role = GetActiveRoleUnlocked();
}
if (role == RaftPeerPB::LEADER) {
@@ -2026,8 +2078,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
// Snooze to avoid the election timer firing again as much as possible.
{
- UniqueLock lock;
- CHECK_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
// We need to snooze when we win and when we lose:
// - When we win because we're about to disable the timer and become leader.
// - When we lose or otherwise we can fall into a cycle, where everyone keeps
@@ -2064,12 +2116,13 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
}
// The vote was granted, become leader.
- UniqueLock lock;
- Status s = LockForConfigChange(&lock);
+ ThreadRestrictions::AssertWaitAllowed();
+ UniqueLock lock(lock_);
+ Status s = CheckRunningUnlocked();
if (PREDICT_FALSE(!s.ok())) {
- LOG_WITH_PREFIX(INFO) << "Received " << election_type << " callback for term "
- << election_term << " while not running: "
- << s.ToString();
+ LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received " << election_type << " callback for term "
+ << election_term << " while not running: "
+ << s.ToString();
return;
}
@@ -2137,8 +2190,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
}
Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
- UniqueLock lock;
- RETURN_NOT_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
if (type == RECEIVED_OPID) {
*DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog();
} else if (type == COMMITTED_OPID) {
@@ -2192,13 +2245,13 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
// the client callback.
if (!status.ok()) {
- LOG(INFO) << LogPrefixThreadSafe() << op_type_str << " replication failed: "
- << status.ToString();
+ LOG_WITH_PREFIX_UNLOCKED(INFO) << op_type_str << " replication failed: "
+ << status.ToString();
client_cb.Run(status);
return;
}
- VLOG(1) << LogPrefixThreadSafe() << "Committing " << op_type_str << " with op id "
- << round->id();
+ VLOG_WITH_PREFIX_UNLOCKED(1) << "Committing " << op_type_str << " with op id "
+ << round->id();
gscoped_ptr<CommitMsg> commit_msg(new CommitMsg);
commit_msg->set_op_type(round->replicate_msg()->op_type());
*commit_msg->mutable_commited_op_id() = round->id();
@@ -2364,82 +2417,19 @@ Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
return Status::OK();
}
-Status RaftConsensus::LockForStart(UniqueLock* lock) const {
- ThreadRestrictions::AssertWaitAllowed();
- UniqueLock l(lock_);
- CHECK_EQ(state_, kInitialized) << "Illegal state for Start()."
- << " Replica is not in kInitialized state";
- lock->swap(l);
- return Status::OK();
-}
-
-Status RaftConsensus::LockForRead(UniqueLock* lock) const {
- ThreadRestrictions::AssertWaitAllowed();
- UniqueLock l(lock_);
- lock->swap(l);
- return Status::OK();
-}
-
-Status RaftConsensus::LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const {
- ThreadRestrictions::AssertWaitAllowed();
+Status RaftConsensus::CheckSafeToReplicateUnlocked(const ReplicateMsg& msg) const {
+ DCHECK(lock_.is_locked());
DCHECK(!msg.has_id()) << "Should not have an ID yet: " << SecureShortDebugString(msg);
- UniqueLock l(lock_);
- if (PREDICT_FALSE(state_ != kRunning)) {
- return Status::IllegalState("Replica not in running state");
- }
-
- RETURN_NOT_OK(CheckActiveLeaderUnlocked());
- lock->swap(l);
- return Status::OK();
-}
-
-Status RaftConsensus::LockForCommit(UniqueLock* lock) const {
- TRACE_EVENT0("consensus", "RaftConsensus::LockForCommit");
- ThreadRestrictions::AssertWaitAllowed();
- UniqueLock l(lock_);
- if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) {
- return Status::IllegalState("Replica not in running state");
- }
- lock->swap(l);
- return Status::OK();
-}
-
-Status RaftConsensus::LockForUpdate(UniqueLock* lock) const {
- TRACE_EVENT0("consensus", "RaftConsensus::LockForUpdate");
- ThreadRestrictions::AssertWaitAllowed();
- UniqueLock l(lock_);
- if (PREDICT_FALSE(state_ != kRunning)) {
- return Status::IllegalState("Replica not in running state");
- }
- if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) {
- LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
- }
- lock->swap(l);
- return Status::OK();
+ RETURN_NOT_OK(CheckRunningUnlocked());
+ return CheckActiveLeaderUnlocked();
}
-Status RaftConsensus::LockForConfigChange(UniqueLock* lock) const {
- TRACE_EVENT0("consensus", "RaftConsensus::LockForConfigChange");
-
- ThreadRestrictions::AssertWaitAllowed();
- UniqueLock l(lock_);
- // Can only change the config on running replicas.
+Status RaftConsensus::CheckRunningUnlocked() const {
+ DCHECK(lock_.is_locked());
if (PREDICT_FALSE(state_ != kRunning)) {
- return Status::IllegalState("Unable to lock ReplicaState for config change",
- Substitute("State = $0", state_));
+ return Status::IllegalState("RaftConsensus is not running",
+ Substitute("State = $0", State_Name(state_)));
}
- lock->swap(l);
- return Status::OK();
-}
-
-Status RaftConsensus::LockForShutdown(UniqueLock* lock) {
- TRACE_EVENT0("consensus", "RaftConsensus::LockForShutdown");
- ThreadRestrictions::AssertWaitAllowed();
- UniqueLock l(lock_);
- if (state_ != kShuttingDown && state_ != kShutDown) {
- state_ = kShuttingDown;
- }
- lock->swap(l);
return Status::OK();
}
@@ -2608,8 +2598,8 @@ const ConsensusOptions& RaftConsensus::GetOptions() const {
}
string RaftConsensus::LogPrefix() const {
- UniqueLock lock;
- CHECK_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
return LogPrefixUnlocked();
}
@@ -2630,14 +2620,14 @@ string RaftConsensus::LogPrefixThreadSafe() const {
string RaftConsensus::ToString() const {
ThreadRestrictions::AssertWaitAllowed();
- UniqueLock lock(lock_);
+ LockGuard l(lock_);
return ToStringUnlocked();
}
string RaftConsensus::ToStringUnlocked() const {
DCHECK(lock_.is_locked());
return Substitute("Replica: $0, State: $1, Role: $2",
- peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
+ peer_uuid_, State_Name(state_), RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
}
ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const {
http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index b1badde..b9348c2 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -35,9 +35,6 @@
namespace kudu {
-typedef std::lock_guard<simple_spinlock> Lock;
-typedef gscoped_ptr<Lock> ScopedLock;
-
class Counter;
class FailureDetector;
class HostPort;
@@ -62,8 +59,6 @@ struct ElectionResult;
class RaftConsensus : public Consensus,
public PeerMessageQueueObserver {
public:
- typedef std::unique_lock<simple_spinlock> UniqueLock;
-
static scoped_refptr<RaftConsensus> Create(
ConsensusOptions options,
std::unique_ptr<ConsensusMetadata> cmeta,
@@ -181,6 +176,8 @@ class RaftConsensus : public Consensus,
FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty);
FRIEND_TEST(RaftConsensusQuorumTest, TestRequestVote);
+ // NOTE: When adding / changing values in this enum, add the corresponding
+ // values to State_Name().
enum State {
// State after the replica is built.
kInitialized,
@@ -224,6 +221,12 @@ class RaftConsensus : public Consensus,
std::string OpsRangeString() const;
};
+ using LockGuard = std::lock_guard<simple_spinlock>;
+ using UniqueLock = std::unique_lock<simple_spinlock>;
+
+ // Returns string description for State enum value.
+ static const char* State_Name(State state);
+
// Set the leader UUID of the configuration and mark the tablet config dirty for
// reporting to the master.
void SetLeaderUuidUnlocked(const std::string& uuid);
@@ -276,7 +279,8 @@ class RaftConsensus : public Consensus,
// pending operations, we proactively abort those pending operations after and including
// the preceding op in 'req' to avoid a pointless cache miss in the leader's log cache.
Status EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req,
- ConsensusResponsePB* response);
+ ConsensusResponsePB* response)
+ WARN_UNUSED_RESULT;
// Check a request received from a leader, making sure:
// - The request is in the right term
@@ -288,7 +292,7 @@ class RaftConsensus : public Consensus,
// the messages to add to our state machine.
Status CheckLeaderRequestUnlocked(const ConsensusRequestPB* request,
ConsensusResponsePB* response,
- LeaderRequest* deduped_req);
+ LeaderRequest* deduped_req) WARN_UNUSED_RESULT;
// Abort any pending operations after the given op index,
// and also truncate the LogCache accordingly.
@@ -382,13 +386,13 @@ class RaftConsensus : public Consensus,
// When this is called a failure is guaranteed not to be detected
// before 'FLAGS_leader_failure_max_missed_heartbeat_periods' *
// 'FLAGS_raft_heartbeat_interval_ms' has elapsed.
- Status SnoozeFailureDetectorUnlocked();
+ Status SnoozeFailureDetectorUnlocked() WARN_UNUSED_RESULT;
// Like the above but adds 'additional_delta' to the default timeout
// period. If allow_logging is set to ALLOW_LOGGING, then this method
// will print a log message when called.
Status SnoozeFailureDetectorUnlocked(const MonoDelta& additional_delta,
- AllowLogging allow_logging);
+ AllowLogging allow_logging) WARN_UNUSED_RESULT;
// Return the minimum election timeout. Due to backoff and random
// jitter, election timeouts may be longer than this.
@@ -462,45 +466,17 @@ class RaftConsensus : public Consensus,
// (see Diego Ongaro's thesis section 4.1).
Status AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round);
- // Locks a replica in preparation for StartUnlocked(). Makes
- // sure the replica is in kInitialized state.
- Status LockForStart(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
- // Obtains the lock for a state read, does not check state.
- Status LockForRead(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
- // Locks a replica down until the critical section of an append completes,
- // i.e. until the replicate message has been assigned an id and placed in
- // the log queue.
- // This also checks that the replica is in the appropriate
- // state (role) to replicate the provided operation, that the operation
- // contains a replicate message and is of the appropriate type, and returns
- // Status::IllegalState if that is not the case.
- Status LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const WARN_UNUSED_RESULT;
-
- // Locks a replica down until the critical section of a commit completes.
- // This succeeds for all states since a replica which has initiated
- // a Prepare()/Replicate() must eventually commit even if it's state
- // has changed after the initial Append()/Update().
- Status LockForCommit(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
- // Locks a replica down until an the critical section of an update completes.
- // Further updates from the same or some other leader will be blocked until
- // this completes. This also checks that the replica is in the appropriate
- // state (role) to be updated and returns Status::IllegalState if that
- // is not the case.
- Status LockForUpdate(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
- Status LockForConfigChange(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
- // Changes the role to non-participant and returns a lock that can be
- // used to make sure no state updates come in until Shutdown() is
- // completed.
- Status LockForShutdown(UniqueLock* lock) WARN_UNUSED_RESULT;
+ // Checks that the replica is in the appropriate state and role to replicate
+ // the provided operation and that the replicate message does not yet have an
+ // OpId assigned.
+ Status CheckSafeToReplicateUnlocked(const ReplicateMsg& msg) const WARN_UNUSED_RESULT;
+
+ // Return Status::IllegalState if 'state_' != kRunning, OK otherwise.
+ Status CheckRunningUnlocked() const WARN_UNUSED_RESULT;
// Ensure the local peer is the active leader.
// Returns OK if leader, IllegalState otherwise.
- Status CheckActiveLeaderUnlocked() const;
+ Status CheckActiveLeaderUnlocked() const WARN_UNUSED_RESULT;
// Return current consensus state summary.
ConsensusStatePB ConsensusStateUnlocked() const;
@@ -515,7 +491,7 @@ class RaftConsensus : public Consensus,
// Inverse of IsConfigChangePendingUnlocked(): returns OK if there is
// currently *no* configuration change pending, and IllegalState is there *is* a
// configuration change pending.
- Status CheckNoConfigChangePendingUnlocked() const;
+ Status CheckNoConfigChangePendingUnlocked() const WARN_UNUSED_RESULT;
// Sets the given configuration as pending commit. Does not persist into the peers
// metadata. In order to be persisted, SetCommittedConfigUnlocked() must be called.
http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index bb1575e..a0933c8 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -660,8 +660,7 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) {
scoped_refptr<RaftConsensus> follower0;
CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
- RaftConsensus::UniqueLock lock;
- ASSERT_OK(follower0->LockForRead(&lock));
+ RaftConsensus::LockGuard l(follower0->lock_);
// If the locked replica would stop consensus we would hang here
// as we wait for operations to be replicated to a majority.
@@ -703,13 +702,11 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) {
// and never letting them go.
scoped_refptr<RaftConsensus> follower0;
CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
- RaftConsensus::UniqueLock lock0;
- ASSERT_OK(follower0->LockForRead(&lock0));
+ RaftConsensus::LockGuard l_0(follower0->lock_);
scoped_refptr<RaftConsensus> follower1;
CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1));
- RaftConsensus::UniqueLock lock1;
- ASSERT_OK(follower1->LockForRead(&lock1));
+ RaftConsensus::LockGuard l_1(follower1->lock_);
// Append a single message to the queue
ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));