You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2018/05/10 16:25:37 UTC
[5/5] impala git commit: IMPALA-6948,
IMPALA-6962: add end-to-end tests
IMPALA-6948,IMPALA-6962: add end-to-end tests
Adds end-to-end tests to validate that following
various metadata operations, the catalog state
in catalogd and impalads is the same.
For IMPALA-6962, catalogd process restart for tests
is fixed.
Change-Id: Ic6c5b39e29b2885cd30fede18833cbf23fb755f5
Reviewed-on: http://gerrit.cloudera.org:8080/10291
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1c21cd0d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1c21cd0d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1c21cd0d
Branch: refs/heads/2.x
Commit: 1c21cd0d7cf51d19bede727a11340e59b02f93a9
Parents: ca0b8eb
Author: Vuk Ercegovac <ve...@cloudera.com>
Authored: Tue May 1 18:44:04 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed May 9 23:10:16 2018 +0000
----------------------------------------------------------------------
be/src/catalog/catalog-server.cc | 12 ++-
tests/common/impala_cluster.py | 13 +++
tests/common/impala_service.py | 39 +++++--
tests/custom_cluster/test_metadata_replicas.py | 107 ++++++++++++++++++++
tests/metadata/test_hms_integration.py | 80 ++++-----------
tests/util/hive_utils.py | 59 +++++++++++
6 files changed, 242 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/1c21cd0d/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 1c6e894..c2ebf14 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -265,7 +265,7 @@ void CatalogServer::UpdateCatalogTopicCallback(
[[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
while (1) {
unique_lock<mutex> unique_lock(catalog_lock_);
- // Protect against spurious wakups by checking the value of topic_updates_ready_.
+ // Protect against spurious wake-ups by checking the value of topic_updates_ready_.
// It is only safe to continue on and update the shared pending_topic_updates_
// when topic_updates_ready_ is false, otherwise we may be in the middle of
// processing a heartbeat.
@@ -311,6 +311,16 @@ void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,
document->AddMember("error", error, document->GetAllocator());
return;
}
+ long current_catalog_version;
+ status = catalog_->GetCatalogVersion(¤t_catalog_version);
+ if (status.ok()) {
+ Value version_value;
+ version_value.SetInt(current_catalog_version);
+ document->AddMember("version", version_value, document->GetAllocator());
+ } else {
+ Value error(status.GetDetail().c_str(), document->GetAllocator());
+ document->AddMember("versionError", error, document->GetAllocator());
+ }
Value databases(kArrayType);
for (const TDatabase& db: get_dbs_result.dbs) {
Value database(kObjectType);
http://git-wip-us.apache.org/repos/asf/impala/blob/1c21cd0d/tests/common/impala_cluster.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 315666b..2d1ca58 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -18,6 +18,7 @@
# Basic object model of an Impala cluster (set of Impala processes).
#
import logging
+import os
import psutil
import socket
from getpass import getuser
@@ -35,6 +36,9 @@ logging.basicConfig(level=logging.ERROR, format='%(threadName)s: %(message)s')
LOG = logging.getLogger('impala_cluster')
LOG.setLevel(level=logging.DEBUG)
+IMPALA_HOME = os.environ['IMPALA_HOME']
+CATALOGD_PATH = os.path.join(IMPALA_HOME, 'bin/start-catalogd.sh')
+
# Represents a set of Impala processes. Each Impala process must be created with
# a basic set of command line options (beeswax_port, webserver_port, etc)
class ImpalaCluster(object):
@@ -252,3 +256,12 @@ class CatalogdProcess(BaseImpalaProcess):
def __get_port(self, default=None):
return int(self._get_arg_value('catalog_service_port', default))
+
+ def start(self, wait_until_ready=True):
+ """Starts catalogd and waits until the service is ready to accept connections."""
+ restart_cmd = [CATALOGD_PATH] + self.cmd[1:] + ["&"]
+ LOG.info("Starting Catalogd process: %s" % ' '.join(restart_cmd))
+ os.system(' '.join(restart_cmd))
+ if wait_until_ready:
+ self.service.wait_for_metric_value('statestore-subscriber.connected',
+ expected_value=1, timeout=30)
http://git-wip-us.apache.org/repos/asf/impala/blob/1c21cd0d/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index e86528b..bc5b4a3 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -134,6 +134,35 @@ class BaseImpalaService(object):
json.dumps(self.read_debug_webpage('threadz?json')),
json.dumps(self.read_debug_webpage('rpcz?json')))
+ def get_catalog_object_dump(self, object_type, object_name):
+ """ Gets the web-page for the given 'object_type' and 'object_name'."""
+ return self.read_debug_webpage('catalog_object?object_type=%s&object_name=%s' %\
+ (object_type, object_name))
+
+ def get_catalog_objects(self, excludes=['_impala_builtins']):
+ """ Returns a dictionary containing all catalog objects. Each entry's key is the fully
+ qualified object name and the value is a tuple of the form (type, version).
+ Does not return databases listed in the 'excludes' list."""
+ catalog = self.get_debug_webpage_json('catalog')
+ objects = {}
+ for db_desc in catalog["databases"]:
+ db_name = db_desc["name"]
+ if db_name in excludes:
+ continue
+ db = self.get_catalog_object_dump('DATABASE', db_name)
+ objects[db_name] = ('DATABASE', self.extract_catalog_object_version(db))
+ for table_desc in db_desc["tables"]:
+ table_name = table_desc["fqtn"]
+ table = self.get_catalog_object_dump('TABLE', table_name)
+ objects[table_name] = ('TABLE', self.extract_catalog_object_version(table))
+ return objects
+
+ def extract_catalog_object_version(self, thrift_txt):
+ """ Extracts and returns the version of the catalog object's 'thrift_txt' representation."""
+ result = re.search(r'catalog_version \(i64\) = (\d+)', thrift_txt)
+ assert result, 'Unable to find catalog version in object: ' + thrift_txt
+ return int(result.group(1))
+
# Allows for interacting with an Impalad instance to perform operations such as creating
# new connections or accessing the debug webpage.
class ImpaladService(BaseImpalaService):
@@ -260,10 +289,6 @@ class ImpaladService(BaseImpalaService):
hs2_client = TCLIService.Client(protocol)
return hs2_client
- def get_catalog_object_dump(self, object_type, object_name):
- return self.read_debug_webpage('catalog_objects?object_type=%s&object_name=%s' %\
- (object_type, object_name))
-
# Allows for interacting with the StateStore service to perform operations such as
# accessing the debug webpage.
@@ -283,6 +308,6 @@ class CatalogdService(BaseImpalaService):
super(CatalogdService, self).__init__(hostname, webserver_port)
self.service_port = service_port
- def get_catalog_object_dump(self, object_type, object_name):
- return self.read_debug_webpage('catalog_objects?object_type=%s&object_name=%s' %\
- (object_type, object_name))
+ def get_catalog_version(self):
+ """ Gets catalogd's latest catalog version. """
+ return self.get_debug_webpage_json('catalog')["version"]
http://git-wip-us.apache.org/repos/asf/impala/blob/1c21cd0d/tests/custom_cluster/test_metadata_replicas.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_metadata_replicas.py b/tests/custom_cluster/test_metadata_replicas.py
new file mode 100644
index 0000000..589bece
--- /dev/null
+++ b/tests/custom_cluster/test_metadata_replicas.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import re
+from time import sleep
+from tests.common.environ import specific_build_type_timeout
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.util.hive_utils import HiveDbWrapper
+
+class TestMetadataReplicas(CustomClusterTestSuite):
+ """ Validates metadata content across catalogd and impalad coordinators."""
+
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ @pytest.mark.execute_serially
+ def test_start(self):
+ """ Baseline to verify that the initial state is identical. No DDL/DML
+ is processed, so no objects are fully loaded."""
+ self.__validate_metadata()
+
+ @pytest.mark.execute_serially
+ def test_catalog_restart(self, testid_checksum):
+ """ IMPALA-6948: reproduces the issue by deleting a table from Hive while the catalogd
+ is down. When catalogd is restarted, if the regression is present, the deleted
+ table will still be present at the impalads."""
+ db_name = "test_catalog_restart_%s" % testid_checksum
+ try:
+ with HiveDbWrapper(self, db_name):
+ # Issue several invalidates to boost the version for the current incarnation of the
+ # catalog. As a result, the table we'll add to Hive will get a version that's easier
+ # to see is higher than the highest version of the restarted catalogd incarnation.
+ for i in xrange(0, 50):
+ self.client.execute("invalidate metadata functional.alltypes")
+ assert self.cluster.catalogd.service.get_catalog_version() >= 50
+ # Creates a database and table with Hive and makes it visible to Impala.
+ self.run_stmt_in_hive("create table %s.x (a string)" % db_name)
+ self.client.execute("invalidate metadata %s.x" % db_name)
+ assert "x" in self.client.execute("show tables in %s" % db_name).data
+ # Stops the catalog
+ self.cluster.catalogd.kill()
+ # Drops the table from the catalog using Hive.
+ self.run_stmt_in_hive("drop table %s.x" % db_name)
+ # Restarts the catalog
+ self.cluster.catalogd.start()
+ # Refreshes the state of the catalogd process.
+ self.cluster.refresh()
+ # Wait until the impalad catalog versions agree with the catalogd's version.
+ catalogd_version = self.cluster.catalogd.service.get_catalog_version()
+ for impalad in self.cluster.impalads:
+ impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version)
+
+ self.__validate_metadata()
+ except Exception as e:
+ assert False, "Unexpected exception: " + str(e)
+ finally:
+ # Hack to work-around IMPALA-5695.
+ self.cluster.catalogd.kill()
+
+ def __validate_metadata(self):
+ """ Computes the pair-wise object version difference between the catalog contents
+ in catalogd and each impalad. Asserts that there are no differences."""
+ c_objects = self.cluster.catalogd.service.get_catalog_objects()
+ i_objects = [proc.service.get_catalog_objects() for proc in self.cluster.impalads]
+
+ for idx in xrange(0, len(i_objects)):
+ i_obj = i_objects[idx]
+ diff = self.__diff_catalog_objects(c_objects, i_obj)
+ assert diff[0] == {},\
+ 'catalogd has objects not in impalad(%d): %s ' % (idx, diff[0])
+ assert diff[1] == {}, 'impalad(%d) has objects not in catalogd: %s' % (idx, diff[1])
+ assert diff[2] is None,\
+ 'impalad(%d) and catalogd version for objects differs: %s' % (idx, diff[2])
+
+ def __diff_catalog_objects(self, a, b):
+ """ Computes the diff between the input 'a' and 'b' dictionaries. The result is a
+ list of length 3 where position 0 holds those entries that are in a, but not b,
+ position 1 those entries that are in b, but not a, and position 2 holds entries
+ where the key is in both a and b, but whose value differs."""
+ # diff[0] : a - b
+ # diff[1] : b - a
+ # diff[2] : a[k] != b[k]
+ diff = [None, None, None]
+ diff[0] = dict((k, a[k]) for k in set(a) - set(b))
+ diff[1] = dict((k, b[k]) for k in set(b) - set(a))
+ for k, v_a in a.items():
+ v_b = b[k]
+ if v_b is not None:
+ if v_b != v_a:
+ diff[2][k] = (v_a, v_b)
+ return diff
http://git-wip-us.apache.org/repos/asf/impala/blob/1c21cd0d/tests/metadata/test_hms_integration.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_hms_integration.py b/tests/metadata/test_hms_integration.py
index bcdc1e6..0f085c4 100644
--- a/tests/metadata/test_hms_integration.py
+++ b/tests/metadata/test_hms_integration.py
@@ -34,6 +34,7 @@ from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
from tests.common.test_dimensions import (
create_single_exec_option_dimension,
create_uncompressed_text_dimension)
+from tests.util.hive_utils import HiveDbWrapper, HiveTableWrapper
import logging
@@ -147,45 +148,6 @@ class TestHmsIntegration(ImpalaTestSuite):
def __exit__(self, typ, value, traceback):
self.impala.client.execute('drop table if exists %s' % self.table_name)
- class HiveDbWrapper(object):
- """
- A wrapper class for using `with` guards with databases created through Hive
- ensuring deletion even if an exception occurs.
- """
-
- def __init__(self, hive, db_name):
- self.hive = hive
- self.db_name = db_name
-
- def __enter__(self):
- self.hive.run_stmt_in_hive(
- 'create database if not exists ' + self.db_name)
- return self.db_name
-
- def __exit__(self, typ, value, traceback):
- self.hive.run_stmt_in_hive(
- 'drop database if exists %s cascade' % self.db_name)
-
- class HiveTableWrapper(object):
- """
- A wrapper class for using `with` guards with tables created through Hive
- ensuring deletion even if an exception occurs.
- """
-
- def __init__(self, hive, table_name, table_spec):
- self.hive = hive
- self.table_name = table_name
- self.table_spec = table_spec
-
- def __enter__(self):
- self.hive.run_stmt_in_hive(
- 'create table if not exists %s %s' %
- (self.table_name, self.table_spec))
- return self.table_name
-
- def __exit__(self, typ, value, traceback):
- self.hive.run_stmt_in_hive('drop table if exists %s' % self.table_name)
-
def impala_table_stats(self, table):
"""Returns a dictionary of stats for a table according to Impala."""
output = self.client.execute('show table stats %s' % table).get_data()
@@ -288,13 +250,11 @@ class TestHmsIntegration(ImpalaTestSuite):
@pytest.mark.execute_serially
def test_hive_db_hive_table_add_partition(self, vector):
- self.add_hive_partition_helper(vector, self.HiveDbWrapper,
- self.HiveTableWrapper)
+ self.add_hive_partition_helper(vector, HiveDbWrapper, HiveTableWrapper)
@pytest.mark.execute_serially
def test_hive_db_impala_table_add_partition(self, vector):
- self.add_hive_partition_helper(vector, self.HiveDbWrapper,
- self.ImpalaTableWrapper)
+ self.add_hive_partition_helper(vector, HiveDbWrapper, self.ImpalaTableWrapper)
@pytest.mark.execute_serially
def test_impala_db_impala_table_add_partition(self, vector):
@@ -304,7 +264,7 @@ class TestHmsIntegration(ImpalaTestSuite):
@pytest.mark.execute_serially
def test_impala_db_hive_table_add_partition(self, vector):
self.add_hive_partition_helper(vector, self.ImpalaDbWrapper,
- self.HiveTableWrapper)
+ HiveTableWrapper)
@pytest.mark.xfail(run=False, reason="This is a bug: IMPALA-2426")
@pytest.mark.execute_serially
@@ -493,9 +453,9 @@ class TestHmsIntegration(ImpalaTestSuite):
@pytest.mark.execute_serially
def test_compute_stats_get_to_impala(self, vector):
"""Column stats computed in Hive are also visible in Impala."""
- with self.HiveDbWrapper(self, self.unique_string()) as db_name:
- with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
- '(x int)') as table_name:
+ with HiveDbWrapper(self, self.unique_string()) as db_name:
+ with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
+ '(x int)') as table_name:
hive_stats = self.hive_column_stats(table_name, 'x')
self.client.execute('invalidate metadata')
self.client.execute('refresh %s' % table_name)
@@ -577,7 +537,7 @@ class TestHmsIntegration(ImpalaTestSuite):
"""
test_db = self.unique_string()
- with self.HiveDbWrapper(self, test_db) as db_name:
+ with HiveDbWrapper(self, test_db) as db_name:
pass
self.assert_sql_error(
self.client.execute,
@@ -596,9 +556,9 @@ class TestHmsIntegration(ImpalaTestSuite):
"""
# TODO: check results of insert, then select * before and after
# storage format change.
- with self.HiveDbWrapper(self, self.unique_string()) as db_name:
- with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
- '(x int, y int) stored as parquet') as table_name:
+ with HiveDbWrapper(self, self.unique_string()) as db_name:
+ with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
+ '(x int, y int) stored as parquet') as table_name:
self.client.execute('invalidate metadata')
self.client.execute('invalidate metadata %s' % table_name)
print self.impala_table_stats(table_name)
@@ -612,9 +572,9 @@ class TestHmsIntegration(ImpalaTestSuite):
def test_change_column_type(self, vector):
"""Hive column type changes propagate to Impala."""
- with self.HiveDbWrapper(self, self.unique_string()) as db_name:
- with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
- '(x int, y int)') as table_name:
+ with HiveDbWrapper(self, self.unique_string()) as db_name:
+ with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
+ '(x int, y int)') as table_name:
self.run_stmt_in_hive(
'insert into table %s values (33,44)' % table_name)
self.run_stmt_in_hive('alter table %s change y y string' % table_name)
@@ -633,9 +593,9 @@ class TestHmsIntegration(ImpalaTestSuite):
known issue with changing column types in Hive/parquet.
"""
- with self.HiveDbWrapper(self, self.unique_string()) as db_name:
- with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
- '(x int, y int) stored as parquet') as table_name:
+ with HiveDbWrapper(self, self.unique_string()) as db_name:
+ with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
+ '(x int, y int) stored as parquet') as table_name:
self.run_stmt_in_hive(
'insert into table %s values (33,44)' % table_name)
assert '33,44' == self.run_stmt_in_hive(
@@ -661,9 +621,9 @@ class TestHmsIntegration(ImpalaTestSuite):
metadata'.
"""
- with self.HiveDbWrapper(self, self.unique_string()) as db_name:
- with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
- '(x int, y int)') as table_name:
+ with HiveDbWrapper(self, self.unique_string()) as db_name:
+ with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
+ '(x int, y int)') as table_name:
self.client.execute('invalidate metadata')
int_column = {'type': 'int', 'comment': ''}
expected_columns = {'x': int_column, 'y': int_column}
http://git-wip-us.apache.org/repos/asf/impala/blob/1c21cd0d/tests/util/hive_utils.py
----------------------------------------------------------------------
diff --git a/tests/util/hive_utils.py b/tests/util/hive_utils.py
new file mode 100644
index 0000000..094b816
--- /dev/null
+++ b/tests/util/hive_utils.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Utilities for interacting with Hive.
+
+
+class HiveDbWrapper(object):
+ """
+ A wrapper class for using `with` guards with databases created through Hive
+ ensuring deletion even if an exception occurs.
+ """
+
+ def __init__(self, hive, db_name):
+ self.hive = hive
+ self.db_name = db_name
+
+ def __enter__(self):
+ self.hive.run_stmt_in_hive(
+ 'create database if not exists ' + self.db_name)
+ return self.db_name
+
+ def __exit__(self, typ, value, traceback):
+ self.hive.run_stmt_in_hive(
+ 'drop database if exists %s cascade' % self.db_name)
+
+
+class HiveTableWrapper(object):
+ """
+ A wrapper class for using `with` guards with tables created through Hive
+ ensuring deletion even if an exception occurs.
+ """
+
+ def __init__(self, hive, table_name, table_spec):
+ self.hive = hive
+ self.table_name = table_name
+ self.table_spec = table_spec
+
+ def __enter__(self):
+ self.hive.run_stmt_in_hive(
+ 'create table if not exists %s %s' %
+ (self.table_name, self.table_spec))
+ return self.table_name
+
+ def __exit__(self, typ, value, traceback):
+ self.hive.run_stmt_in_hive('drop table if exists %s' % self.table_name)