You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/04/18 23:33:46 UTC
[2/4] impala git commit: IMPALA-6793: Fix empty metadata after
statestore restarts
IMPALA-6793: Fix empty metadata after statestore restarts
IMPALA-5990 introduced a bug where restarting the statestore
deterministically clears the metadata without ever coming back. The
cause of the bug is a wrong condition used by catalog to detect the
restart of statestore.
A custom cluster regression test is added. The process restarting
utility function in the custom cluster test is changed into using
shell=True in popen.
Change-Id: I332a60e172af84b93b3544373fe363cdced5e8d0
Reviewed-on: http://gerrit.cloudera.org:8080/9921
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Tianyi Wang <tw...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/740fc6b5
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/740fc6b5
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/740fc6b5
Branch: refs/heads/master
Commit: 740fc6b57f074a448bac04ec2e8e05312f141f67
Parents: cfaffc2
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Fri Apr 6 13:06:35 2018 -0700
Committer: Tianyi Wang <tw...@cloudera.com>
Committed: Wed Apr 18 18:26:24 2018 +0000
----------------------------------------------------------------------
be/src/catalog/catalog-server.cc | 13 ++---
be/src/catalog/catalog-server.h | 5 --
tests/common/impala_cluster.py | 2 +-
tests/custom_cluster/test_restart_services.py | 59 ++++++++++++++++++++++
4 files changed, 67 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/740fc6b5/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 8a91c25..e645204 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -157,7 +157,7 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
: thrift_iface_(new CatalogServiceThriftIf(this)),
thrift_serializer_(FLAGS_compact_catalog_topic), metrics_(metrics),
topic_updates_ready_(false), last_sent_catalog_version_(0L),
- catalog_objects_min_version_(0L), catalog_objects_max_version_(0L) {
+ catalog_objects_max_version_(0L) {
topic_processing_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
CATALOG_SERVER_TOPIC_PROCESSING_TIMES);
}
@@ -228,10 +228,12 @@ void CatalogServer::UpdateCatalogTopicCallback(
const TTopicDelta& delta = topic->second;
- // If not generating a delta update and 'pending_topic_updates_' doesn't already contain
- // the full catalog (beginning with version 0), then force GatherCatalogUpdatesThread()
- // to reload the full catalog.
- if (delta.from_version == 0 && catalog_objects_min_version_ != 0) {
+ // If the statestore restarts, both from_version and to_version would be 0. If catalog
+ // has sent non-empty topic udpate, pending_topic_updates_ won't be from version 0 and
+ // it should be re-collected.
+ if (delta.from_version == 0 && delta.to_version == 0 &&
+ last_sent_catalog_version_ != 0) {
+ LOG(INFO) << "Statestore restart detected. Collecting a non-delta catalog update.";
last_sent_catalog_version_ = 0L;
} else if (!pending_topic_updates_.empty()) {
// Process the pending topic update.
@@ -284,7 +286,6 @@ void CatalogServer::UpdateCatalogTopicCallback(
if (!status.ok()) {
LOG(ERROR) << status.GetDetail();
} else {
- catalog_objects_min_version_ = last_sent_catalog_version_;
catalog_objects_max_version_ = resp.max_catalog_version;
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/740fc6b5/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 2fa8ce7..1df83a3 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -119,11 +119,6 @@ class CatalogServer {
/// Set in UpdateCatalogTopicCallback() and protected by the catalog_lock_.
int64_t last_sent_catalog_version_;
- /// The minimum catalog object version in pending_topic_updates_. All items in
- /// pending_topic_updates_ will be greater than this version. Set by the
- /// catalog_update_gathering_thread_ and protected by catalog_lock_.
- int64_t catalog_objects_min_version_;
-
/// The max catalog version in pending_topic_updates_. Set by the
/// catalog_update_gathering_thread_ and protected by catalog_lock_.
int64_t catalog_objects_max_version_;
http://git-wip-us.apache.org/repos/asf/impala/blob/740fc6b5/tests/common/impala_cluster.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 3fbcacf..276c02b 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -29,7 +29,7 @@ from tests.common.impala_service import (
CatalogdService,
ImpaladService,
StateStoredService)
-from tests.util.shell_util import exec_process_async, exec_process
+from tests.util.shell_util import exec_process, exec_process_async
logging.basicConfig(level=logging.ERROR, format='%(threadName)s: %(message)s')
LOG = logging.getLogger('impala_cluster')
http://git-wip-us.apache.org/repos/asf/impala/blob/740fc6b5/tests/custom_cluster/test_restart_services.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
new file mode 100644
index 0000000..bcfe19d
--- /dev/null
+++ b/tests/custom_cluster/test_restart_services.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from impala.error import HiveServer2Error
+from tests.common.environ import specific_build_type_timeout
+from time import sleep
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+
+class TestRestart(CustomClusterTestSuite):
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ @pytest.mark.execute_serially
+ def test_restart_statestore(self, cursor):
+ """ Regression test of IMPALA-6973. After the statestore restarts, the metadata should
+ eventually recover after being cleared by the new statestore.
+ """
+ try:
+ self.cluster.statestored.restart()
+ # We need to wait for the impalad to register to the new statestored and for a
+ # non-empty catalog update from the new statestored. It cannot be expressed with the
+ # existing metrics yet so we wait for some time here.
+ wait_time_s = specific_build_type_timeout(60, slow_build_timeout=100)
+ sleep(wait_time_s)
+ for retry in xrange(wait_time_s):
+ try:
+ cursor.execute("describe database functional")
+ return
+ except HiveServer2Error, e:
+ assert "AnalysisException: Database does not exist: functional" in e.message,\
+ "Unexpected exception: " + e.message
+ sleep(1)
+ assert False, "Coordinator never received non-empty metadata from the restarted " \
+ "statestore after {0} seconds".format(wait_time_s)
+ finally:
+ # Workaround for IMPALA-5695. Restarted process has to be manually killed or it will
+ # block start-impala-cluster.py from killing impala daemons.
+ self.cluster.statestored.kill()
+ self.cluster.statestored.wait()