You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/08/30 06:18:18 UTC

[4/7] impala git commit: IMPALA-6644: Add last heartbeat timestamp into Statestore metric

IMPALA-6644: Add last heartbeat timestamp into Statestore metric

After this patch, the statestore keeps track of the time since the
last heartbeat for each subscriber. It is exposed as a subscriber
metric on the statestore debug page. It also adds a monitoring
thread that periodically checks the last heartbeat timestamp for
all subscribers and logs the IDs of those that have not been
updated since the last periodic check.

Testing: Added an end to end test to verify the 'sec_since_heartbeat'
metric of a slow subscriber.

Change-Id: I754adccc4569e8219d5d01500cccdfc8782953f7
Reviewed-on: http://gerrit.cloudera.org:8080/11052
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8692bfbe
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8692bfbe
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8692bfbe

Branch: refs/heads/master
Commit: 8692bfbef657fe95da68e9dcaca9b49de331ccc3
Parents: 8848588
Author: poojanilangekar <po...@cloudera.com>
Authored: Tue Jul 24 18:01:21 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 29 22:06:00 2018 +0000

----------------------------------------------------------------------
 be/src/statestore/statestore.cc     | 55 ++++++++++++++++++++++++++++++--
 be/src/statestore/statestore.h      | 30 ++++++++++++++++-
 tests/statestore/test_statestore.py | 20 ++++++++++++
 www/statestore_subscribers.tmpl     |  2 ++
 4 files changed, 104 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 8749825..4e63dad 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -75,6 +75,8 @@ DEFINE_int32(statestore_num_heartbeat_threads, 10, "(Advanced) Number of threads
     " send heartbeats in parallel to all registered subscribers.");
 DEFINE_int32(statestore_heartbeat_frequency_ms, 1000, "(Advanced) Frequency (in ms) with"
     " which the statestore sends heartbeat heartbeats to subscribers.");
+DEFINE_double_hidden(heartbeat_monitoring_frequency_ms, 60000, "(Advanced) Frequency (in "
+    "ms) with which the statestore monitors heartbeats from a subscriber.");
 
 DEFINE_int32(state_store_port, 24000, "port where StatestoreService is running");
 
@@ -315,6 +317,7 @@ Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
   : subscriber_id_(subscriber_id),
     registration_id_(registration_id),
     network_address_(network_address) {
+  RefreshLastHeartbeatTimestamp();
   for (const TTopicRegistration& topic : subscribed_topics) {
     GetTopicsMapForId(topic.topic_name)
         ->emplace(piecewise_construct, forward_as_tuple(topic.topic_name),
@@ -388,6 +391,11 @@ void Statestore::Subscriber::SetLastTopicVersionProcessed(const TopicId& topic_i
   topic_it->second.last_version.Store(version);
 }
 
+void Statestore::Subscriber::RefreshLastHeartbeatTimestamp() {
+  DCHECK_GE(MonotonicMillis(), last_heartbeat_ts_.Load());
+  last_heartbeat_ts_.Store(MonotonicMillis());
+}
+
 Statestore::Statestore(MetricGroup* metrics)
   : subscriber_topic_update_threadpool_("statestore-update",
         "subscriber-update-worker",
@@ -419,7 +427,6 @@ Statestore::Statestore(MetricGroup* metrics)
     failure_detector_(new MissedHeartbeatFailureDetector(
         FLAGS_statestore_max_missed_heartbeats,
         FLAGS_statestore_max_missed_heartbeats / 2)) {
-
   DCHECK(metrics != NULL);
   metrics_ = metrics;
   num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0);
@@ -440,6 +447,10 @@ Statestore::Statestore(MetricGroup* metrics)
   heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat");
 }
 
+Statestore::~Statestore() {
+  CHECK(initialized_) << "Cannot shutdown Statestore once initialized.";
+}
+
 Status Statestore::Init(int32_t state_store_port) {
   boost::shared_ptr<TProcessor> processor(new StatestoreServiceProcessor(thrift_iface()));
   boost::shared_ptr<TProcessorEventHandler> event_handler(
@@ -464,6 +475,9 @@ Status Statestore::Init(int32_t state_store_port) {
   RETURN_IF_ERROR(subscriber_topic_update_threadpool_.Init());
   RETURN_IF_ERROR(subscriber_priority_topic_update_threadpool_.Init());
   RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init());
+  RETURN_IF_ERROR(Thread::Create("statestore-heartbeat", "heartbeat-monitoring-thread",
+      &Statestore::MonitorSubscriberHeartbeat, this, &heartbeat_monitoring_thread_));
+  initialized_ = true;
 
   return Status::OK();
 }
@@ -536,6 +550,12 @@ void Statestore::SubscribersHandler(const Webserver::ArgumentMap& args,
         document->GetAllocator());
     sub_json.AddMember("registration_id", registration_id, document->GetAllocator());
 
+    Value secs_since_heartbeat(
+        StringPrintf("%.3f", subscriber.second->SecondsSinceHeartbeat()).c_str(),
+        document->GetAllocator());
+    sub_json.AddMember(
+        "secs_since_heartbeat", secs_since_heartbeat, document->GetAllocator());
+
     subscribers.PushBack(sub_json, document->GetAllocator());
   }
   document->AddMember("subscribers", subscribers, document->GetAllocator());
@@ -898,7 +918,9 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
   Status status;
   if (is_heartbeat) {
     status = SendHeartbeat(subscriber.get());
-    if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
+    if (status.ok()) {
+      subscriber->RefreshLastHeartbeatTimestamp();
+    } else if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
       // Add details to status to make it more useful, while preserving the stack
       status.AddDetail(Substitute(
           "Subscriber $0 timed-out during heartbeat RPC. Timeout is $1s.",
@@ -968,6 +990,35 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
   }
 }
 
+[[noreturn]] void Statestore::MonitorSubscriberHeartbeat() {
+  while (1) {
+    int num_subscribers;
+    vector<SubscriberId> inactive_subscribers;
+    SleepForMs(FLAGS_heartbeat_monitoring_frequency_ms);
+    {
+      lock_guard<mutex> l(subscribers_lock_);
+      num_subscribers = subscribers_.size();
+      for (const auto& subscriber : subscribers_) {
+        if (subscriber.second->SecondsSinceHeartbeat()
+            > FLAGS_heartbeat_monitoring_frequency_ms) {
+          inactive_subscribers.push_back(subscriber.second->id());
+        }
+      }
+    }
+    if (inactive_subscribers.empty()) {
+      LOG(INFO) << "All " << num_subscribers
+                << " subscribers successfully heartbeat in the last "
+                << FLAGS_heartbeat_monitoring_frequency_ms << "ms.";
+    } else {
+      int num_active_subscribers = num_subscribers - inactive_subscribers.size();
+      LOG(WARNING) << num_active_subscribers << "/" << num_subscribers
+                   << " subscribers successfully heartbeat in the last "
+                   << FLAGS_heartbeat_monitoring_frequency_ms << "ms."
+                   << " Slow subscribers: " << boost::join(inactive_subscribers, ", ");
+    }
+  }
+}
+
 void Statestore::UnregisterSubscriber(Subscriber* subscriber) {
   SubscriberMap::const_iterator it = subscribers_.find(subscriber->id());
   if (it == subscribers_.end() ||

http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 9326492..52f7d68 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -18,6 +18,7 @@
 #ifndef STATESTORE_STATESTORE_H
 #define STATESTORE_STATESTORE_H
 
+#include <atomic>
 #include <cstdint>
 #include <map>
 #include <memory>
@@ -130,6 +131,9 @@ class Statestore : public CacheLineAligned {
   /// The only constructor; initialises member variables only.
   Statestore(MetricGroup* metrics);
 
+  /// Destructor, should not be called once the Statestore is initialized.
+  ~Statestore();
+
   /// Initialize and start the backing ThriftServer with port 'state_store_port'.
   /// Initialize the ThreadPools used for updates and heartbeats. Returns an error if
   /// any of the above initialization fails.
@@ -150,7 +154,7 @@ class Statestore : public CacheLineAligned {
 
   void RegisterWebpages(Webserver* webserver);
 
-  /// The main processing loop. Blocks until the exit flag is set.
+  /// The main processing loop. Runs infinitely.
   void MainLoop();
 
   /// Returns the Thrift API interface that proxies requests onto the local Statestore.
@@ -384,6 +388,12 @@ class Statestore : public CacheLineAligned {
     const SubscriberId& id() const { return subscriber_id_; }
     const RegistrationId& registration_id() const { return registration_id_; }
 
+    /// Returns the time elapsed (in seconds) since the last heartbeat.
+    double SecondsSinceHeartbeat() const {
+      return (static_cast<double>(MonotonicMillis() - last_heartbeat_ts_.Load()))
+          / 1000.0;
+    }
+
     /// Get the Topics map that would be used to store 'topic_id'.
     const Topics& GetTopicsMapForId(const TopicId& topic_id) const {
       return IsPrioritizedTopic(topic_id) ? priority_subscribed_topics_
@@ -427,6 +437,9 @@ class Statestore : public CacheLineAligned {
     void SetLastTopicVersionProcessed(const TopicId& topic_id,
         TopicEntry::Version version);
 
+    /// Refresh the subscriber's last heartbeat timestamp to the current monotonic time.
+    void RefreshLastHeartbeatTimestamp();
+
    private:
     /// Unique human-readable identifier for this subscriber, set by the subscriber itself
     /// on a Register call.
@@ -449,6 +462,10 @@ class Statestore : public CacheLineAligned {
     Topics priority_subscribed_topics_;
     Topics non_priority_subscribed_topics_;
 
+    /// The timestamp of the last successful heartbeat in milliseconds. A timestamp much
+    /// older than the heartbeat frequency implies an unresponsive subscriber.
+    AtomicInt64 last_heartbeat_ts_{0};
+
     /// Lock held when adding or deleting transient entries. See class comment for lock
     /// acquisition order.
     boost::mutex transient_entry_lock_;
@@ -534,6 +551,12 @@ class Statestore : public CacheLineAligned {
 
   ThreadPool<ScheduledSubscriberUpdate> subscriber_heartbeat_threadpool_;
 
+  /// Thread that monitors the heartbeats of all subscribers.
+  std::unique_ptr<Thread> heartbeat_monitoring_thread_;
+
+  /// Flag to indicate that the statestore has been initialized.
+  bool initialized_ = false;
+
   /// Cache of subscriber clients used for UpdateState() RPCs. Only one client per
   /// subscriber should be used, but the cache helps with the client lifecycle on failure.
   boost::scoped_ptr<StatestoreSubscriberClientCache> update_state_client_cache_;
@@ -683,6 +706,11 @@ class Statestore : public CacheLineAligned {
   void SubscribersHandler(const Webserver::ArgumentMap& args,
       rapidjson::Document* document);
 
+  /// Monitors the heartbeats of all subscribers every
+  /// FLAGS_heartbeat_monitoring_frequency_ms milliseconds. If a subscriber's
+  /// last_heartbeat_ts_ has not been updated in that interval, it logs the subscriber's
+  /// id.
+  [[noreturn]] void MonitorSubscriberHeartbeat();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index 8f26b63..23f8aa8 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -18,6 +18,7 @@
 from collections import defaultdict
 import json
 import logging
+from random import randint
 import socket
 import threading
 import traceback
@@ -524,6 +525,25 @@ class TestStatestore():
            .wait_for_failure(timeout=60)
        )
 
+  def test_slow_subscriber(self):
+    """Test for IMPALA-6644: This test kills a healthy subscriber and sleeps for a random
+    interval between 1 and 9 seconds, this lets the heartbeats fail without removing the
+    subscriber from the set of active subscribers. It then checks the subscribers page
+    of the statestore to ensure that the 'time_since_heartbeat' field is updated with an
+    acceptable value. Since the statestore heartbeats at 1 second intervals, an acceptable
+    value would be between ((sleep_time-1.0), (sleep_time+1.0))."""
+    sub = StatestoreSubscriber()
+    sub.start().register().wait_for_heartbeat(1)
+    sub.kill()
+    sleep_time = randint(1, 9)
+    time.sleep(sleep_time)
+    subscribers = get_statestore_subscribers()["subscribers"]
+    for s in subscribers:
+      if str(s["id"]) == sub.subscriber_id:
+        secs_since_heartbeat = float(s["secs_since_heartbeat"])
+        assert (secs_since_heartbeat > float(sleep_time - 1.0))
+        assert (secs_since_heartbeat < float(sleep_time + 1.0))
+
   def test_topic_persistence(self):
     """Test that persistent topic entries survive subscriber failure, but transent topic
     entries are erased when the associated subscriber fails"""

http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/www/statestore_subscribers.tmpl
----------------------------------------------------------------------
diff --git a/www/statestore_subscribers.tmpl b/www/statestore_subscribers.tmpl
index f57b4f6..77b07dc 100644
--- a/www/statestore_subscribers.tmpl
+++ b/www/statestore_subscribers.tmpl
@@ -28,6 +28,7 @@ under the License.
   <th>Subscribed priority topics</th>
   <th>Transient entries</th>
   <th>Registration ID</th>
+  <th>Seconds since last heartbeat</th>
 </tr>
 
 {{#subscribers}}
@@ -38,6 +39,7 @@ under the License.
   <td>{{num_priority_topics}}</td>
   <td>{{num_transient}}</td>
   <td>{{registration_id}}</td>
+  <td>{{secs_since_heartbeat}}</td>
 </tr>
 {{/subscribers}}