You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2018/05/10 16:25:37 UTC

[5/5] impala git commit: IMPALA-6948, IMPALA-6962: add end-to-end tests

IMPALA-6948,IMPALA-6962: add end-to-end tests

Adds end-to-end tests to validate that following
various metadata operations, the catalog state
in catalogd and impalads is the same.

For IMPALA-6962, catalogd process restart for tests
is fixed.

Change-Id: Ic6c5b39e29b2885cd30fede18833cbf23fb755f5
Reviewed-on: http://gerrit.cloudera.org:8080/10291
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1c21cd0d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1c21cd0d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1c21cd0d

Branch: refs/heads/2.x
Commit: 1c21cd0d7cf51d19bede727a11340e59b02f93a9
Parents: ca0b8eb
Author: Vuk Ercegovac <ve...@cloudera.com>
Authored: Tue May 1 18:44:04 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed May 9 23:10:16 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc               |  12 ++-
 tests/common/impala_cluster.py                 |  13 +++
 tests/common/impala_service.py                 |  39 +++++--
 tests/custom_cluster/test_metadata_replicas.py | 107 ++++++++++++++++++++
 tests/metadata/test_hms_integration.py         |  80 ++++-----------
 tests/util/hive_utils.py                       |  59 +++++++++++
 6 files changed, 242 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/1c21cd0d/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 1c6e894..c2ebf14 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -265,7 +265,7 @@ void CatalogServer::UpdateCatalogTopicCallback(
 [[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
   while (1) {
     unique_lock<mutex> unique_lock(catalog_lock_);
-    // Protect against spurious wakups by checking the value of topic_updates_ready_.
+    // Protect against spurious wake-ups by checking the value of topic_updates_ready_.
     // It is only safe to continue on and update the shared pending_topic_updates_
     // when topic_updates_ready_ is false, otherwise we may be in the middle of
     // processing a heartbeat.
@@ -311,6 +311,16 @@ void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,
     document->AddMember("error", error, document->GetAllocator());
     return;
   }
+  long current_catalog_version;
+  status = catalog_->GetCatalogVersion(&current_catalog_version);
+  if (status.ok()) {
+    Value version_value;
+    version_value.SetInt(current_catalog_version);
+    document->AddMember("version", version_value, document->GetAllocator());
+  } else {
+    Value error(status.GetDetail().c_str(), document->GetAllocator());
+    document->AddMember("versionError", error, document->GetAllocator());
+  }
   Value databases(kArrayType);
   for (const TDatabase& db: get_dbs_result.dbs) {
     Value database(kObjectType);

http://git-wip-us.apache.org/repos/asf/impala/blob/1c21cd0d/tests/common/impala_cluster.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 315666b..2d1ca58 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -18,6 +18,7 @@
 # Basic object model of an Impala cluster (set of Impala processes).
 #
 import logging
+import os
 import psutil
 import socket
 from getpass import getuser
@@ -35,6 +36,9 @@ logging.basicConfig(level=logging.ERROR, format='%(threadName)s: %(message)s')
 LOG = logging.getLogger('impala_cluster')
 LOG.setLevel(level=logging.DEBUG)
 
+IMPALA_HOME = os.environ['IMPALA_HOME']
+CATALOGD_PATH = os.path.join(IMPALA_HOME, 'bin/start-catalogd.sh')
+
 # Represents a set of Impala processes. Each Impala process must be created with
 # a basic set of command line options (beeswax_port, webserver_port, etc)
 class ImpalaCluster(object):
@@ -252,3 +256,12 @@ class CatalogdProcess(BaseImpalaProcess):
 
   def __get_port(self, default=None):
     return int(self._get_arg_value('catalog_service_port', default))
+
+  def start(self, wait_until_ready=True):
+    """Starts catalogd and waits until the service is ready to accept connections."""
+    restart_cmd = [CATALOGD_PATH] + self.cmd[1:] + ["&"]
+    LOG.info("Starting Catalogd process: %s" % ' '.join(restart_cmd))
+    os.system(' '.join(restart_cmd))
+    if wait_until_ready:
+      self.service.wait_for_metric_value('statestore-subscriber.connected',
+                                         expected_value=1, timeout=30)

http://git-wip-us.apache.org/repos/asf/impala/blob/1c21cd0d/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index e86528b..bc5b4a3 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -134,6 +134,35 @@ class BaseImpalaService(object):
                json.dumps(self.read_debug_webpage('threadz?json')),
                json.dumps(self.read_debug_webpage('rpcz?json')))
 
+  def get_catalog_object_dump(self, object_type, object_name):
+    """ Gets the web-page for the given 'object_type' and 'object_name'."""
+    return self.read_debug_webpage('catalog_object?object_type=%s&object_name=%s' %\
+        (object_type, object_name))
+
+  def get_catalog_objects(self, excludes=['_impala_builtins']):
+    """ Returns a dictionary containing all catalog objects. Each entry's key is the fully
+        qualified object name and the value is a tuple of the form (type, version).
+        Does not return databases listed in the 'excludes' list."""
+    catalog = self.get_debug_webpage_json('catalog')
+    objects = {}
+    for db_desc in catalog["databases"]:
+      db_name = db_desc["name"]
+      if db_name in excludes:
+        continue
+      db = self.get_catalog_object_dump('DATABASE', db_name)
+      objects[db_name] = ('DATABASE', self.extract_catalog_object_version(db))
+      for table_desc in db_desc["tables"]:
+        table_name = table_desc["fqtn"]
+        table = self.get_catalog_object_dump('TABLE', table_name)
+        objects[table_name] = ('TABLE', self.extract_catalog_object_version(table))
+    return objects
+
+  def extract_catalog_object_version(self, thrift_txt):
+    """ Extracts and returns the version of the catalog object's 'thrift_txt' representation."""
+    result = re.search(r'catalog_version \(i64\) = (\d+)', thrift_txt)
+    assert result, 'Unable to find catalog version in object: ' + thrift_txt
+    return int(result.group(1))
+
 # Allows for interacting with an Impalad instance to perform operations such as creating
 # new connections or accessing the debug webpage.
 class ImpaladService(BaseImpalaService):
@@ -260,10 +289,6 @@ class ImpaladService(BaseImpalaService):
     hs2_client = TCLIService.Client(protocol)
     return hs2_client
 
-  def get_catalog_object_dump(self, object_type, object_name):
-    return self.read_debug_webpage('catalog_objects?object_type=%s&object_name=%s' %\
-        (object_type, object_name))
-
 
 # Allows for interacting with the StateStore service to perform operations such as
 # accessing the debug webpage.
@@ -283,6 +308,6 @@ class CatalogdService(BaseImpalaService):
     super(CatalogdService, self).__init__(hostname, webserver_port)
     self.service_port = service_port
 
-  def get_catalog_object_dump(self, object_type, object_name):
-    return self.read_debug_webpage('catalog_objects?object_type=%s&object_name=%s' %\
-        (object_type, object_name))
+  def get_catalog_version(self):
+    """ Gets catalogd's latest catalog version. """
+    return self.get_debug_webpage_json('catalog')["version"]

http://git-wip-us.apache.org/repos/asf/impala/blob/1c21cd0d/tests/custom_cluster/test_metadata_replicas.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_metadata_replicas.py b/tests/custom_cluster/test_metadata_replicas.py
new file mode 100644
index 0000000..589bece
--- /dev/null
+++ b/tests/custom_cluster/test_metadata_replicas.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import re
+from time import sleep
+from tests.common.environ import specific_build_type_timeout
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.util.hive_utils import HiveDbWrapper
+
+class TestMetadataReplicas(CustomClusterTestSuite):
+  """ Validates metadata content across catalogd and impalad coordinators."""
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  def test_start(self):
+    """ Baseline to verify that the initial state is identical. No DDL/DML
+        is processed, so no objects are fully loaded."""
+    self.__validate_metadata()
+
+  @pytest.mark.execute_serially
+  def test_catalog_restart(self, testid_checksum):
+    """ IMPALA-6948: reproduces the issue by deleting a table from Hive while the catalogd
+        is down. When catalogd is restarted, if the regression is present, the deleted
+        table will still be present at the impalads."""
+    db_name = "test_catalog_restart_%s" % testid_checksum
+    try:
+      with HiveDbWrapper(self, db_name):
+        # Issue several invalidates to boost the version for the current incarnation of the
+        # catalog. As a result, the table we'll add to Hive will get a version that's easier
+        # to see is higher than the highest version of the restarted catalogd incarnation.
+        for i in xrange(0, 50):
+          self.client.execute("invalidate metadata functional.alltypes")
+        assert self.cluster.catalogd.service.get_catalog_version() >= 50
+        # Creates a database and table with Hive and makes it visible to Impala.
+        self.run_stmt_in_hive("create table %s.x (a string)" % db_name)
+        self.client.execute("invalidate metadata %s.x" % db_name)
+        assert "x" in self.client.execute("show tables in %s" % db_name).data
+        # Stops the catalog
+        self.cluster.catalogd.kill()
+        # Drops the table from the catalog using Hive.
+        self.run_stmt_in_hive("drop table %s.x" % db_name)
+        # Restarts the catalog
+        self.cluster.catalogd.start()
+        # Refreshes the state of the catalogd process.
+        self.cluster.refresh()
+        # Wait until the impalad catalog versions agree with the catalogd's version.
+        catalogd_version = self.cluster.catalogd.service.get_catalog_version()
+        for impalad in self.cluster.impalads:
+          impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version)
+
+        self.__validate_metadata()
+    except Exception as e:
+      assert False, "Unexpected exception: " + str(e)
+    finally:
+      # Hack to work-around IMPALA-5695.
+      self.cluster.catalogd.kill()
+
+  def __validate_metadata(self):
+    """ Computes the pair-wise object version difference between the catalog contents
+        in catalogd and each impalad. Asserts that there are no differences."""
+    c_objects = self.cluster.catalogd.service.get_catalog_objects()
+    i_objects = [proc.service.get_catalog_objects() for proc in self.cluster.impalads]
+
+    for idx in xrange(0, len(i_objects)):
+      i_obj = i_objects[idx]
+      diff = self.__diff_catalog_objects(c_objects, i_obj)
+      assert diff[0] == {},\
+          'catalogd has objects not in impalad(%d): %s ' % (idx, diff[0])
+      assert diff[1] == {}, 'impalad(%d) has objects not in catalogd: %s' % (idx, diff[1])
+      assert diff[2] is None,\
+          'impalad(%d) and catalogd version for objects differs: %s' % (idx, diff[2])
+
+  def __diff_catalog_objects(self, a, b):
+    """ Computes the diff between the input 'a' and 'b' dictionaries. The result is a
+        list of length 3 where position 0 holds those entries that are in a, but not b,
+        position 1 those entries that are in b, but not a, and position 2 holds entries
+        where the key is in both a and b, but whose value differs."""
+    # diff[0] : a - b
+    # diff[1] : b - a
+    # diff[2] : a[k] != b[k]
+    diff = [None, None, None]
+    diff[0] = dict((k, a[k]) for k in set(a) - set(b))
+    diff[1] = dict((k, b[k]) for k in set(b) - set(a))
+    for k, v_a in a.items():
+      v_b = b[k]
+      if v_b is not None:
+        if v_b != v_a:
+          diff[2][k] = (v_a, v_b)
+    return diff

http://git-wip-us.apache.org/repos/asf/impala/blob/1c21cd0d/tests/metadata/test_hms_integration.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_hms_integration.py b/tests/metadata/test_hms_integration.py
index bcdc1e6..0f085c4 100644
--- a/tests/metadata/test_hms_integration.py
+++ b/tests/metadata/test_hms_integration.py
@@ -34,6 +34,7 @@ from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
 from tests.common.test_dimensions import (
     create_single_exec_option_dimension,
     create_uncompressed_text_dimension)
+from tests.util.hive_utils import HiveDbWrapper, HiveTableWrapper
 
 import logging
 
@@ -147,45 +148,6 @@ class TestHmsIntegration(ImpalaTestSuite):
     def __exit__(self, typ, value, traceback):
       self.impala.client.execute('drop table if exists %s' % self.table_name)
 
-  class HiveDbWrapper(object):
-    """
-    A wrapper class for using `with` guards with databases created through Hive
-    ensuring deletion even if an exception occurs.
-    """
-
-    def __init__(self, hive, db_name):
-      self.hive = hive
-      self.db_name = db_name
-
-    def __enter__(self):
-      self.hive.run_stmt_in_hive(
-          'create database if not exists ' + self.db_name)
-      return self.db_name
-
-    def __exit__(self, typ, value, traceback):
-      self.hive.run_stmt_in_hive(
-          'drop database if exists %s cascade' % self.db_name)
-
-  class HiveTableWrapper(object):
-    """
-    A wrapper class for using `with` guards with tables created through Hive
-    ensuring deletion even if an exception occurs.
-    """
-
-    def __init__(self, hive, table_name, table_spec):
-      self.hive = hive
-      self.table_name = table_name
-      self.table_spec = table_spec
-
-    def __enter__(self):
-      self.hive.run_stmt_in_hive(
-          'create table if not exists %s %s' %
-          (self.table_name, self.table_spec))
-      return self.table_name
-
-    def __exit__(self, typ, value, traceback):
-      self.hive.run_stmt_in_hive('drop table if exists %s' % self.table_name)
-
   def impala_table_stats(self, table):
     """Returns a dictionary of stats for a table according to Impala."""
     output = self.client.execute('show table stats %s' % table).get_data()
@@ -288,13 +250,11 @@ class TestHmsIntegration(ImpalaTestSuite):
 
   @pytest.mark.execute_serially
   def test_hive_db_hive_table_add_partition(self, vector):
-    self.add_hive_partition_helper(vector, self.HiveDbWrapper,
-                                   self.HiveTableWrapper)
+    self.add_hive_partition_helper(vector, HiveDbWrapper, HiveTableWrapper)
 
   @pytest.mark.execute_serially
   def test_hive_db_impala_table_add_partition(self, vector):
-    self.add_hive_partition_helper(vector, self.HiveDbWrapper,
-                                   self.ImpalaTableWrapper)
+    self.add_hive_partition_helper(vector, HiveDbWrapper, self.ImpalaTableWrapper)
 
   @pytest.mark.execute_serially
   def test_impala_db_impala_table_add_partition(self, vector):
@@ -304,7 +264,7 @@ class TestHmsIntegration(ImpalaTestSuite):
   @pytest.mark.execute_serially
   def test_impala_db_hive_table_add_partition(self, vector):
     self.add_hive_partition_helper(vector, self.ImpalaDbWrapper,
-                                   self.HiveTableWrapper)
+                                   HiveTableWrapper)
 
   @pytest.mark.xfail(run=False, reason="This is a bug: IMPALA-2426")
   @pytest.mark.execute_serially
@@ -493,9 +453,9 @@ class TestHmsIntegration(ImpalaTestSuite):
   @pytest.mark.execute_serially
   def test_compute_stats_get_to_impala(self, vector):
     """Column stats computed in Hive are also visible in Impala."""
-    with self.HiveDbWrapper(self, self.unique_string()) as db_name:
-      with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
-                                 '(x int)') as table_name:
+    with HiveDbWrapper(self, self.unique_string()) as db_name:
+      with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
+                            '(x int)') as table_name:
         hive_stats = self.hive_column_stats(table_name, 'x')
         self.client.execute('invalidate metadata')
         self.client.execute('refresh %s' % table_name)
@@ -577,7 +537,7 @@ class TestHmsIntegration(ImpalaTestSuite):
     """
 
     test_db = self.unique_string()
-    with self.HiveDbWrapper(self, test_db) as db_name:
+    with HiveDbWrapper(self, test_db) as db_name:
       pass
     self.assert_sql_error(
         self.client.execute,
@@ -596,9 +556,9 @@ class TestHmsIntegration(ImpalaTestSuite):
     """
     # TODO: check results of insert, then select * before and after
     # storage format change.
-    with self.HiveDbWrapper(self, self.unique_string()) as db_name:
-      with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
-                                 '(x int, y int) stored as parquet') as table_name:
+    with HiveDbWrapper(self, self.unique_string()) as db_name:
+      with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
+                            '(x int, y int) stored as parquet') as table_name:
         self.client.execute('invalidate metadata')
         self.client.execute('invalidate metadata %s' % table_name)
         print self.impala_table_stats(table_name)
@@ -612,9 +572,9 @@ class TestHmsIntegration(ImpalaTestSuite):
   def test_change_column_type(self, vector):
     """Hive column type changes propagate to Impala."""
 
-    with self.HiveDbWrapper(self, self.unique_string()) as db_name:
-      with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
-                                 '(x int, y int)') as table_name:
+    with HiveDbWrapper(self, self.unique_string()) as db_name:
+      with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
+                            '(x int, y int)') as table_name:
         self.run_stmt_in_hive(
             'insert into table %s values (33,44)' % table_name)
         self.run_stmt_in_hive('alter table %s change y y string' % table_name)
@@ -633,9 +593,9 @@ class TestHmsIntegration(ImpalaTestSuite):
     known issue with changing column types in Hive/parquet.
     """
 
-    with self.HiveDbWrapper(self, self.unique_string()) as db_name:
-      with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
-                                 '(x int, y int) stored as parquet') as table_name:
+    with HiveDbWrapper(self, self.unique_string()) as db_name:
+      with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
+                            '(x int, y int) stored as parquet') as table_name:
         self.run_stmt_in_hive(
             'insert into table %s values (33,44)' % table_name)
         assert '33,44' == self.run_stmt_in_hive(
@@ -661,9 +621,9 @@ class TestHmsIntegration(ImpalaTestSuite):
     metadata'.
     """
 
-    with self.HiveDbWrapper(self, self.unique_string()) as db_name:
-      with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
-                                 '(x int, y int)') as table_name:
+    with HiveDbWrapper(self, self.unique_string()) as db_name:
+      with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
+                            '(x int, y int)') as table_name:
         self.client.execute('invalidate metadata')
         int_column = {'type': 'int', 'comment': ''}
         expected_columns = {'x': int_column, 'y': int_column}

http://git-wip-us.apache.org/repos/asf/impala/blob/1c21cd0d/tests/util/hive_utils.py
----------------------------------------------------------------------
diff --git a/tests/util/hive_utils.py b/tests/util/hive_utils.py
new file mode 100644
index 0000000..094b816
--- /dev/null
+++ b/tests/util/hive_utils.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Utilities for interacting with Hive.
+
+
+class HiveDbWrapper(object):
+  """
+  A wrapper class for using `with` guards with databases created through Hive
+  ensuring deletion even if an exception occurs.
+  """
+
+  def __init__(self, hive, db_name):
+    self.hive = hive
+    self.db_name = db_name
+
+  def __enter__(self):
+    self.hive.run_stmt_in_hive(
+        'create database if not exists ' + self.db_name)
+    return self.db_name
+
+  def __exit__(self, typ, value, traceback):
+    self.hive.run_stmt_in_hive(
+        'drop database if exists %s cascade' % self.db_name)
+
+
+class HiveTableWrapper(object):
+  """
+  A wrapper class for using `with` guards with tables created through Hive
+  ensuring deletion even if an exception occurs.
+  """
+
+  def __init__(self, hive, table_name, table_spec):
+    self.hive = hive
+    self.table_name = table_name
+    self.table_spec = table_spec
+
+  def __enter__(self):
+    self.hive.run_stmt_in_hive(
+        'create table if not exists %s %s' %
+        (self.table_name, self.table_spec))
+    return self.table_name
+
+  def __exit__(self, typ, value, traceback):
+    self.hive.run_stmt_in_hive('drop table if exists %s' % self.table_name)