You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/01 15:03:18 UTC

[2/4] incubator-impala git commit: IMPALA-5009: Clean up test_insert_parquet.py

IMPALA-5009: Clean up test_insert_parquet.py

Replace make_tmp_dir with py.test's own tmpdir

Change-Id: Ia84c78d7ff74cc7fdb3d782060caa5e52d0cd7d2
Reviewed-on: http://gerrit.cloudera.org:8080/7518
Reviewed-by: David Knupp <dk...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b6d400c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b6d400c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b6d400c9

Branch: refs/heads/master
Commit: b6d400c9d80d5e69d827d2a05f2a5fbae6e492c5
Parents: 229f12c
Author: Lars Volker <lv...@cloudera.com>
Authored: Wed Jul 26 16:32:48 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Aug 1 00:41:09 2017 +0000

----------------------------------------------------------------------
 tests/query_test/test_insert_parquet.py | 147 +++++++++++++--------------
 1 file changed, 72 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b6d400c9/tests/query_test/test_insert_parquet.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index 7cd23f7..dc73a2d 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -24,7 +24,6 @@ from datetime import datetime
 from decimal import Decimal
 from shutil import rmtree
 from subprocess import check_call
-from tempfile import mkdtemp as make_tmp_dir
 from parquet.ttypes import ColumnOrder, SortingColumn, TypeDefinedOrder
 
 from tests.common.environ import impalad_basedir
@@ -97,10 +96,10 @@ class TestInsertParquetQueries(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension("file_size", *PARQUET_FILE_SIZES))
 
-    cls.ImpalaTestMatrix.add_constraint(lambda v:
-                                        v.get_value('table_format').file_format == 'parquet')
-    cls.ImpalaTestMatrix.add_constraint(lambda v:
-                                        v.get_value('table_format').compression_codec == 'none')
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').compression_codec == 'none')
 
   @SkipIfLocal.multiple_impalad
   @UniqueDatabase.parametrize(sync_ddl=True)
@@ -127,10 +126,10 @@ class TestInsertParquetInvalidCodec(ImpalaTestSuite):
         sync_ddl=[1]))
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension("compression_codec", 'bzip2'))
-    cls.ImpalaTestMatrix.add_constraint(lambda v:
-                                        v.get_value('table_format').file_format == 'parquet')
-    cls.ImpalaTestMatrix.add_constraint(lambda v:
-                                        v.get_value('table_format').compression_codec == 'none')
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').compression_codec == 'none')
 
   @SkipIfLocal.multiple_impalad
   def test_insert_parquet_invalid_codec(self, vector):
@@ -153,10 +152,10 @@ class TestInsertParquetVerifySize(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
         cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0],
         sync_ddl=[1]))
-    cls.ImpalaTestMatrix.add_constraint(lambda v:
-                                        v.get_value('table_format').file_format == 'parquet')
-    cls.ImpalaTestMatrix.add_constraint(lambda v:
-                                        v.get_value('table_format').compression_codec == 'none')
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').compression_codec == 'none')
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension("compression_codec", *PARQUET_CODECS))
 
@@ -204,7 +203,7 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_constraint(
         lambda v: v.get_value('table_format').file_format == 'parquet')
 
-  def test_def_level_encoding(self, vector, unique_database):
+  def test_def_level_encoding(self, vector, unique_database, tmpdir):
     """IMPALA-3376: Tests that parquet files are written to HDFS correctly by generating a
     parquet table and running the parquet-reader tool on it, which performs sanity
     checking, such as that the correct number of definition levels were encoded.
@@ -214,21 +213,16 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
     self.execute_query("create table %s stored as parquet as select l_linenumber from "
                        "tpch_parquet.lineitem limit 180000" % qualified_table_name)
 
-    tmp_dir = make_tmp_dir()
-    try:
-      hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/*.parq'
-                              % (unique_database, table_name))
-      check_call(['hdfs', 'dfs', '-copyToLocal', hdfs_file, tmp_dir])
-
-      for root, subdirs, files in os.walk(tmp_dir):
-        for f in files:
-          if not f.endswith('parq'):
-            continue
-          check_call([os.path.join(impalad_basedir, 'util/parquet-reader'), '--file',
-                      os.path.join(tmp_dir, str(f))])
-    finally:
-      self.execute_query("drop table %s" % qualified_table_name)
-      rmtree(tmp_dir)
+    hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/*.parq'
+                            % (unique_database, table_name))
+    check_call(['hdfs', 'dfs', '-copyToLocal', hdfs_file, tmpdir.strpath])
+
+    for root, subdirs, files in os.walk(tmpdir.strpath):
+      for f in files:
+        if not f.endswith('parq'):
+          continue
+        check_call([os.path.join(impalad_basedir, 'util/parquet-reader'), '--file',
+                    os.path.join(tmpdir.strpath, str(f))])
 
   def test_sorting_columns(self, vector, unique_database, tmpdir):
     """Tests that RowGroup::sorting_columns gets populated when the table has SORT BY
@@ -348,36 +342,35 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
 
     return file_stats
 
-  def _get_row_group_stats_from_hdfs_folder(self, hdfs_path):
-    """Returns a list of statistics for each row group in all parquet files in
-    'hdfs_path'. The result is a two-dimensional list, containing stats by row group and
-    column."""
+  def _get_row_group_stats_from_hdfs_folder(self, hdfs_path, tmp_dir):
+    """Returns a list of statistics for each row group in all parquet files i 'hdfs_path'.
+    'tmp_dir' needs to be supplied by the caller and will be used to store temporary
+    files. The caller is responsible for cleaning up 'tmp_dir'. The result is a
+    two-dimensional list, containing stats by row group and column."""
     row_group_stats = []
 
-    try:
-      tmp_dir = make_tmp_dir()
-      check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir])
+    check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir])
 
-      for root, subdirs, files in os.walk(tmp_dir):
-        for f in files:
-          parquet_file = os.path.join(root, str(f))
-          row_group_stats.extend(self._get_row_group_stats_from_file(parquet_file))
-
-    finally:
-      rmtree(tmp_dir)
+    for root, subdirs, files in os.walk(tmp_dir):
+      for f in files:
+        parquet_file = os.path.join(root, str(f))
+        row_group_stats.extend(self._get_row_group_stats_from_file(parquet_file))
 
     return row_group_stats
 
-  def _validate_parquet_stats(self, hdfs_path, expected_values, skip_col_idxs = None):
+  def _validate_parquet_stats(self, hdfs_path, tmp_dir, expected_values,
+                              skip_col_idxs = None):
     """Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup
     statistics in that file match the values in 'expected_values'. Columns indexed by
-    'skip_col_idx' are excluded from the verification of the expected values.
+    'skip_col_idx' are excluded from the verification of the expected values. 'tmp_dir'
+    needs to be supplied by the caller and will be used to store temporary files. The
+    caller is responsible for cleaning up 'tmp_dir'.
     """
     skip_col_idxs = skip_col_idxs or []
     # The caller has to make sure that the table fits into a single row group. We enforce
     # it here to make sure the results are predictable and independent of how the data
     # could get written across multiple files.
-    row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path)
+    row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path, tmp_dir)
     assert(len(row_group_stats)) == 1
     table_stats = row_group_stats[0]
 
@@ -392,10 +385,12 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
         continue
       assert stats == expected
 
-  def _ctas_table_and_verify_stats(self, vector, unique_database, source_table,
+  def _ctas_table_and_verify_stats(self, vector, unique_database, tmp_dir, source_table,
                                    expected_values):
     """Copies 'source_table' into a parquet table and makes sure that the row group
-    statistics in the resulting parquet file match those in 'expected_values'.
+    statistics in the resulting parquet file match those in 'expected_values'. 'tmp_dir'
+    needs to be supplied by the caller and will be used to store temporary files. The
+    caller is responsible for cleaning up 'tmp_dir'.
     """
     table_name = "test_hdfs_parquet_table_writer"
     qualified_table_name = "{0}.{1}".format(unique_database, table_name)
@@ -409,9 +404,9 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
         qualified_table_name, source_table)
     vector.get_value('exec_option')['num_nodes'] = 1
     self.execute_query(query, vector.get_value('exec_option'))
-    self._validate_parquet_stats(hdfs_path, expected_values)
+    self._validate_parquet_stats(hdfs_path, tmp_dir, expected_values)
 
-  def test_write_statistics_alltypes(self, vector, unique_database):
+  def test_write_statistics_alltypes(self, vector, unique_database, tmpdir):
     """Test that writing a parquet file populates the rowgroup statistics with the correct
     values.
     """
@@ -433,10 +428,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
         ColumnStats('month', 1, 12, 0),
     ]
 
-    self._ctas_table_and_verify_stats(vector, unique_database, "functional.alltypes",
-                                      expected_min_max_values)
+    self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+                                      "functional.alltypes", expected_min_max_values)
 
-  def test_write_statistics_decimal(self, vector, unique_database):
+  def test_write_statistics_decimal(self, vector, unique_database, tmpdir):
     """Test that writing a parquet file populates the rowgroup statistics with the correct
     values for decimal columns.
     """
@@ -450,10 +445,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
       ColumnStats('d6', 1, 1, 0)
     ]
 
-    self._ctas_table_and_verify_stats(vector, unique_database, "functional.decimal_tbl",
-      expected_min_max_values)
+    self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+                                      "functional.decimal_tbl", expected_min_max_values)
 
-  def test_write_statistics_multi_page(self, vector, unique_database):
+  def test_write_statistics_multi_page(self, vector, unique_database, tmpdir):
     """Test that writing a parquet file populates the rowgroup statistics with the correct
     values. This test write a single parquet file with several pages per column.
     """
@@ -471,10 +466,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
                     'zzle. blithely regular instructions cajol', 0),
     ]
 
-    self._ctas_table_and_verify_stats(vector, unique_database, "tpch_parquet.customer",
-                                      expected_min_max_values)
+    self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+                                      "tpch_parquet.customer", expected_min_max_values)
 
-  def test_write_statistics_null(self, vector, unique_database):
+  def test_write_statistics_null(self, vector, unique_database, tmpdir):
     """Test that we don't write min/max statistics for null columns. Ensure null_count
     is set for columns with null values."""
     expected_min_max_values = [
@@ -487,10 +482,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
         ColumnStats('g', '\x00', '\x00', 0)
     ]
 
-    self._ctas_table_and_verify_stats(vector, unique_database, "functional.nulltable",
-                                      expected_min_max_values)
+    self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+                                      "functional.nulltable", expected_min_max_values)
 
-  def test_write_statistics_char_types(self, vector, unique_database):
+  def test_write_statistics_char_types(self, vector, unique_database, tmpdir):
     """Test that Impala correctly writes statistics for char columns."""
     table_name = "test_char_types"
     qualified_table_name = "{0}.{1}".format(unique_database, table_name)
@@ -502,7 +497,8 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
     insert_stmt = """insert into {0} values
         (cast("def" as char(3)), "ghj xyz", "abc xyz"),
         (cast("abc" as char(3)), "def 123 xyz", "lorem ipsum"),
-        (cast("xy" as char(3)), "abc banana", "dolor dis amet")""".format(qualified_table_name)
+        (cast("xy" as char(3)), "abc banana", "dolor dis amet")
+        """.format(qualified_table_name)
     self.execute_query(insert_stmt)
     expected_min_max_values = [
         ColumnStats('c3', 'abc', 'xy', 0),
@@ -510,10 +506,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
         ColumnStats('st', 'abc xyz', 'lorem ipsum', 0)
     ]
 
-    self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name,
-                                      expected_min_max_values)
+    self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+                                      qualified_table_name, expected_min_max_values)
 
-  def test_write_statistics_negative(self, vector, unique_database):
+  def test_write_statistics_negative(self, vector, unique_database, tmpdir):
     """Test that Impala correctly writes statistics for negative values."""
     view_name = "test_negative_view"
     qualified_view_name = "{0}.{1}".format(unique_database, view_name)
@@ -537,10 +533,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
         ColumnStats('double_col', RoundFloat(-90.9, 1), RoundFloat(80.8, 1), 0),
     ]
 
-    self._ctas_table_and_verify_stats(vector, unique_database, qualified_view_name,
-                                      expected_min_max_values)
+    self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+                                      qualified_view_name, expected_min_max_values)
 
-  def test_write_statistics_multiple_row_groups(self, vector, unique_database):
+  def test_write_statistics_multiple_row_groups(self, vector, unique_database, tmpdir):
     """Test that writing multiple row groups works as expected. This is done by inserting
     into a table using the SORT BY clause and then making sure that the min and max values
     of row groups don't overlap."""
@@ -563,7 +559,8 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
     self.execute_query(query, vector.get_value('exec_option'))
 
     # Get all stats for the o_orderkey column
-    row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path)
+    row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path,
+                                                                 tmpdir.strpath)
     assert len(row_group_stats) > 1
     orderkey_stats = [s[0] for s in row_group_stats]
 
@@ -573,7 +570,7 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
     for l, r in zip(orderkey_stats, orderkey_stats[1:]):
       assert l.max <= r.min
 
-  def test_write_statistics_float_infinity(self, vector, unique_database):
+  def test_write_statistics_float_infinity(self, vector, unique_database, tmpdir):
     """Test that statistics for -inf and inf are written correctly."""
     table_name = "test_float_infinity"
     qualified_table_name = "{0}.{1}".format(unique_database, table_name)
@@ -592,10 +589,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
         ColumnStats('d', float('-inf'), float('inf'), 0),
     ]
 
-    self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name,
-                                      expected_min_max_values)
+    self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+                                      qualified_table_name, expected_min_max_values)
 
-  def test_write_null_count_statistics(self, vector, unique_database):
+  def test_write_null_count_statistics(self, vector, unique_database, tmpdir):
     """Test that writing a parquet file populates the rowgroup statistics with the correct
     null_count. This test ensures that the null_count is correct for a table with multiple
     null values."""
@@ -609,5 +606,5 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
       ColumnStats('income', 0, 189570, 29),
     ]
 
-    self._ctas_table_and_verify_stats(vector, unique_database,
+    self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
       "functional_parquet.zipcode_incomes", expected_min_max_values)