You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/06/01 22:59:43 UTC

[2/2] kudu git commit: [consensus_queue] fix race in UpdateLagMetrics()

[consensus_queue] fix race in UpdateLagMetrics()

TSAN reports warnings on races on writing/reading the
QueueState::last_idx_appended_to_leader field:

CatalogManagerTskITest.LeadershipChangeOnTskGeneration: WARNING: ThreadSanitizer: data race (pid=2710)  Write of size 8 at 0x7d500003e480 by thread T33 (mutexes: write M1863, write M1821):
    #0 kudu::consensus::PeerMessageQueue::UpdateLastIndexAppendedToLeader(long) consensus/consensus_queue.cc:607:44
    #1 kudu::consensus::RaftConsensus::UpdateReplica(kudu::consensus::ConsensusRequestPB const*, kudu::consensus::ConsensusResponsePB*) consensus/raft_consensus.cc:1155:13
    #2 kudu::consensus::RaftConsensus::Update(kudu::consensus::ConsensusRequestPB const*, kudu::consensus::ConsensusResponsePB*) consensus/raft_consensus.cc:752:14
    #3 kudu::tserver::ConsensusServiceImpl::UpdateConsensus(kudu::consensus::ConsensusRequestPB const*, kudu::consensus::ConsensusResponsePB*, kudu::rpc::RpcContext*) tserver/tablet_service.cc:861:25
    #4 kudu::consensus::ConsensusServiceIf::ConsensusServiceIf(scoped_refptr<kudu::MetricEntity> const&, scoped_refptr<kudu::rpc::ResultTracker> const&)::$_1::operator()(google::protobuf::Message const*, google::protobuf::Message*, kudu::rpc::RpcContext*) const consensus/consensus.service.cc:100:13
    ... skipped ...

  Previous read of size 8 at 0x7d500003e480 by thread T79 (mutexes: write M1822):
    #0 kudu::consensus::PeerMessageQueue::UpdateLagMetrics() consensus/consensus_queue.cc:602:20
    #1 kudu::consensus::PeerMessageQueue::UpdateMetrics() consensus/consensus_queue.cc:875:3
    #2 kudu::consensus::PeerMessageQueue::ResponseFromPeer(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, kudu::consensus::ConsensusResponsePB const&, bool*) consensus/consensus_queue.cc:828:5
    ... skipped ...

For more details, see
  http://dist-test.cloudera.org:8080/diagnose?key=0784c33a-41b5-11e7-9f6b-0242ac11000f

This patch fixes the above race.  Also, it contains some extras:
  * The PeerMessageQueue::UpdateLagMetrics() method has been renamed
    into PeerMessageQueue::UpdateLagMetricsUnlocked() and made private.
  * The PeerMessageQueue::UpdateMetrics() method has been renamed
    into PeerMessageQueue::UpdateMetricsUnlocked().
  * Added DCHECK(queue_lock_.is_locked()) into all
    PeerMessageQueue::XxxUnlocked() methods.

Change-Id: I25feb676619cc1f3a94fb8e631bffd8ca02ead49
Reviewed-on: http://gerrit.cloudera.org:8080/7032
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5f8442ff
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5f8442ff
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5f8442ff

Branch: refs/heads/master
Commit: 5f8442ff67fe87b019c71a09f0556bdcb6868428
Parents: a71a443
Author: Alexey Serbin <as...@cloudera.com>
Authored: Wed May 31 11:02:48 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Thu Jun 1 22:58:30 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_queue.cc | 30 +++++++++++++++++++-----------
 src/kudu/consensus/consensus_queue.h  | 10 +++++-----
 2 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5f8442ff/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 8bc5590..6ae73f0 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -193,6 +193,7 @@ void PeerMessageQueue::TrackPeer(const string& uuid) {
 
 void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) {
   CHECK(!uuid.empty()) << "Got request to track peer with empty UUID";
+  DCHECK(queue_lock_.is_locked());
   DCHECK_EQ(queue_state_.state, kQueueOpen);
 
   TrackedPeer* tracked_peer = new TrackedPeer(uuid);
@@ -223,6 +224,7 @@ void PeerMessageQueue::UntrackPeer(const string& uuid) {
 }
 
 void PeerMessageQueue::CheckPeersInActiveConfigIfLeaderUnlocked() const {
+  DCHECK(queue_lock_.is_locked());
   if (queue_state_.mode != LEADER) return;
   unordered_set<string> config_peer_uuids;
   for (const RaftPeerPB& peer_pb : queue_state_.active_config->peers()) {
@@ -312,7 +314,7 @@ Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
                                                  log_append_callback)));
   lock.lock();
   queue_state_.last_appended = last_id;
-  UpdateMetrics();
+  UpdateMetricsUnlocked();
 
   return Status::OK();
 }
@@ -594,18 +596,14 @@ void PeerMessageQueue::UpdateFollowerWatermarks(int64_t committed_index,
   DCHECK_EQ(queue_state_.mode, NON_LEADER);
   queue_state_.committed_index = committed_index;
   queue_state_.all_replicated_index = all_replicated_index;
-  UpdateMetrics();
-}
-
-void PeerMessageQueue::UpdateLagMetrics() {
-  metrics_.num_ops_behind_leader->set_value(queue_state_.mode == LEADER ? 0 :
-      queue_state_.last_idx_appended_to_leader - queue_state_.last_appended.index());
+  UpdateMetricsUnlocked();
 }
 
 void PeerMessageQueue::UpdateLastIndexAppendedToLeader(int64_t last_idx_appended_to_leader) {
+  std::lock_guard<simple_spinlock> l(queue_lock_);
   DCHECK_EQ(queue_state_.mode, NON_LEADER);
   queue_state_.last_idx_appended_to_leader = last_idx_appended_to_leader;
-  UpdateLagMetrics();
+  UpdateLagMetricsUnlocked();
 }
 
 void PeerMessageQueue::NotifyPeerIsResponsive(const std::string& peer_uuid) {
@@ -825,7 +823,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
 
     log_cache_.EvictThroughOp(queue_state_.all_replicated_index);
 
-    UpdateMetrics();
+    UpdateMetricsUnlocked();
   }
 
   if (mode_copy == LEADER && updated_commit_index != boost::none) {
@@ -861,7 +859,8 @@ int64_t PeerMessageQueue::GetMajorityReplicatedIndexForTests() const {
 }
 
 
-void PeerMessageQueue::UpdateMetrics() {
+void PeerMessageQueue::UpdateMetricsUnlocked() {
+  DCHECK(queue_lock_.is_locked());
   // Since operations have consecutive indices we can update the metrics based
   // on simple index math.
   // For non-leaders, majority_done_ops isn't meaningful because followers don't
@@ -872,7 +871,13 @@ void PeerMessageQueue::UpdateMetrics() {
   metrics_.num_in_progress_ops->set_value(
     queue_state_.last_appended.index() - queue_state_.committed_index);
 
-  UpdateLagMetrics();
+  UpdateLagMetricsUnlocked();
+}
+
+void PeerMessageQueue::UpdateLagMetricsUnlocked() {
+  DCHECK(queue_lock_.is_locked());
+  metrics_.num_ops_behind_leader->set_value(queue_state_.mode == LEADER ? 0 :
+      queue_state_.last_idx_appended_to_leader - queue_state_.last_appended.index());
 }
 
 void PeerMessageQueue::DumpToStrings(vector<string>* lines) const {
@@ -881,6 +886,7 @@ void PeerMessageQueue::DumpToStrings(vector<string>* lines) const {
 }
 
 void PeerMessageQueue::DumpToStringsUnlocked(vector<string>* lines) const {
+  DCHECK(queue_lock_.is_locked());
   lines->push_back("Watermarks:");
   for (const PeersMap::value_type& entry : peers_map_) {
     lines->push_back(
@@ -908,6 +914,7 @@ void PeerMessageQueue::DumpToHtml(std::ostream& out) const {
 }
 
 void PeerMessageQueue::ClearUnlocked() {
+  DCHECK(queue_lock_.is_locked());
   STLDeleteValues(&peers_map_);
   queue_state_.state = kQueueClosed;
 }
@@ -930,6 +937,7 @@ string PeerMessageQueue::ToString() const {
 }
 
 string PeerMessageQueue::ToStringUnlocked() const {
+  DCHECK(queue_lock_.is_locked());
   return Substitute("Consensus queue metrics: "
                     "Only Majority Done Ops: $0, In Progress Ops: $1, Cache: $2",
                     metrics_.num_majority_done_ops->value(), metrics_.num_in_progress_ops->value(),

http://git-wip-us.apache.org/repos/asf/kudu/blob/5f8442ff/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 23c34b7..f513753 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -244,10 +244,6 @@ class PeerMessageQueue {
   void UpdateFollowerWatermarks(int64_t committed_index,
                                 int64_t all_replicated_index);
 
-  // Update the metric that measures how many ops behind the leader the local
-  // replica believes it is (0 if leader).
-  void UpdateLagMetrics();
-
   // Updates the last op appended to the leader and the corresponding lag metric.
   // This should not be called by a leader.
   void UpdateLastIndexAppendedToLeader(int64_t last_idx_appended_to_leader);
@@ -398,7 +394,11 @@ class PeerMessageQueue {
   void DumpToStringsUnlocked(std::vector<std::string>* lines) const;
 
   // Updates the metrics based on index math.
-  void UpdateMetrics();
+  void UpdateMetricsUnlocked();
+
+  // Update the metric that measures how many ops behind the leader the local
+  // replica believes it is (0 if leader).
+  void UpdateLagMetricsUnlocked();
 
   void ClearUnlocked();