You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/01/06 18:27:32 UTC

[kudu] branch master updated (c767bd1 -> b6ac4c7)

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from c767bd1  KUDU-3032 Prevent selecting unnecessarily columns after scan optimization
     new a98d1fd  [consensus] LeaderElection cleanup
     new b6ac4c7  [consensus] respond lock-free to RequestVote() if busy

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/consensus/leader_election-test.cc |  19 +++---
 src/kudu/consensus/leader_election.cc      | 104 ++++++++++++++---------------
 src/kudu/consensus/leader_election.h       |  20 +++---
 src/kudu/consensus/raft_consensus.cc       |  86 +++++++++++++-----------
 src/kudu/consensus/raft_consensus.h        |  36 +++++++---
 5 files changed, 144 insertions(+), 121 deletions(-)


[kudu] 01/02: [consensus] LeaderElection cleanup

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit a98d1fda0b148787eabbf25b71eb931e64314491
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Dec 23 13:40:11 2019 -0800

    [consensus] LeaderElection cleanup
    
    This patch contains a cleanup of the LeaderElection-related code.
    
    Change-Id: Ic3163ff5c7a628431a145c11133c54e26b7165cd
    Reviewed-on: http://gerrit.cloudera.org:8080/14947
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/consensus/leader_election-test.cc |  19 +++---
 src/kudu/consensus/leader_election.cc      | 104 ++++++++++++++---------------
 src/kudu/consensus/leader_election.h       |  20 +++---
 src/kudu/consensus/raft_consensus.cc       |  16 ++---
 src/kudu/consensus/raft_consensus.h        |  14 ++--
 5 files changed, 85 insertions(+), 88 deletions(-)

diff --git a/src/kudu/consensus/leader_election-test.cc b/src/kudu/consensus/leader_election-test.cc
index 94facbc..e6e601c 100644
--- a/src/kudu/consensus/leader_election-test.cc
+++ b/src/kudu/consensus/leader_election-test.cc
@@ -21,7 +21,6 @@
 #include <memory>
 #include <ostream>
 #include <string>
-#include <type_traits>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -127,7 +126,7 @@ class LeaderElectionTest : public KuduTest {
   void InitUUIDs(int num_voters);
   void InitNoOpPeerProxies();
   void InitDelayableMockedProxies(bool enable_delay);
-  gscoped_ptr<VoteCounter> InitVoteCounter(int num_voters, int majority_size);
+  VoteCounter InitVoteCounter(int num_voters, int majority_size);
 
   // Voter 0 is the high-term voter.
   scoped_refptr<LeaderElection> SetUpElectionWithHighTermVoter(ConsensusTerm election_term);
@@ -192,12 +191,12 @@ void LeaderElectionTest::InitDelayableMockedProxies(bool enable_delay) {
   }
 }
 
-gscoped_ptr<VoteCounter> LeaderElectionTest::InitVoteCounter(int num_voters, int majority_size) {
-  gscoped_ptr<VoteCounter> counter(new VoteCounter(num_voters, majority_size));
+VoteCounter LeaderElectionTest::InitVoteCounter(int num_voters, int majority_size) {
+  VoteCounter counter(num_voters, majority_size);
   bool duplicate;
-  CHECK_OK(counter->RegisterVote(candidate_uuid_, VOTE_GRANTED, &duplicate));
+  CHECK_OK(counter.RegisterVote(candidate_uuid_, VOTE_GRANTED, &duplicate));
   CHECK(!duplicate);
-  return std::move(counter);
+  return counter;
 }
 
 scoped_refptr<LeaderElection> LeaderElectionTest::SetUpElectionWithHighTermVoter(
@@ -207,7 +206,7 @@ scoped_refptr<LeaderElection> LeaderElectionTest::SetUpElectionWithHighTermVoter
 
   InitUUIDs(kNumVoters);
   InitDelayableMockedProxies(true);
-  gscoped_ptr<VoteCounter> counter = InitVoteCounter(kNumVoters, kMajoritySize);
+  VoteCounter counter = InitVoteCounter(kNumVoters, kMajoritySize);
 
   VoteResponsePB response;
   response.set_responder_uuid(voter_uuids_[0]);
@@ -250,7 +249,7 @@ scoped_refptr<LeaderElection> LeaderElectionTest::SetUpElectionWithGrantDenyErro
 
   InitUUIDs(kNumVoters);
   InitDelayableMockedProxies(false); // Don't delay the vote responses.
-  gscoped_ptr<VoteCounter> counter = InitVoteCounter(kNumVoters, kMajoritySize);
+  VoteCounter counter = InitVoteCounter(kNumVoters, kMajoritySize);
   int num_grant_followers = num_grant - 1;
 
   // Set up mocked responses based on the params specified in the method arguments.
@@ -310,7 +309,7 @@ TEST_F(LeaderElectionTest, TestPerfectElection) {
 
     InitUUIDs(num_voters);
     InitNoOpPeerProxies();
-    gscoped_ptr<VoteCounter> counter = InitVoteCounter(num_voters, majority_size);
+    VoteCounter counter = InitVoteCounter(num_voters, majority_size);
 
     VoteRequestPB request;
     request.set_candidate_uuid(candidate_uuid_);
@@ -446,7 +445,7 @@ TEST_F(LeaderElectionTest, TestFailToCreateProxy) {
   request.set_candidate_term(kElectionTerm);
   request.set_tablet_id(tablet_id_);
 
-  gscoped_ptr<VoteCounter> counter = InitVoteCounter(kNumVoters, kMajoritySize);
+  VoteCounter counter = InitVoteCounter(kNumVoters, kMajoritySize);
   scoped_refptr<LeaderElection> election(
       new LeaderElection(config_, proxy_factory_.get(),
                          std::move(request), std::move(counter),
diff --git a/src/kudu/consensus/leader_election.cc b/src/kudu/consensus/leader_election.cc
index 46e85eb..917d03e 100644
--- a/src/kudu/consensus/leader_election.cc
+++ b/src/kudu/consensus/leader_election.cc
@@ -18,9 +18,9 @@
 #include "kudu/consensus/leader_election.h"
 
 #include <algorithm>
+#include <memory>
 #include <mutex>
 #include <ostream>
-#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -34,7 +34,6 @@
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
-#include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/rpc_controller.h"
@@ -47,6 +46,7 @@ namespace kudu {
 namespace consensus {
 
 using std::string;
+using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
@@ -55,16 +55,16 @@ using strings::Substitute;
 ///////////////////////////////////////////////////
 
 VoteCounter::VoteCounter(int num_voters, int majority_size)
-  : num_voters_(num_voters),
-    majority_size_(majority_size),
-    yes_votes_(0),
-    no_votes_(0) {
+    : num_voters_(num_voters),
+      majority_size_(majority_size),
+      yes_votes_(0),
+      no_votes_(0) {
   CHECK_LE(majority_size, num_voters);
   CHECK_GT(num_voters_, 0);
   CHECK_GT(majority_size_, 0);
 }
 
-Status VoteCounter::RegisterVote(const std::string& voter_uuid, ElectionVote vote,
+Status VoteCounter::RegisterVote(const string& voter_uuid, ElectionVote vote,
                                  bool* is_duplicate) {
   // Handle repeated votes.
   if (PREDICT_FALSE(ContainsKey(votes_, voter_uuid))) {
@@ -160,25 +160,23 @@ string VoteCounter::GetElectionSummary() const {
 // ElectionResult
 ///////////////////////////////////////////////////
 
-// Suppress false positive about 'decision' used while uninitialized.
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
-ElectionResult::ElectionResult(VoteRequestPB vote_request, ElectionVote decision,
-                               ConsensusTerm highest_voter_term, const std::string& message)
-  : vote_request(std::move(vote_request)),
-    decision(decision),
-    highest_voter_term(highest_voter_term),
-    message(message) {
+ElectionResult::ElectionResult(VoteRequestPB request,
+                               ElectionVote election_decision,
+                               ConsensusTerm highest_term,
+                               string msg)
+    : vote_request(std::move(request)),
+      decision(election_decision),
+      highest_voter_term(highest_term),
+      message(std::move(msg)) {
   DCHECK(!message.empty());
 }
-#pragma GCC diagnostic pop
 
 ///////////////////////////////////////////////////
 // LeaderElection::VoterState
 ///////////////////////////////////////////////////
 
 string LeaderElection::VoterState::PeerInfo() const {
-  std::string info = peer_uuid;
+  string info = peer_uuid;
   if (proxy) {
     strings::SubstituteAndAppend(&info, " ($0)", proxy->PeerName());
   }
@@ -192,7 +190,7 @@ string LeaderElection::VoterState::PeerInfo() const {
 LeaderElection::LeaderElection(RaftConfigPB config,
                                PeerProxyFactory* proxy_factory,
                                VoteRequestPB request,
-                               gscoped_ptr<VoteCounter> vote_counter,
+                               VoteCounter vote_counter,
                                MonoDelta timeout,
                                ElectionDecisionCallback decision_callback)
     : has_responded_(false),
@@ -208,7 +206,6 @@ LeaderElection::LeaderElection(RaftConfigPB config,
 LeaderElection::~LeaderElection() {
   std::lock_guard<Lock> guard(lock_);
   DCHECK(has_responded_); // We must always call the callback exactly once.
-  STLDeleteValues(&voter_state_);
 }
 
 void LeaderElection::Run() {
@@ -217,9 +214,9 @@ void LeaderElection::Run() {
   // Initialize voter state tracking.
   vector<string> other_voter_uuids;
   voter_state_.clear();
-  for (const RaftPeerPB& peer : config_.peers()) {
+  for (const auto& peer : config_.peers()) {
     if (request_.candidate_uuid() == peer.permanent_uuid()) {
-      DCHECK_EQ(peer.member_type(), RaftPeerPB::VOTER)
+      DCHECK_EQ(RaftPeerPB::VOTER, peer.member_type())
           << Substitute("non-voter member $0 tried to start an election; "
                         "Raft config {$1}",
                         peer.permanent_uuid(),
@@ -231,18 +228,18 @@ void LeaderElection::Run() {
     }
     other_voter_uuids.emplace_back(peer.permanent_uuid());
 
-    gscoped_ptr<VoterState> state(new VoterState());
+    unique_ptr<VoterState> state(new VoterState);
     state->peer_uuid = peer.permanent_uuid();
     state->proxy_status = proxy_factory_->NewProxy(peer, &state->proxy);
-    InsertOrDie(&voter_state_, peer.permanent_uuid(), state.release());
+    EmplaceOrDie(&voter_state_, peer.permanent_uuid(), std::move(state));
   }
 
   // Ensure that the candidate has already voted for itself.
-  CHECK_EQ(1, vote_counter_->GetTotalVotesCounted()) << "Candidate must vote for itself first";
+  CHECK_EQ(1, vote_counter_.GetTotalVotesCounted()) << "Candidate must vote for itself first";
 
   // Ensure that existing votes + future votes add up to the expected total.
-  CHECK_EQ(vote_counter_->GetTotalVotesCounted() + other_voter_uuids.size(),
-           vote_counter_->GetTotalExpectedVotes())
+  CHECK_EQ(vote_counter_.GetTotalVotesCounted() + other_voter_uuids.size(),
+           vote_counter_.GetTotalExpectedVotes())
       << "Expected different number of voters. Voter UUIDs: ["
       << JoinStringsIterator(other_voter_uuids.begin(), other_voter_uuids.end(), ", ")
       << "]; RaftConfig: {" << pb_util::SecureShortDebugString(config_) << "}";
@@ -258,7 +255,7 @@ void LeaderElection::Run() {
     VoterState* state = nullptr;
     {
       std::lock_guard<Lock> guard(lock_);
-      state = FindOrDie(voter_state_, voter_uuid);
+      state = FindOrDie(voter_state_, voter_uuid).get();
       // Safe to drop the lock because voter_state_ is not mutated outside of
       // the constructor / destructor. We do this to avoid deadlocks below.
     }
@@ -303,17 +300,18 @@ void LeaderElection::CheckForDecision() {
   {
     std::lock_guard<Lock> guard(lock_);
     // Check if the vote has been newly decided.
-    if (!result_ && vote_counter_->IsDecided()) {
+    if (!result_ && vote_counter_.IsDecided()) {
       ElectionVote decision;
-      CHECK_OK(vote_counter_->GetDecision(&decision));
+      CHECK_OK(vote_counter_.GetDecision(&decision));
       const auto election_won = decision == VOTE_GRANTED;
       LOG_WITH_PREFIX(INFO) << Substitute("Election decided. Result: candidate $0. "
                                           "Election summary: $1",
                                           election_won ? "won" : "lost",
-                                          vote_counter_->GetElectionSummary());
+                                          vote_counter_.GetElectionSummary());
       string msg = election_won ?
           "achieved majority votes" : "could not achieve majority";
-      result_.reset(new ElectionResult(request_, decision, highest_voter_term_, msg));
+      result_.reset(new ElectionResult(
+          request_, decision, highest_voter_term_, std::move(msg)));
     }
     // Check whether to respond. This can happen as a result of either getting
     // a majority vote or of something invalidating the election, like
@@ -331,10 +329,10 @@ void LeaderElection::CheckForDecision() {
   }
 }
 
-void LeaderElection::VoteResponseRpcCallback(const std::string& voter_uuid) {
+void LeaderElection::VoteResponseRpcCallback(const string& voter_uuid) {
   {
     std::lock_guard<Lock> guard(lock_);
-    VoterState* state = FindOrDie(voter_state_, voter_uuid);
+    VoterState* state = FindOrDie(voter_state_, voter_uuid).get();
 
     // Check for RPC errors.
     if (!state->rpc.status().ok()) {
@@ -353,17 +351,16 @@ void LeaderElection::VoteResponseRpcCallback(const std::string& voter_uuid) {
     // If the peer changed their IP address, we shouldn't count this vote since
     // our knowledge of the configuration is in an inconsistent state.
     } else if (PREDICT_FALSE(voter_uuid != state->response.responder_uuid())) {
-      LOG_WITH_PREFIX(DFATAL) << "Received vote response from peer "
-                              << state->PeerInfo() << ": "
-                              << "we thought peer had UUID " << voter_uuid
-                              << " but its actual UUID is "
-                              << state->response.responder_uuid();
+      LOG_WITH_PREFIX(DFATAL) << Substitute(
+          "$0: peer UUID mismatch from VoteRequest(): expected $1; actual $2",
+          state->PeerInfo(), voter_uuid, state->response.responder_uuid());
       RecordVoteUnlocked(*state, VOTE_DENIED);
-
     } else {
       // No error: count actual votes.
-
-      highest_voter_term_ = std::max(highest_voter_term_, state->response.responder_term());
+      if (state->response.has_responder_term()) {
+        highest_voter_term_ = std::max(highest_voter_term_,
+                                       state->response.responder_term());
+      }
       if (state->response.vote_granted()) {
         HandleVoteGrantedUnlocked(*state);
       } else {
@@ -380,8 +377,8 @@ void LeaderElection::RecordVoteUnlocked(const VoterState& state, ElectionVote vo
   DCHECK(lock_.is_locked());
 
   // Record the vote.
-  bool duplicate;
-  Status s = vote_counter_->RegisterVote(state.peer_uuid, vote, &duplicate);
+  bool duplicate = false;
+  const auto s = vote_counter_.RegisterVote(state.peer_uuid, vote, &duplicate);
   if (!s.ok()) {
     LOG_WITH_PREFIX(WARNING) << "Error registering vote for peer "
                              << state.PeerInfo() << ": " << s.ToString();
@@ -397,27 +394,27 @@ void LeaderElection::RecordVoteUnlocked(const VoterState& state, ElectionVote vo
 
 void LeaderElection::HandleHigherTermUnlocked(const VoterState& state) {
   DCHECK(lock_.is_locked());
+  DCHECK(state.response.has_responder_term());
   DCHECK_GT(state.response.responder_term(), election_term());
 
   string msg = Substitute("Vote denied by peer $0 with higher term. Message: $1",
                           state.PeerInfo(),
                           StatusFromPB(state.response.consensus_error().status()).ToString());
-  LOG_WITH_PREFIX(WARNING) << msg;
+  LOG_WITH_PREFIX(INFO) << msg;
 
   if (!result_) {
     LOG_WITH_PREFIX(INFO) << "Cancelling election due to peer responding with higher term";
-    result_.reset(new ElectionResult(request_, VOTE_DENIED,
-                                     state.response.responder_term(), msg));
+    result_.reset(new ElectionResult(
+        request_, VOTE_DENIED, state.response.responder_term(), std::move(msg)));
   }
 }
 
 void LeaderElection::HandleVoteGrantedUnlocked(const VoterState& state) {
   DCHECK(lock_.is_locked());
-  if (!request_.is_pre_election()) {
-    DCHECK_EQ(state.response.responder_term(), election_term());
-  }
   DCHECK(state.response.vote_granted());
-
+  DCHECK(state.response.has_responder_term());
+  DCHECK(request_.is_pre_election() ||
+         state.response.responder_term() == election_term());
   VLOG_WITH_PREFIX(1) << "Vote granted by peer " << state.PeerInfo();
   RecordVoteUnlocked(state, VOTE_GRANTED);
 }
@@ -428,7 +425,8 @@ void LeaderElection::HandleVoteDeniedUnlocked(const VoterState& state) {
 
   // If one of the voters responds with a greater term than our own, and we
   // have not yet triggered the decision callback, it cancels the election.
-  if (state.response.responder_term() > election_term()) {
+  if (state.response.has_responder_term() &&
+      state.response.responder_term() > election_term()) {
     return HandleHigherTermUnlocked(state);
   }
 
@@ -439,7 +437,7 @@ void LeaderElection::HandleVoteDeniedUnlocked(const VoterState& state) {
   RecordVoteUnlocked(state, VOTE_DENIED);
 }
 
-std::string LeaderElection::LogPrefix() const {
+string LeaderElection::LogPrefix() const {
   return Substitute("T $0 P $1 [CANDIDATE]: Term $2 $3election: ",
                     request_.tablet_id(),
                     request_.candidate_uuid(),
diff --git a/src/kudu/consensus/leader_election.h b/src/kudu/consensus/leader_election.h
index 8770906..c42e7c6 100644
--- a/src/kudu/consensus/leader_election.h
+++ b/src/kudu/consensus/leader_election.h
@@ -15,12 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef KUDU_CONSENSUS_LEADER_ELECTION_H
-#define KUDU_CONSENSUS_LEADER_ELECTION_H
+#pragma once
 
 #include <cstdint>
 #include <functional>
 #include <map>
+#include <memory>
 #include <string>
 #include <unordered_map>
 
@@ -51,6 +51,7 @@ class VoteCounter {
  public:
   // Create new VoteCounter with the given majority size.
   VoteCounter(int num_voters, int majority_size);
+  VoteCounter(VoteCounter&&) = default;
 
   // Register a peer's vote.
   //
@@ -98,8 +99,8 @@ class VoteCounter {
 // The result of a leader election.
 struct ElectionResult {
  public:
-  ElectionResult(VoteRequestPB vote_request, ElectionVote decision,
-                 ConsensusTerm highest_term, const std::string& message);
+  ElectionResult(VoteRequestPB request, ElectionVote election_decision,
+                 ConsensusTerm highest_term, std::string msg);
 
   // The vote request that was sent to the voters for this election.
   const VoteRequestPB vote_request;
@@ -149,7 +150,7 @@ class LeaderElection : public RefCountedThreadSafe<LeaderElection> {
   LeaderElection(RaftConfigPB config,
                  PeerProxyFactory* proxy_factory,
                  VoteRequestPB request,
-                 gscoped_ptr<VoteCounter> vote_counter,
+                 VoteCounter vote_counter,
                  MonoDelta timeout,
                  ElectionDecisionCallback decision_callback);
 
@@ -174,7 +175,8 @@ class LeaderElection : public RefCountedThreadSafe<LeaderElection> {
     std::string PeerInfo() const;
   };
 
-  typedef std::unordered_map<std::string, VoterState*> VoterStateMap;
+  typedef std::unordered_map<std::string, std::unique_ptr<VoterState>>
+      VoterStateMap;
   typedef simple_spinlock Lock;
 
   // This class is refcounted.
@@ -211,7 +213,7 @@ class LeaderElection : public RefCountedThreadSafe<LeaderElection> {
 
   // The result returned by the ElectionDecisionCallback.
   // NULL if not yet known.
-  gscoped_ptr<ElectionResult> result_;
+  std::unique_ptr<ElectionResult> result_;
 
   // Whether we have responded via the callback yet.
   bool has_responded_;
@@ -226,7 +228,7 @@ class LeaderElection : public RefCountedThreadSafe<LeaderElection> {
   const VoteRequestPB request_;
 
   // Object to count the votes.
-  const gscoped_ptr<VoteCounter> vote_counter_;
+  VoteCounter vote_counter_;
 
   // Timeout for sending RPCs.
   const MonoDelta timeout_;
@@ -243,5 +245,3 @@ class LeaderElection : public RefCountedThreadSafe<LeaderElection> {
 
 } // namespace consensus
 } // namespace kudu
-
-#endif /* KUDU_CONSENSUS_LEADER_ELECTION_H */
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 77966e4..69c5d9f 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -497,11 +497,11 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
     // Initialize the VoteCounter.
     int num_voters = CountVoters(active_config);
     int majority_size = MajoritySize(num_voters);
-    gscoped_ptr<VoteCounter> counter(new VoteCounter(num_voters, majority_size));
+    VoteCounter counter(num_voters, majority_size);
 
     // Vote for ourselves.
     bool duplicate;
-    RETURN_NOT_OK(counter->RegisterVote(peer_uuid(), VOTE_GRANTED, &duplicate));
+    RETURN_NOT_OK(counter.RegisterVote(peer_uuid(), VOTE_GRANTED, &duplicate));
     CHECK(!duplicate) << LogPrefixUnlocked()
                       << "Inexplicable duplicate self-vote for term "
                       << CurrentTermUnlocked();
@@ -2421,6 +2421,12 @@ const char* RaftConsensus::State_Name(State state) {
   }
 }
 
+MonoDelta RaftConsensus::MinimumElectionTimeout() {
+  int32_t failure_timeout = FLAGS_leader_failure_max_missed_heartbeat_periods *
+      FLAGS_raft_heartbeat_interval_ms;
+  return MonoDelta::FromMilliseconds(failure_timeout);
+}
+
 void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
   DCHECK(lock_.is_locked());
   failed_elections_since_stable_leader_ = 0;
@@ -2866,12 +2872,6 @@ void RaftConsensus::WithholdVotesUnlocked() {
                                    MonoTime::Now() + MinimumElectionTimeout());
 }
 
-MonoDelta RaftConsensus::MinimumElectionTimeout() const {
-  int32_t failure_timeout = FLAGS_leader_failure_max_missed_heartbeat_periods *
-      FLAGS_raft_heartbeat_interval_ms;
-  return MonoDelta::FromMilliseconds(failure_timeout);
-}
-
 MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
   DCHECK(lock_.is_locked());
   // Compute a backoff factor based on how many leader elections have
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index a631c5a..95b3120 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -457,6 +457,13 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   using LockGuard = std::lock_guard<simple_spinlock>;
   using UniqueLock = std::unique_lock<simple_spinlock>;
 
+  // Returns string description for State enum value.
+  static const char* State_Name(State state);
+
+  // Return the minimum election timeout. Due to backoff and random
+  // jitter, election timeouts may be longer than this.
+  static MonoDelta MinimumElectionTimeout();
+
   // Initializes the RaftConsensus object, including loading the consensus
   // metadata.
   Status Init();
@@ -465,9 +472,6 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // enum documents legal state transitions.
   void SetStateUnlocked(State new_state);
 
-  // Returns string description for State enum value.
-  static const char* State_Name(State state);
-
   // Set the leader UUID of the configuration and mark the tablet config dirty for
   // reporting to the master.
   void SetLeaderUuidUnlocked(const std::string& uuid);
@@ -649,10 +653,6 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // This method is safe to call even it's a leader replica.
   void WithholdVotesUnlocked();
 
-  // Return the minimum election timeout. Due to backoff and random
-  // jitter, election timeouts may be longer than this.
-  MonoDelta MinimumElectionTimeout() const;
-
   // Calculates a snooze delta for leader election.
   //
   // The delta increases exponentially with the difference between the current


[kudu] 02/02: [consensus] respond lock-free to RequestVote() if busy

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit b6ac4c7aa3e2bdc04d85135b8eac06ec215cbbcc
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Dec 23 13:40:11 2019 -0800

    [consensus] respond lock-free to RequestVote() if busy
    
    I saw cases of contention on replica's RaftConsensus lock when the
    filesystem was slow on updating Raft metadata files to record a vote
    that has just been granted by the replica.  As it turned out, the fast
    path was also acquiring the RaftConsensus object lock, but it's easy
    to avoid that.
    
    This patch updates the code by not acquiring the lock in such cases.
    It should help a bit with overflowing of the RaftConsensus RPC queue.
    
    In addition, I turned the LOG(INFO) message about this event into
    VLOG(1) since it's not so important to report about such events on the
    responder side: its state doesn't change upon responding with
    ServiceUnavailable anyways.  Other minor clean-up.
    
    Change-Id: I95d5cbe455fefc4cdc540ee1e7b69e1f21b6ebc0
    Reviewed-on: http://gerrit.cloudera.org:8080/14943
    Tested-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Bankim Bhavsar <ba...@cloudera.com>
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/consensus/raft_consensus.cc | 70 ++++++++++++++++++++----------------
 src/kudu/consensus/raft_consensus.h  | 22 ++++++++++--
 2 files changed, 59 insertions(+), 33 deletions(-)

diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 69c5d9f..d4b7d86 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -228,7 +228,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
                             scoped_refptr<TimeManager> time_manager,
                             ConsensusRoundHandler* round_handler,
                             const scoped_refptr<MetricEntity>& metric_entity,
-                            Callback<void(const std::string& reason)> mark_dirty_clbk) {
+                            Callback<void(const string& reason)> mark_dirty_clbk) {
   DCHECK(metric_entity);
 
   peer_proxy_factory_ = DCHECK_NOTNULL(std::move(peer_proxy_factory));
@@ -849,7 +849,7 @@ void RaftConsensus::NotifyTermChange(int64_t term) {
 
 void RaftConsensus::NotifyFailedFollower(const string& uuid,
                                          int64_t term,
-                                         const std::string& reason) {
+                                         const string& reason) {
   // Common info used in all of the log messages within this method.
   string fail_msg = Substitute("Processing failure of peer $0 in term $1 ($2): ",
                                uuid, term, reason);
@@ -891,7 +891,7 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
               LogPrefixThreadSafe() + "Unable to start TryRemoveFollowerTask");
 }
 
-void RaftConsensus::NotifyPeerToPromote(const std::string& peer_uuid) {
+void RaftConsensus::NotifyPeerToPromote(const string& peer_uuid) {
   // Run the config change on the raft thread pool.
   WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryPromoteNonVoterTask,
                                                      shared_from_this(),
@@ -913,7 +913,7 @@ void RaftConsensus::NotifyPeerHealthChange() {
 
 void RaftConsensus::TryRemoveFollowerTask(const string& uuid,
                                           const RaftConfigPB& committed_config,
-                                          const std::string& reason) {
+                                          const string& reason) {
   ChangeConfigRequestPB req;
   req.set_tablet_id(options_.tablet_id);
   req.mutable_server()->set_permanent_uuid(uuid);
@@ -926,7 +926,7 @@ void RaftConsensus::TryRemoveFollowerTask(const string& uuid,
               LogPrefixThreadSafe() + "Unable to remove follower " + uuid);
 }
 
-void RaftConsensus::TryPromoteNonVoterTask(const std::string& peer_uuid) {
+void RaftConsensus::TryPromoteNonVoterTask(const string& peer_uuid) {
   string msg = Substitute("attempt to promote peer $0: ", peer_uuid);
   int64_t current_committed_config_index;
   {
@@ -1052,8 +1052,8 @@ bool RaftConsensus::IsSingleVoterConfig() const {
          cmeta_->IsVoterInConfig(peer_uuid(), COMMITTED_CONFIG);
 }
 
-std::string RaftConsensus::LeaderRequest::OpsRangeString() const {
-  std::string ret;
+string RaftConsensus::LeaderRequest::OpsRangeString() const {
+  string ret;
   ret.reserve(100);
   ret.push_back('[');
   if (!messages.empty()) {
@@ -1651,7 +1651,7 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request,
   // takes place between requests.
   // Lock ordering: update_lock_ must be acquired before lock_.
   std::unique_lock<simple_spinlock> update_guard(update_lock_, std::defer_lock);
-  if (FLAGS_enable_leader_failure_detection) {
+  if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
     update_guard.try_lock();
   } else {
     // If failure detection is not enabled, then we can't just reject the vote,
@@ -1663,10 +1663,6 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request,
     // other request is likely to reset the timer, and we'll end up just voting
     // "NO" after waiting. To avoid starving RPC handlers and causing cascading
     // timeouts, just vote a quick NO.
-    //
-    // We still need to take the state lock in order to respond with term info, etc.
-    ThreadRestrictions::AssertWaitAllowed();
-    LockGuard l(lock_);
     return RequestVoteRespondIsBusy(request, response);
   }
 
@@ -2236,7 +2232,13 @@ Status RaftConsensus::AdvanceTermForTests(int64_t new_term) {
   return HandleTermAdvanceUnlocked(new_term);
 }
 
-std::string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const {
+string RaftConsensus::GetRequestVoteLogPrefixThreadSafe(const VoteRequestPB& request) const {
+  return Substitute("$0Leader $1election vote request",
+                    LogPrefixThreadSafe(),
+                    request.is_pre_election() ? "pre-" : "");
+}
+
+string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const {
   DCHECK(lock_.is_locked());
   return Substitute("$0Leader $1election vote request",
                     LogPrefixUnlocked(),
@@ -2248,11 +2250,15 @@ void RaftConsensus::FillVoteResponseVoteGranted(VoteResponsePB* response) {
   response->set_vote_granted(true);
 }
 
-void RaftConsensus::FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code,
-                                               VoteResponsePB* response) {
-  response->set_responder_term(CurrentTermUnlocked());
+void RaftConsensus::FillVoteResponseVoteDenied(
+    ConsensusErrorPB::Code error_code,
+    VoteResponsePB* response,
+    ResponderTermPolicy responder_term_policy) {
   response->set_vote_granted(false);
   response->mutable_consensus_error()->set_code(error_code);
+  if (responder_term_policy == ResponderTermPolicy::SET) {
+    response->set_responder_term(CurrentTermUnlocked());
+  }
 }
 
 Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request,
@@ -2321,20 +2327,24 @@ Status RaftConsensus::RequestVoteRespondLeaderIsAlive(const VoteRequestPB* reque
                           request->candidate_uuid(),
                           request->candidate_term());
   LOG(INFO) << msg;
-  StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
+  StatusToPB(Status::InvalidArgument(msg),
+             response->mutable_consensus_error()->mutable_status());
   return Status::OK();
 }
 
-Status RaftConsensus::RequestVoteRespondIsBusy(const VoteRequestPB* request,
-                                               VoteResponsePB* response) {
-  FillVoteResponseVoteDenied(ConsensusErrorPB::CONSENSUS_BUSY, response);
-  string msg = Substitute("$0: Denying vote to candidate $1 for term $2 because "
-                          "replica is already servicing an update from a current leader "
-                          "or another vote.",
-                          GetRequestVoteLogPrefixUnlocked(*request),
-                          request->candidate_uuid(),
-                          request->candidate_term());
-  LOG(INFO) << msg;
+Status RaftConsensus::RequestVoteRespondIsBusy(
+    const VoteRequestPB* request, VoteResponsePB* response) {
+  // Don't set the term in the response: the requestor doesn't need it
+  // to process the NO vote response in this case.
+  FillVoteResponseVoteDenied(ConsensusErrorPB::CONSENSUS_BUSY, response,
+                             ResponderTermPolicy::DO_NOT_SET);
+  auto msg = Substitute("$0: Denying vote to candidate $1 for term $2 because "
+                        "replica is already servicing an update from "
+                        "a current leader or another vote",
+                        GetRequestVoteLogPrefixThreadSafe(*request),
+                        request->candidate_uuid(),
+                        request->candidate_term());
+  VLOG(1) << msg;
   StatusToPB(Status::ServiceUnavailable(msg),
              response->mutable_consensus_error()->mutable_status());
   return Status::OK();
@@ -2709,7 +2719,7 @@ log::RetentionIndexes RaftConsensus::GetRetentionIndexes() {
                                queue_->GetAllReplicatedIndex()); // for peers
 }
 
-void RaftConsensus::MarkDirty(const std::string& reason) {
+void RaftConsensus::MarkDirty(const string& reason) {
   WARN_NOT_OK(raft_pool_token_->SubmitClosure(Bind(mark_dirty_clbk_, reason)),
               LogPrefixThreadSafe() + "Unable to run MarkDirty callback");
 }
@@ -3062,7 +3072,7 @@ const bool RaftConsensus::HasVotedCurrentTermUnlocked() const {
   return cmeta_->has_voted_for();
 }
 
-Status RaftConsensus::SetVotedForCurrentTermUnlocked(const std::string& uuid) {
+Status RaftConsensus::SetVotedForCurrentTermUnlocked(const string& uuid) {
   TRACE_EVENT1("consensus", "RaftConsensus::SetVotedForCurrentTermUnlocked",
                "uuid", uuid);
   DCHECK(lock_.is_locked());
@@ -3071,7 +3081,7 @@ Status RaftConsensus::SetVotedForCurrentTermUnlocked(const std::string& uuid) {
   return Status::OK();
 }
 
-const std::string& RaftConsensus::GetVotedForCurrentTermUnlocked() const {
+const string& RaftConsensus::GetVotedForCurrentTermUnlocked() const {
   DCHECK(lock_.is_locked());
   DCHECK(cmeta_->has_voted_for());
   return cmeta_->voted_for();
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 95b3120..ddf5020 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -552,7 +552,11 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // Returns true if this node is the only voter in the Raft configuration.
   bool IsSingleVoterConfig() const;
 
-  // Return header string for RequestVote log messages. 'lock_' must be held.
+  // Return header string for RequestVote log messages, no 'lock_' is necessary.
+  std::string GetRequestVoteLogPrefixThreadSafe(const VoteRequestPB& request) const;
+
+  // Similar to the method above, but outputs more detailed information on the
+  // metadata of the RaftConsensus object. 'lock_' must be held.
   std::string GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const;
 
   // Fills the response with the current status, if an update was successful.
@@ -568,11 +572,23 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // - Set vote_granted to true.
   void FillVoteResponseVoteGranted(VoteResponsePB* response);
 
+  // Enum for the 'responder_term' parameter of the FillVoterResponseVoteDenied()
+  // method below. Controls whether to populate the 'responder_term' field
+  // in the 'response' output parameter.
+  enum class ResponderTermPolicy {
+    DO_NOT_SET,  // don't set the field
+    SET,          // populate/set the field
+  };
+
   // Fill VoteResponsePB with the following information:
-  // - Update responder_term to current local term.
   // - Set vote_granted to false.
   // - Set consensus_error.code to the given code.
-  void FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code, VoteResponsePB* response);
+  // - Set or leave the responder_term field unset as prescribed by the
+  //   'responder_term' parameter.
+  void FillVoteResponseVoteDenied(
+      ConsensusErrorPB::Code error_code,
+      VoteResponsePB* response,
+      ResponderTermPolicy responder_term_policy = ResponderTermPolicy::SET);
 
   // Respond to VoteRequest that the candidate has an old term.
   Status RequestVoteRespondInvalidTerm(const VoteRequestPB* request, VoteResponsePB* response);