You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/08/22 23:20:22 UTC

[3/7] impala git commit: IMPALA-7457. statestore: allow filtering by key prefix

IMPALA-7457. statestore: allow filtering by key prefix

This adds the ability for a statestore subscriber to specify a key
prefix which acts as a filter. Only topic entries which match the
specified prefix are transmitted to the subscriber.

This patch makes use of the new feature for a small optimization: the
catalogd subscribes to the catalog topic with a key prefix "!" which we
know doesn't match any actual topic items. This avoids the statestore
having to reflect back the catalog contents to the catalogd, since the
catalogd ignored this info anyway.

A later patch will make use of this to publish lightweight catalog
object version numbers in the same topic as the catalog objects
themselves.

The modification to catalogd's topic subscription is covered by existing
tests. A new specific test is added to verify the filtering mechanism.

Change-Id: I6ddcf3bfaf16bc3cd1ba01100e948ff142a67620
Reviewed-on: http://gerrit.cloudera.org:8080/11253
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/da01f29d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/da01f29d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/da01f29d

Branch: refs/heads/master
Commit: da01f29d303dca1dbc2be30bc75a72d698a9f4d2
Parents: 0782321
Author: Todd Lipcon <to...@apache.org>
Authored: Thu Aug 16 13:18:28 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Aug 22 16:05:55 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc           |  9 ++++-
 be/src/scheduling/admission-controller.cc  |  4 +-
 be/src/scheduling/scheduler.cc             |  4 +-
 be/src/service/impala-server.cc            | 10 +++--
 be/src/statestore/statestore-subscriber.cc |  8 +++-
 be/src/statestore/statestore-subscriber.h  |  6 ++-
 be/src/statestore/statestore.cc            | 13 +++++--
 be/src/statestore/statestore.h             | 22 +++++++++--
 common/thrift/StatestoreService.thrift     |  6 +++
 tests/statestore/test_statestore.py        | 49 +++++++++++++++++++++++++
 10 files changed, 115 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 96646fb..bd754cf 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -204,7 +204,14 @@ Status CatalogServer::Start() {
 
   StatestoreSubscriber::UpdateCallback cb =
       bind<void>(mem_fn(&CatalogServer::UpdateCatalogTopicCallback), this, _1, _2);
-  status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, false, cb);
+  // The catalogd never needs to read any entries from the topic. It only publishes
+  // entries. So, we set a prefix to some random character that we know won't be a
+  // prefix of any key. This saves a bit of network communication from the statestore
+  // back to the catalog.
+  string filter_prefix = "!";
+  status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC,
+      /* is_transient=*/ false, /* populate_min_subscriber_topic_version=*/ false,
+      filter_prefix, cb);
   if (!status.ok()) {
     status.AddDetail("CatalogService failed to start");
     return status;

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 3960528..2e1f7d9 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -246,7 +246,9 @@ Status AdmissionController::Init() {
       const StatestoreSubscriber::TopicDeltaMap& state,
       vector<TTopicDelta>* topic_updates) { UpdatePoolStats(state, topic_updates); };
   Status status =
-      subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, true, false, cb);
+      subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC,
+          /* is_transient=*/ true, /* populate_min_subscriber_topic_version=*/ false,
+          /* filter_prefix=*/"", cb);
   if (!status.ok()) {
     status.AddDetail("AdmissionController failed to register request queue topic");
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 20acc43..ae4f049 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -84,7 +84,9 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
     StatestoreSubscriber::UpdateCallback cb =
         bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2);
     Status status = statestore_subscriber_->AddTopic(
-        Statestore::IMPALA_MEMBERSHIP_TOPIC, true, false, cb);
+        Statestore::IMPALA_MEMBERSHIP_TOPIC, /* is_transient=*/ true,
+        /* populate_min_subscriber_topic_version=*/ false,
+        /* filter_prefix= */"", cb);
     if (!status.ok()) {
       status.AddDetail("Scheduler failed to register membership topic");
       return status;

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 23a09f5..6b37d49 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -362,15 +362,19 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
       this->MembershipCallback(state, topic_updates);
     };
     ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
-        Statestore::IMPALA_MEMBERSHIP_TOPIC, true, false, cb));
+        Statestore::IMPALA_MEMBERSHIP_TOPIC, /* is_transient=*/ true,
+        /* populate_min_subscriber_topic_version=*/ false,
+        /* filter_prefix=*/"", cb));
 
-    if (FLAGS_is_coordinator && !FLAGS_use_local_catalog) {
+    if (FLAGS_is_coordinator) {
       auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
           vector<TTopicDelta>* topic_updates) {
         this->CatalogUpdateCallback(state, topic_updates);
       };
       ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
-          CatalogServer::IMPALA_CATALOG_TOPIC, true, true, catalog_cb));
+          CatalogServer::IMPALA_CATALOG_TOPIC, /* is_transient=*/ true,
+          /* populate_min_subscriber_topic_version=*/ true, /* filter_prefix=*/ "",
+          catalog_cb));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 443a0e5..c83b520 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -43,6 +43,8 @@ using boost::posix_time::seconds;
 using boost::shared_lock;
 using boost::shared_mutex;
 using boost::try_to_lock;
+using std::string;
+
 using namespace apache::thrift;
 using namespace apache::thrift::transport;
 using namespace strings;
@@ -105,7 +107,7 @@ class StatestoreSubscriberThriftIf : public StatestoreSubscriberIf {
   StatestoreSubscriber* subscriber_;
 };
 
-StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
+StatestoreSubscriber::StatestoreSubscriber(const string& subscriber_id,
     const TNetworkAddress& heartbeat_address, const TNetworkAddress& statestore_address,
     MetricGroup* metrics)
     : subscriber_id_(subscriber_id),
@@ -139,7 +141,7 @@ StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
 
 Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
     bool is_transient, bool populate_min_subscriber_topic_version,
-    const UpdateCallback& callback) {
+    string filter_prefix, const UpdateCallback& callback) {
   lock_guard<shared_mutex> exclusive_lock(lock_);
   if (is_registered_) return Status("Subscriber already started, can't add new topic");
   TopicRegistration& registration = topic_registrations_[topic_id];
@@ -154,6 +156,7 @@ Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
   registration.is_transient = is_transient;
   registration.populate_min_subscriber_topic_version =
       populate_min_subscriber_topic_version;
+  registration.filter_prefix = std::move(filter_prefix);
   return Status::OK();
 }
 
@@ -169,6 +172,7 @@ Status StatestoreSubscriber::Register() {
     thrift_topic.is_transient = registration.second.is_transient;
     thrift_topic.populate_min_subscriber_topic_version =
         registration.second.populate_min_subscriber_topic_version;
+    thrift_topic.__set_filter_prefix(registration.second.filter_prefix);
     request.topic_registrations.push_back(thrift_topic);
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/statestore/statestore-subscriber.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index 05d4489..016343f 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -112,7 +112,8 @@ class StatestoreSubscriber {
   /// Must be called before Start(), in which case it will return
   /// Status::OK. Otherwise an error will be returned.
   Status AddTopic(const Statestore::TopicId& topic_id, bool is_transient,
-      bool populate_min_subscriber_topic_version, const UpdateCallback& callback);
+      bool populate_min_subscriber_topic_version, std::string filter_prefix,
+      const UpdateCallback& callback);
 
   /// Registers this subscriber with the statestore, and starts the
   /// heartbeat service, as well as a thread to check for failure and
@@ -220,6 +221,9 @@ class StatestoreSubscriber {
     /// in on updates.
     bool populate_min_subscriber_topic_version = false;
 
+    /// Only subscribe to keys with the provided prefix.
+    string filter_prefix;
+
     /// The last version of the topic this subscriber processed.
     /// -1 if no updates have been processed yet.
     int64_t current_topic_version = -1;

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index a208d97..8749825 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -25,6 +25,7 @@
 #include <boost/thread.hpp>
 #include <thrift/Thrift.h>
 #include <gutil/strings/substitute.h>
+#include <gutil/strings/util.h>
 
 #include "common/status.h"
 #include "gen-cpp/StatestoreService_types.h"
@@ -232,7 +233,8 @@ void Statestore::Topic::ClearAllEntries() {
 }
 
 void Statestore::Topic::BuildDelta(const SubscriberId& subscriber_id,
-    TopicEntry::Version last_processed_version, TTopicDelta* delta) {
+    TopicEntry::Version last_processed_version,
+    const string& filter_prefix, TTopicDelta* delta) {
   // If the subscriber version is > 0, send this update as a delta. Otherwise, this is
   // a new subscriber so send them a non-delta update that includes all entries in the
   // topic.
@@ -253,6 +255,9 @@ void Statestore::Topic::BuildDelta(const SubscriberId& subscriber_id,
       if (!delta->is_delta && topic_entry.is_deleted()) {
         continue;
       }
+      // Skip any entries that don't match the requested prefix.
+      if (!HasPrefixString(itr->first, filter_prefix)) continue;
+
       delta->topic_entries.push_back(TTopicItem());
       TTopicItem& delta_entry = delta->topic_entries.back();
       delta_entry.key = itr->first;
@@ -314,7 +319,8 @@ Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
     GetTopicsMapForId(topic.topic_name)
         ->emplace(piecewise_construct, forward_as_tuple(topic.topic_name),
             forward_as_tuple(
-                topic.is_transient, topic.populate_min_subscriber_topic_version));
+                topic.is_transient, topic.populate_min_subscriber_topic_version,
+                topic.filter_prefix));
   }
 }
 
@@ -752,7 +758,8 @@ void Statestore::GatherTopicUpdates(const Subscriber& subscriber, UpdateKind upd
       TTopicDelta& topic_delta =
           update_state_request->topic_deltas[subscribed_topic.first];
       topic_delta.topic_name = subscribed_topic.first;
-      topic_it->second.BuildDelta(subscriber.id(), last_processed_version, &topic_delta);
+      topic_it->second.BuildDelta(subscriber.id(), last_processed_version,
+          subscribed_topic.second.filter_prefix, &topic_delta);
       if (subscribed_topic.second.populate_min_subscriber_topic_version) {
         deltas_needing_min_version.push_back(&topic_delta);
       }

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 1d7f1a2..9326492 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -97,6 +97,13 @@ typedef TUniqueId RegistrationId;
 /// subscriber, rather than all items in the topic.  For non-delta updates, the statestore
 /// will send an update that includes all values in the topic.
 ///
+/// Subscribers may filter the keys within a subscribed topic by an optional prefix. If
+/// a key filter prefix is specified, only entries matching that prefix will be sent to
+/// the subscriber in updates. Note that this may result in empty updates being sent
+/// to subscribers in the case that all updated keys have been excluded by the filter.
+/// These empty updates are important so that subscribers can keep track of the current
+/// version number and report back their progress in receiving the topic contents.
+///
 /// +================+
 /// | Implementation |
 /// +================+
@@ -262,12 +269,14 @@ class Statestore : public CacheLineAligned {
     void DeleteIfVersionsMatch(TopicEntry::Version version, const TopicEntryKey& key);
 
     /// Build a delta update to send to 'subscriber_id' including the deltas greater
-    /// than 'last_processed_version' (not inclusive).
+    /// than 'last_processed_version' (not inclusive). Only those items whose keys
+    /// start with 'filter_prefix' are included in the update.
     ///
     /// Safe to call concurrently from multiple threads (for different subscribers).
     /// Acquires a shared read lock for the topic.
     void BuildDelta(const SubscriberId& subscriber_id,
-        TopicEntry::Version last_processed_version, TTopicDelta* delta);
+        TopicEntry::Version last_processed_version, const std::string& filter_prefix,
+        TTopicDelta* delta);
 
     /// Adds entries representing the current topic state to 'topic_json'.
     void ToJson(rapidjson::Document* document, rapidjson::Value* topic_json);
@@ -335,9 +344,11 @@ class Statestore : public CacheLineAligned {
 
     /// Information about a subscriber's subscription to a specific topic.
     struct TopicSubscription {
-      TopicSubscription(bool is_transient, bool populate_min_subscriber_topic_version)
+      TopicSubscription(bool is_transient, bool populate_min_subscriber_topic_version,
+          std::string filter_prefix)
         : is_transient(is_transient),
-          populate_min_subscriber_topic_version(populate_min_subscriber_topic_version) {}
+          populate_min_subscriber_topic_version(populate_min_subscriber_topic_version),
+          filter_prefix(std::move(filter_prefix)) {}
 
       /// Whether entries written by this subscriber should be considered transient.
       const bool is_transient;
@@ -346,6 +357,9 @@ class Statestore : public CacheLineAligned {
       /// subscription.
       const bool populate_min_subscriber_topic_version;
 
+      /// The prefix for which the subscriber wants to see updates.
+      const std::string filter_prefix;
+
       /// The last topic entry version successfully processed by this subscriber. Only
       /// written by a single thread at a time but can be read concurrently.
       AtomicInt64 last_version{TOPIC_INITIAL_VERSION};

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 783bea7..1c82170 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -145,6 +145,12 @@ struct TTopicRegistration {
   // actually required - computing the version is relatively expensive compared to
   // other aspects of preparing topic updates - see IMPALA-6816.
   3: required bool populate_min_subscriber_topic_version = false;
+
+  // Restrict the items to receive on this subscription to only those items
+  // starting with the given prefix.
+  //
+  // If this is not specified, all items will be subscribed to.
+  4: optional string filter_prefix
 }
 
 struct TRegisterSubscriberRequest {

http://git-wip-us.apache.org/repos/asf/impala/blob/da01f29d/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index 682a306..d45deeb 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -378,6 +378,55 @@ class TestStatestore():
          .wait_for_update(topic_name, 3)
     )
 
+  def test_filter_prefix(self):
+    topic_name = "topic_delta_%s" % uuid.uuid1()
+
+    def topic_update_correct(sub, args):
+      foo_delta = self.make_topic_update(topic_name, num_updates=1)
+      bar_delta = self.make_topic_update(topic_name, num_updates=2, key_template='bar')
+
+      update_count = sub.update_counts[topic_name]
+      if topic_name not in args.topic_deltas:
+        # The update doesn't contain our topic.
+        pass
+      elif update_count == 1:
+        # Send some values with both prefixes.
+        return TUpdateStateResponse(status=STATUS_OK,
+                                    topic_updates=[foo_delta, bar_delta],
+                                    skipped=False)
+      elif update_count == 2:
+        # We should only get the 'bar' entries back.
+        assert len(args.topic_deltas) == 1, args.topic_deltas
+        assert args.topic_deltas[topic_name].topic_entries == bar_delta.topic_entries
+        assert args.topic_deltas[topic_name].topic_name == bar_delta.topic_name
+      elif update_count == 3:
+        # Send some more updates that only have 'foo' prefixes.
+        return TUpdateStateResponse(status=STATUS_OK,
+                                    topic_updates=[foo_delta],
+                                    skipped=False)
+      elif update_count == 4:
+        # We shouldn't see any entries from the above update, but we should still see
+        # the version number change due to the new entries in the topic.
+        assert len(args.topic_deltas[topic_name].topic_entries) == 0
+        assert args.topic_deltas[topic_name].from_version == 3
+        assert args.topic_deltas[topic_name].to_version == 4
+      elif update_count == 5:
+        # After the content-bearing update was processed, the next delta should be empty
+        assert len(args.topic_deltas[topic_name].topic_entries) == 0
+        assert args.topic_deltas[topic_name].from_version == 4
+        assert args.topic_deltas[topic_name].to_version == 4
+
+      return DEFAULT_UPDATE_STATE_RESPONSE
+
+    sub = StatestoreSubscriber(update_cb=topic_update_correct)
+    reg = TTopicRegistration(topic_name=topic_name, is_transient=False,
+                             filter_prefix="bar")
+    (
+      sub.start()
+         .register(topics=[reg])
+         .wait_for_update(topic_name, 5)
+    )
+
   def test_update_is_delta(self):
     """Test that the 'is_delta' flag is correctly set. The first update for a topic should
     always not be a delta, and so should all subsequent updates until the subscriber says