You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/05/21 21:05:47 UTC

[impala] branch master updated: IMPALA-7936: Enable better control over Parquet writing

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 95be560  IMPALA-7936: Enable better control over Parquet writing
95be560 is described below

commit 95be560e0c43a5f8ea7b00f3ba9f83ec3f734ca2
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri May 17 16:31:43 2019 +0200

    IMPALA-7936: Enable better control over Parquet writing
    
    This commit adds two new query options to Impala. One is to disable/
    enable parquet page index writing (by default it is enabled), the other
    is to set a row count limit per Parquet page (by default there is no
    row count limit).
    
    It removes the old command-line flag that controlled the enablement
    of page index writing.
    
    Since page index writing is the default since IMPALA-5843, I moved the
    tests from the "custom cluster" test suite to the "query test" test
    suite. This way the tests run faster because we don't need to restart
    the Impala daemons.
    
    Testing:
    Added new test cases to test the effect of the query options.
    
    Change-Id: Ib9ec8b16036e1fd35886e887809be8eca52a6982
    Reviewed-on: http://gerrit.cloudera.org:8080/13361
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc                      |   6 --
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   |  33 +++---
 be/src/exec/parquet/hdfs-parquet-table-writer.h    |   5 +
 be/src/service/query-options.cc                    |  22 +++-
 be/src/service/query-options.h                     |   6 +-
 common/thrift/ImpalaInternalService.thrift         |   6 ++
 common/thrift/ImpalaService.thrift                 |   6 ++
 .../test_parquet_page_index.py                     | 117 ++++++++++++++++-----
 8 files changed, 150 insertions(+), 51 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 427e777..a28846f 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -263,12 +263,6 @@ DEFINE_double_hidden(invalidate_tables_fraction_on_memory_pressure, 0.1,
     "The fraction of tables to invalidate when CatalogdTableInvalidator considers the "
     "old GC generation to be almost full.");
 
-DEFINE_bool_hidden(enable_parquet_page_index_writing_debug_only, true, "If true, Impala "
-    "will write the Parquet page index. It is not advised to use it in a production "
-    "environment, only for testing and development. This flag is meant to be temporary. "
-    "We plan to remove this flag once Impala is able to read the page index and has "
-    "better test coverage around it.");
-
 DEFINE_bool_hidden(unlock_mt_dop, false,
     "(Experimental) If true, allow specifying mt_dop for all queries.");
 
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index cf6a0de..0c01395 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -85,8 +85,6 @@ using namespace apache::thrift;
 // the columns and run that function over row batches.
 // TODO: we need to pass in the compression from the FE/metadata
 
-DECLARE_bool(enable_parquet_page_index_writing_debug_only);
-
 namespace impala {
 
 // Base class for column writers. This contains most of the logic except for
@@ -186,6 +184,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
     page_index_memory_consumption_ = 0;
     column_index_.null_counts.clear();
     valid_column_index_ = true;
+    write_page_index_ = parent_->state_->query_options().parquet_write_page_index;
   }
 
   // Close this writer. This is only called after Flush() and no more rows will
@@ -209,6 +208,12 @@ class HdfsParquetTableWriter::BaseColumnWriter {
  protected:
   friend class HdfsParquetTableWriter;
 
+  // Returns true if we should start writing a new page because of reaching some limits.
+  bool ShouldStartNewPage() {
+    int32_t num_values = current_page_->header.data_page_header.num_values;
+    return def_levels_->buffer_full() || num_values >= parent_->page_row_count_limit();
+  }
+
   Status AddMemoryConsumptionForPageIndex(int64_t new_memory_allocation) {
     if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) {
       return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_,
@@ -219,7 +224,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   }
 
   Status ReserveOffsetIndex(int64_t capacity) {
-    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
+    if (!write_page_index_) return Status::OK();
     RETURN_IF_ERROR(
         AddMemoryConsumptionForPageIndex(capacity * sizeof(parquet::PageLocation)));
     offset_index_.page_locations.reserve(capacity);
@@ -227,12 +232,12 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   }
 
   void AddLocationToOffsetIndex(const parquet::PageLocation& location) {
-    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return;
+    if (!write_page_index_) return;
     offset_index_.page_locations.push_back(location);
   }
 
   Status AddPageStatsToColumnIndex() {
-    if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
+    if (!write_page_index_) return Status::OK();
     parquet::Statistics page_stats;
     page_stats_base_->EncodeToThrift(&page_stats);
     // If pages_stats contains min_value and max_value, then append them to min_values_
@@ -251,7 +256,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
     } else {
       DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value);
       column_index_.null_pages.push_back(true);
-      DCHECK_EQ(page_stats.null_count, num_values_);
+      DCHECK_EQ(page_stats.null_count, current_page_->header.data_page_header.num_values);
     }
     RETURN_IF_ERROR(
         AddMemoryConsumptionForPageIndex(min_val.capacity() + max_val.capacity()));
@@ -384,6 +389,9 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // Only write ColumnIndex when 'valid_column_index_' is true. We always need to write
   // the OffsetIndex though.
   bool valid_column_index_ = true;
+
+  // True, if we should write the page index.
+  bool write_page_index_;
 };
 
 // Per type column writer.
@@ -638,9 +646,7 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
   void* value = ConvertValue(expr_eval_->GetValue(row));
   if (current_page_ == nullptr) NewPage();
 
-  // Ensure that we have enough space for the definition level, but don't write it yet in
-  // case we don't have enough space for the value.
-  if (def_levels_->buffer_full()) {
+  if (ShouldStartNewPage()) {
     RETURN_IF_ERROR(FinalizeCurrentPage());
     NewPage();
   }
@@ -690,8 +696,8 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
   // Writing the def level will succeed because we ensured there was enough space for it
   // above, and new pages will always have space for at least a single def level.
   DCHECK(ret);
-
   ++current_page_->header.data_page_header.num_values;
+
   return Status::OK();
 }
 
@@ -961,9 +967,12 @@ Status HdfsParquetTableWriter::Init() {
     ss << "Invalid parquet compression codec " << Codec::GetCodecName(codec);
     return Status(ss.str());
   }
-
   VLOG_FILE << "Using compression codec: " << codec;
 
+  if (query_options.__isset.parquet_page_row_count_limit) {
+    page_row_count_limit_ = query_options.parquet_page_row_count_limit;
+  }
+
   int num_cols = table_desc_->num_cols() - table_desc_->num_clustering_cols();
   // When opening files using the hdfsOpenFile() API, the maximum block size is limited to
   // 2GB.
@@ -1342,7 +1351,7 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
 }
 
 Status HdfsParquetTableWriter::WritePageIndex() {
-  if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK();
+  if (!state_->query_options().parquet_write_page_index) return Status::OK();
 
   // Currently Impala only write Parquet files with a single row group. The current
   // page index logic depends on this behavior as it only keeps one row group's
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.h b/be/src/exec/parquet/hdfs-parquet-table-writer.h
index a065b9a..708ffa5 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.h
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.h
@@ -80,6 +80,8 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
 
   virtual std::string file_extension() const override { return "parq"; }
 
+  int32_t page_row_count_limit() const { return page_row_count_limit_; }
+
  private:
   /// Default data page size. In bytes.
   static const int DEFAULT_DATA_PAGE_SIZE = 64 * 1024;
@@ -207,6 +209,9 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
 
   /// For each column, the on disk size written.
   ParquetDmlStatsPB parquet_dml_stats_;
+
+  /// Maximum row count written in a page.
+  int32_t page_row_count_limit_ = std::numeric_limits<int32_t>::max();
 };
 
 }
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 9482f23..e10cb3e 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -724,10 +724,6 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_default_file_format(enum_type);
         break;
       }
-      case TImpalaQueryOptions::PARQUET_READ_PAGE_INDEX: {
-        query_options->__set_parquet_read_page_index(IsTrue(value));
-        break;
-      }
       case TImpalaQueryOptions::PARQUET_TIMESTAMP_TYPE: {
         TParquetTimestampType::type enum_type;
         RETURN_IF_ERROR(GetThriftEnum(value, "Parquet timestamp type",
@@ -735,6 +731,24 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_parquet_timestamp_type(enum_type);
         break;
       }
+      case TImpalaQueryOptions::PARQUET_READ_PAGE_INDEX: {
+        query_options->__set_parquet_read_page_index(IsTrue(value));
+        break;
+      }
+      case TImpalaQueryOptions::PARQUET_WRITE_PAGE_INDEX: {
+        query_options->__set_parquet_write_page_index(IsTrue(value));
+        break;
+      }
+      case TImpalaQueryOptions::PARQUET_PAGE_ROW_COUNT_LIMIT: {
+        StringParser::ParseResult result;
+        const int32_t row_count_limit =
+            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS || row_count_limit <= 0) {
+          return Status("Parquet page row count limit must be a positive integer.");
+        }
+        query_options->__set_parquet_page_row_count_limit(row_count_limit);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index cf37550..014e6c8 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::PARQUET_READ_PAGE_INDEX + 1);\
+      TImpalaQueryOptions::PARQUET_PAGE_ROW_COUNT_LIMIT + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -156,6 +156,10 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(parquet_timestamp_type, PARQUET_TIMESTAMP_TYPE,\
       TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(parquet_read_page_index, PARQUET_READ_PAGE_INDEX,\
+      TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(parquet_write_page_index, PARQUET_WRITE_PAGE_INDEX,\
+      TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(parquet_page_row_count_limit, PARQUET_PAGE_ROW_COUNT_LIMIT,\
       TQueryOptionLevel::ADVANCED)
   ;
 
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index dfb6099..ae1c5d4 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -342,6 +342,12 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift.
   81: optional bool parquet_read_page_index = true;
+
+  // See comment in ImpalaService.thrift.
+  82: optional bool parquet_write_page_index = true;
+
+  // See comment in ImpalaService.thrift.
+  83: optional i32 parquet_page_row_count_limit;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index e7dadbb..cf09785 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -390,6 +390,12 @@ enum TImpalaQueryOptions {
   // statistics at page-level granularity. It can be used to skip pages and rows during
   // scanning.
   PARQUET_READ_PAGE_INDEX = 80
+
+  // Enable writing the Parquet page index.
+  PARQUET_WRITE_PAGE_INDEX = 81
+
+  // Maximum number of rows written in a single Parquet data page.
+  PARQUET_PAGE_ROW_COUNT_LIMIT = 82
 }
 
 // The summary of a DML statement.
diff --git a/tests/custom_cluster/test_parquet_page_index.py b/tests/query_test/test_parquet_page_index.py
similarity index 80%
rename from tests/custom_cluster/test_parquet_page_index.py
rename to tests/query_test/test_parquet_page_index.py
index acbe01d..40c7eb4 100644
--- a/tests/custom_cluster/test_parquet_page_index.py
+++ b/tests/query_test/test_parquet_page_index.py
@@ -23,7 +23,7 @@ from collections import namedtuple
 from subprocess import check_call
 from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, PageHeader, PageType
 
-from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfLocal
 from tests.util.filesystem_utils import get_fs_path
 from tests.util.get_parquet_metadata import (
@@ -36,7 +36,7 @@ PAGE_INDEX_MAX_STRING_LENGTH = 64
 
 
 @SkipIfLocal.parquet_file_size
-class TestHdfsParquetTableIndexWriter(CustomClusterTestSuite):
+class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
   """Since PARQUET-922 page statistics can be written before the footer.
   The tests in this class checks if Impala writes the page indices correctly.
   It is temporarily a custom cluster test suite because we need to set the
@@ -51,7 +51,7 @@ class TestHdfsParquetTableIndexWriter(CustomClusterTestSuite):
 
   @classmethod
   def add_test_dimensions(cls):
-    super(CustomClusterTestSuite, cls).add_test_dimensions()
+    super(TestHdfsParquetTableIndexWriter, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_constraint(
         lambda v: v.get_value('table_format').file_format == 'parquet')
 
@@ -243,27 +243,36 @@ class TestHdfsParquetTableIndexWriter(CustomClusterTestSuite):
           e.args += ("Validation failed on column {}.".format(column_info.schema.name),)
           raise
 
-  def _ctas_table_and_verify_index(self, vector, unique_database, source_table,
+  def _ctas_table(self, vector, unique_database, source_db, source_table,
                                    tmpdir, sorting_column=None):
-    """Copies 'source_table' into a parquet table and makes sure that the index
-    in the resulting parquet file is valid.
+    """Copies the contents of 'source_db.source_table' into a parquet table.
     """
-    table_name = "test_hdfs_parquet_table_writer"
+    qualified_source_table = "{0}.{1}".format(source_db, source_table)
+    table_name = source_table
     qualified_table_name = "{0}.{1}".format(unique_database, table_name)
-    hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
-                                                                 table_name))
     # Setting num_nodes = 1 ensures that the query is executed on the coordinator,
     # resulting in a single parquet file being written.
     vector.get_value('exec_option')['num_nodes'] = 1
     self.execute_query("drop table if exists {0}".format(qualified_table_name))
     if sorting_column is None:
       query = ("create table {0} stored as parquet as select * from {1}").format(
-          qualified_table_name, source_table)
+          qualified_table_name, qualified_source_table)
     else:
       query = ("create table {0} sort by({1}) stored as parquet as select * from {2}"
-               ).format(qualified_table_name, sorting_column, source_table)
+               ).format(qualified_table_name, sorting_column, qualified_source_table)
     self.execute_query(query, vector.get_value('exec_option'))
-    self._validate_parquet_page_index(hdfs_path, tmpdir.join(source_table))
+
+  def _ctas_table_and_verify_index(self, vector, unique_database, source_db, source_table,
+                                   tmpdir, sorting_column=None):
+    """Copies 'source_table' into a parquet table and makes sure that the index
+    in the resulting parquet file is valid.
+    """
+    self._ctas_table(vector, unique_database, source_db, source_table, tmpdir,
+        sorting_column)
+    hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
+        source_table))
+    qualified_source_table = "{0}.{1}".format(unique_database, source_table)
+    self._validate_parquet_page_index(hdfs_path, tmpdir.join(qualified_source_table))
 
   def _create_string_table_with_values(self, vector, unique_database, table_name,
                                        values_sql):
@@ -282,59 +291,57 @@ class TestHdfsParquetTableIndexWriter(CustomClusterTestSuite):
     return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
         table_name))
 
-  @CustomClusterTestSuite.with_args("--enable_parquet_page_index_writing_debug_only")
   def test_ctas_tables(self, vector, unique_database, tmpdir):
     """Test different Parquet files created via CTAS statements."""
 
     # Test that writing a parquet file populates the rowgroup indexes with the correct
     # values.
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.alltypes",
+    self._ctas_table_and_verify_index(vector, unique_database, "functional", "alltypes",
         tmpdir)
 
     # Test that writing a parquet file populates the rowgroup indexes with the correct
     # values, using decimal types.
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.decimal_tbl",
-        tmpdir)
+    self._ctas_table_and_verify_index(vector, unique_database, "functional",
+        "decimal_tbl", tmpdir)
 
     # Test that writing a parquet file populates the rowgroup indexes with the correct
     # values, using date types.
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.date_tbl",
+    self._ctas_table_and_verify_index(vector, unique_database, "functional", "date_tbl",
         tmpdir)
 
     # Test that writing a parquet file populates the rowgroup indexes with the correct
     # values, using char types.
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.chars_formats",
-        tmpdir)
+    self._ctas_table_and_verify_index(vector, unique_database, "functional",
+        "chars_formats", tmpdir)
 
     # Test that we don't write min/max values in the index for null columns.
     # Ensure null_count is set for columns with null values.
-    self._ctas_table_and_verify_index(vector, unique_database, "functional.nulltable",
+    self._ctas_table_and_verify_index(vector, unique_database, "functional", "nulltable",
         tmpdir)
 
     # Test that when a ColumnChunk is written across multiple pages, the index is
     # valid.
-    self._ctas_table_and_verify_index(vector, unique_database, "tpch.customer",
+    self._ctas_table_and_verify_index(vector, unique_database, "tpch", "customer",
         tmpdir)
-    self._ctas_table_and_verify_index(vector, unique_database, "tpch.orders",
+    self._ctas_table_and_verify_index(vector, unique_database, "tpch", "orders",
         tmpdir)
 
     # Test that when the schema has a sorting column, the index is valid.
     self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.zipcode_incomes", tmpdir, "id")
+        "functional_parquet", "zipcode_incomes", tmpdir, "id")
 
     # Test table with wide row.
     self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widerow", tmpdir)
+        "functional_parquet", "widerow", tmpdir)
 
     # Test tables with wide rows and many columns.
     self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widetable_250_cols", tmpdir)
+        "functional_parquet", "widetable_250_cols", tmpdir)
     self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widetable_500_cols", tmpdir)
+        "functional_parquet", "widetable_500_cols", tmpdir)
     self._ctas_table_and_verify_index(vector, unique_database,
-        "functional_parquet.widetable_1000_cols", tmpdir)
+        "functional_parquet", "widetable_1000_cols", tmpdir)
 
-  @CustomClusterTestSuite.with_args("--enable_parquet_page_index_writing_debug_only")
   def test_max_string_values(self, vector, unique_database, tmpdir):
     """Test string values that are all 0xFFs or end with 0xFFs."""
 
@@ -374,3 +381,57 @@ class TestHdfsParquetTableIndexWriter(CustomClusterTestSuite):
     assert len(column.column_index.max_values) == 1
     max_value = column.column_index.max_values[0]
     assert max_value == 'aab'
+
+  def test_row_count_limit(self, vector, unique_database, tmpdir):
+    """Tests that we can set the page row count limit via a query option.
+    """
+    vector.get_value('exec_option')['parquet_page_row_count_limit'] = 20
+    table_name = "alltypessmall"
+    self._ctas_table_and_verify_index(vector, unique_database, "functional", table_name,
+        tmpdir)
+    hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
+        table_name))
+    row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path,
+        tmpdir.join(table_name))
+    for row_group in row_group_indexes:
+      for column in row_group:
+        for page_header in column.page_headers:
+          if page_header.data_page_header is not None:
+            assert page_header.data_page_header.num_values == 20
+    result_row20 = self.execute_query(
+        "select * from {0}.{1} order by id".format(unique_database, table_name))
+    result_orig = self.execute_query(
+        "select * from functional.alltypessmall order by id")
+    assert result_row20.data == result_orig.data
+
+  def test_row_count_limit_nulls(self, vector, unique_database, tmpdir):
+    """Tests that we can set the page row count limit on a table with null values.
+    """
+    vector.get_value('exec_option')['parquet_page_row_count_limit'] = 2
+    null_tbl = 'null_table'
+    null_tbl_path = self._create_string_table_with_values(vector, unique_database,
+        null_tbl, "(NULL), (NULL), ('foo'), (NULL), (NULL), (NULL)")
+    row_group_indexes = self._get_row_groups_from_hdfs_folder(null_tbl_path,
+        tmpdir.join(null_tbl))
+    column = row_group_indexes[0][0]
+    assert column.column_index.null_pages[0] == True
+    assert column.column_index.null_pages[1] == False
+    assert column.column_index.null_pages[2] == True
+    assert len(column.page_headers) == 3
+    for page_header in column.page_headers:
+      assert page_header.data_page_header.num_values == 2
+
+  def test_disable_page_index_writing(self, vector, unique_database, tmpdir):
+    """Tests that we can disable page index writing via a query option.
+    """
+    vector.get_value('exec_option')['parquet_write_page_index'] = False
+    table_name = "alltypessmall"
+    self._ctas_table(vector, unique_database, "functional", table_name, tmpdir)
+    hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
+        table_name))
+    row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path,
+        tmpdir.join(table_name))
+    for row_group in row_group_indexes:
+      for column in row_group:
+        assert column.offset_index is None
+        assert column.column_index is None