You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/14 23:21:17 UTC
[5/6] impala git commit: IMPALA-4953,
IMPALA-6437: separate AC/scheduler from catalog topic updates
http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index deeb5aa..3058e94 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -20,13 +20,16 @@
#include <cstdint>
#include <map>
+#include <memory>
#include <string>
#include <vector>
+#include <boost/thread/shared_mutex.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/unordered_map.hpp>
#include <boost/uuid/uuid_generators.hpp>
+#include "common/atomic.h"
#include "common/status.h"
#include "gen-cpp/StatestoreService.h"
#include "gen-cpp/StatestoreSubscriber.h"
@@ -51,38 +54,60 @@ typedef TUniqueId RegistrationId;
/// The Statestore is a soft-state key-value store that maintains a set of Topics, which
/// are maps from string keys to byte array values.
-//
+///
/// Topics are subscribed to by subscribers, which are remote clients of the statestore
/// which express an interest in some set of Topics. The statestore sends topic updates to
/// subscribers via periodic 'update' messages, and also sends periodic 'heartbeat'
-/// messages, which are used to detect the liveness of a subscriber.
-//
+/// messages, which are used to detect the liveness of a subscriber. Updates for each
+/// topic are delivered sequentially to each subscriber per subscription. E.g. if a
+/// subscriber is subscribed to a topic "foo", the statestore will not deliver topic
+/// updates for "foo" out-of-order or concurrently, but the updates may be sent
+/// concurrently or out-of-order with "bar".
+///
/// In response to 'update' messages, subscribers, send topic updates to the statestore to
/// merge with the current topic. These updates are then sent to all other subscribers in
-/// their next update message. The next message is scheduled for
-/// FLAGS_statestore_update_frequency_ms in the future, unless the subscriber indicated
-/// that it skipped processing an update, in which case the statestore will back off
-/// slightly before re-sending the same update.
-//
+/// their next update message. The next message is scheduled update_frequency in the
+/// future, unless the subscriber indicated that it skipped processing an update, in which
+/// case the statestore will back off slightly before re-sending a new update. The
+/// update frequency is determined by FLAGS_statestore_update_frequency_ms or
+/// FLAGS_statestore_priority_update_frequency, depending on whether the topic is a
+/// prioritized topic.
+///
+/// Prioritized topics are topics that are small but important to delivery in a timely
+/// manner. Handling those topics in a separate threadpool prevents large updates of other
+/// topics slowing or blocking dissemination of updates to prioritized topics.
+///
/// Topic entries usually have human-readable keys, and values which are some serialised
/// representation of a data structure, e.g. a Thrift struct. The contents of a value's
/// byte string is opaque to the statestore, which maintains no information about how to
/// deserialise it. Subscribers must use convention to interpret each other's updates.
-//
+///
/// A subscriber may have marked some updates that it made as 'transient', which implies
/// that those entries should be deleted once the subscriber is no longer connected (this
/// is judged by the statestore's failure-detector, which will mark a subscriber as failed
/// when it has not responded to a number of successive heartbeat messages). Transience
/// is tracked per-topic-per-subscriber, so two different subscribers may treat the same
/// topic differently wrt to the transience of their updates.
-//
+///
/// The statestore tracks the history of updates to each topic, with each topic update
/// getting a sequentially increasing version number that is unique across the topic.
-//
+///
/// Subscribers also track the max version of each topic which they have have successfully
/// processed. The statestore can use this information to send a delta of updates to a
/// subscriber, rather than all items in the topic. For non-delta updates, the statestore
/// will send an update that includes all values in the topic.
+///
+/// +================+
+/// | Implementation |
+/// +================+
+///
+/// Locking:
+/// --------
+/// The lock acquisition order is:
+/// 1. 'subscribers_lock_'
+/// 2. 'topics_map_lock_'
+/// 3. Subscriber::transient_entry_lock_
+/// 4. Topic::lock_ (terminal)
class Statestore : public CacheLineAligned {
public:
/// A SubscriberId uniquely identifies a single subscriber, and is
@@ -125,10 +150,14 @@ class Statestore : public CacheLineAligned {
return thrift_iface_;
}
- /// Tells the Statestore to shut down. Does not wait for the processing loop to exit
- /// before returning.
- void SetExitFlag();
-
+ /// Names of prioritized topics that are handled in a separate threadpool. The topic
+ /// names are hardcoded here for expediency. Ideally we would have a more generic
+ /// interface for specifying prioritized topics, but for now we only have a small
+ /// fixed set of topics.
+ /// Topic tracking the set of live Impala daemon instances.
+ static const std::string IMPALA_MEMBERSHIP_TOPIC;
+ /// Topic tracking the state of admission control on all coordinators.
+ static const std::string IMPALA_REQUEST_QUEUE_TOPIC;
private:
/// A TopicEntry is a single entry in a topic, and logically is a <string, byte string>
/// pair.
@@ -140,7 +169,7 @@ class Statestore : public CacheLineAligned {
/// A version is a monotonically increasing counter. Each update to a topic has its own
/// unique version with the guarantee that sequentially later updates have larger
/// version numbers.
- typedef uint64_t Version;
+ typedef int64_t Version;
/// The Version value used to initialize a new TopicEntry.
static const Version TOPIC_ENTRY_INITIAL_VERSION = 1L;
@@ -199,14 +228,14 @@ class Statestore : public CacheLineAligned {
total_value_size_bytes_(0L), key_size_metric_(key_size_metric),
value_size_metric_(value_size_metric), topic_size_metric_(topic_size_metric) { }
- /// Adds an entry with the given key and value (bytes). If is_deleted is
- /// true the entry is considered deleted, and may be garbage collected in the future.
- /// The entry is assigned a new version number by the Topic, and that version number
- /// is returned.
- //
- /// Must be called holding the topic lock
- TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& bytes,
- bool is_deleted);
+ /// Add entries with the given keys and values. If is_deleted is true for an entry,
+ /// it is considered deleted, and may be garbage collected in the future. Each entry
+ /// is assigned a new version number by the Topic, and the version numbers are
+ /// returned.
+ ///
+ /// Safe to call concurrently from multiple threads (for different subscribers).
+ /// Acquires an exclusive write lock for the topic.
+ std::vector<TopicEntry::Version> Put(const std::vector<TTopicItem>& entries);
/// Utility method to support removing transient entries. We track the version numbers
/// of entries added by subscribers, and remove entries with the same version number
@@ -215,26 +244,37 @@ class Statestore : public CacheLineAligned {
//
/// Deletion means marking the entry as deleted and incrementing its version
/// number.
- //
- /// Must be called holding the topic lock
+ ///
+ /// Safe to call concurrently from multiple threads (for different subscribers).
+ /// Acquires an exclusive write lock for the topic.
void DeleteIfVersionsMatch(TopicEntry::Version version, const TopicEntryKey& key);
- const TopicId& id() const { return topic_id_; }
- const TopicEntryMap& entries() const { return entries_; }
- TopicEntry::Version last_version() const { return last_version_; }
- const TopicUpdateLog& topic_update_log() const { return topic_update_log_; }
- int64_t total_key_size_bytes() const { return total_key_size_bytes_; }
- int64_t total_value_size_bytes() const { return total_value_size_bytes_; }
+ /// Build a delta update to send to 'subscriber_id' including the deltas greater
+ /// than 'last_processed_version' (not inclusive).
+ ///
+ /// Safe to call concurrently from multiple threads (for different subscribers).
+ /// Acquires a shared read lock for the topic.
+ void BuildDelta(const SubscriberId& subscriber_id,
+ TopicEntry::Version last_processed_version, TTopicDelta* delta);
+ /// Adds entries representing the current topic state to 'topic_json'.
+ void ToJson(rapidjson::Document* document, rapidjson::Value* topic_json);
private:
- /// Map from topic entry key to topic entry.
- TopicEntryMap entries_;
-
/// Unique identifier for this topic. Should be human-readable.
const TopicId topic_id_;
- /// Tracks the last version that was assigned to an entry in this Topic. Incremented on
- /// every Put() so each TopicEntry is tagged with a unique version value.
+ /// Reader-writer lock to protect state below. This is a terminal lock - no
+ /// other locks should be acquired while holding this one. boost::shared_mutex
+ /// gives writers priority over readers in acquiring the lock, which prevents
+ /// starvation.
+ boost::shared_mutex lock_;
+
+ /// Map from topic entry key to topic entry.
+ TopicEntryMap entries_;
+
+ /// Tracks the last version that was assigned to an entry in this Topic. Incremented
+ /// every time an entry is added in Put() so each TopicEntry is tagged with a unique
+ /// version value.
TopicEntry::Version last_version_;
/// Contains a history of updates to this Topic, with each key being a Version and the
@@ -259,18 +299,12 @@ class Statestore : public CacheLineAligned {
IntGauge* topic_size_metric_;
};
- /// Note on locking: Subscribers and Topics should be accessed under their own coarse
- /// locks, and worker threads will use worker_lock_ to ensure safe access to the
- /// subscriber work queue.
-
- /// Protects access to exit_flag_, but is used mostly to ensure visibility of updates
- /// between threads..
- boost::mutex exit_flag_lock_;
-
- bool exit_flag_;
-
- /// Controls access to topics_. Cannot take subscribers_lock_ after acquiring this lock.
- boost::mutex topic_lock_;
+ /// Protects the 'topics_' map. Should be held shared when reading or holding a
+ /// reference to entries in the map and exclusively when modifying the map.
+ /// See the class comment for the lock acquisition order. boost::shared_mutex
+ /// gives writers priority over readers in acquiring the lock, which prevents
+ /// starvation.
+ boost::shared_mutex topics_map_lock_;
/// The entire set of topics tracked by the statestore
typedef boost::unordered_map<TopicId, Topic> TopicMap;
@@ -287,40 +321,68 @@ class Statestore : public CacheLineAligned {
const TNetworkAddress& network_address,
const std::vector<TTopicRegistration>& subscribed_topics);
- /// The TopicState contains information on whether entries written by this subscriber
- /// should be considered transient, as well as the last topic entry version
- /// successfully processed by this subscriber.
- struct TopicState {
- bool is_transient;
- TopicEntry::Version last_version;
+ /// Information about a subscriber's subscription to a specific topic.
+ struct TopicSubscription {
+ TopicSubscription(bool is_transient) : is_transient(is_transient) {}
+
+ /// Whether entries written by this subscriber should be considered transient.
+ const bool is_transient;
+
+ /// The last topic entry version successfully processed by this subscriber. Only
+ /// written by a single thread at a time but can be read concurrently.
+ AtomicInt64 last_version{TOPIC_INITIAL_VERSION};
+
+ /// Map from the key to the version of a transient update made by this subscriber.
+ /// protected by Subscriber:: 'transient_entries_lock_'.
+ boost::unordered_map<TopicEntryKey, TopicEntry::Version> transient_entries_;
};
/// The set of topics subscribed to, and current state (as seen by this subscriber) of
/// the topic.
- typedef boost::unordered_map<TopicId, TopicState> Topics;
+ typedef boost::unordered_map<TopicId, TopicSubscription> Topics;
/// The Version value used to initialize new Topic subscriptions for this Subscriber.
static const TopicEntry::Version TOPIC_INITIAL_VERSION;
- const Topics& subscribed_topics() const { return subscribed_topics_; }
+ const Topics& non_priority_subscribed_topics() const {
+ return non_priority_subscribed_topics_;
+ }
+ const Topics& priority_subscribed_topics() const { return priority_subscribed_topics_; }
const TNetworkAddress& network_address() const { return network_address_; }
const SubscriberId& id() const { return subscriber_id_; }
const RegistrationId& registration_id() const { return registration_id_; }
- /// Records the fact that an update to this topic is owned by this subscriber. The
- /// version number of the update is saved so that only those updates which are made
- /// most recently by this subscriber - and not overwritten by another subscriber - are
- /// deleted on failure. If the topic the entry belongs to is not marked as transient,
- /// no update will be recorded.
- void AddTransientUpdate(const TopicId& topic_id, const TopicEntryKey& topic_key,
- TopicEntry::Version version);
-
- /// Map from the topic / key pair to the version of a transient update made by this
- /// subscriber.
- typedef boost::unordered_map<std::pair<TopicId, TopicEntryKey>, TopicEntry::Version>
- TransientEntryMap;
-
- const TransientEntryMap& transient_entries() const { return transient_entries_; }
+ /// Get the Topics map that would be used to store 'topic_id'.
+ const Topics& GetTopicsMapForId(const TopicId& topic_id) const {
+ return IsPrioritizedTopic(topic_id) ? priority_subscribed_topics_
+ : non_priority_subscribed_topics_;
+ }
+ Topics* GetTopicsMapForId(const TopicId& topic_id) {
+ return IsPrioritizedTopic(topic_id) ? &priority_subscribed_topics_
+ : &non_priority_subscribed_topics_;
+ }
+
+ /// Records the fact that updates to this topic are owned by this subscriber. The
+ /// version number of each update (which must be at the corresponding index in
+ /// 'versions' is saved so that only those updates which are made most recently by
+ /// this subscriber - and not overwritten by another subscriber - are deleted on
+ /// failure. If the topic each entry belongs to is not marked as transient, no update
+ /// will be recorded. Should not be called concurrently from multiple threads for a
+ /// given 'topic_id'.
+ ///
+ /// Returns false if DeleteAllTransientEntries() was called and 'topic_id' entries
+ /// are transient, in which case the caller should delete the entries themselves.
+ bool AddTransientEntries(const TopicId& topic_id,
+ const std::vector<TTopicItem>& entries,
+ const std::vector<TopicEntry::Version>& entry_versions) WARN_UNUSED_RESULT;
+
+ /// Delete all transient topic entries for this subscriber from 'global_topics'.
+ ///
+ /// Statestore::topics_map_lock_ (in shared mode) must be held by the caller.
+ void DeleteAllTransientEntries(TopicMap* global_topics);
+
+ /// Returns the number of transient entries.
+ int64_t NumTransientEntries();
/// Returns the last version of the topic which this subscriber has successfully
/// processed. Will never decrease.
@@ -328,7 +390,8 @@ class Statestore : public CacheLineAligned {
/// Sets the subscriber's last processed version of the topic to the given value. This
/// should only be set when once a subscriber has succesfully processed the given
- /// update corresponding to this version.
+ /// update corresponding to this version. Should not be called concurrently from
+ /// multiple threads for a given 'topic_id'.
void SetLastTopicVersionProcessed(const TopicId& topic_id,
TopicEntry::Version version);
@@ -346,19 +409,25 @@ class Statestore : public CacheLineAligned {
/// The location of the subscriber service that this subscriber runs.
const TNetworkAddress network_address_;
- /// Map of topic subscriptions to current TopicState. The the state describes whether
- /// updates on the topic are 'transient' (i.e., to be deleted upon subscriber failure)
- /// or not and contains the version number of the last update processed by this
- /// Subscriber on the topic.
- Topics subscribed_topics_;
-
- /// List of updates made by this subscriber so that transient entries may be deleted on
- /// failure.
- TransientEntryMap transient_entries_;
+ /// Maps of topic subscriptions to current TopicSubscription, with separate maps for
+ /// priority and non-priority topics. The state describes whether updates on the
+ /// topic are 'transient' (i.e., to be deleted upon subscriber failure) or not
+ /// and contains the version number of the last update processed by this Subscriber
+ /// on the topic. The set of keys is not modified after construction.
+ Topics priority_subscribed_topics_;
+ Topics non_priority_subscribed_topics_;
+
+ /// Lock held when adding or deleting transient entries. See class comment for lock
+ /// acquisition order.
+ boost::mutex transient_entry_lock_;
+
+ /// True once DeleteAllTransientEntries() has been called during subscriber
+ /// unregisteration. Protected by 'transient_entry_lock_'
+ bool unregistered_ = false;
};
- /// Protects access to subscribers_ and subscriber_uuid_generator_. Must be taken before
- /// topic_lock_.
+ /// Protects access to subscribers_ and subscriber_uuid_generator_. See the class
+ /// comment for the lock acquisition order.
boost::mutex subscribers_lock_;
/// Map of subscribers currently connected; upon failure their entry is removed from this
@@ -393,11 +462,12 @@ class Statestore : public CacheLineAligned {
registration_id(r_id) {}
};
- /// The statestore has two pools of threads that send messages to subscribers
+ /// The statestore has three pools of threads that send messages to subscribers
/// one-by-one. One pool deals with 'heartbeat' messages that update failure detection
- /// state, and the other pool sends 'topic update' messages which contain the
- /// actual topic data that a subscriber does not yet have.
- //
+ /// state, and the remaining pools send 'topic update' messages that contain the
+ /// actual topic data that a subscriber does not yet have, with one pool dedicated to
+ /// a set of special "prioritized" topics.
+ ///
/// Each message is scheduled for some time in the future and each worker thread
/// will sleep until that time has passed to rate-limit messages. Subscribers are
/// placed back into the queue once they have been processed. A subscriber may have many
@@ -405,24 +475,31 @@ class Statestore : public CacheLineAligned {
/// subscriber. Since at most one registration is considered 'live' per subscriber, this
/// guarantees that subscribers_.size() - 1 'live' subscribers ahead of any subscriber in
/// the queue.
- //
+ ///
/// Messages may be delayed for any number of reasons, including scheduler
/// interference, lock unfairness when submitting to the thread pool and head-of-line
/// blocking when threads are occupied sending messages to slow subscribers
/// (subscribers are not guaranteed to be in the queue in next-update order).
- //
+ ///
/// Delays for heartbeat messages can result in the subscriber that is kept waiting
/// assuming that the statestore has failed. Correct configuration of heartbeat message
/// frequency and subscriber timeout is therefore very important, and depends upon the
/// cluster size. See --statestore_heartbeat_frequency_ms and
/// --statestore_subscriber_timeout_seconds. We expect that the provided defaults will
/// work up to clusters of several hundred nodes.
- //
+ ///
/// Subscribers are therefore not processed in lock-step, and one subscriber may have
/// seen many more messages than another during the same interval (if the second
/// subscriber runs slow for any reason).
+ enum class UpdateKind {
+ TOPIC_UPDATE,
+ PRIORITY_TOPIC_UPDATE,
+ HEARTBEAT
+ };
ThreadPool<ScheduledSubscriberUpdate> subscriber_topic_update_threadpool_;
+ ThreadPool<ScheduledSubscriberUpdate> subscriber_priority_topic_update_threadpool_;
+
ThreadPool<ScheduledSubscriberUpdate> subscriber_heartbeat_threadpool_;
/// Cache of subscriber clients used for UpdateState() RPCs. Only one client per
@@ -452,10 +529,11 @@ class Statestore : public CacheLineAligned {
IntGauge* value_size_metric_;
IntGauge* topic_size_metric_;
- /// Tracks the distribution of topic-update durations - precisely the time spent in
- /// calling the UpdateState() RPC which allows us to measure the network transmission
- /// cost as well as the subscriber-side processing time.
+ /// Tracks the distribution of topic-update durations for regular and prioritized topic
+ /// updates. This measures the time spent in calling the UpdateState() RPC which
+ /// includes network transmission cost and subscriber-side processing time.
StatsMetric<double>* topic_update_duration_metric_;
+ StatsMetric<double>* priority_topic_update_duration_metric_;
/// Same as above, but for SendHeartbeat() RPCs.
StatsMetric<double>* heartbeat_duration_metric_;
@@ -466,23 +544,25 @@ class Statestore : public CacheLineAligned {
ThreadPool<ScheduledSubscriberUpdate>* thread_pool) WARN_UNUSED_RESULT;
/// Sends either a heartbeat or topic update message to the subscriber in 'update' at
- /// the closest possible time to the first member of 'update'. If is_heartbeat is true,
- /// sends a heartbeat update, otherwise the set of pending topic updates is sent. Once
- /// complete, the next update is scheduled and added to the appropriate queue.
- void DoSubscriberUpdate(bool is_heartbeat, int thread_id,
+ /// the closest possible time to the first member of 'update'. If 'update_kind' is
+ /// HEARTBEAT, sends a heartbeat update, otherwise the set of priority/non-priority
+ /// pending topic updates is sent. Once complete, the next update is scheduled and
+ /// added to the appropriate queue.
+ void DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
const ScheduledSubscriberUpdate& update);
/// Does the work of updating a single subscriber, by calling UpdateState() on the client
/// to send a list of topic deltas to the subscriber. If that call fails (either because
/// the RPC could not be completed, or the subscriber indicated an error), this method
/// returns a non-OK status immediately without further processing.
- //
+ ///
/// The subscriber may indicated that it skipped processing the message, either because
/// it was not ready to do so or because it was busy. In that case, the UpdateState() RPC
/// will return OK (since there was no error) and the output parameter update_skipped is
/// set to true. Otherwise, any updates returned by the subscriber are applied to their
/// target topics.
- Status SendTopicUpdate(Subscriber* subscriber, bool* update_skipped) WARN_UNUSED_RESULT;
+ Status SendTopicUpdate(Subscriber* subscriber, UpdateKind update_kind,
+ bool* update_skipped) WARN_UNUSED_RESULT;
/// Sends a heartbeat message to subscriber. Returns false if there was some error
/// performing the RPC.
@@ -501,9 +581,10 @@ class Statestore : public CacheLineAligned {
void UnregisterSubscriber(Subscriber* subscriber);
/// Populates a TUpdateStateRequest with the update state for this subscriber. Iterates
- /// over all updates in all subscribed topics, populating the given TUpdateStateRequest
- /// object. Takes the topic_lock_ and subscribers_lock_.
- void GatherTopicUpdates(const Subscriber& subscriber,
+ /// over all updates in all priority or non-priority subscribed topics, based on
+ /// 'update_kind'. The given TUpdateStateRequest object is populated with the
+ /// changes to the subscribed topics. Takes the topics_map_lock_ and subscribers_lock_.
+ void GatherTopicUpdates(const Subscriber& subscriber, UpdateKind update_kind,
TUpdateStateRequest* update_state_request);
/// Returns the minimum last processed topic version across all subscribers for the given
@@ -523,8 +604,14 @@ class Statestore : public CacheLineAligned {
TopicEntry::Version GetMinSubscriberTopicVersion(
const TopicId& topic_id, SubscriberId* subscriber_id = NULL);
- /// True if the shutdown flag has been set true, false otherwise.
- bool ShouldExit();
+ /// Returns true if this topic should be handled by the priority pool.
+ static bool IsPrioritizedTopic(const std::string& topic);
+
+ /// Return human-readable name for 'kind'.
+ static const char* GetUpdateKindName(UpdateKind kind);
+
+ /// Return the thread pool to process updates of 'kind'.
+ ThreadPool<ScheduledSubscriberUpdate>* GetThreadPool(UpdateKind kind);
/// Webpage handler: upon return, 'document' will contain a list of topics as follows:
/// "topics": [
http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index df78a0b..b457741 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -972,7 +972,18 @@
"key": "statestore-subscriber.topic-$0.processing-time-s"
},
{
- "description": "The time (sec) taken to process Statestore subcriber topic updates.",
+ "description": "Interval between topic updates for Topic $0",
+ "contexts": [
+ "CATALOGSERVER",
+ "IMPALAD"
+ ],
+ "label": "Statestore Subscriber Topic $0 Update Interval",
+ "units": "TIME_S",
+ "kind": "STATS",
+ "key": "statestore-subscriber.topic-$0.update-interval"
+ },
+ {
+ "description": "The time (sec) taken to process Statestore subscriber topic updates.",
"contexts": [
"CATALOGSERVER",
"IMPALAD"
@@ -1024,7 +1035,7 @@
"key": "statestore.live-backends.list"
},
{
- "description": "The time (sec) spent sending topic update RPCs. Includes subscriber-side processing time and network transmission time.",
+ "description": "The time (sec) spent sending non-priority topic update RPCs. Includes subscriber-side processing time and network transmission time.",
"contexts": [
"STATESTORE"
],
@@ -1034,6 +1045,16 @@
"key": "statestore.topic-update-durations"
},
{
+ "description": "The time (sec) spent sending priority topic update RPCs. Includes subscriber-side processing time and network transmission time.",
+ "contexts": [
+ "STATESTORE"
+ ],
+ "label": "Statestore Priority Topic Update Durations",
+ "units": "TIME_S",
+ "kind": "STATS",
+ "key": "statestore.priority-topic-update-durations"
+ },
+ {
"description": "The sum of the size of all keys for all topics tracked by the StateStore.",
"contexts": [
"STATESTORE"
http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 69cabd8..bed6994 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -120,9 +120,13 @@ MAX_NUM_QUEUED_QUERIES = 10
# Mem limit (bytes) used in the mem limit test
MEM_TEST_LIMIT = 12 * 1024 * 1024 * 1024
-_STATESTORED_ARGS = "-statestore_heartbeat_frequency_ms=%s "\
- "-statestore_update_frequency_ms=%s" %\
- (STATESTORE_RPC_FREQUENCY_MS, STATESTORE_RPC_FREQUENCY_MS)
+_STATESTORED_ARGS = ("-statestore_heartbeat_frequency_ms={freq_ms} "
+ "-statestore_priority_update_frequency_ms={freq_ms}").format(
+ freq_ms=STATESTORE_RPC_FREQUENCY_MS)
+
+# Name of the subscriber metric tracking the admission control update interval.
+REQUEST_QUEUE_UPDATE_INTERVAL =\
+ 'statestore-subscriber.topic-impala-request-queue.update-interval'
# Key in the query profile for the query options.
PROFILE_QUERY_OPTIONS_KEY = "Query Options (set by configuration): "
@@ -551,14 +555,14 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
sleep(1)
def wait_for_statestore_updates(self, heartbeats):
- """Waits for a number of statestore heartbeats from all impalads."""
+ """Waits for a number of admission control statestore updates from all impalads."""
start_time = time()
num_impalads = len(self.impalads)
init = dict()
curr = dict()
for impalad in self.impalads:
- init[impalad] = impalad.service.get_metric_value(\
- 'statestore-subscriber.topic-update-interval-time')['count']
+ init[impalad] = impalad.service.get_metric_value(
+ REQUEST_QUEUE_UPDATE_INTERVAL)['count']
curr[impalad] = init[impalad]
while True:
@@ -566,8 +570,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
init.values(), [curr[i] - init[i] for i in self.impalads])
if all([curr[i] - init[i] >= heartbeats for i in self.impalads]): break
for impalad in self.impalads:
- curr[impalad] = impalad.service.get_metric_value(\
- 'statestore-subscriber.topic-update-interval-time')['count']
+ curr[impalad] = impalad.service.get_metric_value(
+ REQUEST_QUEUE_UPDATE_INTERVAL)['count']
assert (time() - start_time < STRESS_TIMEOUT),\
"Timed out waiting %s seconds for heartbeats" % (STRESS_TIMEOUT,)
sleep(STATESTORE_RPC_FREQUENCY_MS / float(1000))
http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index 1003dc7..ba51c8d 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -15,9 +15,11 @@
# specific language governing permissions and limitations
# under the License.
+from collections import defaultdict
import json
import socket
import threading
+import traceback
import time
import urllib2
import uuid
@@ -167,7 +169,10 @@ class StatestoreSubscriber(object):
more readable."""
def __init__(self, heartbeat_cb=None, update_cb=None):
self.heartbeat_event, self.heartbeat_count = threading.Condition(), 0
- self.update_event, self.update_count = threading.Condition(), 0
+ # Track the number of updates received per topic.
+ self.update_counts = defaultdict(lambda : 0)
+ # Variables to notify for updates on each topic.
+ self.update_event = threading.Condition()
self.heartbeat_cb, self.update_cb = heartbeat_cb, update_cb
self.exception = None
@@ -191,12 +196,14 @@ class StatestoreSubscriber(object):
"""UpdateState RPC handler. Calls update callback if one exists."""
self.update_event.acquire()
try:
- self.update_count += 1
+ for topic_name in args.topic_deltas: self.update_counts[topic_name] += 1
response = DEFAULT_UPDATE_STATE_RESPONSE
if self.update_cb is not None and self.exception is None:
try:
response = self.update_cb(self, args)
except Exception, e:
+ # Print the original backtrace so it doesn't get lost.
+ traceback.print_exc()
self.exception = e
self.update_event.notify()
finally:
@@ -278,21 +285,23 @@ class StatestoreSubscriber(object):
finally:
self.heartbeat_event.release()
- def wait_for_update(self, count=None):
- """Waits for some number of updates. If 'count' is provided, waits until the number
- of updates seen by this subscriber exceeds count, otherwise waits for one further
- update."""
+ def wait_for_update(self, topic_name, count=None):
+ """Waits for some number of updates of 'topic_name'. If 'count' is provided, waits
+ until the number updates seen by this subscriber exceeds count, otherwise waits
+ for one further update."""
self.update_event.acquire()
+ start_time = time.time()
try:
- if count is not None and self.update_count >= count: return self
- if count is None: count = self.update_count + 1
- while count > self.update_count:
+ if count is not None and self.update_counts[topic_name] >= count: return self
+ if count is None: count = self.update_counts[topic_name] + 1
+ while count > self.update_counts[topic_name]:
self.check_thread_exceptions()
- last_count = self.update_count
+ last_count = self.update_counts[topic_name]
self.update_event.wait(10)
- if last_count == self.update_count:
- raise Exception("Update not received within 10s (update count: %s)" %
- self.update_count)
+ if (time.time() > start_time + 10 and
+ last_count == self.update_counts[topic_name]):
+ raise Exception("Update not received for %s within 10s (update count: %s)" %
+ (topic_name, last_count))
self.check_thread_exceptions()
return self
finally:
@@ -340,14 +349,18 @@ class TestStatestore():
def topic_update_correct(sub, args):
delta = self.make_topic_update(topic_name)
- if sub.update_count == 1:
+ update_count = sub.update_counts[topic_name]
+ if topic_name not in args.topic_deltas:
+ # The update doesn't contain our topic.
+ pass
+ elif update_count == 1:
return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta],
skipped=False)
- elif sub.update_count == 2:
- assert len(args.topic_deltas) == 1
+ elif update_count == 2:
+ assert len(args.topic_deltas) == 1, args.topic_deltas
assert args.topic_deltas[topic_name].topic_entries == delta.topic_entries
assert args.topic_deltas[topic_name].topic_name == delta.topic_name
- elif sub.update_count == 3:
+ elif update_count == 3:
# After the content-bearing update was processed, the next delta should be empty
assert len(args.topic_deltas[topic_name].topic_entries) == 0
@@ -358,7 +371,7 @@ class TestStatestore():
(
sub.start()
.register(topics=[reg])
- .wait_for_update(3)
+ .wait_for_update(topic_name, 3)
)
def test_update_is_delta(self):
@@ -368,14 +381,18 @@ class TestStatestore():
topic_name = "test_update_is_delta_%s" % uuid.uuid1()
def check_delta(sub, args):
- if sub.update_count == 1:
+ update_count = sub.update_counts[topic_name]
+ if topic_name not in args.topic_deltas:
+ # The update doesn't contain our topic.
+ pass
+ elif update_count == 1:
assert args.topic_deltas[topic_name].is_delta == False
delta = self.make_topic_update(topic_name)
return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta],
skipped=False)
- elif sub.update_count == 2:
+ elif update_count == 2:
assert args.topic_deltas[topic_name].is_delta == False
- elif sub.update_count == 3:
+ elif update_count == 3:
assert args.topic_deltas[topic_name].is_delta == True
assert len(args.topic_deltas[topic_name].topic_entries) == 0
assert args.topic_deltas[topic_name].to_version == 1
@@ -387,7 +404,7 @@ class TestStatestore():
(
sub.start()
.register(topics=[reg])
- .wait_for_update(3)
+ .wait_for_update(topic_name, 3)
)
def test_skipped(self):
@@ -395,7 +412,10 @@ class TestStatestore():
topic_name = "test_skipped_%s" % uuid.uuid1()
def check_skipped(sub, args):
- if sub.update_count == 1:
+ # Ignore responses that don't contain our topic.
+ if topic_name not in args.topic_deltas: return DEFAULT_UPDATE_STATE_RESPONSE
+ update_count = sub.update_counts[topic_name]
+ if update_count == 1:
update = self.make_topic_update(topic_name)
return TUpdateStateResponse(status=STATUS_OK, topic_updates=[update],
skipped=False)
@@ -410,15 +430,17 @@ class TestStatestore():
(
sub.start()
.register(topics=[reg])
- .wait_for_update(3)
+ .wait_for_update(topic_name, 3)
)
def test_failure_detected(self):
sub = StatestoreSubscriber()
+ topic_name = "test_failure_detected"
+ reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
(
sub.start()
- .register()
- .wait_for_update(1)
+ .register(topics=[reg])
+ .wait_for_update(topic_name, 1)
.kill()
.wait_for_failure()
)
@@ -428,10 +450,12 @@ class TestStatestore():
minutes) the statestore should time them out every 3s and then eventually fail after
40s (10 times (3 + 1), where the 1 is the inter-heartbeat delay)"""
sub = StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(300))
+ topic_name = "test_hung_heartbeat"
+ reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
(
sub.start()
- .register()
- .wait_for_update(1)
+ .register(topics=[reg])
+ .wait_for_update(topic_name, 1)
.wait_for_failure(timeout=60)
)
@@ -443,21 +467,32 @@ class TestStatestore():
transient_topic_name = "test_topic_persistence_transient_%s" % topic_id
def add_entries(sub, args):
- if sub.update_count == 1:
- updates = [self.make_topic_update(persistent_topic_name),
- self.make_topic_update(transient_topic_name)]
+ # None of, one or both of the topics may be in the update.
+ updates = []
+ if (persistent_topic_name in args.topic_deltas and
+ sub.update_counts[persistent_topic_name] == 1):
+ updates.append(self.make_topic_update(persistent_topic_name))
+
+ if (transient_topic_name in args.topic_deltas and
+ sub.update_counts[transient_topic_name] == 1):
+ updates.append(self.make_topic_update(transient_topic_name))
+
+ if len(updates) > 0:
return TUpdateStateResponse(status=STATUS_OK, topic_updates=updates,
skipped=False)
-
return DEFAULT_UPDATE_STATE_RESPONSE
def check_entries(sub, args):
- if sub.update_count == 1:
- assert len(args.topic_deltas[transient_topic_name].topic_entries) == 0
+ # None of, one or both of the topics may be in the update.
+ if (persistent_topic_name in args.topic_deltas and
+ sub.update_counts[persistent_topic_name] == 1):
assert len(args.topic_deltas[persistent_topic_name].topic_entries) == 1
# Statestore should not send deletions when the update is not a delta, see
# IMPALA-1891
assert args.topic_deltas[persistent_topic_name].topic_entries[0].deleted == False
+ if (transient_topic_name in args.topic_deltas and
+ sub.update_counts[persistent_topic_name] == 1):
+ assert len(args.topic_deltas[transient_topic_name].topic_entries) == 0
return DEFAULT_UPDATE_STATE_RESPONSE
reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False),
@@ -467,7 +502,8 @@ class TestStatestore():
(
sub.start()
.register(topics=reg)
- .wait_for_update(2)
+ .wait_for_update(persistent_topic_name, 2)
+ .wait_for_update(transient_topic_name, 2)
.kill()
.wait_for_failure()
)
@@ -476,5 +512,6 @@ class TestStatestore():
(
sub2.start()
.register(topics=reg)
- .wait_for_update(1)
+ .wait_for_update(persistent_topic_name, 1)
+ .wait_for_update(transient_topic_name, 1)
)
http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/www/statestore_subscribers.tmpl
----------------------------------------------------------------------
diff --git a/www/statestore_subscribers.tmpl b/www/statestore_subscribers.tmpl
index f117002..f57b4f6 100644
--- a/www/statestore_subscribers.tmpl
+++ b/www/statestore_subscribers.tmpl
@@ -25,6 +25,7 @@ under the License.
<th>Id</th>
<th>Address</th>
<th>Subscribed topics</th>
+ <th>Subscribed priority topics</th>
<th>Transient entries</th>
<th>Registration ID</th>
</tr>
@@ -34,6 +35,7 @@ under the License.
<td>{{id}}</td>
<td>{{address}}</td>
<td>{{num_topics}}</td>
+ <td>{{num_priority_topics}}</td>
<td>{{num_transient}}</td>
<td>{{registration_id}}</td>
</tr>
http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/www/statestore_topics.tmpl
----------------------------------------------------------------------
diff --git a/www/statestore_topics.tmpl b/www/statestore_topics.tmpl
index 99f0dfe..56568f7 100644
--- a/www/statestore_topics.tmpl
+++ b/www/statestore_topics.tmpl
@@ -25,6 +25,7 @@ under the License.
<th>Topic Id</th>
<th>Number of entries</th>
<th>Version</th>
+ <th>Prioritized</th>
<th>Oldest subscriber version</th>
<th>Oldest subscriber ID</th>
<th>Size (keys / values / total)</th>
@@ -35,6 +36,7 @@ under the License.
<td>{{topic_id}}</td>
<td>{{num_entries}}</td>
<td>{{version}}</td>
+ <td>{{prioritized}}</td>
<td>{{oldest_version}}</td>
<td>{{oldest_id}}</td>
<td>{{key_size}} / {{value_size}} / {{total_size}}</td>