You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2018/10/13 03:27:50 UTC

[5/5] impala git commit: IMPALA-7704: Revert "IMPALA-7644: Hide Parquet page index writing with feature flag"

IMPALA-7704: Revert "IMPALA-7644: Hide Parquet page index writing with feature flag"

The fix for IMPALA-7644 introduced ASAN issues detailed in
IMPALA-7704. Reverting for now.

This reverts commit 843683ed6c2ef41c7c25e9fa4af68801dbdd1a78.

Change-Id: Icf0a64d6ec747275e3ecd6e801e054f81095591a
Reviewed-on: http://gerrit.cloudera.org:8080/11671
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Michael Ho <kw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/af76186e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/af76186e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/af76186e

Branch: refs/heads/master
Commit: af76186e013607cb64baf151c039e4f6aaab4350
Parents: 97f0282
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Fri Oct 12 11:15:07 2018 -0700
Committer: Joe McDonnell <jo...@cloudera.com>
Committed: Sat Oct 13 03:26:03 2018 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc                   |   6 -
 be/src/exec/hdfs-parquet-table-writer.cc        | 100 ++---
 .../queries/QueryTest/stats-extrapolation.test  |  14 +-
 tests/custom_cluster/test_parquet_page_index.py | 371 ------------------
 tests/query_test/test_parquet_page_index.py     | 372 +++++++++++++++++++
 5 files changed, 417 insertions(+), 446 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index ac76b53..2ea1ca5 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -239,12 +239,6 @@ DEFINE_double_hidden(invalidate_tables_fraction_on_memory_pressure, 0.1,
     "The fraction of tables to invalidate when CatalogdTableInvalidator considers the "
     "old GC generation to be almost full.");
 
-DEFINE_bool_hidden(enable_parquet_page_index_writing_debug_only, false, "If true, Impala "
-    "will write the Parquet page index. It is not advised to use it in a production "
-    "environment, only for testing and development. This flag is meant to be temporary. "
-    "We plan to remove this flag once Impala is able to read the page index and has "
-    "better test coverage around it.");
-
 // ++========================++
 // || Startup flag graveyard ||
 // ++========================++

http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 13137e5..8aa4f7a 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -83,8 +83,6 @@ using namespace apache::thrift;
 // the columns and run that function over row batches.
 // TODO: we need to pass in the compression from the FE/metadata
 
-DECLARE_bool(enable_parquet_page_index_writing_debug_only);
-
 namespace impala {
 
 // Base class for column writers. This contains most of the logic except for
@@ -207,58 +205,6 @@ class HdfsParquetTableWriter::BaseColumnWriter {
  protected:
   friend class HdfsParquetTableWriter;
 
-  Status AddMemoryConsumptionForPageIndex(int64_t new_memory_allocation) {
-    if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) {
-      return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_,
-          "Failed to allocate memory for Parquet page index.", new_memory_allocation);
-    }
-    page_index_memory_consumption_ += new_memory_allocation;
-    return Status::OK();
-  }
-
-  Status ReserveOffsetIndex(int64_t capacity) {
-    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
-    RETURN_IF_ERROR(
-        AddMemoryConsumptionForPageIndex(capacity * sizeof(parquet::PageLocation)));
-    offset_index_.page_locations.reserve(capacity);
-    return Status::OK();
-  }
-
-  void AddLocationToOffsetIndex(const parquet::PageLocation& location) {
-    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return;
-    offset_index_.page_locations.push_back(location);
-  }
-
-  Status AddPageStatsToColumnIndex() {
-    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
-    parquet::Statistics page_stats;
-    page_stats_base_->EncodeToThrift(&page_stats);
-    // If pages_stats contains min_value and max_value, then append them to min_values_
-    // and max_values_ and also mark the page as not null. In case min and max values are
-    // not set, push empty strings to maintain the consistency of the index and mark the
-    // page as null. Always push the null_count.
-    string min_val;
-    string max_val;
-    if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) {
-      Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH,
-          &min_val);
-      Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH,
-          &max_val);
-      if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false;
-      column_index_.null_pages.push_back(false);
-    } else {
-      DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value);
-      column_index_.null_pages.push_back(true);
-      DCHECK_EQ(page_stats.null_count, num_values_);
-    }
-    RETURN_IF_ERROR(
-        AddMemoryConsumptionForPageIndex(min_val.capacity() + max_val.capacity()));
-    column_index_.min_values.emplace_back(std::move(min_val));
-    column_index_.max_values.emplace_back(std::move(max_val));
-    column_index_.null_counts.push_back(page_stats.null_count);
-    return Status::OK();
-  }
-
   // Encodes value into the current page output buffer and updates the column statistics
   // aggregates. Returns true if the value was appended successfully to the current page.
   // Returns false if the value was not appended to the current page and the caller can
@@ -699,10 +645,11 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
 
   *first_data_page = *file_pos;
   int64_t current_row_group_index = 0;
-  RETURN_IF_ERROR(ReserveOffsetIndex(num_data_pages_));
+  offset_index_.page_locations.resize(num_data_pages_);
 
   // Write data pages
-  for (const DataPage& page : pages_) {
+  for (int i = 0; i < num_data_pages_; ++i) {
+    DataPage& page = pages_[i];
     parquet::PageLocation location;
 
     if (page.header.data_page_header.num_values == 0) {
@@ -710,7 +657,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
       location.offset = -1;
       location.compressed_page_size = 0;
       location.first_row_index = -1;
-      AddLocationToOffsetIndex(location);
+      offset_index_.page_locations[i] = location;
       continue;
     }
 
@@ -730,7 +677,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
     // its name suggests. On the other hand, parquet::PageLocation::compressed_page_size
     // also includes the size of the page header.
     location.compressed_page_size = page.header.compressed_page_size + len;
-    AddLocationToOffsetIndex(location);
+    offset_index_.page_locations[i] = location;
 
     // Write the page data
     RETURN_IF_ERROR(parent_->Write(page.data, page.header.compressed_page_size));
@@ -807,7 +754,37 @@ Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   }
 
   DCHECK(page_stats_base_ != nullptr);
-  RETURN_IF_ERROR(AddPageStatsToColumnIndex());
+  parquet::Statistics page_stats;
+  page_stats_base_->EncodeToThrift(&page_stats);
+  {
+    // If pages_stats contains min_value and max_value, then append them to min_values_
+    // and max_values_ and also mark the page as not null. In case min and max values are
+    // not set, push empty strings to maintain the consistency of the index and mark the
+    // page as null. Always push the null_count.
+    string min_val;
+    string max_val;
+    if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) {
+      Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH,
+          &min_val);
+      Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH,
+          &max_val);
+      if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false;
+      column_index_.null_pages.push_back(false);
+    } else {
+      DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value);
+      column_index_.null_pages.push_back(true);
+      DCHECK_EQ(page_stats.null_count, num_values_);
+    }
+    int64_t new_memory_allocation = min_val.capacity() + max_val.capacity();
+    if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) {
+      return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_,
+          "Failed to allocate memory for Parquet page index.", new_memory_allocation);
+    }
+    page_index_memory_consumption_ += new_memory_allocation;
+    column_index_.min_values.emplace_back(std::move(min_val));
+    column_index_.max_values.emplace_back(std::move(max_val));
+    column_index_.null_counts.push_back(page_stats.null_count);
+  }
 
   // Update row group statistics from page statistics.
   DCHECK(row_group_stats_base_ != nullptr);
@@ -1160,7 +1137,6 @@ Status HdfsParquetTableWriter::Finalize() {
 
   RETURN_IF_ERROR(FlushCurrentRowGroup());
   RETURN_IF_ERROR(WritePageIndex());
-  for (auto& column : columns_) column->Reset();
   RETURN_IF_ERROR(WriteFileFooter());
   stats_.__set_parquet_stats(parquet_insert_stats_);
   COUNTER_ADD(parent_->rows_inserted_counter(), row_count_);
@@ -1273,8 +1249,6 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
 }
 
 Status HdfsParquetTableWriter::WritePageIndex() {
-  if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
-
   // Currently Impala only write Parquet files with a single row group. The current
   // page index logic depends on this behavior as it only keeps one row group's
   // statistics in memory.
@@ -1310,6 +1284,8 @@ Status HdfsParquetTableWriter::WritePageIndex() {
     row_group->columns[i].__set_offset_index_length(len);
     file_pos_ += len;
   }
+  // Reset column writers.
+  for (auto& column : columns_) column->Reset();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
index 3b25427..8e95168 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
@@ -33,17 +33,17 @@ show table stats alltypes
 YEAR, MONTH, #ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
 ---- RESULTS
 '2009','1',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=1'
-'2009','2',-1,288,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
-'2009','3',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
+'2009','2',-1,289,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
+'2009','3',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
 '2009','4',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=4'
-'2009','5',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
+'2009','5',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
 '2009','6',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=6'
-'2009','7',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7'
-'2009','8',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8'
+'2009','7',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7'
+'2009','8',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8'
 '2009','9',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=9'
-'2009','10',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
+'2009','10',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
 '2009','11',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=11'
-'2009','12',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
+'2009','12',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
 'Total','',3650,3650,12,regex:.*B,'0B','','','',''
 ---- TYPES
 STRING,STRING,BIGINT,BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING

http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/tests/custom_cluster/test_parquet_page_index.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_parquet_page_index.py b/tests/custom_cluster/test_parquet_page_index.py
deleted file mode 100644
index 0d2a750..0000000
--- a/tests/custom_cluster/test_parquet_page_index.py
+++ /dev/null
@@ -1,371 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Targeted Impala insert tests
-
-import os
-
-from collections import namedtuple
-from subprocess import check_call
-from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, PageHeader, PageType
-
-from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.skip import SkipIfLocal
-from tests.util.filesystem_utils import get_fs_path
-from tests.util.get_parquet_metadata import (
-    decode_stats_value,
-    get_parquet_metadata,
-    read_serialized_object
-)
-
-PAGE_INDEX_MAX_STRING_LENGTH = 64
-
-
-@SkipIfLocal.parquet_file_size
-class TestHdfsParquetTableIndexWriter(CustomClusterTestSuite):
-  """Since PARQUET-922 page statistics can be written before the footer.
-  The tests in this class checks if Impala writes the page indices correctly.
-  It is temporarily a custom cluster test suite because we need to set the
-  enable_parquet_page_index_writing command-line flag for the Impala daemon
-  in order to make it write the page index.
-  TODO: IMPALA-5843 Once Impala is able to read the page index and also write it by
-  default, this test suite should be moved back to query tests.
-  """
-  @classmethod
-  def get_workload(cls):
-    return 'functional-query'
-
-  @classmethod
-  def add_test_dimensions(cls):
-    super(CustomClusterTestSuite, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_constraint(
-        lambda v: v.get_value('table_format').file_format == 'parquet')
-
-  def _get_row_group_from_file(self, parquet_file):
-    """Returns namedtuples that contain the schema, stats, offset_index, column_index,
-    and page_headers for each column in the first row group in file 'parquet_file'. Fails
-    if the file contains multiple row groups.
-    """
-    ColumnInfo = namedtuple('ColumnInfo', ['schema', 'stats', 'offset_index',
-        'column_index', 'page_headers'])
-
-    file_meta_data = get_parquet_metadata(parquet_file)
-    assert len(file_meta_data.row_groups) == 1
-    # We only support flat schemas, the additional element is the root element.
-    schemas = file_meta_data.schema[1:]
-    row_group = file_meta_data.row_groups[0]
-    assert len(schemas) == len(row_group.columns)
-    row_group_index = []
-    with open(parquet_file) as file_handle:
-      for column, schema in zip(row_group.columns, schemas):
-        column_index_offset = column.column_index_offset
-        column_index_length = column.column_index_length
-        column_index = None
-        if column_index_offset and column_index_length:
-          column_index = read_serialized_object(ColumnIndex, file_handle,
-                                                column_index_offset, column_index_length)
-        column_meta_data = column.meta_data
-        stats = None
-        if column_meta_data:
-          stats = column_meta_data.statistics
-
-        offset_index_offset = column.offset_index_offset
-        offset_index_length = column.offset_index_length
-        offset_index = None
-        page_headers = []
-        if offset_index_offset and offset_index_length:
-          offset_index = read_serialized_object(OffsetIndex, file_handle,
-                                                offset_index_offset, offset_index_length)
-          for page_loc in offset_index.page_locations:
-            page_header = read_serialized_object(PageHeader, file_handle, page_loc.offset,
-                                                 page_loc.compressed_page_size)
-            page_headers.append(page_header)
-
-        column_info = ColumnInfo(schema, stats, offset_index, column_index, page_headers)
-        row_group_index.append(column_info)
-    return row_group_index
-
-  def _get_row_groups_from_hdfs_folder(self, hdfs_path, tmpdir):
-    """Returns a list of column infos (containing the schema, stats, offset_index,
-    column_index, and page_headers) for the first row group in all parquet files in
-    'hdfs_path'.
-    """
-    row_group_indexes = []
-    check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
-    for root, subdirs, files in os.walk(tmpdir.strpath):
-      for f in files:
-        parquet_file = os.path.join(root, str(f))
-        row_group_indexes.append(self._get_row_group_from_file(parquet_file))
-    return row_group_indexes
-
-  def _validate_page_locations(self, page_locations):
-    """Validate that the page locations are in order."""
-    for previous_loc, current_loc in zip(page_locations[:-1], page_locations[1:]):
-      assert previous_loc.offset < current_loc.offset
-      assert previous_loc.first_row_index < current_loc.first_row_index
-
-  def _validate_null_stats(self, index_size, column_info):
-    """Validates the statistics stored in null_pages and null_counts."""
-    column_index = column_info.column_index
-    column_stats = column_info.stats
-    assert column_index.null_pages is not None
-    assert len(column_index.null_pages) == index_size
-    assert column_index.null_counts is not None
-    assert len(column_index.null_counts) == index_size
-
-    for page_is_null, null_count, page_header in zip(column_index.null_pages,
-        column_index.null_counts, column_info.page_headers):
-      assert page_header.type == PageType.DATA_PAGE
-      num_values = page_header.data_page_header.num_values
-      assert not page_is_null or null_count == num_values
-
-    if column_stats:
-      assert column_stats.null_count == sum(column_index.null_counts)
-
-  def _validate_min_max_values(self, index_size, column_info):
-    """Validate min/max values of the pages in a column chunk."""
-    column_index = column_info.column_index
-    min_values = column_info.column_index.min_values
-    assert len(min_values) == index_size
-    max_values = column_info.column_index.max_values
-    assert len(max_values) == index_size
-
-    if not column_info.stats:
-      return
-
-    column_min_value_str = column_info.stats.min_value
-    column_max_value_str = column_info.stats.max_value
-    if column_min_value_str is None or column_max_value_str is None:
-      # If either is None, then both need to be None.
-      assert column_min_value_str is None and column_max_value_str is None
-      # No min and max value, all pages need to be null
-      for idx, null_page in enumerate(column_index.null_pages):
-        assert null_page, "Page {} of column {} is not null, \
-            but doesn't have min and max values!".format(idx, column_index.schema.name)
-      # Everything is None, no further checks needed.
-      return
-
-    column_min_value = decode_stats_value(column_info.schema, column_min_value_str)
-    for null_page, page_min_str in zip(column_index.null_pages, min_values):
-      if not null_page:
-        page_min_value = decode_stats_value(column_info.schema, page_min_str)
-        # If type is str, page_min_value might have been truncated.
-        if isinstance(page_min_value, basestring):
-          assert page_min_value >= column_min_value[:len(page_min_value)]
-        else:
-          assert page_min_value >= column_min_value
-
-    column_max_value = decode_stats_value(column_info.schema, column_max_value_str)
-    for null_page, page_max_str in zip(column_index.null_pages, max_values):
-      if not null_page:
-        page_max_value = decode_stats_value(column_info.schema, page_max_str)
-        # If type is str, page_max_value might have been truncated and incremented.
-        if (isinstance(page_max_value, basestring) and
-            len(page_max_value) == PAGE_INDEX_MAX_STRING_LENGTH):
-          max_val_prefix = page_max_value.rstrip('\0')
-          assert max_val_prefix[:-1] <= column_max_value
-        else:
-          assert page_max_value <= column_max_value
-
-  def _validate_ordering(self, ordering, schema, null_pages, min_values, max_values):
-    """Check if the ordering of the values reflects the value of 'ordering'."""
-
-    def is_sorted(l, reverse=False):
-      if not reverse:
-        return all(a <= b for a, b in zip(l, l[1:]))
-      else:
-        return all(a >= b for a, b in zip(l, l[1:]))
-
-    # Filter out null pages and decode the actual min/max values.
-    actual_min_values = [decode_stats_value(schema, min_val)
-                         for min_val, is_null in zip(min_values, null_pages)
-                         if not is_null]
-    actual_max_values = [decode_stats_value(schema, max_val)
-                         for max_val, is_null in zip(max_values, null_pages)
-                         if not is_null]
-
-    # For ASCENDING and DESCENDING, both min and max values need to be sorted.
-    if ordering == BoundaryOrder.ASCENDING:
-      assert is_sorted(actual_min_values)
-      assert is_sorted(actual_max_values)
-    elif ordering == BoundaryOrder.DESCENDING:
-      assert is_sorted(actual_min_values, reverse=True)
-      assert is_sorted(actual_max_values, reverse=True)
-    else:
-      assert ordering == BoundaryOrder.UNORDERED
-      # For UNORDERED, min and max values cannot be both sorted.
-      assert not is_sorted(actual_min_values) or not is_sorted(actual_max_values)
-      assert (not is_sorted(actual_min_values, reverse=True) or
-              not is_sorted(actual_max_values, reverse=True))
-
-  def _validate_boundary_order(self, column_info):
-    """Validate that min/max values are really in the order specified by
-    boundary order.
-    """
-    column_index = column_info.column_index
-    self._validate_ordering(column_index.boundary_order, column_info.schema,
-        column_index.null_pages, column_index.min_values, column_index.max_values)
-
-  def _validate_parquet_page_index(self, hdfs_path, tmpdir):
-    """Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup
-    index in that file is in the valid format.
-    """
-    row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path, tmpdir)
-    for columns in row_group_indexes:
-      for column_info in columns:
-        try:
-          index_size = len(column_info.offset_index.page_locations)
-          assert index_size > 0
-          self._validate_page_locations(column_info.offset_index.page_locations)
-          # IMPALA-7304: Impala doesn't write column index for floating-point columns
-          # until PARQUET-1222 is resolved.
-          if column_info.schema.type in [4, 5]:
-            assert column_info.column_index is None
-            continue
-          self._validate_null_stats(index_size, column_info)
-          self._validate_min_max_values(index_size, column_info)
-          self._validate_boundary_order(column_info)
-        except AssertionError as e:
-          e.args += ("Validation failed on column {}.".format(column_info.schema.name),)
-          raise
-
-  def _ctas_table_and_verify_index(self, vector, unique_database, source_table,
-                                   tmpdir, sorting_column=None):
-    """Copies 'source_table' into a parquet table and makes sure that the index
-    in the resulting parquet file is valid.
-    """
-    table_name = "test_hdfs_parquet_table_writer"
-    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
-    hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
-                                                                 table_name))
-    # Setting num_nodes = 1 ensures that the query is executed on the coordinator,
-    # resulting in a single parquet file being written.
-    vector.get_value('exec_option')['num_nodes'] = 1
-    self.execute_query("drop table if exists {0}".format(qualified_table_name))
-    if sorting_column is None:
-      query = ("create table {0} stored as parquet as select * from {1}").format(
-          qualified_table_name, source_table)
-    else:
-      query = ("create table {0} sort by({1}) stored as parquet as select * from {2}"
-               ).format(qualified_table_name, sorting_column, source_table)
-    self.execute_query(query, vector.get_value('exec_option'))
-    self._validate_parquet_page_index(hdfs_path, tmpdir.join(source_table))
-
-  def _create_string_table_with_values(self, vector, unique_database, table_name,
-                                       values_sql):
-    """Creates a parquet table that has a single string column, then invokes an insert
-    statement on it with the 'values_sql' parameter. E.g. 'values_sql' is "('asdf')".
-    It returns the HDFS path for the table.
-    """
-    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
-    self.execute_query("drop table if exists {0}".format(qualified_table_name))
-    vector.get_value('exec_option')['num_nodes'] = 1
-    query = ("create table {0} (str string) stored as parquet").format(
-        qualified_table_name)
-    self.execute_query(query, vector.get_value('exec_option'))
-    self.execute_query("insert into {0} values {1}".format(qualified_table_name,
-        values_sql), vector.get_value('exec_option'))
-    return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
-        table_name))
-
-  @CustomClusterTestSuite.with_args("--enable_parquet_page_index_writing_debug_only")
-  def test_ctas_tables(self, vector, unique_database, tmpdir):
-    """Test different Parquet files created via CTAS statements."""
-
-    # Test that writing a parquet file populates the rowgroup indexes with the correct
-    # values.
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.alltypes",
-        tmpdir)
-
-    # Test that writing a parquet file populates the rowgroup indexes with the correct
-    # values, using decimal types.
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.decimal_tbl",
-        tmpdir)
-
-    # Test that writing a parquet file populates the rowgroup indexes with the correct
-    # values, using char types.
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.chars_formats",
-        tmpdir)
-
-    # Test that we don't write min/max values in the index for null columns.
-    # Ensure null_count is set for columns with null values.
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.nulltable",
-        tmpdir)
-
-    # Test that when a ColumnChunk is written across multiple pages, the index is
-    # valid.
-    self._ctas_table_and_verify_index(vector, unique_database, "tpch.customer",
-        tmpdir)
-    self._ctas_table_and_verify_index(vector, unique_database, "tpch.orders",
-        tmpdir)
-
-    # Test that when the schema has a sorting column, the index is valid.
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.zipcode_incomes", tmpdir, "id")
-
-    # Test table with wide row.
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widerow", tmpdir)
-
-    # Test tables with wide rows and many columns.
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widetable_250_cols", tmpdir)
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widetable_500_cols", tmpdir)
-    self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widetable_1000_cols", tmpdir)
-
-  @CustomClusterTestSuite.with_args("--enable_parquet_page_index_writing_debug_only")
-  def test_max_string_values(self, vector, unique_database, tmpdir):
-    """Test string values that are all 0xFFs or end with 0xFFs."""
-
-    # String value is all of 0xFFs but its length is less than PAGE_INDEX_TRUNCATE_LENGTH.
-    short_tbl = "short_tbl"
-    short_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        short_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1))
-    self._validate_parquet_page_index(short_hdfs_path, tmpdir.join(short_tbl))
-
-    # String value is all of 0xFFs and its length is PAGE_INDEX_TRUNCATE_LENGTH.
-    fit_tbl = "fit_tbl"
-    fit_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        fit_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH))
-    self._validate_parquet_page_index(fit_hdfs_path, tmpdir.join(fit_tbl))
-
-    # All bytes are 0xFFs and the string is longer then PAGE_INDEX_TRUNCATE_LENGTH, so we
-    # should not write page statistics.
-    too_long_tbl = "too_long_tbl"
-    too_long_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        too_long_tbl, "(rpad('', {0}, chr(255)))".format(
-            PAGE_INDEX_MAX_STRING_LENGTH + 1))
-    row_group_indexes = self._get_row_groups_from_hdfs_folder(too_long_hdfs_path,
-        tmpdir.join(too_long_tbl))
-    column = row_group_indexes[0][0]
-    assert column.column_index is None
-    # We always write the offset index
-    assert column.offset_index is not None
-
-    # Test string with value that starts with 'aaa' following with 0xFFs and its length is
-    # greater than PAGE_INDEX_TRUNCATE_LENGTH. Max value should be 'aab'.
-    aaa_tbl = "aaa_tbl"
-    aaa_hdfs_path = self._create_string_table_with_values(vector, unique_database,
-        aaa_tbl, "(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
-    row_group_indexes = self._get_row_groups_from_hdfs_folder(aaa_hdfs_path,
-        tmpdir.join(aaa_tbl))
-    column = row_group_indexes[0][0]
-    assert len(column.column_index.max_values) == 1
-    max_value = column.column_index.max_values[0]
-    assert max_value == 'aab'

http://git-wip-us.apache.org/repos/asf/impala/blob/af76186e/tests/query_test/test_parquet_page_index.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_parquet_page_index.py b/tests/query_test/test_parquet_page_index.py
new file mode 100644
index 0000000..6235819
--- /dev/null
+++ b/tests/query_test/test_parquet_page_index.py
@@ -0,0 +1,372 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Targeted Impala insert tests
+
+import os
+
+from collections import namedtuple
+from subprocess import check_call
+from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, PageHeader, PageType
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.skip import SkipIfLocal
+from tests.util.filesystem_utils import get_fs_path
+from tests.util.get_parquet_metadata import (
+    decode_stats_value,
+    get_parquet_metadata,
+    read_serialized_object
+)
+
+PAGE_INDEX_MAX_STRING_LENGTH = 64
+
+
+@SkipIfLocal.parquet_file_size
+class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
+  """Since PARQUET-922 page statistics can be written before the footer.
+  The tests in this class checks if Impala writes the page indices correctly.
+  """
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestHdfsParquetTableIndexWriter, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  def _get_row_group_from_file(self, parquet_file):
+    """Returns namedtuples that contain the schema, stats, offset_index, column_index,
+    and page_headers for each column in the first row group in file 'parquet_file'. Fails
+    if the file contains multiple row groups.
+    """
+    ColumnInfo = namedtuple('ColumnInfo', ['schema', 'stats', 'offset_index',
+        'column_index', 'page_headers'])
+
+    file_meta_data = get_parquet_metadata(parquet_file)
+    assert len(file_meta_data.row_groups) == 1
+    # We only support flat schemas, the additional element is the root element.
+    schemas = file_meta_data.schema[1:]
+    row_group = file_meta_data.row_groups[0]
+    assert len(schemas) == len(row_group.columns)
+    row_group_index = []
+    with open(parquet_file) as file_handle:
+      for column, schema in zip(row_group.columns, schemas):
+        column_index_offset = column.column_index_offset
+        column_index_length = column.column_index_length
+        column_index = None
+        if column_index_offset and column_index_length:
+          column_index = read_serialized_object(ColumnIndex, file_handle,
+                                                column_index_offset, column_index_length)
+        column_meta_data = column.meta_data
+        stats = None
+        if column_meta_data:
+          stats = column_meta_data.statistics
+
+        offset_index_offset = column.offset_index_offset
+        offset_index_length = column.offset_index_length
+        offset_index = None
+        page_headers = []
+        if offset_index_offset and offset_index_length:
+          offset_index = read_serialized_object(OffsetIndex, file_handle,
+                                                offset_index_offset, offset_index_length)
+          for page_loc in offset_index.page_locations:
+            page_header = read_serialized_object(PageHeader, file_handle, page_loc.offset,
+                                                 page_loc.compressed_page_size)
+            page_headers.append(page_header)
+
+        column_info = ColumnInfo(schema, stats, offset_index, column_index, page_headers)
+        row_group_index.append(column_info)
+    return row_group_index
+
+  def _get_row_groups_from_hdfs_folder(self, hdfs_path, tmpdir):
+    """Returns a list of column infos (containing the schema, stats, offset_index,
+    column_index, and page_headers) for the first row group in all parquet files in
+    'hdfs_path'.
+    """
+    row_group_indexes = []
+    check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
+    for root, subdirs, files in os.walk(tmpdir.strpath):
+      for f in files:
+        parquet_file = os.path.join(root, str(f))
+        row_group_indexes.append(self._get_row_group_from_file(parquet_file))
+    return row_group_indexes
+
+  def _validate_page_locations(self, page_locations):
+    """Validate that the page locations are in order."""
+    for previous_loc, current_loc in zip(page_locations[:-1], page_locations[1:]):
+      assert previous_loc.offset < current_loc.offset
+      assert previous_loc.first_row_index < current_loc.first_row_index
+
+  def _validate_null_stats(self, index_size, column_info):
+    """Validates the statistics stored in null_pages and null_counts."""
+    column_index = column_info.column_index
+    column_stats = column_info.stats
+    assert column_index.null_pages is not None
+    assert len(column_index.null_pages) == index_size
+    assert column_index.null_counts is not None
+    assert len(column_index.null_counts) == index_size
+
+    for page_is_null, null_count, page_header in zip(column_index.null_pages,
+        column_index.null_counts, column_info.page_headers):
+      assert page_header.type == PageType.DATA_PAGE
+      num_values = page_header.data_page_header.num_values
+      assert not page_is_null or null_count == num_values
+
+    if column_stats:
+      assert column_stats.null_count == sum(column_index.null_counts)
+
+  def _validate_min_max_values(self, index_size, column_info):
+    """Validate min/max values of the pages in a column chunk."""
+    column_index = column_info.column_index
+    min_values = column_info.column_index.min_values
+    assert len(min_values) == index_size
+    max_values = column_info.column_index.max_values
+    assert len(max_values) == index_size
+
+    if not column_info.stats:
+      return
+
+    column_min_value_str = column_info.stats.min_value
+    column_max_value_str = column_info.stats.max_value
+    if column_min_value_str is None or column_max_value_str is None:
+      # If either is None, then both need to be None.
+      assert column_min_value_str is None and column_max_value_str is None
+      # No min and max value, all pages need to be null
+      for idx, null_page in enumerate(column_index.null_pages):
+        assert null_page, "Page {} of column {} is not null, \
+            but doesn't have min and max values!".format(idx, column_index.schema.name)
+      # Everything is None, no further checks needed.
+      return
+
+    column_min_value = decode_stats_value(column_info.schema, column_min_value_str)
+    for null_page, page_min_str in zip(column_index.null_pages, min_values):
+      if not null_page:
+        page_min_value = decode_stats_value(column_info.schema, page_min_str)
+        # If type is str, page_min_value might have been truncated.
+        if isinstance(page_min_value, basestring):
+          assert page_min_value >= column_min_value[:len(page_min_value)]
+        else:
+          assert page_min_value >= column_min_value
+
+    column_max_value = decode_stats_value(column_info.schema, column_max_value_str)
+    for null_page, page_max_str in zip(column_index.null_pages, max_values):
+      if not null_page:
+        page_max_value = decode_stats_value(column_info.schema, page_max_str)
+        # If type is str, page_max_value might have been truncated and incremented.
+        if (isinstance(page_max_value, basestring) and
+            len(page_max_value) == PAGE_INDEX_MAX_STRING_LENGTH):
+          max_val_prefix = page_max_value.rstrip('\0')
+          assert max_val_prefix[:-1] <= column_max_value
+        else:
+          assert page_max_value <= column_max_value
+
+  def _validate_ordering(self, ordering, schema, null_pages, min_values, max_values):
+    """Check if the ordering of the values reflects the value of 'ordering'."""
+
+    def is_sorted(l, reverse=False):
+      if not reverse:
+        return all(a <= b for a, b in zip(l, l[1:]))
+      else:
+        return all(a >= b for a, b in zip(l, l[1:]))
+
+    # Filter out null pages and decode the actual min/max values.
+    actual_min_values = [decode_stats_value(schema, min_val)
+                         for min_val, is_null in zip(min_values, null_pages)
+                         if not is_null]
+    actual_max_values = [decode_stats_value(schema, max_val)
+                         for max_val, is_null in zip(max_values, null_pages)
+                         if not is_null]
+
+    # For ASCENDING and DESCENDING, both min and max values need to be sorted.
+    if ordering == BoundaryOrder.ASCENDING:
+      assert is_sorted(actual_min_values)
+      assert is_sorted(actual_max_values)
+    elif ordering == BoundaryOrder.DESCENDING:
+      assert is_sorted(actual_min_values, reverse=True)
+      assert is_sorted(actual_max_values, reverse=True)
+    else:
+      assert ordering == BoundaryOrder.UNORDERED
+      # For UNORDERED, min and max values cannot be both sorted.
+      assert not is_sorted(actual_min_values) or not is_sorted(actual_max_values)
+      assert (not is_sorted(actual_min_values, reverse=True) or
+              not is_sorted(actual_max_values, reverse=True))
+
+  def _validate_boundary_order(self, column_info):
+    """Validate that min/max values are really in the order specified by
+    boundary order.
+    """
+    column_index = column_info.column_index
+    self._validate_ordering(column_index.boundary_order, column_info.schema,
+        column_index.null_pages, column_index.min_values, column_index.max_values)
+
+  def _validate_parquet_page_index(self, hdfs_path, tmpdir):
+    """Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup
+    index in that file is in the valid format.
+    """
+    row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path, tmpdir)
+    for columns in row_group_indexes:
+      for column_info in columns:
+        try:
+          index_size = len(column_info.offset_index.page_locations)
+          assert index_size > 0
+          self._validate_page_locations(column_info.offset_index.page_locations)
+          # IMPALA-7304: Impala doesn't write column index for floating-point columns
+          # until PARQUET-1222 is resolved.
+          if column_info.schema.type in [4, 5]:
+            assert column_info.column_index is None
+            continue
+          self._validate_null_stats(index_size, column_info)
+          self._validate_min_max_values(index_size, column_info)
+          self._validate_boundary_order(column_info)
+        except AssertionError as e:
+          e.args += ("Validation failed on column {}.".format(column_info.schema.name),)
+          raise
+
+  def _ctas_table_and_verify_index(self, vector, unique_database, source_table,
+                                   tmpdir, sorting_column=None):
+    """Copies 'source_table' into a parquet table and makes sure that the index
+    in the resulting parquet file is valid.
+    """
+    table_name = "test_hdfs_parquet_table_writer"
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
+    hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
+                                                                 table_name))
+    # Setting num_nodes = 1 ensures that the query is executed on the coordinator,
+    # resulting in a single parquet file being written.
+    vector.get_value('exec_option')['num_nodes'] = 1
+    self.execute_query("drop table if exists {0}".format(qualified_table_name))
+    if sorting_column is None:
+      query = ("create table {0} stored as parquet as select * from {1}").format(
+          qualified_table_name, source_table)
+    else:
+      query = ("create table {0} sort by({1}) stored as parquet as select * from {2}"
+               ).format(qualified_table_name, sorting_column, source_table)
+    self.execute_query(query, vector.get_value('exec_option'))
+    self._validate_parquet_page_index(hdfs_path, tmpdir.join(source_table))
+
+  def _create_string_table_with_values(self, vector, unique_database, table_name,
+                                       values_sql):
+    """Creates a parquet table that has a single string column, then invokes an insert
+    statement on it with the 'values_sql' parameter. E.g. 'values_sql' is "('asdf')".
+    It returns the HDFS path for the table.
+    """
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
+    self.execute_query("drop table if exists {0}".format(qualified_table_name))
+    vector.get_value('exec_option')['num_nodes'] = 1
+    query = ("create table {0} (str string) stored as parquet").format(qualified_table_name)
+    self.execute_query(query, vector.get_value('exec_option'))
+    self.execute_query("insert into {0} values {1}".format(qualified_table_name,
+        values_sql), vector.get_value('exec_option'))
+    return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
+        table_name))
+
+  def test_write_index_alltypes(self, vector, unique_database, tmpdir):
+    """Test that writing a parquet file populates the rowgroup indexes with the correct
+    values.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, "functional.alltypes",
+        tmpdir)
+
+  def test_write_index_decimals(self, vector, unique_database, tmpdir):
+    """Test that writing a parquet file populates the rowgroup indexes with the correct
+    values, using decimal types.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, "functional.decimal_tbl",
+        tmpdir)
+
+  def test_write_index_chars(self, vector, unique_database, tmpdir):
+    """Test that writing a parquet file populates the rowgroup indexes with the correct
+    values, using char types.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, "functional.chars_formats",
+        tmpdir)
+
+  def test_write_index_null(self, vector, unique_database, tmpdir):
+    """Test that we don't write min/max values in the index for null columns.
+    Ensure null_count is set for columns with null values.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, "functional.nulltable",
+        tmpdir)
+
+  def test_write_index_multi_page(self, vector, unique_database, tmpdir):
+    """Test that when a ColumnChunk is written across multiple pages, the index is
+    valid.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, "tpch.customer",
+        tmpdir)
+    self._ctas_table_and_verify_index(vector, unique_database, "tpch.orders",
+        tmpdir)
+
+  def test_write_index_sorting_column(self, vector, unique_database, tmpdir):
+    """Test that when the schema has a sorting column, the index is valid."""
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.zipcode_incomes", tmpdir, "id")
+
+  def test_write_index_wide_table(self, vector, unique_database, tmpdir):
+    """Test table with wide row."""
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widerow", tmpdir)
+
+  def test_write_index_many_columns_tables(self, vector, unique_database, tmpdir):
+    """Test tables with wide rows and many columns."""
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widetable_250_cols", tmpdir)
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widetable_500_cols", tmpdir)
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widetable_1000_cols", tmpdir)
+
+  def test_max_string_values(self, vector, unique_database, tmpdir):
+    """Test string values that are all 0xFFs or end with 0xFFs."""
+
+    # String value is all of 0xFFs but its length is less than PAGE_INDEX_TRUNCATE_LENGTH.
+    short_tbl = "short_tbl"
+    short_hdfs_path = self._create_string_table_with_values(vector, unique_database,
+        short_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1))
+    self._validate_parquet_page_index(short_hdfs_path, tmpdir.join(short_tbl))
+
+    # String value is all of 0xFFs and its length is PAGE_INDEX_TRUNCATE_LENGTH.
+    fit_tbl = "fit_tbl"
+    fit_hdfs_path = self._create_string_table_with_values(vector, unique_database,
+        fit_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH))
+    self._validate_parquet_page_index(fit_hdfs_path, tmpdir.join(fit_tbl))
+
+    # All bytes are 0xFFs and the string is longer then PAGE_INDEX_TRUNCATE_LENGTH, so we
+    # should not write page statistics.
+    too_long_tbl = "too_long_tbl"
+    too_long_hdfs_path = self._create_string_table_with_values(vector, unique_database,
+        too_long_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
+    row_group_indexes = self._get_row_groups_from_hdfs_folder(too_long_hdfs_path,
+        tmpdir.join(too_long_tbl))
+    column = row_group_indexes[0][0]
+    assert column.column_index is None
+    # We always write the offset index
+    assert column.offset_index is not None
+
+    # Test string with value that starts with 'aaa' following with 0xFFs and its length is
+    # greater than PAGE_INDEX_TRUNCATE_LENGTH. Max value should be 'aab'.
+    aaa_tbl = "aaa_tbl"
+    aaa_hdfs_path = self._create_string_table_with_values(vector, unique_database,
+        aaa_tbl, "(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
+    row_group_indexes = self._get_row_groups_from_hdfs_folder(aaa_hdfs_path,
+        tmpdir.join(aaa_tbl))
+    column = row_group_indexes[0][0]
+    assert len(column.column_index.max_values) == 1
+    max_value = column.column_index.max_values[0]
+    assert max_value == 'aab'