You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/25 07:57:22 UTC
[impala] 08/14: IMPALA-6816: minimise calls to
GetMinSubscriberTopicVersion()
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git
commit a29bec1902ffa1ccfb0539a40b874f7650f6b56e
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Mon Jun 11 17:22:35 2018 -0700
IMPALA-6816: minimise calls to GetMinSubscriberTopicVersion()
min_subscriber_topic_version is expensive to compute (requires iterating
over all subscribers to compute) but is only used by one
subscriber/topic pair: Impalads receiving catalog topic updates.
This patch implements a simple fix - only compute it if a subscriber
asks for it. A more complex alternative would be to maintain
a priority queue of subscriber versions, but that didn't seem worth
the the complexity and risk of bugs.
Testing:
Add a statestore test to validate the versions. It looks like we had a
pre-existing test gap for validating min_subscriber_topic_version so
the test is mainly focused on adding that coverage.
Ran core tests with DEBUG and ASAN.
Change-Id: I8ee7cb2355ba1049b9081e0df344ac41aa4ebeb1
Reviewed-on: http://gerrit.cloudera.org:8080/10705
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/catalog/catalog-server.cc | 2 +-
be/src/scheduling/admission-controller.cc | 9 +--
be/src/scheduling/scheduler.cc | 2 +-
be/src/service/impala-server.cc | 11 ++--
be/src/statestore/statestore-subscriber.cc | 7 ++-
be/src/statestore/statestore-subscriber.h | 6 +-
be/src/statestore/statestore.cc | 50 +++++++++-------
be/src/statestore/statestore.h | 8 ++-
common/thrift/StatestoreService.thrift | 6 ++
tests/statestore/test_statestore.py | 93 ++++++++++++++++++++++++++++++
10 files changed, 160 insertions(+), 34 deletions(-)
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index e74db75..a17478f 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -186,7 +186,7 @@ Status CatalogServer::Start() {
StatestoreSubscriber::UpdateCallback cb =
bind<void>(mem_fn(&CatalogServer::UpdateCatalogTopicCallback), this, _1, _2);
- status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, cb);
+ status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, false, cb);
if (!status.ok()) {
status.AddDetail("CatalogService failed to start");
return status;
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index ac176f4..a53d1a0 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -18,7 +18,6 @@
#include "scheduling/admission-controller.h"
#include <boost/algorithm/string.hpp>
-#include <boost/bind.hpp>
#include <boost/mem_fn.hpp>
#include <gutil/strings/substitute.h>
@@ -243,9 +242,11 @@ AdmissionController::~AdmissionController() {
Status AdmissionController::Init() {
RETURN_IF_ERROR(Thread::Create("scheduling", "admission-thread",
&AdmissionController::DequeueLoop, this, &dequeue_thread_));
- StatestoreSubscriber::UpdateCallback cb =
- bind<void>(mem_fn(&AdmissionController::UpdatePoolStats), this, _1, _2);
- Status status = subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, true, cb);
+ auto cb = [this](
+ const StatestoreSubscriber::TopicDeltaMap& state,
+ vector<TTopicDelta>* topic_updates) { UpdatePoolStats(state, topic_updates); };
+ Status status =
+ subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, true, false, cb);
if (!status.ok()) {
status.AddDetail("AdmissionController failed to register request queue topic");
}
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 5a67a74..950d218 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -88,7 +88,7 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
StatestoreSubscriber::UpdateCallback cb =
bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2);
Status status = statestore_subscriber_->AddTopic(
- Statestore::IMPALA_MEMBERSHIP_TOPIC, true, cb);
+ Statestore::IMPALA_MEMBERSHIP_TOPIC, true, false, cb);
if (!status.ok()) {
status.AddDetail("Scheduler failed to register membership topic");
return status;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index ce6c2c3..c2b8300 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -358,12 +358,12 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
// Register the membership callback if running in a real cluster.
if (!TestInfo::is_test()) {
- auto cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
- vector<TTopicDelta>* topic_updates) {
+ auto cb = [this](const StatestoreSubscriber::TopicDeltaMap& state,
+ vector<TTopicDelta>* topic_updates) {
this->MembershipCallback(state, topic_updates);
};
- ABORT_IF_ERROR(
- exec_env->subscriber()->AddTopic(Statestore::IMPALA_MEMBERSHIP_TOPIC, true, cb));
+ ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
+ Statestore::IMPALA_MEMBERSHIP_TOPIC, true, false, cb));
if (FLAGS_is_coordinator && !FLAGS_use_local_catalog) {
auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
@@ -371,7 +371,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
this->CatalogUpdateCallback(state, topic_updates);
};
ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
- CatalogServer::IMPALA_CATALOG_TOPIC, true, catalog_cb));
+ CatalogServer::IMPALA_CATALOG_TOPIC, true, true, catalog_cb));
}
}
@@ -1483,6 +1483,7 @@ void ImpalaServer::CatalogUpdateCallback(
// Always update the minimum subscriber version for the catalog topic.
{
unique_lock<mutex> unique_lock(catalog_version_lock_);
+ DCHECK(delta.__isset.min_subscriber_topic_version);
min_subscriber_catalog_topic_version_ = delta.min_subscriber_topic_version;
}
catalog_version_update_cv_.NotifyAll();
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index a27d1ae..4351589 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -137,7 +137,8 @@ StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
}
Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
- bool is_transient, const UpdateCallback& callback) {
+ bool is_transient, bool populate_min_subscriber_topic_version,
+ const UpdateCallback& callback) {
lock_guard<shared_mutex> exclusive_lock(lock_);
if (is_registered_) return Status("Subscriber already started, can't add new topic");
TopicRegistration& registration = topic_registrations_[topic_id];
@@ -150,6 +151,8 @@ Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
registration.update_interval_timer.Start();
}
registration.is_transient = is_transient;
+ registration.populate_min_subscriber_topic_version =
+ populate_min_subscriber_topic_version;
return Status::OK();
}
@@ -163,6 +166,8 @@ Status StatestoreSubscriber::Register() {
TTopicRegistration thrift_topic;
thrift_topic.topic_name = registration.first;
thrift_topic.is_transient = registration.second.is_transient;
+ thrift_topic.populate_min_subscriber_topic_version =
+ registration.second.populate_min_subscriber_topic_version;
request.topic_registrations.push_back(thrift_topic);
}
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index f102cae..7621592 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -112,7 +112,7 @@ class StatestoreSubscriber {
/// Must be called before Start(), in which case it will return
/// Status::OK. Otherwise an error will be returned.
Status AddTopic(const Statestore::TopicId& topic_id, bool is_transient,
- const UpdateCallback& callback);
+ bool populate_min_subscriber_topic_version, const UpdateCallback& callback);
/// Registers this subscriber with the statestore, and starts the
/// heartbeat service, as well as a thread to check for failure and
@@ -212,6 +212,10 @@ class StatestoreSubscriber {
/// it makes will be deleted upon failure or disconnection.
bool is_transient = false;
+ /// Whether this subscriber needs the min_subscriber_topic_version field to be filled
+ /// in on updates.
+ bool populate_min_subscriber_topic_version = false;
+
/// The last version of the topic this subscriber processed.
/// -1 if no updates have been processed yet.
int64_t current_topic_version = -1;
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 90ea167..a58aec1 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -300,12 +300,14 @@ void Statestore::Topic::ToJson(Document* document, Value* topic_json) {
Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
const RegistrationId& registration_id, const TNetworkAddress& network_address,
const vector<TTopicRegistration>& subscribed_topics)
- : subscriber_id_(subscriber_id),
- registration_id_(registration_id),
- network_address_(network_address) {
- for (const TTopicRegistration& topic: subscribed_topics) {
- GetTopicsMapForId(topic.topic_name)->emplace(piecewise_construct,
- forward_as_tuple(topic.topic_name), forward_as_tuple(topic.is_transient));
+ : subscriber_id_(subscriber_id),
+ registration_id_(registration_id),
+ network_address_(network_address) {
+ for (const TTopicRegistration& topic : subscribed_topics) {
+ GetTopicsMapForId(topic.topic_name)
+ ->emplace(piecewise_construct, forward_as_tuple(topic.topic_name),
+ forward_as_tuple(
+ topic.is_transient, topic.populate_min_subscriber_topic_version));
}
}
@@ -697,18 +699,22 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, UpdateKind update_kin
return Status::OK();
}
-void Statestore::GatherTopicUpdates(const Subscriber& subscriber,
- UpdateKind update_kind, TUpdateStateRequest* update_state_request) {
+void Statestore::GatherTopicUpdates(const Subscriber& subscriber, UpdateKind update_kind,
+ TUpdateStateRequest* update_state_request) {
+ DCHECK(update_kind == UpdateKind::TOPIC_UPDATE
+ || update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE)
+ << static_cast<int>(update_kind);
+ // Indices into update_state_request->topic_deltas where we need to populate
+ // 'min_subscriber_topic_version'. GetMinSubscriberTopicVersion() is somewhat
+ // expensive so we want to avoid calling it unless necessary.
+ vector<TTopicDelta*> deltas_needing_min_version;
{
- DCHECK(update_kind == UpdateKind::TOPIC_UPDATE
- || update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE)
- << static_cast<int>(update_kind);
const bool is_priority = update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE;
- const Subscriber::Topics& subscribed_topics = is_priority
- ? subscriber.priority_subscribed_topics()
- : subscriber.non_priority_subscribed_topics();
+ const Subscriber::Topics& subscribed_topics = is_priority ?
+ subscriber.priority_subscribed_topics() :
+ subscriber.non_priority_subscribed_topics();
shared_lock<shared_mutex> l(topics_map_lock_);
- for (const auto& subscribed_topic: subscribed_topics) {
+ for (const auto& subscribed_topic : subscribed_topics) {
auto topic_it = topics_.find(subscribed_topic.first);
DCHECK(topic_it != topics_.end());
TopicEntry::Version last_processed_version =
@@ -718,16 +724,20 @@ void Statestore::GatherTopicUpdates(const Subscriber& subscriber,
update_state_request->topic_deltas[subscribed_topic.first];
topic_delta.topic_name = subscribed_topic.first;
topic_it->second.BuildDelta(subscriber.id(), last_processed_version, &topic_delta);
+ if (subscribed_topic.second.populate_min_subscriber_topic_version) {
+ deltas_needing_min_version.push_back(&topic_delta);
+ }
}
}
// Fill in the min subscriber topic version. This must be done after releasing
// topics_map_lock_.
- lock_guard<mutex> l(subscribers_lock_);
- typedef map<TopicId, TTopicDelta> TopicDeltaMap;
- for (TopicDeltaMap::value_type& topic_delta: update_state_request->topic_deltas) {
- topic_delta.second.__set_min_subscriber_topic_version(
- GetMinSubscriberTopicVersion(topic_delta.first));
+ if (!deltas_needing_min_version.empty()) {
+ lock_guard<mutex> l(subscribers_lock_);
+ for (TTopicDelta* delta : deltas_needing_min_version) {
+ delta->__set_min_subscriber_topic_version(
+ GetMinSubscriberTopicVersion(delta->topic_name));
+ }
}
}
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index edaef49..71e1ade 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -331,11 +331,17 @@ class Statestore : public CacheLineAligned {
/// Information about a subscriber's subscription to a specific topic.
struct TopicSubscription {
- TopicSubscription(bool is_transient) : is_transient(is_transient) {}
+ TopicSubscription(bool is_transient, bool populate_min_subscriber_topic_version)
+ : is_transient(is_transient),
+ populate_min_subscriber_topic_version(populate_min_subscriber_topic_version) {}
/// Whether entries written by this subscriber should be considered transient.
const bool is_transient;
+ /// Whether min_subscriber_topic_version needs to be filled in for this
+ /// subscription.
+ const bool populate_min_subscriber_topic_version;
+
/// The last topic entry version successfully processed by this subscriber. Only
/// written by a single thread at a time but can be read concurrently.
AtomicInt64 last_version{TOPIC_INITIAL_VERSION};
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 4f2dada..783bea7 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -139,6 +139,12 @@ struct TTopicRegistration {
// True if updates to this topic from this subscriber should be removed upon the
// subscriber's failure or disconnection
2: required bool is_transient;
+
+ // If true, min_subscriber_topic_version is computed and set in topic updates sent
+ // to this subscriber to this subscriber. Should only be set to true if this is
+ // actually required - computing the version is relatively expensive compared to
+ // other aspects of preparing topic updates - see IMPALA-6816.
+ 3: required bool populate_min_subscriber_topic_version = false;
}
struct TRegisterSubscriberRequest {
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index 2a7c2f9..698a2a6 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -590,3 +590,96 @@ class TestStatestore():
sub.register(topics=[reg])
LOG.info("Re-registered with id {0}, waiting for update".format(sub.subscriber_id))
sub.wait_for_update(topic_name, target_updates)
+
+ def test_min_subscriber_topic_version(self):
+ self._do_test_min_subscriber_topic_version(False)
+
+ def test_min_subscriber_topic_version_with_straggler(self):
+ self._do_test_min_subscriber_topic_version(True)
+
+ def _do_test_min_subscriber_topic_version(self, simulate_straggler):
+ """Implementation of test that the 'min_subscriber_topic_version' flag is correctly
+ set when requested. This tests runs two subscribers concurrently and tracks the
+ minimum version each has processed. If 'simulate_straggler' is true, one subscriber
+ rejects updates so that its version is not advanced."""
+ topic_name = "test_min_subscriber_topic_version_%s" % uuid.uuid1()
+
+ # This lock is held while processing the update to protect last_to_versions.
+ update_lock = threading.Lock()
+ last_to_versions = {}
+ TOTAL_SUBSCRIBERS = 2
+ def callback(sub, args, is_producer, sub_name):
+ """Callback for subscriber to verify min_subscriber_topic_version behaviour.
+ If 'is_producer' is true, this acts as the producer, otherwise it acts as the
+ consumer. 'sub_name' is a name used to index into last_to_versions."""
+ if topic_name not in args.topic_deltas:
+ # The update doesn't contain our topic.
+ pass
+ with update_lock:
+ LOG.info("{0} got update {1}".format(sub_name,
+ repr(args.topic_deltas[topic_name])))
+ LOG.info("Versions: {0}".format(last_to_versions))
+ to_version = args.topic_deltas[topic_name].to_version
+ from_version = args.topic_deltas[topic_name].from_version
+ min_subscriber_topic_version = \
+ args.topic_deltas[topic_name].min_subscriber_topic_version
+
+ if is_producer:
+ assert min_subscriber_topic_version is not None
+ assert (to_version == 0 and min_subscriber_topic_version == 0) or\
+ min_subscriber_topic_version < to_version,\
+ "'to_version' hasn't been created yet by this subscriber."
+ # Only validate version once all subscribers have processed an update.
+ if len(last_to_versions) == TOTAL_SUBSCRIBERS:
+ min_to_version = min(last_to_versions.values())
+ assert min_subscriber_topic_version <= min_to_version,\
+ "The minimum subscriber topic version seen by the producer cannot get " +\
+ "ahead of the minimum version seem by the consumer, by definition."
+ assert min_subscriber_topic_version >= min_to_version - 2,\
+ "The min topic version can be two behind the last version seen by " + \
+ "this subscriber because the updates for both subscribers are " + \
+ "prepared in parallel and because it's possible that the producer " + \
+ "processes two updates in-between consumer updates. This is not " + \
+ "absolute but depends on updates not being delayed a large amount."
+ else:
+ # Consumer did not request topic version.
+ assert min_subscriber_topic_version is None
+
+ # Check the 'to_version' and update 'last_to_versions'.
+ last_to_version = last_to_versions.get(sub_name, 0)
+ if to_version > 0:
+ # Non-empty update.
+ assert from_version == last_to_version
+ # Stragglers should accept the first update then skip later ones.
+ skip_update = simulate_straggler and not is_producer and last_to_version > 0
+ if not skip_update: last_to_versions[sub_name] = to_version
+
+ if is_producer:
+ delta = self.make_topic_update(topic_name)
+ return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta],
+ skipped=False)
+ elif skip_update:
+ return TUpdateStateResponse(status=STATUS_OK, topic_updates=[], skipped=True)
+ else:
+ return DEFAULT_UPDATE_STATE_RESPONSE
+
+ # Two concurrent subscribers, which pushes out updates and checks the minimum
+ # version, the other which just consumes the updates.
+ def producer_callback(sub, args): return callback(sub, args, True, "producer")
+ def consumer_callback(sub, args): return callback(sub, args, False, "consumer")
+ consumer_sub = StatestoreSubscriber(update_cb=consumer_callback)
+ consumer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
+ producer_sub = StatestoreSubscriber(update_cb=producer_callback)
+ producer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True,
+ populate_min_subscriber_topic_version=True)
+ NUM_UPDATES = 6
+ (
+ consumer_sub.start()
+ .register(topics=[consumer_reg])
+ )
+ (
+ producer_sub.start()
+ .register(topics=[producer_reg])
+ .wait_for_update(topic_name, NUM_UPDATES)
+ )
+ consumer_sub.wait_for_update(topic_name, NUM_UPDATES)