You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/01 15:03:18 UTC
[2/4] incubator-impala git commit: IMPALA-5009: Clean up
test_insert_parquet.py
IMPALA-5009: Clean up test_insert_parquet.py
Replace make_tmp_dir with py.test's own tmpdir
Change-Id: Ia84c78d7ff74cc7fdb3d782060caa5e52d0cd7d2
Reviewed-on: http://gerrit.cloudera.org:8080/7518
Reviewed-by: David Knupp <dk...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b6d400c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b6d400c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b6d400c9
Branch: refs/heads/master
Commit: b6d400c9d80d5e69d827d2a05f2a5fbae6e492c5
Parents: 229f12c
Author: Lars Volker <lv...@cloudera.com>
Authored: Wed Jul 26 16:32:48 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Aug 1 00:41:09 2017 +0000
----------------------------------------------------------------------
tests/query_test/test_insert_parquet.py | 147 +++++++++++++--------------
1 file changed, 72 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b6d400c9/tests/query_test/test_insert_parquet.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index 7cd23f7..dc73a2d 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -24,7 +24,6 @@ from datetime import datetime
from decimal import Decimal
from shutil import rmtree
from subprocess import check_call
-from tempfile import mkdtemp as make_tmp_dir
from parquet.ttypes import ColumnOrder, SortingColumn, TypeDefinedOrder
from tests.common.environ import impalad_basedir
@@ -97,10 +96,10 @@ class TestInsertParquetQueries(ImpalaTestSuite):
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension("file_size", *PARQUET_FILE_SIZES))
- cls.ImpalaTestMatrix.add_constraint(lambda v:
- v.get_value('table_format').file_format == 'parquet')
- cls.ImpalaTestMatrix.add_constraint(lambda v:
- v.get_value('table_format').compression_codec == 'none')
+ cls.ImpalaTestMatrix.add_constraint(
+ lambda v: v.get_value('table_format').file_format == 'parquet')
+ cls.ImpalaTestMatrix.add_constraint(
+ lambda v: v.get_value('table_format').compression_codec == 'none')
@SkipIfLocal.multiple_impalad
@UniqueDatabase.parametrize(sync_ddl=True)
@@ -127,10 +126,10 @@ class TestInsertParquetInvalidCodec(ImpalaTestSuite):
sync_ddl=[1]))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension("compression_codec", 'bzip2'))
- cls.ImpalaTestMatrix.add_constraint(lambda v:
- v.get_value('table_format').file_format == 'parquet')
- cls.ImpalaTestMatrix.add_constraint(lambda v:
- v.get_value('table_format').compression_codec == 'none')
+ cls.ImpalaTestMatrix.add_constraint(
+ lambda v: v.get_value('table_format').file_format == 'parquet')
+ cls.ImpalaTestMatrix.add_constraint(
+ lambda v: v.get_value('table_format').compression_codec == 'none')
@SkipIfLocal.multiple_impalad
def test_insert_parquet_invalid_codec(self, vector):
@@ -153,10 +152,10 @@ class TestInsertParquetVerifySize(ImpalaTestSuite):
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0],
sync_ddl=[1]))
- cls.ImpalaTestMatrix.add_constraint(lambda v:
- v.get_value('table_format').file_format == 'parquet')
- cls.ImpalaTestMatrix.add_constraint(lambda v:
- v.get_value('table_format').compression_codec == 'none')
+ cls.ImpalaTestMatrix.add_constraint(
+ lambda v: v.get_value('table_format').file_format == 'parquet')
+ cls.ImpalaTestMatrix.add_constraint(
+ lambda v: v.get_value('table_format').compression_codec == 'none')
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension("compression_codec", *PARQUET_CODECS))
@@ -204,7 +203,7 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
- def test_def_level_encoding(self, vector, unique_database):
+ def test_def_level_encoding(self, vector, unique_database, tmpdir):
"""IMPALA-3376: Tests that parquet files are written to HDFS correctly by generating a
parquet table and running the parquet-reader tool on it, which performs sanity
checking, such as that the correct number of definition levels were encoded.
@@ -214,21 +213,16 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
self.execute_query("create table %s stored as parquet as select l_linenumber from "
"tpch_parquet.lineitem limit 180000" % qualified_table_name)
- tmp_dir = make_tmp_dir()
- try:
- hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/*.parq'
- % (unique_database, table_name))
- check_call(['hdfs', 'dfs', '-copyToLocal', hdfs_file, tmp_dir])
-
- for root, subdirs, files in os.walk(tmp_dir):
- for f in files:
- if not f.endswith('parq'):
- continue
- check_call([os.path.join(impalad_basedir, 'util/parquet-reader'), '--file',
- os.path.join(tmp_dir, str(f))])
- finally:
- self.execute_query("drop table %s" % qualified_table_name)
- rmtree(tmp_dir)
+ hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/*.parq'
+ % (unique_database, table_name))
+ check_call(['hdfs', 'dfs', '-copyToLocal', hdfs_file, tmpdir.strpath])
+
+ for root, subdirs, files in os.walk(tmpdir.strpath):
+ for f in files:
+ if not f.endswith('parq'):
+ continue
+ check_call([os.path.join(impalad_basedir, 'util/parquet-reader'), '--file',
+ os.path.join(tmpdir.strpath, str(f))])
def test_sorting_columns(self, vector, unique_database, tmpdir):
"""Tests that RowGroup::sorting_columns gets populated when the table has SORT BY
@@ -348,36 +342,35 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
return file_stats
- def _get_row_group_stats_from_hdfs_folder(self, hdfs_path):
- """Returns a list of statistics for each row group in all parquet files in
- 'hdfs_path'. The result is a two-dimensional list, containing stats by row group and
- column."""
+ def _get_row_group_stats_from_hdfs_folder(self, hdfs_path, tmp_dir):
+ """Returns a list of statistics for each row group in all parquet files i 'hdfs_path'.
+ 'tmp_dir' needs to be supplied by the caller and will be used to store temporary
+ files. The caller is responsible for cleaning up 'tmp_dir'. The result is a
+ two-dimensional list, containing stats by row group and column."""
row_group_stats = []
- try:
- tmp_dir = make_tmp_dir()
- check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir])
+ check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir])
- for root, subdirs, files in os.walk(tmp_dir):
- for f in files:
- parquet_file = os.path.join(root, str(f))
- row_group_stats.extend(self._get_row_group_stats_from_file(parquet_file))
-
- finally:
- rmtree(tmp_dir)
+ for root, subdirs, files in os.walk(tmp_dir):
+ for f in files:
+ parquet_file = os.path.join(root, str(f))
+ row_group_stats.extend(self._get_row_group_stats_from_file(parquet_file))
return row_group_stats
- def _validate_parquet_stats(self, hdfs_path, expected_values, skip_col_idxs = None):
+ def _validate_parquet_stats(self, hdfs_path, tmp_dir, expected_values,
+ skip_col_idxs = None):
"""Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup
statistics in that file match the values in 'expected_values'. Columns indexed by
- 'skip_col_idx' are excluded from the verification of the expected values.
+ 'skip_col_idx' are excluded from the verification of the expected values. 'tmp_dir'
+ needs to be supplied by the caller and will be used to store temporary files. The
+ caller is responsible for cleaning up 'tmp_dir'.
"""
skip_col_idxs = skip_col_idxs or []
# The caller has to make sure that the table fits into a single row group. We enforce
# it here to make sure the results are predictable and independent of how the data
# could get written across multiple files.
- row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path)
+ row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path, tmp_dir)
assert(len(row_group_stats)) == 1
table_stats = row_group_stats[0]
@@ -392,10 +385,12 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
continue
assert stats == expected
- def _ctas_table_and_verify_stats(self, vector, unique_database, source_table,
+ def _ctas_table_and_verify_stats(self, vector, unique_database, tmp_dir, source_table,
expected_values):
"""Copies 'source_table' into a parquet table and makes sure that the row group
- statistics in the resulting parquet file match those in 'expected_values'.
+ statistics in the resulting parquet file match those in 'expected_values'. 'tmp_dir'
+ needs to be supplied by the caller and will be used to store temporary files. The
+ caller is responsible for cleaning up 'tmp_dir'.
"""
table_name = "test_hdfs_parquet_table_writer"
qualified_table_name = "{0}.{1}".format(unique_database, table_name)
@@ -409,9 +404,9 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
qualified_table_name, source_table)
vector.get_value('exec_option')['num_nodes'] = 1
self.execute_query(query, vector.get_value('exec_option'))
- self._validate_parquet_stats(hdfs_path, expected_values)
+ self._validate_parquet_stats(hdfs_path, tmp_dir, expected_values)
- def test_write_statistics_alltypes(self, vector, unique_database):
+ def test_write_statistics_alltypes(self, vector, unique_database, tmpdir):
"""Test that writing a parquet file populates the rowgroup statistics with the correct
values.
"""
@@ -433,10 +428,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
ColumnStats('month', 1, 12, 0),
]
- self._ctas_table_and_verify_stats(vector, unique_database, "functional.alltypes",
- expected_min_max_values)
+ self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+ "functional.alltypes", expected_min_max_values)
- def test_write_statistics_decimal(self, vector, unique_database):
+ def test_write_statistics_decimal(self, vector, unique_database, tmpdir):
"""Test that writing a parquet file populates the rowgroup statistics with the correct
values for decimal columns.
"""
@@ -450,10 +445,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
ColumnStats('d6', 1, 1, 0)
]
- self._ctas_table_and_verify_stats(vector, unique_database, "functional.decimal_tbl",
- expected_min_max_values)
+ self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+ "functional.decimal_tbl", expected_min_max_values)
- def test_write_statistics_multi_page(self, vector, unique_database):
+ def test_write_statistics_multi_page(self, vector, unique_database, tmpdir):
"""Test that writing a parquet file populates the rowgroup statistics with the correct
values. This test write a single parquet file with several pages per column.
"""
@@ -471,10 +466,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
'zzle. blithely regular instructions cajol', 0),
]
- self._ctas_table_and_verify_stats(vector, unique_database, "tpch_parquet.customer",
- expected_min_max_values)
+ self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+ "tpch_parquet.customer", expected_min_max_values)
- def test_write_statistics_null(self, vector, unique_database):
+ def test_write_statistics_null(self, vector, unique_database, tmpdir):
"""Test that we don't write min/max statistics for null columns. Ensure null_count
is set for columns with null values."""
expected_min_max_values = [
@@ -487,10 +482,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
ColumnStats('g', '\x00', '\x00', 0)
]
- self._ctas_table_and_verify_stats(vector, unique_database, "functional.nulltable",
- expected_min_max_values)
+ self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+ "functional.nulltable", expected_min_max_values)
- def test_write_statistics_char_types(self, vector, unique_database):
+ def test_write_statistics_char_types(self, vector, unique_database, tmpdir):
"""Test that Impala correctly writes statistics for char columns."""
table_name = "test_char_types"
qualified_table_name = "{0}.{1}".format(unique_database, table_name)
@@ -502,7 +497,8 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
insert_stmt = """insert into {0} values
(cast("def" as char(3)), "ghj xyz", "abc xyz"),
(cast("abc" as char(3)), "def 123 xyz", "lorem ipsum"),
- (cast("xy" as char(3)), "abc banana", "dolor dis amet")""".format(qualified_table_name)
+ (cast("xy" as char(3)), "abc banana", "dolor dis amet")
+ """.format(qualified_table_name)
self.execute_query(insert_stmt)
expected_min_max_values = [
ColumnStats('c3', 'abc', 'xy', 0),
@@ -510,10 +506,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
ColumnStats('st', 'abc xyz', 'lorem ipsum', 0)
]
- self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name,
- expected_min_max_values)
+ self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+ qualified_table_name, expected_min_max_values)
- def test_write_statistics_negative(self, vector, unique_database):
+ def test_write_statistics_negative(self, vector, unique_database, tmpdir):
"""Test that Impala correctly writes statistics for negative values."""
view_name = "test_negative_view"
qualified_view_name = "{0}.{1}".format(unique_database, view_name)
@@ -537,10 +533,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
ColumnStats('double_col', RoundFloat(-90.9, 1), RoundFloat(80.8, 1), 0),
]
- self._ctas_table_and_verify_stats(vector, unique_database, qualified_view_name,
- expected_min_max_values)
+ self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+ qualified_view_name, expected_min_max_values)
- def test_write_statistics_multiple_row_groups(self, vector, unique_database):
+ def test_write_statistics_multiple_row_groups(self, vector, unique_database, tmpdir):
"""Test that writing multiple row groups works as expected. This is done by inserting
into a table using the SORT BY clause and then making sure that the min and max values
of row groups don't overlap."""
@@ -563,7 +559,8 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
self.execute_query(query, vector.get_value('exec_option'))
# Get all stats for the o_orderkey column
- row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path)
+ row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path,
+ tmpdir.strpath)
assert len(row_group_stats) > 1
orderkey_stats = [s[0] for s in row_group_stats]
@@ -573,7 +570,7 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
for l, r in zip(orderkey_stats, orderkey_stats[1:]):
assert l.max <= r.min
- def test_write_statistics_float_infinity(self, vector, unique_database):
+ def test_write_statistics_float_infinity(self, vector, unique_database, tmpdir):
"""Test that statistics for -inf and inf are written correctly."""
table_name = "test_float_infinity"
qualified_table_name = "{0}.{1}".format(unique_database, table_name)
@@ -592,10 +589,10 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
ColumnStats('d', float('-inf'), float('inf'), 0),
]
- self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name,
- expected_min_max_values)
+ self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
+ qualified_table_name, expected_min_max_values)
- def test_write_null_count_statistics(self, vector, unique_database):
+ def test_write_null_count_statistics(self, vector, unique_database, tmpdir):
"""Test that writing a parquet file populates the rowgroup statistics with the correct
null_count. This test ensures that the null_count is correct for a table with multiple
null values."""
@@ -609,5 +606,5 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
ColumnStats('income', 0, 189570, 29),
]
- self._ctas_table_and_verify_stats(vector, unique_database,
+ self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
"functional_parquet.zipcode_incomes", expected_min_max_values)