You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/04/04 19:04:09 UTC

[5/5] impala git commit: IMPALA-6785: reset failed heartbeat count when re-registering

IMPALA-6785: reset failed heartbeat count when re-registering

When a subscriber re-registers with the same subscriber ID, we need
to reset failure detection info so that we don't erroneously think
that the subscriber has failed.

The bug is a mix-up between whether the subscriber or registration ID is
the key for the failure detected. The symptom was that, if the first
topic update won the race with the first heartbeat, no subsequent topic
updates were delivered. It was introduced when IMPALA-3613 updated some,
but not all, places to use the subscriber ID as the key. It wasn't
detected because we had no tests directly testing this code path, and
it appears that the race almost never happens on smaller clusters.

This patch fixes the problem in two places. Fixing either place is
actually sufficient to fix the bug.

Testing:
Add a regression that reliably reproduces the issue by delaying the
heartbeat and checking that topic updates continue to be sent. The
test reliably fails before the fix and passes after.

Change-Id: I2ad409e2a8e22d081fce97b085b9469ab046bf07
Reviewed-on: http://gerrit.cloudera.org:8080/9913
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/890bd4c7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/890bd4c7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/890bd4c7

Branch: refs/heads/master
Commit: 890bd4c72babef838bf00b71d62ac9605719e897
Parents: 4c1f0ac
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Apr 3 14:13:28 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Apr 4 18:58:42 2018 +0000

----------------------------------------------------------------------
 be/src/statestore/statestore.cc     |  8 +++++---
 be/src/statestore/statestore.h      |  6 +++++-
 tests/statestore/test_statestore.py | 28 +++++++++++++++++++++++++++-
 3 files changed, 37 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/890bd4c7/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 1072dee..02363fe 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -548,8 +548,7 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
     shared_ptr<Subscriber> current_registration(
         new Subscriber(subscriber_id, *registration_id, location, topic_registrations));
     subscribers_.emplace(subscriber_id, current_registration);
-    failure_detector_->UpdateHeartbeat(
-        PrintId(current_registration->registration_id()), true);
+    failure_detector_->UpdateHeartbeat(subscriber_id, true);
     num_subscribers_metric_->SetValue(subscribers_.size());
     subscriber_set_metric_->Add(subscriber_id);
 
@@ -878,6 +877,9 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
                   << "or re-registered (last known registration ID: "
                   << update.registration_id << ")";
         UnregisterSubscriber(subscriber.get());
+      } else {
+        LOG(INFO) << "Failure was already detected for subscriber '" << subscriber->id()
+                  << "'. Won't send another " << update_kind_str;
       }
     } else {
       // Schedule the next message.
@@ -907,7 +909,7 @@ void Statestore::UnregisterSubscriber(Subscriber* subscriber) {
   heartbeat_client_cache_->CloseConnections(subscriber->network_address());
 
   // Prevent the failure detector from growing without bound
-  failure_detector_->EvictPeer(PrintId(subscriber->registration_id()));
+  failure_detector_->EvictPeer(subscriber->id());
 
   // Delete all transient entries
   {

http://git-wip-us.apache.org/repos/asf/impala/blob/890bd4c7/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 3058e94..482e48b 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -517,7 +517,11 @@ class Statestore : public CacheLineAligned {
 
   /// Failure detector for subscribers. If a subscriber misses a configurable number of
   /// consecutive heartbeat messages, it is considered failed and a) its transient topic
-  /// entries are removed and b) its entry in the subscriber map is erased.
+  /// entries are removed and b) its entry in the subscriber map is erased. The
+  /// subscriber ID is used to identify peers for failure detection purposes. Subscriber
+  /// state is evicted from the failure detector when the subscriber is unregistered,
+  /// so old subscribers do not occupy memory and the failure detection state does not
+  /// carry over to any new registrations of the previous subscriber.
   boost::scoped_ptr<MissedHeartbeatFailureDetector> failure_detector_;
 
   /// Metric that track the registered, non-failed subscribers.

http://git-wip-us.apache.org/repos/asf/impala/blob/890bd4c7/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index ba51c8d..737ff9c 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -17,6 +17,7 @@
 
 from collections import defaultdict
 import json
+import logging
 import socket
 import threading
 import traceback
@@ -39,6 +40,8 @@ from Status.ttypes import TStatus
 
 from tests.common.environ import specific_build_type_timeout
 
+LOG = logging.getLogger('test_statestore')
+
 # Tests for the statestore. The StatestoreSubscriber class is a skeleton implementation of
 # a Python-based statestore subscriber with additional hooks to allow testing. Each
 # StatestoreSubscriber runs its own server so that the statestore may contact it.
@@ -174,6 +177,7 @@ class StatestoreSubscriber(object):
     # Variables to notify for updates on each topic.
     self.update_event = threading.Condition()
     self.heartbeat_cb, self.update_cb = heartbeat_cb, update_cb
+    self.subscriber_id = "python-test-client-%s" % uuid.uuid1()
     self.exception = None
 
   def Heartbeat(self, args):
@@ -250,7 +254,6 @@ class StatestoreSubscriber(object):
 
   def register(self, topics=None):
     """Call the Register() RPC"""
-    self.subscriber_id = "python-test-client-%s" % uuid.uuid1()
     if topics is None: topics = []
     request = Subscriber.TRegisterSubscriberRequest(
       topic_registrations=topics,
@@ -515,3 +518,26 @@ class TestStatestore():
           .wait_for_update(persistent_topic_name, 1)
           .wait_for_update(transient_topic_name, 1)
     )
+
+  def test_heartbeat_failure_reset(self):
+    """Regression test for IMPALA-6785: the heartbeat failure count for the subscriber ID
+    should be reset when it resubscribes, not after the first successful heartbeat. Delay
+    the heartbeat to force the topic update to finish first."""
+
+    sub = StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(0.5))
+    topic_name = "test_heartbeat_failure_reset"
+    reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
+    sub.start()
+    sub.register(topics=[reg])
+    LOG.info("Registered with id {0}".format(sub.subscriber_id))
+    sub.wait_for_heartbeat(1)
+    sub.kill()
+    LOG.info("Killed, waiting for statestore to detect failure via heartbeats")
+    sub.wait_for_failure()
+    # IMPALA-6785 caused only one topic update to be send. Wait for multiple updates to
+    # be received to confirm that the subsequent updates are being scheduled repeatedly.
+    target_updates = sub.update_counts[topic_name] + 5
+    sub.start()
+    sub.register(topics=[reg])
+    LOG.info("Re-registered with id {0}, waiting for update".format(sub.subscriber_id))
+    sub.wait_for_update(topic_name, target_updates)