You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/04/18 23:33:46 UTC

[2/4] impala git commit: IMPALA-6793: Fix empty metadata after statestore restarts

IMPALA-6793: Fix empty metadata after statestore restarts

IMPALA-5990 introduced a bug where restarting the statestore
deterministically clears the metadata without ever coming back. The
cause of the bug is a wrong condition used by catalog to detect the
restart of statestore.

A custom cluster regression test is added. The process restarting
utility function in the custom cluster test is changed into using
shell=True in popen.

Change-Id: I332a60e172af84b93b3544373fe363cdced5e8d0
Reviewed-on: http://gerrit.cloudera.org:8080/9921
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Tianyi Wang <tw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/740fc6b5
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/740fc6b5
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/740fc6b5

Branch: refs/heads/master
Commit: 740fc6b57f074a448bac04ec2e8e05312f141f67
Parents: cfaffc2
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Fri Apr 6 13:06:35 2018 -0700
Committer: Tianyi Wang <tw...@cloudera.com>
Committed: Wed Apr 18 18:26:24 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc              | 13 ++---
 be/src/catalog/catalog-server.h               |  5 --
 tests/common/impala_cluster.py                |  2 +-
 tests/custom_cluster/test_restart_services.py | 59 ++++++++++++++++++++++
 4 files changed, 67 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/740fc6b5/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 8a91c25..e645204 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -157,7 +157,7 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
   : thrift_iface_(new CatalogServiceThriftIf(this)),
     thrift_serializer_(FLAGS_compact_catalog_topic), metrics_(metrics),
     topic_updates_ready_(false), last_sent_catalog_version_(0L),
-    catalog_objects_min_version_(0L), catalog_objects_max_version_(0L) {
+    catalog_objects_max_version_(0L) {
   topic_processing_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
       CATALOG_SERVER_TOPIC_PROCESSING_TIMES);
 }
@@ -228,10 +228,12 @@ void CatalogServer::UpdateCatalogTopicCallback(
 
   const TTopicDelta& delta = topic->second;
 
-  // If not generating a delta update and 'pending_topic_updates_' doesn't already contain
-  // the full catalog (beginning with version 0), then force GatherCatalogUpdatesThread()
-  // to reload the full catalog.
-  if (delta.from_version == 0 && catalog_objects_min_version_ != 0) {
+  // If the statestore restarts, both from_version and to_version would be 0. If catalog
+  // has sent non-empty topic udpate, pending_topic_updates_ won't be from version 0 and
+  // it should be re-collected.
+  if (delta.from_version == 0 && delta.to_version == 0 &&
+      last_sent_catalog_version_ != 0) {
+    LOG(INFO) << "Statestore restart detected. Collecting a non-delta catalog update.";
     last_sent_catalog_version_ = 0L;
   } else if (!pending_topic_updates_.empty()) {
     // Process the pending topic update.
@@ -284,7 +286,6 @@ void CatalogServer::UpdateCatalogTopicCallback(
       if (!status.ok()) {
         LOG(ERROR) << status.GetDetail();
       } else {
-        catalog_objects_min_version_ = last_sent_catalog_version_;
         catalog_objects_max_version_ = resp.max_catalog_version;
       }
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/740fc6b5/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 2fa8ce7..1df83a3 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -119,11 +119,6 @@ class CatalogServer {
   /// Set in UpdateCatalogTopicCallback() and protected by the catalog_lock_.
   int64_t last_sent_catalog_version_;
 
-  /// The minimum catalog object version in pending_topic_updates_. All items in
-  /// pending_topic_updates_ will be greater than this version. Set by the
-  /// catalog_update_gathering_thread_ and protected by catalog_lock_.
-  int64_t catalog_objects_min_version_;
-
   /// The max catalog version in pending_topic_updates_. Set by the
   /// catalog_update_gathering_thread_ and protected by catalog_lock_.
   int64_t catalog_objects_max_version_;

http://git-wip-us.apache.org/repos/asf/impala/blob/740fc6b5/tests/common/impala_cluster.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 3fbcacf..276c02b 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -29,7 +29,7 @@ from tests.common.impala_service import (
     CatalogdService,
     ImpaladService,
     StateStoredService)
-from tests.util.shell_util import exec_process_async, exec_process
+from tests.util.shell_util import exec_process, exec_process_async
 
 logging.basicConfig(level=logging.ERROR, format='%(threadName)s: %(message)s')
 LOG = logging.getLogger('impala_cluster')

http://git-wip-us.apache.org/repos/asf/impala/blob/740fc6b5/tests/custom_cluster/test_restart_services.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
new file mode 100644
index 0000000..bcfe19d
--- /dev/null
+++ b/tests/custom_cluster/test_restart_services.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from impala.error import HiveServer2Error
+from tests.common.environ import specific_build_type_timeout
+from time import sleep
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+
+class TestRestart(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  def test_restart_statestore(self, cursor):
+    """ Regression test of IMPALA-6973. After the statestore restarts, the metadata should
+        eventually recover after being cleared by the new statestore.
+    """
+    try:
+      self.cluster.statestored.restart()
+      # We need to wait for the impalad to register to the new statestored and for a
+      # non-empty catalog update from the new statestored. It cannot be expressed with the
+      # existing metrics yet so we wait for some time here.
+      wait_time_s = specific_build_type_timeout(60, slow_build_timeout=100)
+      sleep(wait_time_s)
+      for retry in xrange(wait_time_s):
+        try:
+          cursor.execute("describe database functional")
+          return
+        except HiveServer2Error, e:
+          assert "AnalysisException: Database does not exist: functional" in e.message,\
+              "Unexpected exception: " + e.message
+          sleep(1)
+      assert False, "Coordinator never received non-empty metadata from the restarted " \
+          "statestore after {0} seconds".format(wait_time_s)
+    finally:
+      # Workaround for IMPALA-5695. Restarted process has to be manually killed or it will
+      # block start-impala-cluster.py from killing impala daemons.
+      self.cluster.statestored.kill()
+      self.cluster.statestored.wait()