You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2019/01/17 15:06:07 UTC

[impala] 02/03: IMPALA-7889: Write new logical types in Parquet

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0906e0817ce2301d7e20f355b334861f0232f16f
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Wed Nov 28 21:01:04 2018 +0100

    IMPALA-7889: Write new logical types in Parquet
    
    Fill the LogicalType field in Parquet schemas for columns
    that have an associated logical type. ConvertedType still
    has to be filled to remain compatible with older readers.
    
    Testing:
    - added new tests to check both logical and converted types
      to test_insert_parquet.py
    
    Change-Id: I6f377950845683ab9c6dea79f4c54db0359d0b91
    Reviewed-on: http://gerrit.cloudera.org:8080/12004
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   |  35 +---
 be/src/exec/parquet/parquet-metadata-utils.cc      | 112 +++++++++++--
 be/src/exec/parquet/parquet-metadata-utils.h       |   6 +
 .../queries/QueryTest/stats-extrapolation.test     |   2 +-
 tests/query_test/test_insert_parquet.py            | 183 ++++++++++++++++-----
 tests/util/get_parquet_metadata.py                 |  17 ++
 6 files changed, 272 insertions(+), 83 deletions(-)

diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index 80e340b..94f2b9e 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -22,6 +22,7 @@
 #include "common/version.h"
 #include "exec/hdfs-table-sink.h"
 #include "exec/parquet/parquet-column-stats.inline.h"
+#include "exec/parquet/parquet-metadata-utils.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/scalar-expr.h"
 #include "rpc/thrift-util.h"
@@ -969,35 +970,11 @@ Status HdfsParquetTableWriter::CreateSchema() {
   file_metadata_.schema[0].name = "schema";
 
   for (int i = 0; i < columns_.size(); ++i) {
-    parquet::SchemaElement& node = file_metadata_.schema[i + 1];
-    const ColumnType& type = output_expr_evals_[i]->root().type();
-    node.name = table_desc_->col_descs()[i + num_clustering_cols].name();
-    node.__set_type(ConvertInternalToParquetType(type.type));
-    node.__set_repetition_type(parquet::FieldRepetitionType::OPTIONAL);
-    if (type.type == TYPE_DECIMAL) {
-      // This column is type decimal. Update the file metadata to include the
-      // additional fields:
-      //  1) converted_type: indicate this is really a decimal column.
-      //  2) type_length: the number of bytes used per decimal value in the data
-      //  3) precision/scale
-      node.__set_converted_type(parquet::ConvertedType::DECIMAL);
-      node.__set_type_length(
-          ParquetPlainEncoder::DecimalSize(output_expr_evals_[i]->root().type()));
-      node.__set_scale(output_expr_evals_[i]->root().type().scale);
-      node.__set_precision(output_expr_evals_[i]->root().type().precision);
-    } else if (type.type == TYPE_VARCHAR || type.type == TYPE_CHAR ||
-        (type.type == TYPE_STRING &&
-         state_->query_options().parquet_annotate_strings_utf8)) {
-      node.__set_converted_type(parquet::ConvertedType::UTF8);
-    } else if (type.type == TYPE_TINYINT) {
-      node.__set_converted_type(parquet::ConvertedType::INT_8);
-    } else if (type.type == TYPE_SMALLINT) {
-      node.__set_converted_type(parquet::ConvertedType::INT_16);
-    } else if (type.type == TYPE_INT) {
-      node.__set_converted_type(parquet::ConvertedType::INT_32);
-    } else if (type.type == TYPE_BIGINT) {
-      node.__set_converted_type(parquet::ConvertedType::INT_64);
-    }
+    parquet::SchemaElement& col_schema = file_metadata_.schema[i + 1];
+    const ColumnType& col_type = output_expr_evals_[i]->root().type();
+    col_schema.name = table_desc_->col_descs()[i + num_clustering_cols].name();
+    ParquetMetadataUtils::FillSchemaElement(col_type, state_->query_options(),
+                                            &col_schema);
   }
 
   return Status::OK();
diff --git a/be/src/exec/parquet/parquet-metadata-utils.cc b/be/src/exec/parquet/parquet-metadata-utils.cc
index 597b187..3e369af 100644
--- a/be/src/exec/parquet/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet/parquet-metadata-utils.cc
@@ -85,8 +85,59 @@ bool IsSupportedType(PrimitiveType impala_type,
   parquet::Type::type parquet_type = element.type;
   return encodings->second.find(parquet_type) != encodings->second.end();
 }
+
+/// Returns true if encoding 'e' is supported by Impala, false otherwise.
+static bool IsEncodingSupported(parquet::Encoding::type e) {
+  switch (e) {
+    case parquet::Encoding::PLAIN:
+    case parquet::Encoding::PLAIN_DICTIONARY:
+    case parquet::Encoding::BIT_PACKED:
+    case parquet::Encoding::RLE:
+      return true;
+    default:
+      return false;
+  }
 }
 
+/// Sets logical and converted types in 'col_schema' to signed 'bitwidth' bit integer.
+void SetIntLogicalType(int bitwidth, parquet::SchemaElement* col_schema) {
+  parquet::IntType int_type;
+  int_type.__set_bitWidth(bitwidth);
+  int_type.__set_isSigned(true);
+  parquet::LogicalType logical_type;
+  logical_type.__set_INTEGER(int_type);
+  col_schema->__set_logicalType(logical_type);
+}
+
+/// Sets logical and converted types in 'col_schema' to UTF8.
+void SetUtf8ConvertedAndLogicalType(parquet::SchemaElement* col_schema) {
+  col_schema->__set_converted_type(parquet::ConvertedType::UTF8);
+  parquet::LogicalType logical_type;
+  logical_type.__set_STRING(parquet::StringType());
+  col_schema->__set_logicalType(logical_type);
+}
+
+/// Sets logical and converted types in 'col_schema' to DECIMAL.
+/// Precision and scale are set according to 'col_type'.
+void SetDecimalConvertedAndLogicalType(
+    const ColumnType& col_type, parquet::SchemaElement* col_schema) {
+  DCHECK_EQ(col_type.type, TYPE_DECIMAL);
+
+  col_schema->__set_type_length(ParquetPlainEncoder::DecimalSize(col_type));
+  col_schema->__set_scale(col_type.scale);
+  col_schema->__set_precision(col_type.precision);
+  col_schema->__set_converted_type(parquet::ConvertedType::DECIMAL);
+
+  parquet::DecimalType decimal_type;
+  decimal_type.__set_scale(col_type.scale);
+  decimal_type.__set_precision(col_type.precision);
+  parquet::LogicalType logical_type;
+  logical_type.__set_DECIMAL(decimal_type);
+  col_schema->__set_logicalType(logical_type);
+}
+
+} // anonymous namespace
+
 // Needs to be in sync with the order of enum values declared in TParquetArrayResolution.
 const std::vector<ParquetSchemaResolver::ArrayEncoding>
     ParquetSchemaResolver::ORDERED_ARRAY_ENCODINGS[] =
@@ -145,18 +196,6 @@ Status ParquetMetadataUtils::ValidateOffsetInFile(const string& filename, int co
   return Status::OK();;
 }
 
-static bool IsEncodingSupported(parquet::Encoding::type e) {
-  switch (e) {
-    case parquet::Encoding::PLAIN:
-    case parquet::Encoding::PLAIN_DICTIONARY:
-    case parquet::Encoding::BIT_PACKED:
-    case parquet::Encoding::RLE:
-      return true;
-    default:
-      return false;
-  }
-}
-
 Status ParquetMetadataUtils::ValidateRowGroupColumn(
     const parquet::FileMetaData& file_metadata, const char* filename, int row_group_idx,
     int col_idx, const parquet::SchemaElement& schema_element, RuntimeState* state) {
@@ -275,6 +314,55 @@ Status ParquetMetadataUtils::ValidateColumn(const char* filename,
   return Status::OK();
 }
 
+void ParquetMetadataUtils::FillSchemaElement(const ColumnType& col_type,
+    const TQueryOptions& query_options, parquet::SchemaElement* col_schema) {
+  col_schema->__set_type(ConvertInternalToParquetType(col_type.type));
+  col_schema->__set_repetition_type(parquet::FieldRepetitionType::OPTIONAL);
+
+  switch (col_type.type) {
+    case TYPE_DECIMAL:
+      SetDecimalConvertedAndLogicalType(col_type, col_schema);
+      break;
+    case TYPE_VARCHAR:
+    case TYPE_CHAR:
+      SetUtf8ConvertedAndLogicalType(col_schema);
+      break;
+    case TYPE_STRING:
+      // By default STRING has no logical type, see IMPALA-5982.
+      // VARCHAR and CHAR are always set to UTF8.
+      if (query_options.parquet_annotate_strings_utf8) {
+        SetUtf8ConvertedAndLogicalType(col_schema);
+      }
+      break;
+    case TYPE_TINYINT:
+      col_schema->__set_converted_type(parquet::ConvertedType::INT_8);
+      SetIntLogicalType(8, col_schema);
+      break;
+    case TYPE_SMALLINT:
+      col_schema->__set_converted_type(parquet::ConvertedType::INT_16);
+      SetIntLogicalType(16, col_schema);
+      break;
+    case TYPE_INT:
+      col_schema->__set_converted_type(parquet::ConvertedType::INT_32);
+      SetIntLogicalType(32, col_schema);
+      break;
+    case TYPE_BIGINT:
+      col_schema->__set_converted_type(parquet::ConvertedType::INT_64);
+      SetIntLogicalType(64, col_schema);
+      break;
+    case TYPE_TIMESTAMP:
+    case TYPE_BOOLEAN:
+    case TYPE_FLOAT:
+    case TYPE_DOUBLE:
+      // boolean/float/double/INT96 encoded timestamp have no logical or converted types.
+      // INT64 encoded timestamp will have logical and converted types (IMPALA-5051).
+      break;
+    default:
+      DCHECK(false);
+      break;
+  }
+}
+
 ParquetFileVersion::ParquetFileVersion(const string& created_by) {
   string created_by_lower = created_by;
   std::transform(created_by_lower.begin(), created_by_lower.end(),
diff --git a/be/src/exec/parquet/parquet-metadata-utils.h b/be/src/exec/parquet/parquet-metadata-utils.h
index f3a144d..e9e95f2 100644
--- a/be/src/exec/parquet/parquet-metadata-utils.h
+++ b/be/src/exec/parquet/parquet-metadata-utils.h
@@ -26,6 +26,7 @@
 namespace impala {
 
 class RuntimeState;
+class TQueryOptions;
 
 class ParquetMetadataUtils {
  public:
@@ -55,6 +56,11 @@ class ParquetMetadataUtils {
   static Status ValidateColumn(const char* filename,
       const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
       RuntimeState* state);
+
+  /// Sets type related fields in a SchemaElement based on the column's internal type
+  /// and query options.
+  static void FillSchemaElement(const ColumnType& col_type,
+      const TQueryOptions& query_options, parquet::SchemaElement* col_schema);
 };
 
 struct ParquetFileVersion {
diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
index 0c6deb4..1398988 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
@@ -33,7 +33,7 @@ show table stats alltypes
 YEAR, MONTH, #ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
 ---- RESULTS
 '2009','1',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=1'
-'2009','2',-1,288,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
+'2009','2',-1,289,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
 '2009','3',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
 '2009','4',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=4'
 '2009','5',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index 6924d26..5790859 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -23,17 +23,18 @@ from collections import namedtuple
 from datetime import datetime
 from decimal import Decimal
 from subprocess import check_call
-from parquet.ttypes import ColumnOrder, SortingColumn, TypeDefinedOrder
+from parquet.ttypes import ColumnOrder, SortingColumn, TypeDefinedOrder, ConvertedType
 
 from tests.common.environ import impalad_basedir
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.parametrize import UniqueDatabase
-from tests.common.skip import SkipIfEC, SkipIfIsilon, SkipIfLocal, SkipIfS3, SkipIfABFS, \
-    SkipIfADLS
+from tests.common.skip import (SkipIfEC, SkipIfIsilon, SkipIfLocal, SkipIfS3, SkipIfABFS,
+    SkipIfADLS)
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.filesystem_utils import get_fs_path
-from tests.util.get_parquet_metadata import get_parquet_metadata, decode_stats_value
+from tests.util.get_parquet_metadata import (decode_stats_value,
+    get_parquet_metadata_from_hdfs_folder)
 
 PARQUET_CODECS = ['none', 'snappy', 'gzip']
 
@@ -246,14 +247,11 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
     self.execute_query(query)
 
     # Download hdfs files and extract rowgroup metadata
+    file_metadata_list = get_parquet_metadata_from_hdfs_folder(hdfs_path, tmpdir.strpath)
     row_groups = []
-    check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
 
-    for root, subdirs, files in os.walk(tmpdir.strpath):
-      for f in files:
-        parquet_file = os.path.join(root, str(f))
-        file_meta_data = get_parquet_metadata(parquet_file)
-        row_groups.extend(file_meta_data.row_groups)
+    for file_metadata in file_metadata_list:
+      row_groups.extend(file_metadata.row_groups)
 
     # Verify that the files have the sorted_columns set
     expected = [SortingColumn(4, False, False), SortingColumn(0, False, False)]
@@ -279,21 +277,18 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
     self.execute_query(query)
 
     # Download hdfs files and verify column orders
-    check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
+    file_metadata_list = get_parquet_metadata_from_hdfs_folder(hdfs_path, tmpdir.strpath)
 
     expected_col_orders = [ColumnOrder(TYPE_ORDER=TypeDefinedOrder())] * 11
 
-    for root, subdirs, files in os.walk(tmpdir.strpath):
-      for f in files:
-        parquet_file = os.path.join(root, str(f))
-        file_meta_data = get_parquet_metadata(parquet_file)
-        assert file_meta_data.column_orders == expected_col_orders
+    for file_metadata in file_metadata_list:
+      assert file_metadata.column_orders == expected_col_orders
 
-  def test_read_write_logical_types(self, vector, unique_database, tmpdir):
+  def test_read_write_integer_logical_types(self, vector, unique_database, tmpdir):
     """IMPALA-5052: Read and write signed integer parquet logical types
     This test creates a src_tbl like a parquet file. The parquet file was generated
     to have columns with different signed integer logical types. The test verifies
-    that parquet file written by the hdfs parquet table writer using the genererated
+    that parquet file written by the hdfs parquet table writer using the generated
     file has the same column type metadata as the generated one."""
     hdfs_path = (os.environ['DEFAULT_FS'] + "/test-warehouse/{0}.db/"
                  "signed_integer_logical_types.parquet").format(unique_database)
@@ -358,6 +353,115 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
             % dst_tbl)
     assert result_src.data == result_dst.data
 
+  def _ctas_and_get_metadata(self, vector, unique_database, tmp_dir, source_table):
+    """CTAS 'source_table' into a Parquet table and returns its Parquet metadata."""
+    table_name = "test_hdfs_parquet_table_writer"
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
+    hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
+                                                                 table_name))
+
+    # Setting num_nodes = 1 ensures that the query is executed on the coordinator,
+    # resulting in a single parquet file being written.
+    query = ("create table {0} stored as parquet as select * from {1}").format(
+      qualified_table_name, source_table)
+    vector.get_value('exec_option')['num_nodes'] = 1
+    self.execute_query_expect_success(self.client, query,
+                                      vector.get_value('exec_option'))
+
+    file_metadata_list = get_parquet_metadata_from_hdfs_folder(hdfs_path, tmp_dir)
+    assert len(file_metadata_list) == 1
+    assert file_metadata_list[0] is not None
+    return file_metadata_list[0]
+
+  @staticmethod
+  def _get_schema(schemas, column_name):
+    """Searches 'schemas' for a schema with name 'column_name'. Asserts if non is found.
+    """
+    for schema in schemas:
+      if schema.name == column_name:
+        return schema
+    assert False, "schema element %s not found" % column_name
+
+  @staticmethod
+  def _check_only_one_member_var_is_set(obj, var_name):
+    """Checks that 'var_name' is the only member of 'obj' that is not None. Useful to
+    check Thrift unions."""
+    keys = [k for k, v in vars(obj).iteritems() if v is not None]
+    assert keys == [var_name]
+
+  def _check_no_logical_type(self, schemas, column_name):
+    """Checks that the schema with name 'column_name' has no logical or converted type."""
+    schema = self._get_schema(schemas, column_name)
+    assert schema.converted_type is None
+    assert schema.logicalType is None
+
+  def _check_int_logical_type(self, schemas, column_name, bit_width):
+    """Checks that the schema with name 'column_name' has logical and converted type that
+    describe a signed integer with 'bit_width' bits."""
+    schema = self._get_schema(schemas, column_name)
+    bit_width_to_converted_type_map = {
+        8: ConvertedType.INT_8,
+        16: ConvertedType.INT_16,
+        32: ConvertedType.INT_32,
+        64: ConvertedType.INT_64
+    }
+    assert schema.converted_type == bit_width_to_converted_type_map[bit_width]
+    assert schema.logicalType is not None
+    self._check_only_one_member_var_is_set(schema.logicalType, "INTEGER")
+    assert schema.logicalType.INTEGER is not None
+    assert schema.logicalType.INTEGER.bitWidth == bit_width
+    assert schema.logicalType.INTEGER.isSigned
+
+  def _check_decimal_logical_type(self, schemas, column_name, precision, scale):
+    """Checks that the schema with name 'column_name' has logical and converted type that
+    describe a decimal with given 'precision' and 'scale'."""
+    schema = self._get_schema(schemas, column_name)
+    assert schema.converted_type == ConvertedType.DECIMAL
+    assert schema.precision == precision
+    assert schema.scale == scale
+    assert schema.logicalType is not None
+    self._check_only_one_member_var_is_set(schema.logicalType, "DECIMAL")
+    assert schema.logicalType.DECIMAL.precision == precision
+    assert schema.logicalType.DECIMAL.scale == scale
+
+  def test_logical_types(self, vector, unique_database, tmpdir):
+    """Tests that the Parquet writers set logical type and converted type correctly
+    for all types except DECIMAL"""
+    source = "functional.alltypestiny"
+    file_metadata = \
+        self._ctas_and_get_metadata(vector, unique_database, tmpdir.strpath, source)
+    schemas = file_metadata.schema
+
+    self._check_int_logical_type(schemas, "tinyint_col", 8)
+    self._check_int_logical_type(schemas, "smallint_col", 16)
+    self._check_int_logical_type(schemas, "int_col", 32)
+    self._check_int_logical_type(schemas, "bigint_col", 64)
+
+    self._check_no_logical_type(schemas, "bool_col")
+    self._check_no_logical_type(schemas, "float_col")
+    self._check_no_logical_type(schemas, "double_col")
+
+    # By default STRING has no logical type, see IMPALA-5982.
+    self._check_no_logical_type(schemas, "string_col")
+
+    # Currently TIMESTAMP is written as INT96 and has no logical type.
+    # This test will break once INT64 becomes the default Parquet type for TIMESTAMP
+    # columns in the future (IMPALA-5049).
+    self._check_no_logical_type(schemas, "timestamp_col")
+
+  def test_decimal_logical_types(self, vector, unique_database, tmpdir):
+    """Tests that the Parquet writers set logical type and converted type correctly
+       for DECIMAL type."""
+    source = "functional.decimal_tiny"
+    file_metadata = \
+        self._ctas_and_get_metadata(vector, unique_database, tmpdir.strpath, source)
+    schemas = file_metadata.schema
+
+    self._check_decimal_logical_type(schemas, "c1", 10, 4)
+    self._check_decimal_logical_type(schemas, "c2", 15, 5)
+    self._check_decimal_logical_type(schemas, "c3", 1, 1)
+
+
 @SkipIfIsilon.hive
 @SkipIfLocal.hive
 @SkipIfS3.hive
@@ -399,14 +503,14 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
     assert len(decoded) == len(schemas)
     return decoded
 
-  def _get_row_group_stats_from_file(self, parquet_file):
-    """Returns a list of statistics for each row group in file 'parquet_file'. The result
-    is a two-dimensional list, containing stats by row group and column."""
-    file_meta_data = get_parquet_metadata(parquet_file)
+  def _get_row_group_stats_from_file_metadata(self, file_metadata):
+    """Returns a list of statistics for each row group in Parquet file metadata
+    'file_metadata'. The result is a two-dimensional list, containing stats by
+    row group and column."""
     # We only support flat schemas, the additional element is the root element.
-    schemas = file_meta_data.schema[1:]
+    schemas = file_metadata.schema[1:]
     file_stats = []
-    for row_group in file_meta_data.row_groups:
+    for row_group in file_metadata.row_groups:
       num_columns = len(row_group.columns)
       assert num_columns == len(schemas)
       column_stats = [c.meta_data.statistics for c in row_group.columns]
@@ -421,12 +525,9 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
     two-dimensional list, containing stats by row group and column."""
     row_group_stats = []
 
-    check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir])
-
-    for root, subdirs, files in os.walk(tmp_dir):
-      for f in files:
-        parquet_file = os.path.join(root, str(f))
-        row_group_stats.extend(self._get_row_group_stats_from_file(parquet_file))
+    file_metadata_list = get_parquet_metadata_from_hdfs_folder(hdfs_path, tmp_dir)
+    for file_metadata in file_metadata_list:
+      row_group_stats.extend(self._get_row_group_stats_from_file_metadata(file_metadata))
 
     return row_group_stats
 
@@ -509,12 +610,12 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
     """
     # Expected values for functional.decimal_tbl
     expected_min_max_values = [
-      ColumnStats('d1', 1234, 132842, 0),
-      ColumnStats('d2', 111, 2222, 0),
-      ColumnStats('d3', Decimal('1.23456789'), Decimal('12345.6789'), 0),
-      ColumnStats('d4', Decimal('0.123456789'), Decimal('0.123456789'), 0),
-      ColumnStats('d5', Decimal('0.1'), Decimal('12345.789'), 0),
-      ColumnStats('d6', 1, 1, 0)
+        ColumnStats('d1', 1234, 132842, 0),
+        ColumnStats('d2', 111, 2222, 0),
+        ColumnStats('d3', Decimal('1.23456789'), Decimal('12345.6789'), 0),
+        ColumnStats('d4', Decimal('0.123456789'), Decimal('0.123456789'), 0),
+        ColumnStats('d5', Decimal('0.1'), Decimal('12345.789'), 0),
+        ColumnStats('d6', 1, 1, 0)
     ]
 
     self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
@@ -670,11 +771,11 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
 
     # Expected values for tpch_parquet.customer
     expected_min_max_values = [
-      ColumnStats('id', '8600000US00601', '8600000US999XX', 0),
-      ColumnStats('zip', '00601', '999XX', 0),
-      ColumnStats('description1', '\"00601 5-Digit ZCTA', '\"999XX 5-Digit ZCTA', 0),
-      ColumnStats('description2', ' 006 3-Digit ZCTA\"', ' 999 3-Digit ZCTA\"', 0),
-      ColumnStats('income', 0, 189570, 29),
+        ColumnStats('id', '8600000US00601', '8600000US999XX', 0),
+        ColumnStats('zip', '00601', '999XX', 0),
+        ColumnStats('description1', '\"00601 5-Digit ZCTA', '\"999XX 5-Digit ZCTA', 0),
+        ColumnStats('description2', ' 006 3-Digit ZCTA\"', ' 999 3-Digit ZCTA\"', 0),
+        ColumnStats('income', 0, 189570, 29),
     ]
 
     self._ctas_table_and_verify_stats(vector, unique_database, tmpdir.strpath,
diff --git a/tests/util/get_parquet_metadata.py b/tests/util/get_parquet_metadata.py
index f6a0e59..56281f8 100644
--- a/tests/util/get_parquet_metadata.py
+++ b/tests/util/get_parquet_metadata.py
@@ -21,6 +21,7 @@ import struct
 from datetime import date, datetime, time, timedelta
 from decimal import Decimal
 from parquet.ttypes import ColumnIndex, FileMetaData, OffsetIndex, PageHeader, Type
+from subprocess import check_call
 from thrift.protocol import TCompactProtocol
 from thrift.transport import TTransport
 
@@ -168,3 +169,19 @@ def get_parquet_metadata(filename):
 
     # Return deserialized FileMetaData object
     return read_serialized_object(FileMetaData, f, metadata_pos, metadata_len)
+
+
+def get_parquet_metadata_from_hdfs_folder(hdfs_path, tmp_dir):
+  """Returns a list with the FileMetaData of every file in 'hdfs_path' and its
+  subdirectories. The hdfs folder is copied into 'tmp_dir' before processing.
+  """
+  check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir])
+  result = []
+  for root, subdirs, files in os.walk(tmp_dir):
+    for f in files:
+      if not f.endswith('parq'):
+        continue
+      parquet_file = os.path.join(root, str(f))
+      file_meta_data = get_parquet_metadata(parquet_file)
+      result.append(file_meta_data)
+  return result