You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2017/05/09 15:56:06 UTC

[12/13] incubator-impala git commit: IMPALA-4815, IMPALA-4817, IMPALA-4819: Write and Read Parquet Statistics for remaining types

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/92703468/tests/query_test/test_insert_parquet.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index 1472bb1..4477786 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -20,10 +20,12 @@
 import os
 
 from collections import namedtuple
+from datetime import datetime
+from decimal import Decimal
 from shutil import rmtree
 from subprocess import check_call
 from tempfile import mkdtemp as make_tmp_dir
-from parquet.ttypes import SortingColumn
+from parquet.ttypes import ColumnOrder, SortingColumn, TypeDefinedOrder
 
 from tests.common.environ import impalad_basedir
 from tests.common.impala_test_suite import ImpalaTestSuite
@@ -50,6 +52,21 @@ class RoundFloat():
     """Compares this objects's value to a numeral after rounding it."""
     return round(self.value, self.num_digits) == round(numeral, self.num_digits)
 
+
+class TimeStamp():
+  """Class to construct timestamps with a default format specifier."""
+  def __init__(self, value):
+    # This member must be called 'timetuple'. Only if this class has a member called
+    # 'timetuple' will the datetime __eq__ function forward an unknown equality check to
+    # this method by returning NotImplemented:
+    # https://docs.python.org/2/library/datetime.html#datetime.datetime
+    self.timetuple = datetime.strptime(value, '%Y-%m-%d %H:%M:%S.%f')
+
+  def __eq__(self, other_timetuple):
+    """Compares this objects's value to another timetuple."""
+    return self.timetuple == other_timetuple
+
+
 ColumnStats = namedtuple('ColumnStats', ['name', 'min', 'max'])
 
 # Test a smaller parquet file size as well
@@ -249,6 +266,34 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
     for row_group in row_groups:
       assert row_group.sorting_columns == expected
 
+  def test_set_column_orders(self, vector, unique_database, tmpdir):
+    """Tests that the Parquet writers set FileMetaData::column_orders."""
+    source_table = "functional_parquet.alltypessmall"
+    target_table = "test_set_column_orders"
+    qualified_target_table = "{0}.{1}".format(unique_database, target_table)
+    hdfs_path = get_fs_path("/test-warehouse/{0}.db/{1}/".format(unique_database,
+        target_table))
+
+    # Create table
+    query = "create table {0} like {1} stored as parquet".format(qualified_target_table,
+        source_table)
+    self.execute_query(query)
+
+    # Insert data
+    query = ("insert into {0} partition(year, month) select * from {1}").format(
+        qualified_target_table, source_table)
+    self.execute_query(query)
+
+    # Download hdfs files and verify column orders
+    check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
+
+    expected_col_orders = [ColumnOrder(TYPE_ORDER=TypeDefinedOrder())] * 11
+
+    for root, subdirs, files in os.walk(tmpdir.strpath):
+      for f in files:
+        parquet_file = os.path.join(root, str(f))
+        file_meta_data = get_parquet_metadata(parquet_file)
+        assert file_meta_data.column_orders == expected_col_orders
 
 @SkipIfIsilon.hive
 @SkipIfLocal.hive
@@ -275,13 +320,13 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
         decoded.append(None)
         continue
 
-      if stats.min is None and stats.max is None:
+      if stats.min_value is None and stats.max_value is None:
         decoded.append(None)
         continue
 
-      assert stats.min is not None and stats.max is not None
-      min_value = decode_stats_value(schema, stats.min)
-      max_value = decode_stats_value(schema, stats.max)
+      assert stats.min_value is not None and stats.max_value is not None
+      min_value = decode_stats_value(schema, stats.min_value)
+      max_value = decode_stats_value(schema, stats.max_value)
       decoded.append(ColumnStats(schema.name, min_value, max_value))
 
     assert len(decoded) == len(schemas)
@@ -347,32 +392,21 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
       assert stats == expected
 
   def _ctas_table_and_verify_stats(self, vector, unique_database, source_table,
-                                   expected_values, hive_skip_col_idx = None):
+                                   expected_values):
     """Copies 'source_table' into a parquet table and makes sure that the row group
-    statistics in the resulting parquet file match those in 'expected_values'. The
-    comparison is performed against both Hive and Impala. For Hive, columns indexed by
-    'hive_skip_col_idx' are excluded from the verification of the expected values.
+    statistics in the resulting parquet file match those in 'expected_values'.
     """
     table_name = "test_hdfs_parquet_table_writer"
     qualified_table_name = "{0}.{1}".format(unique_database, table_name)
     hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
                                                                  table_name))
 
-    # Validate against Hive.
+    # Setting num_nodes = 1 ensures that the query is executed on the coordinator,
+    # resulting in a single parquet file being written.
     self.execute_query("drop table if exists {0}".format(qualified_table_name))
-    self.run_stmt_in_hive("create table {0} stored as parquet as select * from "
-                          "{1}".format(qualified_table_name, source_table))
-    self.execute_query("invalidate metadata {0}".format(qualified_table_name))
-    self._validate_min_max_stats(hdfs_path, expected_values, hive_skip_col_idx)
-
-    # Validate against Impala. Setting exec_single_node_rows_threshold and adding a limit
-    # clause ensures that the query is executed on the coordinator, resulting in a single
-    # parquet file being written.
-    num_rows = self.execute_scalar("select count(*) from {0}".format(source_table))
-    self.execute_query("drop table {0}".format(qualified_table_name))
-    query = ("create table {0} stored as parquet as select * from {1} limit "
-             "{2}").format(qualified_table_name, source_table, num_rows)
-    vector.get_value('exec_option')['EXEC_SINGLE_NODE_ROWS_THRESHOLD'] = num_rows
+    query = ("create table {0} stored as parquet as select * from {1}").format(
+        qualified_table_name, source_table)
+    vector.get_value('exec_option')['num_nodes'] = 1
     self.execute_query(query, vector.get_value('exec_option'))
     self._validate_min_max_stats(hdfs_path, expected_values)
 
@@ -390,29 +424,33 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
         ColumnStats('bigint_col', 0, 90),
         ColumnStats('float_col', 0, RoundFloat(9.9, 1)),
         ColumnStats('double_col', 0, RoundFloat(90.9, 1)),
-        None,
-        None,
-        None,
+        ColumnStats('date_string_col', '01/01/09', '12/31/10'),
+        ColumnStats('string_col', '0', '9'),
+        ColumnStats('timestamp_col', TimeStamp('2009-01-01 00:00:00.0'),
+                    TimeStamp('2010-12-31 05:09:13.860000')),
         ColumnStats('year', 2009, 2010),
         ColumnStats('month', 1, 12),
     ]
 
-    # Skip comparison of unsupported columns types with Hive.
-    hive_skip_col_idx = [8, 9, 10]
-
     self._ctas_table_and_verify_stats(vector, unique_database, "functional.alltypes",
-                                      expected_min_max_values, hive_skip_col_idx)
+                                      expected_min_max_values)
 
   def test_write_statistics_decimal(self, vector, unique_database):
-    """Test that Impala does not write statistics for decimal columns."""
+    """Test that writing a parquet file populates the rowgroup statistics with the correct
+    values for decimal columns.
+    """
     # Expected values for functional.decimal_tbl
-    expected_min_max_values = [None, None, None, None, None, None]
-
-    # Skip comparison of unsupported columns types with Hive.
-    hive_skip_col_idx = range(len(expected_min_max_values))
+    expected_min_max_values = [
+      ColumnStats('d1', 1234, 132842),
+      ColumnStats('d2', 111, 2222),
+      ColumnStats('d3', Decimal('1.23456789'), Decimal('12345.6789')),
+      ColumnStats('d4', Decimal('0.123456789'), Decimal('0.123456789')),
+      ColumnStats('d5', Decimal('0.1'), Decimal('12345.789')),
+      ColumnStats('d6', 1, 1)
+    ]
 
     self._ctas_table_and_verify_stats(vector, unique_database, "functional.decimal_tbl",
-                                      expected_min_max_values, hive_skip_col_idx)
+      expected_min_max_values)
 
   def test_write_statistics_multi_page(self, vector, unique_database):
     """Test that writing a parquet file populates the rowgroup statistics with the correct
@@ -421,40 +459,57 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
     # Expected values for tpch_parquet.customer
     expected_min_max_values = [
         ColumnStats('c_custkey', 1, 150000),
-        None,
-        None,
+        ColumnStats('c_name', 'Customer#000000001', 'Customer#000150000'),
+        ColumnStats('c_address', '   2uZwVhQvwA', 'zzxGktzXTMKS1BxZlgQ9nqQ'),
         ColumnStats('c_nationkey', 0, 24),
-        None,
-        None,
-        None,
-        None,
+        ColumnStats('c_phone', '10-100-106-1617', '34-999-618-6881'),
+        ColumnStats('c_acctbal', Decimal('-999.99'), Decimal('9999.99')),
+        ColumnStats('c_mktsegment', 'AUTOMOBILE', 'MACHINERY'),
+        ColumnStats('c_comment', ' Tiresias according to the slyly blithe instructions '
+                    'detect quickly at the slyly express courts. express dinos wake ',
+                    'zzle. blithely regular instructions cajol'),
     ]
 
-    # Skip comparison of unsupported columns types with Hive.
-    hive_skip_col_idx = [1, 2, 4, 5, 6, 7]
-
     self._ctas_table_and_verify_stats(vector, unique_database, "tpch_parquet.customer",
-                                      expected_min_max_values, hive_skip_col_idx)
+                                      expected_min_max_values)
 
   def test_write_statistics_null(self, vector, unique_database):
     """Test that we don't write min/max statistics for null columns."""
-    expected_min_max_values = [None, None, None, None, None, None, None]
-
-    # Skip comparison of unsupported columns types with Hive.
-    hive_skip_col_idx = range(len(expected_min_max_values))
+    expected_min_max_values = [
+        ColumnStats('a', 'a', 'a'),
+        ColumnStats('b', '', ''),
+        None,
+        None,
+        None,
+        ColumnStats('f', 'a\x00b', 'a\x00b'),
+        ColumnStats('g', '\x00', '\x00')
+    ]
 
     self._ctas_table_and_verify_stats(vector, unique_database, "functional.nulltable",
-                                      expected_min_max_values, hive_skip_col_idx)
+                                      expected_min_max_values)
 
   def test_write_statistics_char_types(self, vector, unique_database):
-    """Test that Impala does not write statistics for char columns."""
-    expected_min_max_values = [None, None, None]
+    """Test that Impala correctly writes statistics for char columns."""
+    table_name = "test_char_types"
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
 
-    # Skip comparison of unsupported columns types with Hive.
-    hive_skip_col_idx = range(len(expected_min_max_values))
+    create_table_stmt = "create table {0} (c3 char(3), vc varchar, st string);".format(
+        qualified_table_name)
+    self.execute_query(create_table_stmt)
 
-    self._ctas_table_and_verify_stats(vector, unique_database, "functional.chars_formats",
-                                      expected_min_max_values, hive_skip_col_idx)
+    insert_stmt = """insert into {0} values
+        (cast("def" as char(3)), "ghj xyz", "abc xyz"),
+        (cast("abc" as char(3)), "def 123 xyz", "lorem ipsum"),
+        (cast("xy" as char(3)), "abc banana", "dolor dis amet")""".format(qualified_table_name)
+    self.execute_query(insert_stmt)
+    expected_min_max_values = [
+        ColumnStats('c3', 'abc', 'xy'),
+        ColumnStats('vc', 'abc banana', 'ghj xyz'),
+        ColumnStats('st', 'abc xyz', 'lorem ipsum')
+    ]
+
+    self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name,
+                                      expected_min_max_values)
 
   def test_write_statistics_negative(self, vector, unique_database):
     """Test that Impala correctly writes statistics for negative values."""
@@ -496,14 +551,13 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
     # Insert a large amount of data on a single backend with a limited parquet file size.
     # This will result in several files being written, exercising code that tracks
     # statistics for row groups.
-    num_rows = self.execute_scalar("select count(*) from {0}".format(source_table))
     query = "create table {0} like {1} stored as parquet".format(qualified_target_table,
                                                                  source_table)
     self.execute_query(query, vector.get_value('exec_option'))
-    query = ("insert into {0} /* +sortby(o_orderkey) */ select * from {1} limit"
-             "{2}").format(qualified_target_table, source_table, num_rows)
-    vector.get_value('exec_option')['EXEC_SINGLE_NODE_ROWS_THRESHOLD'] = num_rows
-    vector.get_value('exec_option')['PARQUET_FILE_SIZE'] = 8 * 1024 * 1024
+    query = ("insert into {0} /* +sortby(o_orderkey) */ select * from {1}").format(
+        qualified_target_table, source_table)
+    vector.get_value('exec_option')['num_nodes'] = 1
+    vector.get_value('exec_option')['parquet_file_size'] = 8 * 1024 * 1024
     self.execute_query(query, vector.get_value('exec_option'))
 
     # Get all stats for the o_orderkey column

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/92703468/tests/query_test/test_parquet_stats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_parquet_stats.py b/tests/query_test/test_parquet_stats.py
index 9b9d6d7..93fc06d 100644
--- a/tests/query_test/test_parquet_stats.py
+++ b/tests/query_test/test_parquet_stats.py
@@ -15,10 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import os
 import pytest
+import shlex
+from subprocess import check_call
 
 from tests.common.test_vector import ImpalaTestDimension
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.util.filesystem_utils import get_fs_path
 
 MT_DOP_VALUES = [0, 1, 2, 8]
 
@@ -43,3 +47,26 @@ class TestParquetStats(ImpalaTestSuite):
     # skipped inside a fragment, so we ensure that the tests run in a single fragment.
     vector.get_value('exec_option')['num_nodes'] = 1
     self.run_test_case('QueryTest/parquet_stats', vector, use_db=unique_database)
+
+  def test_deprecated_stats(self, vector, unique_database):
+    """Test that reading parquet files with statistics with deprecated 'min'/'max' fields
+    works correctly. The statistics will be used for known-good types (boolean, integral,
+    float) and will be ignored for all other types (string, decimal, timestamp)."""
+    table_name = 'deprecated_stats'
+    # We use CTAS instead of "create table like" to convert the partition columns into
+    # normal table columns.
+    self.client.execute('create table %s.%s stored as parquet as select * from '
+                        'functional.alltypessmall limit 0' %
+                        (unique_database, table_name))
+    table_location = get_fs_path('/test-warehouse/%s.db/%s' %
+                                 (unique_database, table_name))
+    local_file = os.path.join(os.environ['IMPALA_HOME'],
+                              'testdata/data/deprecated_statistics.parquet')
+    assert os.path.isfile(local_file)
+    check_call(['hdfs', 'dfs', '-copyFromLocal', local_file, table_location])
+    self.client.execute('invalidate metadata %s.%s' % (unique_database, table_name))
+    # The test makes assumptions about the number of row groups that are processed and
+    # skipped inside a fragment, so we ensure that the tests run in a single fragment.
+    vector.get_value('exec_option')['num_nodes'] = 1
+    self.run_test_case('QueryTest/parquet-deprecated-stats', vector, unique_database)
+