You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/11/19 23:05:31 UTC
impala git commit: IMPALA-7857: log more information about statestore
failure detection
Repository: impala
Updated Branches:
refs/heads/master e1c9cbd07 -> 93a0ce857
IMPALA-7857: log more information about statestore failure detection
This adds a couple of log messages for state transitions in the
statestore's failure detector.
Testing:
Ran test_statestore.py and checked for presence of new log messages.
Added a new tests to test_statestore that exercises handling of
intermittent heartbeat failures (required to produce one of the new log
messages).
Change-Id: Ie6ff85bee117000e4434dcffd3d1680a79905f14
Reviewed-on: http://gerrit.cloudera.org:8080/11937
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/93a0ce85
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/93a0ce85
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/93a0ce85
Branch: refs/heads/master
Commit: 93a0ce857f181f2fe4248252428fc2adfdf1bdb7
Parents: e1c9cbd
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Nov 15 14:42:26 2018 -0800
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon Nov 19 22:55:40 2018 +0000
----------------------------------------------------------------------
be/src/statestore/failure-detector.cc | 28 +++++++++++++++++++---------
be/src/statestore/failure-detector.h | 3 +++
tests/statestore/test_statestore.py | 22 ++++++++++++++++++++++
3 files changed, 44 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/93a0ce85/be/src/statestore/failure-detector.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/failure-detector.cc b/be/src/statestore/failure-detector.cc
index 9aeaff8..8043bed 100644
--- a/be/src/statestore/failure-detector.cc
+++ b/be/src/statestore/failure-detector.cc
@@ -76,30 +76,40 @@ FailureDetector::PeerState MissedHeartbeatFailureDetector::UpdateHeartbeat(
const string& peer, bool seen) {
{
lock_guard<mutex> l(lock_);
+ int32_t* missed_heartbeat_count = &missed_heartbeat_counts_[peer];
if (seen) {
- missed_heartbeat_counts_[peer] = 0;
+ if (*missed_heartbeat_count != 0) {
+ LOG(INFO) << "Heartbeat for '" << peer << "' succeeded after "
+ << *missed_heartbeat_count << " missed heartbeats. "
+ << "Resetting missed heartbeat count.";
+ *missed_heartbeat_count = 0;
+ }
return OK;
} else {
- ++missed_heartbeat_counts_[peer];
+ ++(*missed_heartbeat_count);
+ LOG(INFO) << *missed_heartbeat_count << " consecutive heartbeats failed for "
+ << "'" << peer << "'. State is "
+ << PeerStateToString(ComputePeerState(*missed_heartbeat_count));
}
}
-
return GetPeerState(peer);
}
FailureDetector::PeerState MissedHeartbeatFailureDetector::GetPeerState(
const string& peer) {
lock_guard<mutex> l(lock_);
- map<string, int32_t>::iterator heartbeat_record = missed_heartbeat_counts_.find(peer);
+ auto it = missed_heartbeat_counts_.find(peer);
+ if (it == missed_heartbeat_counts_.end()) return UNKNOWN;
+ return ComputePeerState(it->second);
+}
- if (heartbeat_record == missed_heartbeat_counts_.end()) {
- return UNKNOWN;
- } else if (heartbeat_record->second > max_missed_heartbeats_) {
+FailureDetector::PeerState MissedHeartbeatFailureDetector::ComputePeerState(
+ int32_t missed_heatbeat_count) {
+ if (missed_heatbeat_count > max_missed_heartbeats_) {
return FAILED;
- } else if (heartbeat_record->second > suspect_missed_heartbeats_) {
+ } else if (missed_heatbeat_count > suspect_missed_heartbeats_) {
return SUSPECTED;
}
-
return OK;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/93a0ce85/be/src/statestore/failure-detector.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/failure-detector.h b/be/src/statestore/failure-detector.h
index c3f504e..5962d06 100644
--- a/be/src/statestore/failure-detector.h
+++ b/be/src/statestore/failure-detector.h
@@ -126,6 +126,9 @@ class MissedHeartbeatFailureDetector : public FailureDetector {
virtual void EvictPeer(const std::string& peer);
private:
+ /// Computes the PeerState from the missed heartbeat count.
+ PeerState ComputePeerState(int32_t missed_heatbeat_count);
+
/// Protects all members
boost::mutex lock_;
http://git-wip-us.apache.org/repos/asf/impala/blob/93a0ce85/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index a951414..f9e61cf 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -532,6 +532,28 @@ class TestStatestore():
.wait_for_failure(timeout=60)
)
+ def test_intermittent_hung_heartbeats(self):
+ """Heartbeats that occasionally time out should not cause a failure to be detected."""
+ heartbeat_count = [0] # Use array to allow mutating from inside callback.
+
+ def heartbeat_cb(sub, args):
+ heartbeat_count[0] += 1
+ # Delay every second heartbeat.
+ if (heartbeat_count[0] % 2 == 1):
+ time.sleep(4)
+ return Subscriber.THeartbeatResponse()
+
+ with StatestoreSubscriber(heartbeat_cb=heartbeat_cb) as sub:
+ topic_name = "test_intermittent_hung_heartbeats"
+ reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
+ (
+ sub.start()
+ .register(topics=[reg])
+ .wait_for_update(topic_name, 30)
+ .kill()
+ .wait_for_failure()
+ )
+
def test_slow_subscriber(self):
"""Test for IMPALA-6644: This test kills a healthy subscriber and sleeps for a random
interval between 1 and 9 seconds, this lets the heartbeats fail without removing the