You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2023/01/12 23:11:22 UTC

[arrow] branch master updated: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback (#14603)

This is an automated email from the ASF dual-hosted git repository.

emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 97998d835f PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback (#14603)
97998d835f is described below

commit 97998d835f404dd4876a2691a93b973fc022ffd3
Author: Fatemah Panahi <fa...@users.noreply.github.com>
AuthorDate: Thu Jan 12 15:11:12 2023 -0800

    PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback (#14603)
    
    Currently, we do not expose the page header metadata and they cannot be used for skipping pages. I propose exposing the metadata through a callback that would allow the caller to decide if they want to read or skip the page based on the metadata.
    
    Authored-by: Fatemah Panahi <pa...@google.com>
    Signed-off-by: Micah Kornfield <mi...@google.com>
---
 cpp/src/parquet/column_reader.cc         |  99 ++++++---
 cpp/src/parquet/column_reader.h          |  39 ++++
 cpp/src/parquet/file_deserialize_test.cc | 333 ++++++++++++++++++++++++++++++-
 cpp/src/parquet/metadata.cc              |   1 -
 cpp/src/parquet/statistics.cc            |  11 +
 cpp/src/parquet/statistics.h             |   8 +
 6 files changed, 457 insertions(+), 34 deletions(-)

diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index f881651737..3670af49fb 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -226,6 +226,12 @@ EncodedStatistics ExtractStatsFromHeader(const H& header) {
   return page_statistics;
 }
 
+void CheckNumValuesInHeader(int num_values) {
+  if (num_values < 0) {
+    throw ParquetException("Invalid page header (negative number of values)");
+  }
+}
+
 // ----------------------------------------------------------------------
 // SerializedPageReader deserializes Thrift metadata and pages that have been
 // assembled in a serialized stream for storing in a Parquet files
@@ -269,6 +275,11 @@ class SerializedPageReader : public PageReader {
                                              int compressed_len, int uncompressed_len,
                                              int levels_byte_len = 0);
 
+  // Returns true for non-data pages, and if we should skip based on
+  // data_page_filter_. Performs basic checks on values in the page header.
+  // Fills in data_page_statistics.
+  bool ShouldSkipPage(EncodedStatistics* data_page_statistics);
+
   const ReaderProperties properties_;
   std::shared_ptr<ArrowInputStream> stream_;
 
@@ -342,6 +353,55 @@ void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& de
   }
 }
 
+bool SerializedPageReader::ShouldSkipPage(EncodedStatistics* data_page_statistics) {
+  const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+  if (page_type == PageType::DATA_PAGE) {
+    const format::DataPageHeader& header = current_page_header_.data_page_header;
+    CheckNumValuesInHeader(header.num_values);
+    *data_page_statistics = ExtractStatsFromHeader(header);
+    seen_num_values_ += header.num_values;
+    if (data_page_filter_) {
+      const EncodedStatistics* filter_statistics =
+          data_page_statistics->is_set() ? data_page_statistics : nullptr;
+      DataPageStats data_page_stats(filter_statistics, header.num_values,
+                                    /*num_rows=*/std::nullopt);
+      if (data_page_filter_(data_page_stats)) {
+        return true;
+      }
+    }
+  } else if (page_type == PageType::DATA_PAGE_V2) {
+    const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
+    CheckNumValuesInHeader(header.num_values);
+    if (header.num_rows < 0) {
+      throw ParquetException("Invalid page header (negative number of rows)");
+    }
+    if (header.definition_levels_byte_length < 0 ||
+        header.repetition_levels_byte_length < 0) {
+      throw ParquetException("Invalid page header (negative levels byte length)");
+    }
+    *data_page_statistics = ExtractStatsFromHeader(header);
+    seen_num_values_ += header.num_values;
+    if (data_page_filter_) {
+      const EncodedStatistics* filter_statistics =
+          data_page_statistics->is_set() ? data_page_statistics : nullptr;
+      DataPageStats data_page_stats(filter_statistics, header.num_values,
+                                    header.num_rows);
+      if (data_page_filter_(data_page_stats)) {
+        return true;
+      }
+    }
+  } else if (page_type == PageType::DICTIONARY_PAGE) {
+    const format::DictionaryPageHeader& dict_header =
+        current_page_header_.dictionary_page_header;
+    CheckNumValuesInHeader(dict_header.num_values);
+  } else {
+    // We don't know what this page type is. We're allowed to skip non-data
+    // pages.
+    return true;
+  }
+  return false;
+}
+
 std::shared_ptr<Page> SerializedPageReader::NextPage() {
   ThriftDeserializer deserializer(properties_);
 
@@ -391,6 +451,12 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    EncodedStatistics data_page_statistics;
+    if (ShouldSkipPage(&data_page_statistics)) {
+      PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
+      continue;
+    }
+
     if (crypto_ctx_.data_decryptor != nullptr) {
       UpdateDecryption(crypto_ctx_.data_decryptor, encryption::kDictionaryPage,
                        &data_page_aad_);
@@ -416,19 +482,14 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       page_buffer = decryption_buffer_;
     }
 
+    // Uncompress and construct the pages to return.
     const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
-
     if (page_type == PageType::DICTIONARY_PAGE) {
       crypto_ctx_.start_decrypt_with_dictionary_page = false;
       const format::DictionaryPageHeader& dict_header =
           current_page_header_.dictionary_page_header;
-
       bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
-      if (dict_header.num_values < 0) {
-        throw ParquetException("Invalid page header (negative number of values)");
-      }
 
-      // Uncompress if needed
       page_buffer =
           DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);
 
@@ -438,14 +499,6 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
     } else if (page_type == PageType::DATA_PAGE) {
       ++page_ordinal_;
       const format::DataPageHeader& header = current_page_header_.data_page_header;
-
-      if (header.num_values < 0) {
-        throw ParquetException("Invalid page header (negative number of values)");
-      }
-      EncodedStatistics page_statistics = ExtractStatsFromHeader(header);
-      seen_num_values_ += header.num_values;
-
-      // Uncompress if needed
       page_buffer =
           DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);
 
@@ -453,24 +506,15 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
                                           LoadEnumSafe(&header.encoding),
                                           LoadEnumSafe(&header.definition_level_encoding),
                                           LoadEnumSafe(&header.repetition_level_encoding),
-                                          uncompressed_len, page_statistics);
+                                          uncompressed_len, data_page_statistics);
     } else if (page_type == PageType::DATA_PAGE_V2) {
       ++page_ordinal_;
       const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
 
-      if (header.num_values < 0) {
-        throw ParquetException("Invalid page header (negative number of values)");
-      }
-      if (header.definition_levels_byte_length < 0 ||
-          header.repetition_levels_byte_length < 0) {
-        throw ParquetException("Invalid page header (negative levels byte length)");
-      }
       // Arrow prior to 3.0.0 set is_compressed to false but still compressed.
       bool is_compressed =
           (header.__isset.is_compressed ? header.is_compressed : false) ||
           always_compressed_;
-      EncodedStatistics page_statistics = ExtractStatsFromHeader(header);
-      seen_num_values_ += header.num_values;
 
       // Uncompress if needed
       int levels_byte_len;
@@ -489,11 +533,10 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
           page_buffer, header.num_values, header.num_nulls, header.num_rows,
           LoadEnumSafe(&header.encoding), header.definition_levels_byte_length,
           header.repetition_levels_byte_length, uncompressed_len, is_compressed,
-          page_statistics);
+          data_page_statistics);
     } else {
-      // We don't know what this page type is. We're allowed to skip non-data
-      // pages.
-      continue;
+      throw ParquetException(
+          "Internal error, we have already skipped non-data pages in ShouldSkipPage()");
     }
   }
   return std::shared_ptr<Page>(nullptr);
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index b06266de38..b5f96f8fc4 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -24,6 +24,7 @@
 
 #include "parquet/exception.h"
 #include "parquet/level_conversion.h"
+#include "parquet/metadata.h"
 #include "parquet/platform.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
@@ -55,6 +56,26 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats stores encoded statistics and number of values/rows for
+// a page.
+struct PARQUET_EXPORT DataPageStats {
+  DataPageStats(const EncodedStatistics* encoded_statistics, int32_t num_values,
+                std::optional<int32_t> num_rows)
+      : encoded_statistics(encoded_statistics),
+        num_values(num_values),
+        num_rows(num_rows) {}
+
+  // Encoded statistics extracted from the page header.
+  // Nullptr if there are no statistics in the page header.
+  const EncodedStatistics* encoded_statistics;
+  // Number of values stored in the page. Filled for both V1 and V2 data pages.
+  // For repeated fields, this can be greater than number of rows. For
+  // non-repeated fields, this will be the same as the number of rows.
+  int32_t num_values;
+  // Number of rows stored in the page. std::nullopt if not available.
+  std::optional<int32_t> num_rows;
+};
+
 class PARQUET_EXPORT LevelDecoder {
  public:
   LevelDecoder();
@@ -100,6 +121,8 @@ struct CryptoContext {
 // Abstract page iterator interface. This way, we can feed column pages to the
 // ColumnReader through whatever mechanism we choose
 class PARQUET_EXPORT PageReader {
+  using DataPageFilter = std::function<bool(const DataPageStats&)>;
+
  public:
   virtual ~PageReader() = default;
 
@@ -115,11 +138,27 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If data_page_filter is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or
+  // DATA_PAGE_V2. Dictionary pages will not be skipped.
+  // Caller is responsible for checking that statistics are correct using
+  // ApplicationVersion::HasCorrectStatistics().
+  // \note API EXPERIMENTAL
+  void set_data_page_filter(DataPageFilter data_page_filter) {
+    data_page_filter_ = std::move(data_page_filter);
+  }
+
   // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
   // containing new Page otherwise
   virtual std::shared_ptr<Page> NextPage() = 0;
 
   virtual void set_max_page_header_size(uint32_t size) = 0;
+
+ protected:
+  // Callback that decides if we should skip a page or not.
+  DataPageFilter data_page_filter_;
 };
 
 class PARQUET_EXPORT ColumnReader {
diff --git a/cpp/src/parquet/file_deserialize_test.cc b/cpp/src/parquet/file_deserialize_test.cc
index d0d333256f..76f34a1eec 100644
--- a/cpp/src/parquet/file_deserialize_test.cc
+++ b/cpp/src/parquet/file_deserialize_test.cc
@@ -21,23 +21,24 @@
 #include <cstring>
 #include <memory>
 
+#include "arrow/io/memory.h"
+#include "arrow/status.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/compression.h"
 #include "parquet/column_page.h"
 #include "parquet/column_reader.h"
 #include "parquet/exception.h"
 #include "parquet/file_reader.h"
+#include "parquet/metadata.h"
 #include "parquet/platform.h"
 #include "parquet/test_util.h"
 #include "parquet/thrift_internal.h"
 #include "parquet/types.h"
 
-#include "arrow/io/memory.h"
-#include "arrow/status.h"
-#include "arrow/testing/gtest_util.h"
-#include "arrow/util/compression.h"
-
 namespace parquet {
 
 using ::arrow::io::BufferReader;
+using ::parquet::DataPageStats;
 
 // Adds page statistics occupying a certain amount of bytes (for testing very
 // large page headers)
@@ -123,6 +124,27 @@ class TestPageSerde : public ::testing::Test {
     ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
   }
 
+  void WriteDictionaryPageHeader(int32_t uncompressed_size = 0,
+                                 int32_t compressed_size = 0) {
+    page_header_.__set_dictionary_page_header(dictionary_page_header_);
+    page_header_.uncompressed_page_size = uncompressed_size;
+    page_header_.compressed_page_size = compressed_size;
+    page_header_.type = format::PageType::DICTIONARY_PAGE;
+
+    ThriftSerializer serializer;
+    ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
+  }
+
+  void WriteIndexPageHeader(int32_t uncompressed_size = 0, int32_t compressed_size = 0) {
+    page_header_.__set_index_page_header(index_page_header_);
+    page_header_.uncompressed_page_size = uncompressed_size;
+    page_header_.compressed_page_size = compressed_size;
+    page_header_.type = format::PageType::INDEX_PAGE;
+
+    ThriftSerializer serializer;
+    ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
+  }
+
   void ResetStream() { out_stream_ = CreateOutputStream(); }
 
   void EndStream() { PARQUET_ASSIGN_OR_THROW(out_buffer_, out_stream_->Finish()); }
@@ -135,6 +157,8 @@ class TestPageSerde : public ::testing::Test {
   format::PageHeader page_header_;
   format::DataPageHeader data_page_header_;
   format::DataPageHeaderV2 data_page_header_v2_;
+  format::IndexPageHeader index_page_header_;
+  format::DictionaryPageHeader dictionary_page_header_;
 };
 
 void CheckDataPageHeader(const format::DataPageHeader expected, const Page* page) {
@@ -177,6 +201,305 @@ TEST_F(TestPageSerde, DataPageV1) {
   ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
 }
 
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+  const int kNumPages = 10;
+  void WriteStream();
+  void WritePageWithoutStats();
+  void CheckNumRows(std::optional<int32_t> num_rows, const T& header);
+
+ protected:
+  std::vector<T> data_page_headers_;
+  int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_.__set_num_values(num_rows);
+    this->data_page_header_.statistics.__set_min_value("A" + std::to_string(i));
+    this->data_page_header_.statistics.__set_max_value("Z" + std::to_string(i));
+    this->data_page_header_.statistics.__set_null_count(0);
+    this->data_page_header_.statistics.__set_distinct_count(num_rows);
+    this->data_page_header_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_v2_.__set_num_values(num_rows);
+    this->data_page_header_v2_.__set_num_rows(num_rows);
+    this->data_page_header_v2_.statistics.__set_min_value("A" + std::to_string(i));
+    this->data_page_header_v2_.statistics.__set_max_value("Z" + std::to_string(i));
+    this->data_page_header_v2_.statistics.__set_null_count(0);
+    this->data_page_header_v2_.statistics.__set_distinct_count(num_rows);
+    this->data_page_header_v2_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_v2_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WritePageWithoutStats() {
+  int32_t num_rows = 100;
+  total_rows_ += num_rows;
+  int data_size = 1024;
+  this->data_page_header_.__set_num_values(num_rows);
+  ASSERT_NO_FATAL_FAILURE(
+      this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  data_page_headers_.push_back(this->data_page_header_);
+  std::vector<uint8_t> faux_data(data_size);
+  ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WritePageWithoutStats() {
+  int32_t num_rows = 100;
+  total_rows_ += num_rows;
+  int data_size = 1024;
+  this->data_page_header_v2_.__set_num_values(num_rows);
+  this->data_page_header_v2_.__set_num_rows(num_rows);
+  ASSERT_NO_FATAL_FAILURE(
+      this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+  data_page_headers_.push_back(this->data_page_header_v2_);
+  std::vector<uint8_t> faux_data(data_size);
+  ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeader>::CheckNumRows(
+    std::optional<int32_t> num_rows, const format::DataPageHeader& header) {
+  ASSERT_EQ(num_rows, std::nullopt);
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::CheckNumRows(
+    std::optional<int32_t> num_rows, const format::DataPageHeaderV2& header) {
+  ASSERT_EQ(*num_rows, header.num_rows);
+}
+
+using DataPageHeaderTypes =
+    ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Test that the returned encoded_statistics is nullptr when there are no
+// statistics in the page header.
+TYPED_TEST(PageFilterTest, TestPageWithoutStatistics) {
+  this->WritePageWithoutStats();
+
+  auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+  this->page_reader_ =
+      PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+  int num_pages = 0;
+  bool is_stats_null = false;
+  auto read_all_pages = [&](const DataPageStats& stats) -> bool {
+    is_stats_null = stats.encoded_statistics == nullptr;
+    ++num_pages;
+    return false;
+  };
+
+  this->page_reader_->set_data_page_filter(read_all_pages);
+  std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+  ASSERT_EQ(num_pages, 1);
+  ASSERT_EQ(is_stats_null, true);
+  ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+}
+
+// Creates a number of pages and skips some of them with the page filter callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+  this->WriteStream();
+
+  {  // Read all pages.
+     // Also check that the encoded statistics passed to the callback function
+     // are right.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    std::vector<EncodedStatistics> read_stats;
+    std::vector<int64_t> read_num_values;
+    std::vector<std::optional<int32_t>> read_num_rows;
+    auto read_all_pages = [&](const DataPageStats& stats) -> bool {
+      DCHECK_NE(stats.encoded_statistics, nullptr);
+      read_stats.push_back(*stats.encoded_statistics);
+      read_num_values.push_back(stats.num_values);
+      read_num_rows.push_back(stats.num_rows);
+      return false;
+    };
+
+    this->page_reader_->set_data_page_filter(read_all_pages);
+    for (int i = 0; i < this->kNumPages; ++i) {
+      std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+      ASSERT_NE(current_page, nullptr);
+      ASSERT_NO_FATAL_FAILURE(
+          CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+      auto data_page = static_cast<const DataPage*>(current_page.get());
+      const EncodedStatistics encoded_statistics = data_page->statistics();
+      ASSERT_EQ(read_stats[i].max(), encoded_statistics.max());
+      ASSERT_EQ(read_stats[i].min(), encoded_statistics.min());
+      ASSERT_EQ(read_stats[i].null_count, encoded_statistics.null_count);
+      ASSERT_EQ(read_stats[i].distinct_count, encoded_statistics.distinct_count);
+      ASSERT_EQ(read_num_values[i], this->data_page_headers_[i].num_values);
+      this->CheckNumRows(read_num_rows[i], this->data_page_headers_[i]);
+    }
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+
+  {  // Skip all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+    this->page_reader_->set_data_page_filter(skip_all_pages);
+    std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+
+  {  // Skip every other page.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // Skip pages with even number of values.
+    auto skip_even_pages = [](const DataPageStats& stats) -> bool {
+      if (stats.num_values % 2 == 0) return true;
+      return false;
+    };
+
+    this->page_reader_->set_data_page_filter(skip_even_pages);
+
+    for (int i = 0; i < this->kNumPages; ++i) {
+      // Only pages with odd number of values are read.
+      if (i % 2 != 0) {
+        std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+        ASSERT_NE(current_page, nullptr);
+        ASSERT_NO_FATAL_FAILURE(
+            CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+      }
+    }
+    // We should have exhausted reading the pages by reading the odd pages only.
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+}
+
+// Set the page filter more than once. The new filter should be effective
+// on the next NextPage() call.
+TYPED_TEST(PageFilterTest, TestChangingPageFilter) {
+  this->WriteStream();
+
+  auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+  this->page_reader_ =
+      PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+  // This callback will always return false.
+  auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; };
+  this->page_reader_->set_data_page_filter(read_all_pages);
+  std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+  ASSERT_NE(current_page, nullptr);
+  ASSERT_NO_FATAL_FAILURE(
+      CheckDataPageHeader(this->data_page_headers_[0], current_page.get()));
+
+  // This callback will skip all pages.
+  auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+  this->page_reader_->set_data_page_filter(skip_all_pages);
+  ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+}
+
+// Test that we do not skip dictionary pages.
+TEST_F(TestPageSerde, DoesNotFilterDictionaryPages) {
+  int data_size = 1024;
+  std::vector<uint8_t> faux_data(data_size);
+
+  ASSERT_NO_FATAL_FAILURE(
+      WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(WriteDictionaryPageHeader(data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(
+      WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+  EndStream();
+
+  // Try to read it back while asking for all data pages to be skipped.
+  auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_);
+  page_reader_ = PageReader::Open(stream, /*num_rows=*/100, Compression::UNCOMPRESSED);
+
+  auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+  page_reader_->set_data_page_filter(skip_all_pages);
+  // The first data page is skipped, so we are now at the dictionary page.
+  std::shared_ptr<Page> current_page = page_reader_->NextPage();
+  ASSERT_NE(current_page, nullptr);
+  ASSERT_EQ(current_page->type(), PageType::DICTIONARY_PAGE);
+  // The data page after dictionary page is skipped.
+  ASSERT_EQ(page_reader_->NextPage(), nullptr);
+}
+
+// Tests that we successfully skip non-data pages.
+TEST_F(TestPageSerde, SkipsNonDataPages) {
+  int data_size = 1024;
+  std::vector<uint8_t> faux_data(data_size);
+  ASSERT_NO_FATAL_FAILURE(WriteIndexPageHeader(data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(
+      WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(WriteIndexPageHeader(data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+  ASSERT_NO_FATAL_FAILURE(WriteIndexPageHeader(data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(
+      WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+  ASSERT_NO_FATAL_FAILURE(WriteIndexPageHeader(data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+  EndStream();
+
+  auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_);
+  page_reader_ = PageReader::Open(stream, /*num_rows=*/100, Compression::UNCOMPRESSED);
+
+  // Only the two data pages are returned.
+  std::shared_ptr<Page> current_page = page_reader_->NextPage();
+  ASSERT_EQ(current_page->type(), PageType::DATA_PAGE);
+  current_page = page_reader_->NextPage();
+  ASSERT_EQ(current_page->type(), PageType::DATA_PAGE);
+  ASSERT_EQ(page_reader_->NextPage(), nullptr);
+}
+
 TEST_F(TestPageSerde, DataPageV2) {
   int stats_size = 512;
   const int32_t num_rows = 4444;
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index fd7067b8a7..9e8412217d 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -33,7 +33,6 @@
 #include "parquet/exception.h"
 #include "parquet/schema.h"
 #include "parquet/schema_internal.h"
-#include "parquet/statistics.h"
 #include "parquet/thrift_internal.h"
 
 namespace parquet {
diff --git a/cpp/src/parquet/statistics.cc b/cpp/src/parquet/statistics.cc
index 13d20fcb33..deba7ad3c2 100644
--- a/cpp/src/parquet/statistics.cc
+++ b/cpp/src/parquet/statistics.cc
@@ -858,6 +858,17 @@ std::shared_ptr<Statistics> Statistics::Make(Type::type physical_type, const voi
   return nullptr;
 }
 
+std::shared_ptr<Statistics> Statistics::Make(const ColumnDescriptor* descr,
+                                             const EncodedStatistics* encoded_stats,
+                                             int64_t num_values,
+                                             ::arrow::MemoryPool* pool) {
+  DCHECK(encoded_stats != nullptr);
+  return Make(descr, encoded_stats->min(), encoded_stats->max(), num_values,
+              encoded_stats->null_count, encoded_stats->distinct_count,
+              encoded_stats->has_min && encoded_stats->has_max,
+              encoded_stats->has_null_count, encoded_stats->has_distinct_count, pool);
+}
+
 std::shared_ptr<Statistics> Statistics::Make(const ColumnDescriptor* descr,
                                              const std::string& encoded_min,
                                              const std::string& encoded_max,
diff --git a/cpp/src/parquet/statistics.h b/cpp/src/parquet/statistics.h
index b4e9fb382a..71d9b662ba 100644
--- a/cpp/src/parquet/statistics.h
+++ b/cpp/src/parquet/statistics.h
@@ -216,6 +216,14 @@ class PARQUET_EXPORT Statistics {
       bool has_distinct_count,
       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
 
+  // Helper function to convert EncodedStatistics to Statistics.
+  // EncodedStatistics does not contain number of non-null values, and it can be
+  // passed using the num_values parameter.
+  static std::shared_ptr<Statistics> Make(
+      const ColumnDescriptor* descr, const EncodedStatistics* encoded_statistics,
+      int64_t num_values = -1,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
   /// \brief Return true if the count of null values is set
   virtual bool HasNullCount() const = 0;