You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/11/19 23:05:31 UTC

impala git commit: IMPALA-7857: log more information about statestore failure detection

Repository: impala
Updated Branches:
  refs/heads/master e1c9cbd07 -> 93a0ce857


IMPALA-7857: log more information about statestore failure detection

This adds a couple of log messages for state transitions in the
statestore's failure detector.

Testing:
Ran test_statestore.py and checked for presence of new log messages.

Added a new tests to test_statestore that exercises handling of
intermittent heartbeat failures (required to produce one of the new log
messages).

Change-Id: Ie6ff85bee117000e4434dcffd3d1680a79905f14
Reviewed-on: http://gerrit.cloudera.org:8080/11937
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/93a0ce85
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/93a0ce85
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/93a0ce85

Branch: refs/heads/master
Commit: 93a0ce857f181f2fe4248252428fc2adfdf1bdb7
Parents: e1c9cbd
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Nov 15 14:42:26 2018 -0800
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon Nov 19 22:55:40 2018 +0000

----------------------------------------------------------------------
 be/src/statestore/failure-detector.cc | 28 +++++++++++++++++++---------
 be/src/statestore/failure-detector.h  |  3 +++
 tests/statestore/test_statestore.py   | 22 ++++++++++++++++++++++
 3 files changed, 44 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/93a0ce85/be/src/statestore/failure-detector.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/failure-detector.cc b/be/src/statestore/failure-detector.cc
index 9aeaff8..8043bed 100644
--- a/be/src/statestore/failure-detector.cc
+++ b/be/src/statestore/failure-detector.cc
@@ -76,30 +76,40 @@ FailureDetector::PeerState MissedHeartbeatFailureDetector::UpdateHeartbeat(
     const string& peer, bool seen) {
   {
     lock_guard<mutex> l(lock_);
+    int32_t* missed_heartbeat_count = &missed_heartbeat_counts_[peer];
     if (seen) {
-      missed_heartbeat_counts_[peer] = 0;
+      if (*missed_heartbeat_count != 0) {
+        LOG(INFO) << "Heartbeat for '" << peer << "' succeeded after "
+                  << *missed_heartbeat_count << " missed heartbeats. "
+                  << "Resetting missed heartbeat count.";
+        *missed_heartbeat_count = 0;
+      }
       return OK;
     } else {
-      ++missed_heartbeat_counts_[peer];
+      ++(*missed_heartbeat_count);
+      LOG(INFO) << *missed_heartbeat_count << " consecutive heartbeats failed for "
+                << "'" << peer << "'. State is "
+                << PeerStateToString(ComputePeerState(*missed_heartbeat_count));
     }
   }
-
   return GetPeerState(peer);
 }
 
 FailureDetector::PeerState MissedHeartbeatFailureDetector::GetPeerState(
     const string& peer) {
   lock_guard<mutex> l(lock_);
-  map<string, int32_t>::iterator heartbeat_record = missed_heartbeat_counts_.find(peer);
+  auto it = missed_heartbeat_counts_.find(peer);
+  if (it == missed_heartbeat_counts_.end()) return UNKNOWN;
+  return ComputePeerState(it->second);
+}
 
-  if (heartbeat_record == missed_heartbeat_counts_.end()) {
-    return UNKNOWN;
-  } else if (heartbeat_record->second > max_missed_heartbeats_) {
+FailureDetector::PeerState MissedHeartbeatFailureDetector::ComputePeerState(
+    int32_t missed_heatbeat_count) {
+  if (missed_heatbeat_count > max_missed_heartbeats_) {
     return FAILED;
-  } else if (heartbeat_record->second > suspect_missed_heartbeats_) {
+  } else if (missed_heatbeat_count > suspect_missed_heartbeats_) {
     return SUSPECTED;
   }
-
   return OK;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/93a0ce85/be/src/statestore/failure-detector.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/failure-detector.h b/be/src/statestore/failure-detector.h
index c3f504e..5962d06 100644
--- a/be/src/statestore/failure-detector.h
+++ b/be/src/statestore/failure-detector.h
@@ -126,6 +126,9 @@ class MissedHeartbeatFailureDetector : public FailureDetector {
   virtual void EvictPeer(const std::string& peer);
 
  private:
+  /// Computes the PeerState from the missed heartbeat count.
+  PeerState ComputePeerState(int32_t missed_heatbeat_count);
+
   /// Protects all members
   boost::mutex lock_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/93a0ce85/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index a951414..f9e61cf 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -532,6 +532,28 @@ class TestStatestore():
            .wait_for_failure(timeout=60)
        )
 
+  def test_intermittent_hung_heartbeats(self):
+    """Heartbeats that occasionally time out should not cause a failure to be detected."""
+    heartbeat_count = [0]  # Use array to allow mutating from inside callback.
+
+    def heartbeat_cb(sub, args):
+      heartbeat_count[0] += 1
+      # Delay every second heartbeat.
+      if (heartbeat_count[0] % 2 == 1):
+        time.sleep(4)
+      return Subscriber.THeartbeatResponse()
+
+    with StatestoreSubscriber(heartbeat_cb=heartbeat_cb) as sub:
+      topic_name = "test_intermittent_hung_heartbeats"
+      reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
+      (
+        sub.start()
+           .register(topics=[reg])
+           .wait_for_update(topic_name, 30)
+           .kill()
+           .wait_for_failure()
+       )
+
   def test_slow_subscriber(self):
     """Test for IMPALA-6644: This test kills a healthy subscriber and sleeps for a random
     interval between 1 and 9 seconds, this lets the heartbeats fail without removing the