You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/05/08 16:11:58 UTC

[2/2] impala git commit: IMPALA-6948: Delete catalog update topic entries upon catalog restart

IMPALA-6948: Delete catalog update topic entries upon catalog restart

This commit fixes an issue where the statestore may end up with stale
entries in the catalog update topic that do not correspond to the
catalog objects stored in the catalog. This may occur if the catalog
server restarts and some catalog object (e.g. table) that was known to
the catalog before the restart no longer exists in the Hive Metastore
after the restart.

Fix:
The first update for the catalog update topic that is sent by the catalog
instructs the statestore to clear any entries it may have on this topic
before applying the first update. In that way, we guarantee that the
statestore entries are consistent with the catalog objects stored in the
catalog server. Any coordinator that detects the catalog restart will
receive from the statestore a full topic update that reflects the state
of the catalog server.

Testing:
Added statestore test.

Change-Id: I907509bf92da631ece5efd23c275a613ead00e91

Tmp

Change-Id: I74a8ade8e498ac35cb56d3775d2c67a86988d9b6
Reviewed-on: http://gerrit.cloudera.org:8080/10289
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e2e7c103
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e2e7c103
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e2e7c103

Branch: refs/heads/master
Commit: e2e7c103a7ac87d848eed3138841ae5d2ec32863
Parents: 18455ec
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Sun Apr 29 20:55:24 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue May 8 03:31:24 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc                |  6 +++
 be/src/statestore/statestore.cc                 | 26 ++++++++++
 be/src/statestore/statestore.h                  | 10 +++-
 common/thrift/StatestoreService.thrift          |  4 ++
 .../apache/impala/catalog/ImpaladCatalog.java   |  6 +--
 tests/statestore/test_statestore.py             | 53 +++++++++++++++++++-
 6 files changed, 98 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e2e7c103/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index e645204..1c6e894 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -241,6 +241,12 @@ void CatalogServer::UpdateCatalogTopicCallback(
     TTopicDelta& update = subscriber_topic_updates->back();
     update.topic_name = IMPALA_CATALOG_TOPIC;
     update.topic_entries = std::move(pending_topic_updates_);
+    // If this is the first update sent to the statestore, instruct the
+    // statestore to clear any entries it may already have for the catalog
+    // update topic. This is to guarantee that upon catalog restart, the
+    // statestore entries of the catalog update topic are in sync with the
+    // catalog objects stored in the catalog (see IMPALA-6948).
+    update.__set_clear_topic_entries((last_sent_catalog_version_ == 0));
 
     VLOG(1) << "A catalog update with " << update.topic_entries.size()
             << " entries is assembled. Catalog version: "

http://git-wip-us.apache.org/repos/asf/impala/blob/e2e7c103/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 5c1952d..90ea167 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -207,6 +207,23 @@ void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
   }
 }
 
+void Statestore::Topic::ClearAllEntries() {
+  lock_guard<shared_mutex> write_lock(lock_);
+  entries_.clear();
+  topic_update_log_.clear();
+  int64_t key_size_metric_val = key_size_metric_->GetValue();
+  key_size_metric_->SetValue(std::max(static_cast<int64_t>(0),
+      key_size_metric_val - total_key_size_bytes_));
+  int64_t value_size_metric_val = value_size_metric_->GetValue();
+  value_size_metric_->SetValue(std::max(static_cast<int64_t>(0),
+      value_size_metric_val - total_value_size_bytes_));
+  int64_t topic_size_metric_val = topic_size_metric_->GetValue();
+  topic_size_metric_->SetValue(std::max(static_cast<int64_t>(0),
+      topic_size_metric_val - (total_value_size_bytes_ + total_key_size_bytes_)));
+  total_value_size_bytes_ = 0;
+  total_key_size_bytes_ = 0;
+}
+
 void Statestore::Topic::BuildDelta(const SubscriberId& subscriber_id,
     TopicEntry::Version last_processed_version, TTopicDelta* delta) {
   // If the subscriber version is > 0, send this update as a delta. Otherwise, this is
@@ -655,6 +672,15 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, UpdateKind update_kin
       }
 
       Topic& topic = topic_it->second;
+      // Check if the subscriber indicated that the topic entries should be
+      // cleared.
+      if (update.__isset.clear_topic_entries && update.clear_topic_entries) {
+        DCHECK(!update.__isset.from_version);
+        LOG(INFO) << "Received request for clearing the entries of topic: "
+                  << update.topic_name << " from: " << subscriber->id();
+        topic.ClearAllEntries();
+      }
+
       // Update the topic and add transient entries separately to avoid holding both
       // locks at the same time and preventing concurrent topic updates.
       vector<TopicEntry::Version> entry_versions = topic.Put(update.topic_entries);

http://git-wip-us.apache.org/repos/asf/impala/blob/e2e7c103/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 482e48b..edaef49 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -237,6 +237,14 @@ class Statestore : public CacheLineAligned {
     /// Acquires an exclusive write lock for the topic.
     std::vector<TopicEntry::Version> Put(const std::vector<TTopicItem>& entries);
 
+    /// Deletes all the topic entries and updates the topic metrics. It doesn't
+    /// reset the last_version_ to ensure that versions are monotonically
+    /// increasing.
+    ///
+    /// Safe to call concurrently from multiple threads (for different
+    /// subscribers). Acquires an exclusive lock for the topic.
+    void ClearAllEntries();
+
     /// Utility method to support removing transient entries. We track the version numbers
     /// of entries added by subscribers, and remove entries with the same version number
     /// when that subscriber fails (the same entry may exist, but may have been updated by
@@ -388,7 +396,7 @@ class Statestore : public CacheLineAligned {
     /// processed. Will never decrease.
     TopicEntry::Version LastTopicVersionProcessed(const TopicId& topic_id) const;
 
-    /// Sets the subscriber's last processed version of the topic to the given value.  This
+    /// Sets the subscriber's last processed version of the topic to the given value. This
     /// should only be set when once a subscriber has succesfully processed the given
     /// update corresponding to this version. Should not be called concurrently from
     /// multiple threads for a given 'topic_id'.

http://git-wip-us.apache.org/repos/asf/impala/blob/e2e7c103/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 13a87c9..3702932 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -122,6 +122,10 @@ struct TTopicDelta {
   // keys with a version < min_subscriber_topic_version. Only used when sending an update
   // from the statestore to a subscriber.
   6: optional i64 min_subscriber_topic_version
+
+  // If set and true the statestore must clear the existing topic entries (if any) before
+  // applying the entries in topic_entries.
+  7: optional bool clear_topic_entries
 }
 
 // Description of a topic to subscribe to as part of a RegisterSubscriber call

http://git-wip-us.apache.org/repos/asf/impala/blob/e2e7c103/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 6ad4234..369cc9c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -176,10 +176,8 @@ public class ImpaladCatalog extends Catalog {
         LOG.info("Received large catalog object(>100mb): " + key + " is " + len +
             "bytes");
       }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace((update.first ? "Deleting " : "Adding ") + "item: " + key + " version: "
-            + obj.catalog_version + " of size: " + len);
-      }
+      LOG.info((update.first ? "Deleting: " : "Adding: ") + key + " version: "
+          + obj.catalog_version + " size: " + len);
       // For statestore updates, the service ID and updated version is wrapped in a
       // CATALOG catalog object.
       if (obj.type == TCatalogObjectType.CATALOG) {

http://git-wip-us.apache.org/repos/asf/impala/blob/e2e7c103/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index 737ff9c..2a7c2f9 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -323,13 +323,14 @@ class StatestoreSubscriber(object):
 
 class TestStatestore():
   def make_topic_update(self, topic_name, key_template="foo", value_template="bar",
-                        num_updates=1):
+                        num_updates=1, clear_topic_entries=False):
     topic_entries = [
       Subscriber.TTopicItem(key=key_template + str(x), value=value_template + str(x))
       for x in xrange(num_updates)]
     return Subscriber.TTopicDelta(topic_name=topic_name,
                                   topic_entries=topic_entries,
-                                  is_delta=False)
+                                  is_delta=False,
+                                  clear_topic_entries=clear_topic_entries)
 
   def test_registration_ids_different(self):
     """Test that if a subscriber with the same id registers twice, the registration ID is
@@ -519,6 +520,54 @@ class TestStatestore():
           .wait_for_update(transient_topic_name, 1)
     )
 
+  def test_update_with_clear_entries_flag(self):
+    """Test that the statestore clears all topic entries when a subscriber
+    sets the clear_topic_entries flag in a topic update message (IMPALA-6948)."""
+    topic_name = "test_topic_%s" % str(uuid.uuid1())
+
+    def add_entries(sub, args):
+      updates = []
+      if (topic_name in args.topic_deltas and sub.update_counts[topic_name] == 1):
+        updates.append(self.make_topic_update(topic_name, num_updates=2,
+            key_template="old"))
+
+      if (topic_name in args.topic_deltas and sub.update_counts[topic_name] == 2):
+        updates.append(self.make_topic_update(topic_name, num_updates=1,
+            key_template="new", clear_topic_entries=True))
+
+      if len(updates) > 0:
+        return TUpdateStateResponse(status=STATUS_OK, topic_updates=updates,
+            skipped=False)
+
+      return DEFAULT_UPDATE_STATE_RESPONSE
+
+    def check_entries(sub, args):
+      if (topic_name in args.topic_deltas and sub.update_counts[topic_name] == 1):
+        assert len(args.topic_deltas[topic_name].topic_entries) == 1
+        assert args.topic_deltas[topic_name].topic_entries[0].key == "new0"
+
+      return DEFAULT_UPDATE_STATE_RESPONSE
+
+    reg = [TTopicRegistration(topic_name=topic_name, is_transient=False)]
+    sub1 = StatestoreSubscriber(update_cb=add_entries)
+    (
+      sub1.start()
+        .register(topics=reg)
+        .wait_for_update(topic_name, 1)
+        .kill()
+        .wait_for_failure()
+        .start()
+        .register(topics=reg)
+        .wait_for_update(topic_name, 1)
+    )
+
+    sub2 = StatestoreSubscriber(update_cb=check_entries)
+    (
+      sub2.start()
+        .register(topics=reg)
+        .wait_for_update(topic_name, 2)
+    )
+
   def test_heartbeat_failure_reset(self):
     """Regression test for IMPALA-6785: the heartbeat failure count for the subscriber ID
     should be reset when it resubscribes, not after the first successful heartbeat. Delay