You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/08/30 06:18:18 UTC
[4/7] impala git commit: IMPALA-6644: Add last heartbeat timestamp
into Statestore metric
IMPALA-6644: Add last heartbeat timestamp into Statestore metric
After this patch, the statestore keeps track of the time since the
last heartbeat for each subscriber. It is exposed as a subscriber
metric on the statestore debug page. It also adds a monitoring
thread that periodically checks the last heartbeat timestamp for
all subscribers and logs the IDs of those that have not been
updated since the last periodic check.
Testing: Added an end to end test to verify the 'sec_since_heartbeat'
metric of a slow subscriber.
Change-Id: I754adccc4569e8219d5d01500cccdfc8782953f7
Reviewed-on: http://gerrit.cloudera.org:8080/11052
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8692bfbe
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8692bfbe
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8692bfbe
Branch: refs/heads/master
Commit: 8692bfbef657fe95da68e9dcaca9b49de331ccc3
Parents: 8848588
Author: poojanilangekar <po...@cloudera.com>
Authored: Tue Jul 24 18:01:21 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 29 22:06:00 2018 +0000
----------------------------------------------------------------------
be/src/statestore/statestore.cc | 55 ++++++++++++++++++++++++++++++--
be/src/statestore/statestore.h | 30 ++++++++++++++++-
tests/statestore/test_statestore.py | 20 ++++++++++++
www/statestore_subscribers.tmpl | 2 ++
4 files changed, 104 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 8749825..4e63dad 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -75,6 +75,8 @@ DEFINE_int32(statestore_num_heartbeat_threads, 10, "(Advanced) Number of threads
" send heartbeats in parallel to all registered subscribers.");
DEFINE_int32(statestore_heartbeat_frequency_ms, 1000, "(Advanced) Frequency (in ms) with"
" which the statestore sends heartbeat heartbeats to subscribers.");
+DEFINE_double_hidden(heartbeat_monitoring_frequency_ms, 60000, "(Advanced) Frequency (in "
+ "ms) with which the statestore monitors heartbeats from a subscriber.");
DEFINE_int32(state_store_port, 24000, "port where StatestoreService is running");
@@ -315,6 +317,7 @@ Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
: subscriber_id_(subscriber_id),
registration_id_(registration_id),
network_address_(network_address) {
+ RefreshLastHeartbeatTimestamp();
for (const TTopicRegistration& topic : subscribed_topics) {
GetTopicsMapForId(topic.topic_name)
->emplace(piecewise_construct, forward_as_tuple(topic.topic_name),
@@ -388,6 +391,11 @@ void Statestore::Subscriber::SetLastTopicVersionProcessed(const TopicId& topic_i
topic_it->second.last_version.Store(version);
}
+void Statestore::Subscriber::RefreshLastHeartbeatTimestamp() {
+ DCHECK_GE(MonotonicMillis(), last_heartbeat_ts_.Load());
+ last_heartbeat_ts_.Store(MonotonicMillis());
+}
+
Statestore::Statestore(MetricGroup* metrics)
: subscriber_topic_update_threadpool_("statestore-update",
"subscriber-update-worker",
@@ -419,7 +427,6 @@ Statestore::Statestore(MetricGroup* metrics)
failure_detector_(new MissedHeartbeatFailureDetector(
FLAGS_statestore_max_missed_heartbeats,
FLAGS_statestore_max_missed_heartbeats / 2)) {
-
DCHECK(metrics != NULL);
metrics_ = metrics;
num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0);
@@ -440,6 +447,10 @@ Statestore::Statestore(MetricGroup* metrics)
heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat");
}
+Statestore::~Statestore() {
+ CHECK(initialized_) << "Cannot shutdown Statestore once initialized.";
+}
+
Status Statestore::Init(int32_t state_store_port) {
boost::shared_ptr<TProcessor> processor(new StatestoreServiceProcessor(thrift_iface()));
boost::shared_ptr<TProcessorEventHandler> event_handler(
@@ -464,6 +475,9 @@ Status Statestore::Init(int32_t state_store_port) {
RETURN_IF_ERROR(subscriber_topic_update_threadpool_.Init());
RETURN_IF_ERROR(subscriber_priority_topic_update_threadpool_.Init());
RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init());
+ RETURN_IF_ERROR(Thread::Create("statestore-heartbeat", "heartbeat-monitoring-thread",
+ &Statestore::MonitorSubscriberHeartbeat, this, &heartbeat_monitoring_thread_));
+ initialized_ = true;
return Status::OK();
}
@@ -536,6 +550,12 @@ void Statestore::SubscribersHandler(const Webserver::ArgumentMap& args,
document->GetAllocator());
sub_json.AddMember("registration_id", registration_id, document->GetAllocator());
+ Value secs_since_heartbeat(
+ StringPrintf("%.3f", subscriber.second->SecondsSinceHeartbeat()).c_str(),
+ document->GetAllocator());
+ sub_json.AddMember(
+ "secs_since_heartbeat", secs_since_heartbeat, document->GetAllocator());
+
subscribers.PushBack(sub_json, document->GetAllocator());
}
document->AddMember("subscribers", subscribers, document->GetAllocator());
@@ -898,7 +918,9 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
Status status;
if (is_heartbeat) {
status = SendHeartbeat(subscriber.get());
- if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
+ if (status.ok()) {
+ subscriber->RefreshLastHeartbeatTimestamp();
+ } else if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
// Add details to status to make it more useful, while preserving the stack
status.AddDetail(Substitute(
"Subscriber $0 timed-out during heartbeat RPC. Timeout is $1s.",
@@ -968,6 +990,35 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
}
}
+[[noreturn]] void Statestore::MonitorSubscriberHeartbeat() {
+ while (1) {
+ int num_subscribers;
+ vector<SubscriberId> inactive_subscribers;
+ SleepForMs(FLAGS_heartbeat_monitoring_frequency_ms);
+ {
+ lock_guard<mutex> l(subscribers_lock_);
+ num_subscribers = subscribers_.size();
+ for (const auto& subscriber : subscribers_) {
+ if (subscriber.second->SecondsSinceHeartbeat()
+ > FLAGS_heartbeat_monitoring_frequency_ms) {
+ inactive_subscribers.push_back(subscriber.second->id());
+ }
+ }
+ }
+ if (inactive_subscribers.empty()) {
+ LOG(INFO) << "All " << num_subscribers
+ << " subscribers successfully heartbeat in the last "
+ << FLAGS_heartbeat_monitoring_frequency_ms << "ms.";
+ } else {
+ int num_active_subscribers = num_subscribers - inactive_subscribers.size();
+ LOG(WARNING) << num_active_subscribers << "/" << num_subscribers
+ << " subscribers successfully heartbeat in the last "
+ << FLAGS_heartbeat_monitoring_frequency_ms << "ms."
+ << " Slow subscribers: " << boost::join(inactive_subscribers, ", ");
+ }
+ }
+}
+
void Statestore::UnregisterSubscriber(Subscriber* subscriber) {
SubscriberMap::const_iterator it = subscribers_.find(subscriber->id());
if (it == subscribers_.end() ||
http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 9326492..52f7d68 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -18,6 +18,7 @@
#ifndef STATESTORE_STATESTORE_H
#define STATESTORE_STATESTORE_H
+#include <atomic>
#include <cstdint>
#include <map>
#include <memory>
@@ -130,6 +131,9 @@ class Statestore : public CacheLineAligned {
/// The only constructor; initialises member variables only.
Statestore(MetricGroup* metrics);
+ /// Destructor, should not be called once the Statestore is initialized.
+ ~Statestore();
+
/// Initialize and start the backing ThriftServer with port 'state_store_port'.
/// Initialize the ThreadPools used for updates and heartbeats. Returns an error if
/// any of the above initialization fails.
@@ -150,7 +154,7 @@ class Statestore : public CacheLineAligned {
void RegisterWebpages(Webserver* webserver);
- /// The main processing loop. Blocks until the exit flag is set.
+ /// The main processing loop. Runs infinitely.
void MainLoop();
/// Returns the Thrift API interface that proxies requests onto the local Statestore.
@@ -384,6 +388,12 @@ class Statestore : public CacheLineAligned {
const SubscriberId& id() const { return subscriber_id_; }
const RegistrationId& registration_id() const { return registration_id_; }
+ /// Returns the time elapsed (in seconds) since the last heartbeat.
+ double SecondsSinceHeartbeat() const {
+ return (static_cast<double>(MonotonicMillis() - last_heartbeat_ts_.Load()))
+ / 1000.0;
+ }
+
/// Get the Topics map that would be used to store 'topic_id'.
const Topics& GetTopicsMapForId(const TopicId& topic_id) const {
return IsPrioritizedTopic(topic_id) ? priority_subscribed_topics_
@@ -427,6 +437,9 @@ class Statestore : public CacheLineAligned {
void SetLastTopicVersionProcessed(const TopicId& topic_id,
TopicEntry::Version version);
+ /// Refresh the subscriber's last heartbeat timestamp to the current monotonic time.
+ void RefreshLastHeartbeatTimestamp();
+
private:
/// Unique human-readable identifier for this subscriber, set by the subscriber itself
/// on a Register call.
@@ -449,6 +462,10 @@ class Statestore : public CacheLineAligned {
Topics priority_subscribed_topics_;
Topics non_priority_subscribed_topics_;
+ /// The timestamp of the last successful heartbeat in milliseconds. A timestamp much
+ /// older than the heartbeat frequency implies an unresponsive subscriber.
+ AtomicInt64 last_heartbeat_ts_{0};
+
/// Lock held when adding or deleting transient entries. See class comment for lock
/// acquisition order.
boost::mutex transient_entry_lock_;
@@ -534,6 +551,12 @@ class Statestore : public CacheLineAligned {
ThreadPool<ScheduledSubscriberUpdate> subscriber_heartbeat_threadpool_;
+ /// Thread that monitors the heartbeats of all subscribers.
+ std::unique_ptr<Thread> heartbeat_monitoring_thread_;
+
+ /// Flag to indicate that the statestore has been initialized.
+ bool initialized_ = false;
+
/// Cache of subscriber clients used for UpdateState() RPCs. Only one client per
/// subscriber should be used, but the cache helps with the client lifecycle on failure.
boost::scoped_ptr<StatestoreSubscriberClientCache> update_state_client_cache_;
@@ -683,6 +706,11 @@ class Statestore : public CacheLineAligned {
void SubscribersHandler(const Webserver::ArgumentMap& args,
rapidjson::Document* document);
+ /// Monitors the heartbeats of all subscribers every
+ /// FLAGS_heartbeat_monitoring_frequency_ms milliseconds. If a subscriber's
+ /// last_heartbeat_ts_ has not been updated in that interval, it logs the subscriber's
+ /// id.
+ [[noreturn]] void MonitorSubscriberHeartbeat();
};
}
http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index 8f26b63..23f8aa8 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -18,6 +18,7 @@
from collections import defaultdict
import json
import logging
+from random import randint
import socket
import threading
import traceback
@@ -524,6 +525,25 @@ class TestStatestore():
.wait_for_failure(timeout=60)
)
+ def test_slow_subscriber(self):
+ """Test for IMPALA-6644: This test kills a healthy subscriber and sleeps for a random
+ interval between 1 and 9 seconds, this lets the heartbeats fail without removing the
+ subscriber from the set of active subscribers. It then checks the subscribers page
+ of the statestore to ensure that the 'time_since_heartbeat' field is updated with an
+ acceptable value. Since the statestore heartbeats at 1 second intervals, an acceptable
+ value would be between ((sleep_time-1.0), (sleep_time+1.0))."""
+ sub = StatestoreSubscriber()
+ sub.start().register().wait_for_heartbeat(1)
+ sub.kill()
+ sleep_time = randint(1, 9)
+ time.sleep(sleep_time)
+ subscribers = get_statestore_subscribers()["subscribers"]
+ for s in subscribers:
+ if str(s["id"]) == sub.subscriber_id:
+ secs_since_heartbeat = float(s["secs_since_heartbeat"])
+ assert (secs_since_heartbeat > float(sleep_time - 1.0))
+ assert (secs_since_heartbeat < float(sleep_time + 1.0))
+
def test_topic_persistence(self):
"""Test that persistent topic entries survive subscriber failure, but transent topic
entries are erased when the associated subscriber fails"""
http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/www/statestore_subscribers.tmpl
----------------------------------------------------------------------
diff --git a/www/statestore_subscribers.tmpl b/www/statestore_subscribers.tmpl
index f57b4f6..77b07dc 100644
--- a/www/statestore_subscribers.tmpl
+++ b/www/statestore_subscribers.tmpl
@@ -28,6 +28,7 @@ under the License.
<th>Subscribed priority topics</th>
<th>Transient entries</th>
<th>Registration ID</th>
+ <th>Seconds since last heartbeat</th>
</tr>
{{#subscribers}}
@@ -38,6 +39,7 @@ under the License.
<td>{{num_priority_topics}}</td>
<td>{{num_transient}}</td>
<td>{{registration_id}}</td>
+ <td>{{secs_since_heartbeat}}</td>
</tr>
{{/subscribers}}