You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/25 07:57:22 UTC

[impala] 08/14: IMPALA-6816: minimise calls to GetMinSubscriberTopicVersion()

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a29bec1902ffa1ccfb0539a40b874f7650f6b56e
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Mon Jun 11 17:22:35 2018 -0700

    IMPALA-6816: minimise calls to GetMinSubscriberTopicVersion()
    
    min_subscriber_topic_version is expensive to compute (requires iterating
    over all subscribers to compute) but is only used by one
    subscriber/topic pair: Impalads receiving catalog topic updates.
    
    This patch implements a simple fix - only compute it if a subscriber
    asks for it. A more complex alternative would be to maintain
    a priority queue of subscriber versions, but that didn't seem worth
    the the complexity and risk of bugs.
    
    Testing:
    Add a statestore test to validate the versions. It looks like we had a
    pre-existing test gap for validating min_subscriber_topic_version so
    the test is mainly focused on adding that coverage.
    
    Ran core tests with DEBUG and ASAN.
    
    Change-Id: I8ee7cb2355ba1049b9081e0df344ac41aa4ebeb1
    Reviewed-on: http://gerrit.cloudera.org:8080/10705
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/catalog/catalog-server.cc           |  2 +-
 be/src/scheduling/admission-controller.cc  |  9 +--
 be/src/scheduling/scheduler.cc             |  2 +-
 be/src/service/impala-server.cc            | 11 ++--
 be/src/statestore/statestore-subscriber.cc |  7 ++-
 be/src/statestore/statestore-subscriber.h  |  6 +-
 be/src/statestore/statestore.cc            | 50 +++++++++-------
 be/src/statestore/statestore.h             |  8 ++-
 common/thrift/StatestoreService.thrift     |  6 ++
 tests/statestore/test_statestore.py        | 93 ++++++++++++++++++++++++++++++
 10 files changed, 160 insertions(+), 34 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index e74db75..a17478f 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -186,7 +186,7 @@ Status CatalogServer::Start() {
 
   StatestoreSubscriber::UpdateCallback cb =
       bind<void>(mem_fn(&CatalogServer::UpdateCatalogTopicCallback), this, _1, _2);
-  status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, cb);
+  status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, false, cb);
   if (!status.ok()) {
     status.AddDetail("CatalogService failed to start");
     return status;
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index ac176f4..a53d1a0 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -18,7 +18,6 @@
 #include "scheduling/admission-controller.h"
 
 #include <boost/algorithm/string.hpp>
-#include <boost/bind.hpp>
 #include <boost/mem_fn.hpp>
 #include <gutil/strings/substitute.h>
 
@@ -243,9 +242,11 @@ AdmissionController::~AdmissionController() {
 Status AdmissionController::Init() {
   RETURN_IF_ERROR(Thread::Create("scheduling", "admission-thread",
       &AdmissionController::DequeueLoop, this, &dequeue_thread_));
-  StatestoreSubscriber::UpdateCallback cb =
-    bind<void>(mem_fn(&AdmissionController::UpdatePoolStats), this, _1, _2);
-  Status status = subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, true, cb);
+  auto cb = [this](
+      const StatestoreSubscriber::TopicDeltaMap& state,
+      vector<TTopicDelta>* topic_updates) { UpdatePoolStats(state, topic_updates); };
+  Status status =
+      subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, true, false, cb);
   if (!status.ok()) {
     status.AddDetail("AdmissionController failed to register request queue topic");
   }
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 5a67a74..950d218 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -88,7 +88,7 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
     StatestoreSubscriber::UpdateCallback cb =
         bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2);
     Status status = statestore_subscriber_->AddTopic(
-        Statestore::IMPALA_MEMBERSHIP_TOPIC, true, cb);
+        Statestore::IMPALA_MEMBERSHIP_TOPIC, true, false, cb);
     if (!status.ok()) {
       status.AddDetail("Scheduler failed to register membership topic");
       return status;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index ce6c2c3..c2b8300 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -358,12 +358,12 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
 
   // Register the membership callback if running in a real cluster.
   if (!TestInfo::is_test()) {
-    auto cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
-         vector<TTopicDelta>* topic_updates) {
+    auto cb = [this](const StatestoreSubscriber::TopicDeltaMap& state,
+        vector<TTopicDelta>* topic_updates) {
       this->MembershipCallback(state, topic_updates);
     };
-    ABORT_IF_ERROR(
-        exec_env->subscriber()->AddTopic(Statestore::IMPALA_MEMBERSHIP_TOPIC, true, cb));
+    ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
+        Statestore::IMPALA_MEMBERSHIP_TOPIC, true, false, cb));
 
     if (FLAGS_is_coordinator && !FLAGS_use_local_catalog) {
       auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
@@ -371,7 +371,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
         this->CatalogUpdateCallback(state, topic_updates);
       };
       ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
-            CatalogServer::IMPALA_CATALOG_TOPIC, true, catalog_cb));
+          CatalogServer::IMPALA_CATALOG_TOPIC, true, true, catalog_cb));
     }
   }
 
@@ -1483,6 +1483,7 @@ void ImpalaServer::CatalogUpdateCallback(
   // Always update the minimum subscriber version for the catalog topic.
   {
     unique_lock<mutex> unique_lock(catalog_version_lock_);
+    DCHECK(delta.__isset.min_subscriber_topic_version);
     min_subscriber_catalog_topic_version_ = delta.min_subscriber_topic_version;
   }
   catalog_version_update_cv_.NotifyAll();
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index a27d1ae..4351589 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -137,7 +137,8 @@ StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
 }
 
 Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
-    bool is_transient, const UpdateCallback& callback) {
+    bool is_transient, bool populate_min_subscriber_topic_version,
+    const UpdateCallback& callback) {
   lock_guard<shared_mutex> exclusive_lock(lock_);
   if (is_registered_) return Status("Subscriber already started, can't add new topic");
   TopicRegistration& registration = topic_registrations_[topic_id];
@@ -150,6 +151,8 @@ Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
     registration.update_interval_timer.Start();
   }
   registration.is_transient = is_transient;
+  registration.populate_min_subscriber_topic_version =
+      populate_min_subscriber_topic_version;
   return Status::OK();
 }
 
@@ -163,6 +166,8 @@ Status StatestoreSubscriber::Register() {
     TTopicRegistration thrift_topic;
     thrift_topic.topic_name = registration.first;
     thrift_topic.is_transient = registration.second.is_transient;
+    thrift_topic.populate_min_subscriber_topic_version =
+        registration.second.populate_min_subscriber_topic_version;
     request.topic_registrations.push_back(thrift_topic);
   }
 
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index f102cae..7621592 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -112,7 +112,7 @@ class StatestoreSubscriber {
   /// Must be called before Start(), in which case it will return
   /// Status::OK. Otherwise an error will be returned.
   Status AddTopic(const Statestore::TopicId& topic_id, bool is_transient,
-      const UpdateCallback& callback);
+      bool populate_min_subscriber_topic_version, const UpdateCallback& callback);
 
   /// Registers this subscriber with the statestore, and starts the
   /// heartbeat service, as well as a thread to check for failure and
@@ -212,6 +212,10 @@ class StatestoreSubscriber {
     /// it makes will be deleted upon failure or disconnection.
     bool is_transient = false;
 
+    /// Whether this subscriber needs the min_subscriber_topic_version field to be filled
+    /// in on updates.
+    bool populate_min_subscriber_topic_version = false;
+
     /// The last version of the topic this subscriber processed.
     /// -1 if no updates have been processed yet.
     int64_t current_topic_version = -1;
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 90ea167..a58aec1 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -300,12 +300,14 @@ void Statestore::Topic::ToJson(Document* document, Value* topic_json) {
 Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
     const RegistrationId& registration_id, const TNetworkAddress& network_address,
     const vector<TTopicRegistration>& subscribed_topics)
-    : subscriber_id_(subscriber_id),
-      registration_id_(registration_id),
-      network_address_(network_address) {
-  for (const TTopicRegistration& topic: subscribed_topics) {
-    GetTopicsMapForId(topic.topic_name)->emplace(piecewise_construct,
-        forward_as_tuple(topic.topic_name), forward_as_tuple(topic.is_transient));
+  : subscriber_id_(subscriber_id),
+    registration_id_(registration_id),
+    network_address_(network_address) {
+  for (const TTopicRegistration& topic : subscribed_topics) {
+    GetTopicsMapForId(topic.topic_name)
+        ->emplace(piecewise_construct, forward_as_tuple(topic.topic_name),
+            forward_as_tuple(
+                topic.is_transient, topic.populate_min_subscriber_topic_version));
   }
 }
 
@@ -697,18 +699,22 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, UpdateKind update_kin
   return Status::OK();
 }
 
-void Statestore::GatherTopicUpdates(const Subscriber& subscriber,
-    UpdateKind update_kind, TUpdateStateRequest* update_state_request) {
+void Statestore::GatherTopicUpdates(const Subscriber& subscriber, UpdateKind update_kind,
+    TUpdateStateRequest* update_state_request) {
+  DCHECK(update_kind == UpdateKind::TOPIC_UPDATE
+      || update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE)
+      << static_cast<int>(update_kind);
+  // Indices into update_state_request->topic_deltas where we need to populate
+  // 'min_subscriber_topic_version'. GetMinSubscriberTopicVersion() is somewhat
+  // expensive so we want to avoid calling it unless necessary.
+  vector<TTopicDelta*> deltas_needing_min_version;
   {
-    DCHECK(update_kind == UpdateKind::TOPIC_UPDATE
-        || update_kind ==  UpdateKind::PRIORITY_TOPIC_UPDATE)
-        << static_cast<int>(update_kind);
     const bool is_priority = update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE;
-    const Subscriber::Topics& subscribed_topics = is_priority
-        ? subscriber.priority_subscribed_topics()
-        : subscriber.non_priority_subscribed_topics();
+    const Subscriber::Topics& subscribed_topics = is_priority ?
+        subscriber.priority_subscribed_topics() :
+        subscriber.non_priority_subscribed_topics();
     shared_lock<shared_mutex> l(topics_map_lock_);
-    for (const auto& subscribed_topic: subscribed_topics) {
+    for (const auto& subscribed_topic : subscribed_topics) {
       auto topic_it = topics_.find(subscribed_topic.first);
       DCHECK(topic_it != topics_.end());
       TopicEntry::Version last_processed_version =
@@ -718,16 +724,20 @@ void Statestore::GatherTopicUpdates(const Subscriber& subscriber,
           update_state_request->topic_deltas[subscribed_topic.first];
       topic_delta.topic_name = subscribed_topic.first;
       topic_it->second.BuildDelta(subscriber.id(), last_processed_version, &topic_delta);
+      if (subscribed_topic.second.populate_min_subscriber_topic_version) {
+        deltas_needing_min_version.push_back(&topic_delta);
+      }
     }
   }
 
   // Fill in the min subscriber topic version. This must be done after releasing
   // topics_map_lock_.
-  lock_guard<mutex> l(subscribers_lock_);
-  typedef map<TopicId, TTopicDelta> TopicDeltaMap;
-  for (TopicDeltaMap::value_type& topic_delta: update_state_request->topic_deltas) {
-    topic_delta.second.__set_min_subscriber_topic_version(
-        GetMinSubscriberTopicVersion(topic_delta.first));
+  if (!deltas_needing_min_version.empty()) {
+    lock_guard<mutex> l(subscribers_lock_);
+    for (TTopicDelta* delta : deltas_needing_min_version) {
+      delta->__set_min_subscriber_topic_version(
+          GetMinSubscriberTopicVersion(delta->topic_name));
+    }
   }
 }
 
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index edaef49..71e1ade 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -331,11 +331,17 @@ class Statestore : public CacheLineAligned {
 
     /// Information about a subscriber's subscription to a specific topic.
     struct TopicSubscription {
-      TopicSubscription(bool is_transient) : is_transient(is_transient) {}
+      TopicSubscription(bool is_transient, bool populate_min_subscriber_topic_version)
+        : is_transient(is_transient),
+          populate_min_subscriber_topic_version(populate_min_subscriber_topic_version) {}
 
       /// Whether entries written by this subscriber should be considered transient.
       const bool is_transient;
 
+      /// Whether min_subscriber_topic_version needs to be filled in for this
+      /// subscription.
+      const bool populate_min_subscriber_topic_version;
+
       /// The last topic entry version successfully processed by this subscriber. Only
       /// written by a single thread at a time but can be read concurrently.
       AtomicInt64 last_version{TOPIC_INITIAL_VERSION};
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 4f2dada..783bea7 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -139,6 +139,12 @@ struct TTopicRegistration {
   // True if updates to this topic from this subscriber should be removed upon the
   // subscriber's failure or disconnection
   2: required bool is_transient;
+
+  // If true, min_subscriber_topic_version is computed and set in topic updates sent
+  // to this subscriber to this subscriber. Should only be set to true if this is
+  // actually required - computing the version is relatively expensive compared to
+  // other aspects of preparing topic updates - see IMPALA-6816.
+  3: required bool populate_min_subscriber_topic_version = false;
 }
 
 struct TRegisterSubscriberRequest {
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index 2a7c2f9..698a2a6 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -590,3 +590,96 @@ class TestStatestore():
     sub.register(topics=[reg])
     LOG.info("Re-registered with id {0}, waiting for update".format(sub.subscriber_id))
     sub.wait_for_update(topic_name, target_updates)
+
+  def test_min_subscriber_topic_version(self):
+    self._do_test_min_subscriber_topic_version(False)
+
+  def test_min_subscriber_topic_version_with_straggler(self):
+    self._do_test_min_subscriber_topic_version(True)
+
+  def _do_test_min_subscriber_topic_version(self, simulate_straggler):
+    """Implementation of test that the 'min_subscriber_topic_version' flag is correctly
+    set when requested. This tests runs two subscribers concurrently and tracks the
+    minimum version each has processed. If 'simulate_straggler' is true, one subscriber
+    rejects updates so that its version is not advanced."""
+    topic_name = "test_min_subscriber_topic_version_%s" % uuid.uuid1()
+
+    # This lock is held while processing the update to protect last_to_versions.
+    update_lock = threading.Lock()
+    last_to_versions = {}
+    TOTAL_SUBSCRIBERS = 2
+    def callback(sub, args, is_producer, sub_name):
+      """Callback for subscriber to verify min_subscriber_topic_version behaviour.
+      If 'is_producer' is true, this acts as the producer, otherwise it acts as the
+      consumer. 'sub_name' is a name used to index into last_to_versions."""
+      if topic_name not in args.topic_deltas:
+        # The update doesn't contain our topic.
+        pass
+      with update_lock:
+        LOG.info("{0} got update {1}".format(sub_name,
+            repr(args.topic_deltas[topic_name])))
+        LOG.info("Versions: {0}".format(last_to_versions))
+        to_version = args.topic_deltas[topic_name].to_version
+        from_version = args.topic_deltas[topic_name].from_version
+        min_subscriber_topic_version = \
+            args.topic_deltas[topic_name].min_subscriber_topic_version
+
+        if is_producer:
+          assert min_subscriber_topic_version is not None
+          assert (to_version == 0 and min_subscriber_topic_version == 0) or\
+              min_subscriber_topic_version < to_version,\
+              "'to_version' hasn't been created yet by this subscriber."
+          # Only validate version once all subscribers have processed an update.
+          if len(last_to_versions) == TOTAL_SUBSCRIBERS:
+            min_to_version = min(last_to_versions.values())
+            assert min_subscriber_topic_version <= min_to_version,\
+                "The minimum subscriber topic version seen by the producer cannot get " +\
+                "ahead of the minimum version seem by the consumer, by definition."
+            assert min_subscriber_topic_version >= min_to_version - 2,\
+                "The min topic version can be two behind the last version seen by " + \
+                "this subscriber because the updates for both subscribers are " + \
+                "prepared in parallel and because it's possible that the producer " + \
+                "processes two updates in-between consumer updates. This is not " + \
+                "absolute but depends on updates not being delayed a large amount."
+        else:
+          # Consumer did not request topic version.
+          assert min_subscriber_topic_version is None
+
+        # Check the 'to_version' and update 'last_to_versions'.
+        last_to_version = last_to_versions.get(sub_name, 0)
+        if to_version > 0:
+          # Non-empty update.
+          assert from_version == last_to_version
+        # Stragglers should accept the first update then skip later ones.
+        skip_update = simulate_straggler and not is_producer and last_to_version > 0
+        if not skip_update: last_to_versions[sub_name] = to_version
+
+        if is_producer:
+          delta = self.make_topic_update(topic_name)
+          return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta],
+                                      skipped=False)
+        elif skip_update:
+          return TUpdateStateResponse(status=STATUS_OK, topic_updates=[], skipped=True)
+        else:
+          return DEFAULT_UPDATE_STATE_RESPONSE
+
+    # Two concurrent subscribers, which pushes out updates and checks the minimum
+    # version, the other which just consumes the updates.
+    def producer_callback(sub, args): return callback(sub, args, True, "producer")
+    def consumer_callback(sub, args): return callback(sub, args, False, "consumer")
+    consumer_sub = StatestoreSubscriber(update_cb=consumer_callback)
+    consumer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
+    producer_sub = StatestoreSubscriber(update_cb=producer_callback)
+    producer_reg = TTopicRegistration(topic_name=topic_name, is_transient=True,
+        populate_min_subscriber_topic_version=True)
+    NUM_UPDATES = 6
+    (
+      consumer_sub.start()
+          .register(topics=[consumer_reg])
+    )
+    (
+      producer_sub.start()
+          .register(topics=[producer_reg])
+          .wait_for_update(topic_name, NUM_UPDATES)
+    )
+    consumer_sub.wait_for_update(topic_name, NUM_UPDATES)