You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2017/05/09 15:56:06 UTC
[12/13] incubator-impala git commit: IMPALA-4815, IMPALA-4817,
IMPALA-4819: Write and Read Parquet Statistics for remaining types
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/92703468/tests/query_test/test_insert_parquet.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index 1472bb1..4477786 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -20,10 +20,12 @@
import os
from collections import namedtuple
+from datetime import datetime
+from decimal import Decimal
from shutil import rmtree
from subprocess import check_call
from tempfile import mkdtemp as make_tmp_dir
-from parquet.ttypes import SortingColumn
+from parquet.ttypes import ColumnOrder, SortingColumn, TypeDefinedOrder
from tests.common.environ import impalad_basedir
from tests.common.impala_test_suite import ImpalaTestSuite
@@ -50,6 +52,21 @@ class RoundFloat():
"""Compares this objects's value to a numeral after rounding it."""
return round(self.value, self.num_digits) == round(numeral, self.num_digits)
+
+class TimeStamp():
+ """Class to construct timestamps with a default format specifier."""
+ def __init__(self, value):
+ # This member must be called 'timetuple'. Only if this class has a member called
+ # 'timetuple' will the datetime __eq__ function forward an unknown equality check to
+ # this method by returning NotImplemented:
+ # https://docs.python.org/2/library/datetime.html#datetime.datetime
+ self.timetuple = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
+
+ def __eq__(self, other_timetuple):
+ """Compares this objects's value to another timetuple."""
+ return self.timetuple == other_timetuple
+
+
ColumnStats = namedtuple('ColumnStats', ['name', 'min', 'max'])
# Test a smaller parquet file size as well
@@ -249,6 +266,34 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
for row_group in row_groups:
assert row_group.sorting_columns == expected
+ def test_set_column_orders(self, vector, unique_database, tmpdir):
+ """Tests that the Parquet writers set FileMetaData::column_orders."""
+ source_table = "functional_parquet.alltypessmall"
+ target_table = "test_set_column_orders"
+ qualified_target_table = "{0}.{1}".format(unique_database, target_table)
+ hdfs_path = get_fs_path("/test-warehouse/{0}.db/{1}/".format(unique_database,
+ target_table))
+
+ # Create table
+ query = "create table {0} like {1} stored as parquet".format(qualified_target_table,
+ source_table)
+ self.execute_query(query)
+
+ # Insert data
+ query = ("insert into {0} partition(year, month) select * from {1}").format(
+ qualified_target_table, source_table)
+ self.execute_query(query)
+
+ # Download hdfs files and verify column orders
+ check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
+
+ expected_col_orders = [ColumnOrder(TYPE_ORDER=TypeDefinedOrder())] * 11
+
+ for root, subdirs, files in os.walk(tmpdir.strpath):
+ for f in files:
+ parquet_file = os.path.join(root, str(f))
+ file_meta_data = get_parquet_metadata(parquet_file)
+ assert file_meta_data.column_orders == expected_col_orders
@SkipIfIsilon.hive
@SkipIfLocal.hive
@@ -275,13 +320,13 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
decoded.append(None)
continue
- if stats.min is None and stats.max is None:
+ if stats.min_value is None and stats.max_value is None:
decoded.append(None)
continue
- assert stats.min is not None and stats.max is not None
- min_value = decode_stats_value(schema, stats.min)
- max_value = decode_stats_value(schema, stats.max)
+ assert stats.min_value is not None and stats.max_value is not None
+ min_value = decode_stats_value(schema, stats.min_value)
+ max_value = decode_stats_value(schema, stats.max_value)
decoded.append(ColumnStats(schema.name, min_value, max_value))
assert len(decoded) == len(schemas)
@@ -347,32 +392,21 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
assert stats == expected
def _ctas_table_and_verify_stats(self, vector, unique_database, source_table,
- expected_values, hive_skip_col_idx = None):
+ expected_values):
"""Copies 'source_table' into a parquet table and makes sure that the row group
- statistics in the resulting parquet file match those in 'expected_values'. The
- comparison is performed against both Hive and Impala. For Hive, columns indexed by
- 'hive_skip_col_idx' are excluded from the verification of the expected values.
+ statistics in the resulting parquet file match those in 'expected_values'.
"""
table_name = "test_hdfs_parquet_table_writer"
qualified_table_name = "{0}.{1}".format(unique_database, table_name)
hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
table_name))
- # Validate against Hive.
+ # Setting num_nodes = 1 ensures that the query is executed on the coordinator,
+ # resulting in a single parquet file being written.
self.execute_query("drop table if exists {0}".format(qualified_table_name))
- self.run_stmt_in_hive("create table {0} stored as parquet as select * from "
- "{1}".format(qualified_table_name, source_table))
- self.execute_query("invalidate metadata {0}".format(qualified_table_name))
- self._validate_min_max_stats(hdfs_path, expected_values, hive_skip_col_idx)
-
- # Validate against Impala. Setting exec_single_node_rows_threshold and adding a limit
- # clause ensures that the query is executed on the coordinator, resulting in a single
- # parquet file being written.
- num_rows = self.execute_scalar("select count(*) from {0}".format(source_table))
- self.execute_query("drop table {0}".format(qualified_table_name))
- query = ("create table {0} stored as parquet as select * from {1} limit "
- "{2}").format(qualified_table_name, source_table, num_rows)
- vector.get_value('exec_option')['EXEC_SINGLE_NODE_ROWS_THRESHOLD'] = num_rows
+ query = ("create table {0} stored as parquet as select * from {1}").format(
+ qualified_table_name, source_table)
+ vector.get_value('exec_option')['num_nodes'] = 1
self.execute_query(query, vector.get_value('exec_option'))
self._validate_min_max_stats(hdfs_path, expected_values)
@@ -390,29 +424,33 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
ColumnStats('bigint_col', 0, 90),
ColumnStats('float_col', 0, RoundFloat(9.9, 1)),
ColumnStats('double_col', 0, RoundFloat(90.9, 1)),
- None,
- None,
- None,
+ ColumnStats('date_string_col', '01/01/09', '12/31/10'),
+ ColumnStats('string_col', '0', '9'),
+ ColumnStats('timestamp_col', TimeStamp('2009-01-01 00:00:00.0'),
+ TimeStamp('2010-12-31 05:09:13.860000')),
ColumnStats('year', 2009, 2010),
ColumnStats('month', 1, 12),
]
- # Skip comparison of unsupported columns types with Hive.
- hive_skip_col_idx = [8, 9, 10]
-
self._ctas_table_and_verify_stats(vector, unique_database, "functional.alltypes",
- expected_min_max_values, hive_skip_col_idx)
+ expected_min_max_values)
def test_write_statistics_decimal(self, vector, unique_database):
- """Test that Impala does not write statistics for decimal columns."""
+ """Test that writing a parquet file populates the rowgroup statistics with the correct
+ values for decimal columns.
+ """
# Expected values for functional.decimal_tbl
- expected_min_max_values = [None, None, None, None, None, None]
-
- # Skip comparison of unsupported columns types with Hive.
- hive_skip_col_idx = range(len(expected_min_max_values))
+ expected_min_max_values = [
+ ColumnStats('d1', 1234, 132842),
+ ColumnStats('d2', 111, 2222),
+ ColumnStats('d3', Decimal('1.23456789'), Decimal('12345.6789')),
+ ColumnStats('d4', Decimal('0.123456789'), Decimal('0.123456789')),
+ ColumnStats('d5', Decimal('0.1'), Decimal('12345.789')),
+ ColumnStats('d6', 1, 1)
+ ]
self._ctas_table_and_verify_stats(vector, unique_database, "functional.decimal_tbl",
- expected_min_max_values, hive_skip_col_idx)
+ expected_min_max_values)
def test_write_statistics_multi_page(self, vector, unique_database):
"""Test that writing a parquet file populates the rowgroup statistics with the correct
@@ -421,40 +459,57 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
# Expected values for tpch_parquet.customer
expected_min_max_values = [
ColumnStats('c_custkey', 1, 150000),
- None,
- None,
+ ColumnStats('c_name', 'Customer#000000001', 'Customer#000150000'),
+ ColumnStats('c_address', ' 2uZwVhQvwA', 'zzxGktzXTMKS1BxZlgQ9nqQ'),
ColumnStats('c_nationkey', 0, 24),
- None,
- None,
- None,
- None,
+ ColumnStats('c_phone', '10-100-106-1617', '34-999-618-6881'),
+ ColumnStats('c_acctbal', Decimal('-999.99'), Decimal('9999.99')),
+ ColumnStats('c_mktsegment', 'AUTOMOBILE', 'MACHINERY'),
+ ColumnStats('c_comment', ' Tiresias according to the slyly blithe instructions '
+ 'detect quickly at the slyly express courts. express dinos wake ',
+ 'zzle. blithely regular instructions cajol'),
]
- # Skip comparison of unsupported columns types with Hive.
- hive_skip_col_idx = [1, 2, 4, 5, 6, 7]
-
self._ctas_table_and_verify_stats(vector, unique_database, "tpch_parquet.customer",
- expected_min_max_values, hive_skip_col_idx)
+ expected_min_max_values)
def test_write_statistics_null(self, vector, unique_database):
"""Test that we don't write min/max statistics for null columns."""
- expected_min_max_values = [None, None, None, None, None, None, None]
-
- # Skip comparison of unsupported columns types with Hive.
- hive_skip_col_idx = range(len(expected_min_max_values))
+ expected_min_max_values = [
+ ColumnStats('a', 'a', 'a'),
+ ColumnStats('b', '', ''),
+ None,
+ None,
+ None,
+ ColumnStats('f', 'a\x00b', 'a\x00b'),
+ ColumnStats('g', '\x00', '\x00')
+ ]
self._ctas_table_and_verify_stats(vector, unique_database, "functional.nulltable",
- expected_min_max_values, hive_skip_col_idx)
+ expected_min_max_values)
def test_write_statistics_char_types(self, vector, unique_database):
- """Test that Impala does not write statistics for char columns."""
- expected_min_max_values = [None, None, None]
+ """Test that Impala correctly writes statistics for char columns."""
+ table_name = "test_char_types"
+ qualified_table_name = "{0}.{1}".format(unique_database, table_name)
- # Skip comparison of unsupported columns types with Hive.
- hive_skip_col_idx = range(len(expected_min_max_values))
+ create_table_stmt = "create table {0} (c3 char(3), vc varchar, st string);".format(
+ qualified_table_name)
+ self.execute_query(create_table_stmt)
- self._ctas_table_and_verify_stats(vector, unique_database, "functional.chars_formats",
- expected_min_max_values, hive_skip_col_idx)
+ insert_stmt = """insert into {0} values
+ (cast("def" as char(3)), "ghj xyz", "abc xyz"),
+ (cast("abc" as char(3)), "def 123 xyz", "lorem ipsum"),
+ (cast("xy" as char(3)), "abc banana", "dolor dis amet")""".format(qualified_table_name)
+ self.execute_query(insert_stmt)
+ expected_min_max_values = [
+ ColumnStats('c3', 'abc', 'xy'),
+ ColumnStats('vc', 'abc banana', 'ghj xyz'),
+ ColumnStats('st', 'abc xyz', 'lorem ipsum')
+ ]
+
+ self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name,
+ expected_min_max_values)
def test_write_statistics_negative(self, vector, unique_database):
"""Test that Impala correctly writes statistics for negative values."""
@@ -496,14 +551,13 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
# Insert a large amount of data on a single backend with a limited parquet file size.
# This will result in several files being written, exercising code that tracks
# statistics for row groups.
- num_rows = self.execute_scalar("select count(*) from {0}".format(source_table))
query = "create table {0} like {1} stored as parquet".format(qualified_target_table,
source_table)
self.execute_query(query, vector.get_value('exec_option'))
- query = ("insert into {0} /* +sortby(o_orderkey) */ select * from {1} limit"
- "{2}").format(qualified_target_table, source_table, num_rows)
- vector.get_value('exec_option')['EXEC_SINGLE_NODE_ROWS_THRESHOLD'] = num_rows
- vector.get_value('exec_option')['PARQUET_FILE_SIZE'] = 8 * 1024 * 1024
+ query = ("insert into {0} /* +sortby(o_orderkey) */ select * from {1}").format(
+ qualified_target_table, source_table)
+ vector.get_value('exec_option')['num_nodes'] = 1
+ vector.get_value('exec_option')['parquet_file_size'] = 8 * 1024 * 1024
self.execute_query(query, vector.get_value('exec_option'))
# Get all stats for the o_orderkey column
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/92703468/tests/query_test/test_parquet_stats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_parquet_stats.py b/tests/query_test/test_parquet_stats.py
index 9b9d6d7..93fc06d 100644
--- a/tests/query_test/test_parquet_stats.py
+++ b/tests/query_test/test_parquet_stats.py
@@ -15,10 +15,14 @@
# specific language governing permissions and limitations
# under the License.
+import os
import pytest
+import shlex
+from subprocess import check_call
from tests.common.test_vector import ImpalaTestDimension
from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.util.filesystem_utils import get_fs_path
MT_DOP_VALUES = [0, 1, 2, 8]
@@ -43,3 +47,26 @@ class TestParquetStats(ImpalaTestSuite):
# skipped inside a fragment, so we ensure that the tests run in a single fragment.
vector.get_value('exec_option')['num_nodes'] = 1
self.run_test_case('QueryTest/parquet_stats', vector, use_db=unique_database)
+
+ def test_deprecated_stats(self, vector, unique_database):
+ """Test that reading parquet files with statistics with deprecated 'min'/'max' fields
+ works correctly. The statistics will be used for known-good types (boolean, integral,
+ float) and will be ignored for all other types (string, decimal, timestamp)."""
+ table_name = 'deprecated_stats'
+ # We use CTAS instead of "create table like" to convert the partition columns into
+ # normal table columns.
+ self.client.execute('create table %s.%s stored as parquet as select * from '
+ 'functional.alltypessmall limit 0' %
+ (unique_database, table_name))
+ table_location = get_fs_path('/test-warehouse/%s.db/%s' %
+ (unique_database, table_name))
+ local_file = os.path.join(os.environ['IMPALA_HOME'],
+ 'testdata/data/deprecated_statistics.parquet')
+ assert os.path.isfile(local_file)
+ check_call(['hdfs', 'dfs', '-copyFromLocal', local_file, table_location])
+ self.client.execute('invalidate metadata %s.%s' % (unique_database, table_name))
+ # The test makes assumptions about the number of row groups that are processed and
+ # skipped inside a fragment, so we ensure that the tests run in a single fragment.
+ vector.get_value('exec_option')['num_nodes'] = 1
+ self.run_test_case('QueryTest/parquet-deprecated-stats', vector, unique_database)
+