You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2023/01/12 23:11:22 UTC
[arrow] branch master updated: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback (#14603)
This is an automated email from the ASF dual-hosted git repository.
emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 97998d835f PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback (#14603)
97998d835f is described below
commit 97998d835f404dd4876a2691a93b973fc022ffd3
Author: Fatemah Panahi <fa...@users.noreply.github.com>
AuthorDate: Thu Jan 12 15:11:12 2023 -0800
PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback (#14603)
Currently, we do not expose the page header metadata and they cannot be used for skipping pages. I propose exposing the metadata through a callback that would allow the caller to decide if they want to read or skip the page based on the metadata.
Authored-by: Fatemah Panahi <pa...@google.com>
Signed-off-by: Micah Kornfield <mi...@google.com>
---
cpp/src/parquet/column_reader.cc | 99 ++++++---
cpp/src/parquet/column_reader.h | 39 ++++
cpp/src/parquet/file_deserialize_test.cc | 333 ++++++++++++++++++++++++++++++-
cpp/src/parquet/metadata.cc | 1 -
cpp/src/parquet/statistics.cc | 11 +
cpp/src/parquet/statistics.h | 8 +
6 files changed, 457 insertions(+), 34 deletions(-)
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index f881651737..3670af49fb 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -226,6 +226,12 @@ EncodedStatistics ExtractStatsFromHeader(const H& header) {
return page_statistics;
}
+void CheckNumValuesInHeader(int num_values) {
+ if (num_values < 0) {
+ throw ParquetException("Invalid page header (negative number of values)");
+ }
+}
+
// ----------------------------------------------------------------------
// SerializedPageReader deserializes Thrift metadata and pages that have been
// assembled in a serialized stream for storing in a Parquet files
@@ -269,6 +275,11 @@ class SerializedPageReader : public PageReader {
int compressed_len, int uncompressed_len,
int levels_byte_len = 0);
+ // Returns true for non-data pages, and if we should skip based on
+ // data_page_filter_. Performs basic checks on values in the page header.
+ // Fills in data_page_statistics.
+ bool ShouldSkipPage(EncodedStatistics* data_page_statistics);
+
const ReaderProperties properties_;
std::shared_ptr<ArrowInputStream> stream_;
@@ -342,6 +353,55 @@ void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& de
}
}
+bool SerializedPageReader::ShouldSkipPage(EncodedStatistics* data_page_statistics) {
+ const PageType::type page_type = LoadEnumSafe(¤t_page_header_.type);
+ if (page_type == PageType::DATA_PAGE) {
+ const format::DataPageHeader& header = current_page_header_.data_page_header;
+ CheckNumValuesInHeader(header.num_values);
+ *data_page_statistics = ExtractStatsFromHeader(header);
+ seen_num_values_ += header.num_values;
+ if (data_page_filter_) {
+ const EncodedStatistics* filter_statistics =
+ data_page_statistics->is_set() ? data_page_statistics : nullptr;
+ DataPageStats data_page_stats(filter_statistics, header.num_values,
+ /*num_rows=*/std::nullopt);
+ if (data_page_filter_(data_page_stats)) {
+ return true;
+ }
+ }
+ } else if (page_type == PageType::DATA_PAGE_V2) {
+ const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
+ CheckNumValuesInHeader(header.num_values);
+ if (header.num_rows < 0) {
+ throw ParquetException("Invalid page header (negative number of rows)");
+ }
+ if (header.definition_levels_byte_length < 0 ||
+ header.repetition_levels_byte_length < 0) {
+ throw ParquetException("Invalid page header (negative levels byte length)");
+ }
+ *data_page_statistics = ExtractStatsFromHeader(header);
+ seen_num_values_ += header.num_values;
+ if (data_page_filter_) {
+ const EncodedStatistics* filter_statistics =
+ data_page_statistics->is_set() ? data_page_statistics : nullptr;
+ DataPageStats data_page_stats(filter_statistics, header.num_values,
+ header.num_rows);
+ if (data_page_filter_(data_page_stats)) {
+ return true;
+ }
+ }
+ } else if (page_type == PageType::DICTIONARY_PAGE) {
+ const format::DictionaryPageHeader& dict_header =
+ current_page_header_.dictionary_page_header;
+ CheckNumValuesInHeader(dict_header.num_values);
+ } else {
+ // We don't know what this page type is. We're allowed to skip non-data
+ // pages.
+ return true;
+ }
+ return false;
+}
+
std::shared_ptr<Page> SerializedPageReader::NextPage() {
ThriftDeserializer deserializer(properties_);
@@ -391,6 +451,12 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
throw ParquetException("Invalid page header");
}
+ EncodedStatistics data_page_statistics;
+ if (ShouldSkipPage(&data_page_statistics)) {
+ PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
+ continue;
+ }
+
if (crypto_ctx_.data_decryptor != nullptr) {
UpdateDecryption(crypto_ctx_.data_decryptor, encryption::kDictionaryPage,
&data_page_aad_);
@@ -416,19 +482,14 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
page_buffer = decryption_buffer_;
}
+ // Uncompress and construct the pages to return.
const PageType::type page_type = LoadEnumSafe(¤t_page_header_.type);
-
if (page_type == PageType::DICTIONARY_PAGE) {
crypto_ctx_.start_decrypt_with_dictionary_page = false;
const format::DictionaryPageHeader& dict_header =
current_page_header_.dictionary_page_header;
-
bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
- if (dict_header.num_values < 0) {
- throw ParquetException("Invalid page header (negative number of values)");
- }
- // Uncompress if needed
page_buffer =
DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);
@@ -438,14 +499,6 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
} else if (page_type == PageType::DATA_PAGE) {
++page_ordinal_;
const format::DataPageHeader& header = current_page_header_.data_page_header;
-
- if (header.num_values < 0) {
- throw ParquetException("Invalid page header (negative number of values)");
- }
- EncodedStatistics page_statistics = ExtractStatsFromHeader(header);
- seen_num_values_ += header.num_values;
-
- // Uncompress if needed
page_buffer =
DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);
@@ -453,24 +506,15 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
LoadEnumSafe(&header.encoding),
LoadEnumSafe(&header.definition_level_encoding),
LoadEnumSafe(&header.repetition_level_encoding),
- uncompressed_len, page_statistics);
+ uncompressed_len, data_page_statistics);
} else if (page_type == PageType::DATA_PAGE_V2) {
++page_ordinal_;
const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
- if (header.num_values < 0) {
- throw ParquetException("Invalid page header (negative number of values)");
- }
- if (header.definition_levels_byte_length < 0 ||
- header.repetition_levels_byte_length < 0) {
- throw ParquetException("Invalid page header (negative levels byte length)");
- }
// Arrow prior to 3.0.0 set is_compressed to false but still compressed.
bool is_compressed =
(header.__isset.is_compressed ? header.is_compressed : false) ||
always_compressed_;
- EncodedStatistics page_statistics = ExtractStatsFromHeader(header);
- seen_num_values_ += header.num_values;
// Uncompress if needed
int levels_byte_len;
@@ -489,11 +533,10 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
page_buffer, header.num_values, header.num_nulls, header.num_rows,
LoadEnumSafe(&header.encoding), header.definition_levels_byte_length,
header.repetition_levels_byte_length, uncompressed_len, is_compressed,
- page_statistics);
+ data_page_statistics);
} else {
- // We don't know what this page type is. We're allowed to skip non-data
- // pages.
- continue;
+ throw ParquetException(
+ "Internal error, we have already skipped non-data pages in ShouldSkipPage()");
}
}
return std::shared_ptr<Page>(nullptr);
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index b06266de38..b5f96f8fc4 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -24,6 +24,7 @@
#include "parquet/exception.h"
#include "parquet/level_conversion.h"
+#include "parquet/metadata.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
@@ -55,6 +56,26 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
// 16 KB is the default expected page header size
static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
+// \brief DataPageStats stores encoded statistics and number of values/rows for
+// a page.
+struct PARQUET_EXPORT DataPageStats {
+ DataPageStats(const EncodedStatistics* encoded_statistics, int32_t num_values,
+ std::optional<int32_t> num_rows)
+ : encoded_statistics(encoded_statistics),
+ num_values(num_values),
+ num_rows(num_rows) {}
+
+ // Encoded statistics extracted from the page header.
+ // Nullptr if there are no statistics in the page header.
+ const EncodedStatistics* encoded_statistics;
+ // Number of values stored in the page. Filled for both V1 and V2 data pages.
+ // For repeated fields, this can be greater than number of rows. For
+ // non-repeated fields, this will be the same as the number of rows.
+ int32_t num_values;
+ // Number of rows stored in the page. std::nullopt if not available.
+ std::optional<int32_t> num_rows;
+};
+
class PARQUET_EXPORT LevelDecoder {
public:
LevelDecoder();
@@ -100,6 +121,8 @@ struct CryptoContext {
// Abstract page iterator interface. This way, we can feed column pages to the
// ColumnReader through whatever mechanism we choose
class PARQUET_EXPORT PageReader {
+ using DataPageFilter = std::function<bool(const DataPageStats&)>;
+
public:
virtual ~PageReader() = default;
@@ -115,11 +138,27 @@ class PARQUET_EXPORT PageReader {
bool always_compressed = false,
const CryptoContext* ctx = NULLPTR);
+ // If data_page_filter is present (not null), NextPage() will call the
+ // callback function exactly once per page in the order the pages appear in
+ // the column. If the callback function returns true the page will be
+ // skipped. The callback will be called only if the page type is DATA_PAGE or
+ // DATA_PAGE_V2. Dictionary pages will not be skipped.
+ // Caller is responsible for checking that statistics are correct using
+ // ApplicationVersion::HasCorrectStatistics().
+ // \note API EXPERIMENTAL
+ void set_data_page_filter(DataPageFilter data_page_filter) {
+ data_page_filter_ = std::move(data_page_filter);
+ }
+
// @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
// containing new Page otherwise
virtual std::shared_ptr<Page> NextPage() = 0;
virtual void set_max_page_header_size(uint32_t size) = 0;
+
+ protected:
+ // Callback that decides if we should skip a page or not.
+ DataPageFilter data_page_filter_;
};
class PARQUET_EXPORT ColumnReader {
diff --git a/cpp/src/parquet/file_deserialize_test.cc b/cpp/src/parquet/file_deserialize_test.cc
index d0d333256f..76f34a1eec 100644
--- a/cpp/src/parquet/file_deserialize_test.cc
+++ b/cpp/src/parquet/file_deserialize_test.cc
@@ -21,23 +21,24 @@
#include <cstring>
#include <memory>
+#include "arrow/io/memory.h"
+#include "arrow/status.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/compression.h"
#include "parquet/column_page.h"
#include "parquet/column_reader.h"
#include "parquet/exception.h"
#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
#include "parquet/platform.h"
#include "parquet/test_util.h"
#include "parquet/thrift_internal.h"
#include "parquet/types.h"
-#include "arrow/io/memory.h"
-#include "arrow/status.h"
-#include "arrow/testing/gtest_util.h"
-#include "arrow/util/compression.h"
-
namespace parquet {
using ::arrow::io::BufferReader;
+using ::parquet::DataPageStats;
// Adds page statistics occupying a certain amount of bytes (for testing very
// large page headers)
@@ -123,6 +124,27 @@ class TestPageSerde : public ::testing::Test {
ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
}
+ void WriteDictionaryPageHeader(int32_t uncompressed_size = 0,
+ int32_t compressed_size = 0) {
+ page_header_.__set_dictionary_page_header(dictionary_page_header_);
+ page_header_.uncompressed_page_size = uncompressed_size;
+ page_header_.compressed_page_size = compressed_size;
+ page_header_.type = format::PageType::DICTIONARY_PAGE;
+
+ ThriftSerializer serializer;
+ ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
+ }
+
+ void WriteIndexPageHeader(int32_t uncompressed_size = 0, int32_t compressed_size = 0) {
+ page_header_.__set_index_page_header(index_page_header_);
+ page_header_.uncompressed_page_size = uncompressed_size;
+ page_header_.compressed_page_size = compressed_size;
+ page_header_.type = format::PageType::INDEX_PAGE;
+
+ ThriftSerializer serializer;
+ ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
+ }
+
void ResetStream() { out_stream_ = CreateOutputStream(); }
void EndStream() { PARQUET_ASSIGN_OR_THROW(out_buffer_, out_stream_->Finish()); }
@@ -135,6 +157,8 @@ class TestPageSerde : public ::testing::Test {
format::PageHeader page_header_;
format::DataPageHeader data_page_header_;
format::DataPageHeaderV2 data_page_header_v2_;
+ format::IndexPageHeader index_page_header_;
+ format::DictionaryPageHeader dictionary_page_header_;
};
void CheckDataPageHeader(const format::DataPageHeader expected, const Page* page) {
@@ -177,6 +201,305 @@ TEST_F(TestPageSerde, DataPageV1) {
ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
}
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+ const int kNumPages = 10;
+ void WriteStream();
+ void WritePageWithoutStats();
+ void CheckNumRows(std::optional<int32_t> num_rows, const T& header);
+
+ protected:
+ std::vector<T> data_page_headers_;
+ int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+ for (int i = 0; i < kNumPages; ++i) {
+ // Vary the number of rows to produce different headers.
+ int32_t num_rows = i + 100;
+ total_rows_ += num_rows;
+ int data_size = i + 1024;
+ this->data_page_header_.__set_num_values(num_rows);
+ this->data_page_header_.statistics.__set_min_value("A" + std::to_string(i));
+ this->data_page_header_.statistics.__set_max_value("Z" + std::to_string(i));
+ this->data_page_header_.statistics.__set_null_count(0);
+ this->data_page_header_.statistics.__set_distinct_count(num_rows);
+ this->data_page_header_.__isset.statistics = true;
+ ASSERT_NO_FATAL_FAILURE(
+ this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+ data_page_headers_.push_back(this->data_page_header_);
+ // Also write data, to make sure we skip the data correctly.
+ std::vector<uint8_t> faux_data(data_size);
+ ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+ }
+ this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+ for (int i = 0; i < kNumPages; ++i) {
+ // Vary the number of rows to produce different headers.
+ int32_t num_rows = i + 100;
+ total_rows_ += num_rows;
+ int data_size = i + 1024;
+ this->data_page_header_v2_.__set_num_values(num_rows);
+ this->data_page_header_v2_.__set_num_rows(num_rows);
+ this->data_page_header_v2_.statistics.__set_min_value("A" + std::to_string(i));
+ this->data_page_header_v2_.statistics.__set_max_value("Z" + std::to_string(i));
+ this->data_page_header_v2_.statistics.__set_null_count(0);
+ this->data_page_header_v2_.statistics.__set_distinct_count(num_rows);
+ this->data_page_header_v2_.__isset.statistics = true;
+ ASSERT_NO_FATAL_FAILURE(
+ this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+ data_page_headers_.push_back(this->data_page_header_v2_);
+ // Also write data, to make sure we skip the data correctly.
+ std::vector<uint8_t> faux_data(data_size);
+ ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+ }
+ this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WritePageWithoutStats() {
+ int32_t num_rows = 100;
+ total_rows_ += num_rows;
+ int data_size = 1024;
+ this->data_page_header_.__set_num_values(num_rows);
+ ASSERT_NO_FATAL_FAILURE(
+ this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+ data_page_headers_.push_back(this->data_page_header_);
+ std::vector<uint8_t> faux_data(data_size);
+ ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+ this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WritePageWithoutStats() {
+ int32_t num_rows = 100;
+ total_rows_ += num_rows;
+ int data_size = 1024;
+ this->data_page_header_v2_.__set_num_values(num_rows);
+ this->data_page_header_v2_.__set_num_rows(num_rows);
+ ASSERT_NO_FATAL_FAILURE(
+ this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+ data_page_headers_.push_back(this->data_page_header_v2_);
+ std::vector<uint8_t> faux_data(data_size);
+ ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+ this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeader>::CheckNumRows(
+ std::optional<int32_t> num_rows, const format::DataPageHeader& header) {
+ ASSERT_EQ(num_rows, std::nullopt);
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::CheckNumRows(
+ std::optional<int32_t> num_rows, const format::DataPageHeaderV2& header) {
+ ASSERT_EQ(*num_rows, header.num_rows);
+}
+
+using DataPageHeaderTypes =
+ ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Test that the returned encoded_statistics is nullptr when there are no
+// statistics in the page header.
+TYPED_TEST(PageFilterTest, TestPageWithoutStatistics) {
+ this->WritePageWithoutStats();
+
+ auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ int num_pages = 0;
+ bool is_stats_null = false;
+ auto read_all_pages = [&](const DataPageStats& stats) -> bool {
+ is_stats_null = stats.encoded_statistics == nullptr;
+ ++num_pages;
+ return false;
+ };
+
+ this->page_reader_->set_data_page_filter(read_all_pages);
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_EQ(num_pages, 1);
+ ASSERT_EQ(is_stats_null, true);
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+}
+
+// Creates a number of pages and skips some of them with the page filter callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+ this->WriteStream();
+
+ { // Read all pages.
+ // Also check that the encoded statistics passed to the callback function
+ // are right.
+ auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ std::vector<EncodedStatistics> read_stats;
+ std::vector<int64_t> read_num_values;
+ std::vector<std::optional<int32_t>> read_num_rows;
+ auto read_all_pages = [&](const DataPageStats& stats) -> bool {
+ DCHECK_NE(stats.encoded_statistics, nullptr);
+ read_stats.push_back(*stats.encoded_statistics);
+ read_num_values.push_back(stats.num_values);
+ read_num_rows.push_back(stats.num_rows);
+ return false;
+ };
+
+ this->page_reader_->set_data_page_filter(read_all_pages);
+ for (int i = 0; i < this->kNumPages; ++i) {
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_NE(current_page, nullptr);
+ ASSERT_NO_FATAL_FAILURE(
+ CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+ auto data_page = static_cast<const DataPage*>(current_page.get());
+ const EncodedStatistics encoded_statistics = data_page->statistics();
+ ASSERT_EQ(read_stats[i].max(), encoded_statistics.max());
+ ASSERT_EQ(read_stats[i].min(), encoded_statistics.min());
+ ASSERT_EQ(read_stats[i].null_count, encoded_statistics.null_count);
+ ASSERT_EQ(read_stats[i].distinct_count, encoded_statistics.distinct_count);
+ ASSERT_EQ(read_num_values[i], this->data_page_headers_[i].num_values);
+ this->CheckNumRows(read_num_rows[i], this->data_page_headers_[i]);
+ }
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+ }
+
+ { // Skip all pages.
+ auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+ this->page_reader_->set_data_page_filter(skip_all_pages);
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+ }
+
+ { // Skip every other page.
+ auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ // Skip pages with even number of values.
+ auto skip_even_pages = [](const DataPageStats& stats) -> bool {
+ if (stats.num_values % 2 == 0) return true;
+ return false;
+ };
+
+ this->page_reader_->set_data_page_filter(skip_even_pages);
+
+ for (int i = 0; i < this->kNumPages; ++i) {
+ // Only pages with odd number of values are read.
+ if (i % 2 != 0) {
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_NE(current_page, nullptr);
+ ASSERT_NO_FATAL_FAILURE(
+ CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+ }
+ }
+ // We should have exhausted reading the pages by reading the odd pages only.
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+ }
+}
+
+// Set the page filter more than once. The new filter should be effective
+// on the next NextPage() call.
+TYPED_TEST(PageFilterTest, TestChangingPageFilter) {
+ this->WriteStream();
+
+ auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ // This callback will always return false.
+ auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; };
+ this->page_reader_->set_data_page_filter(read_all_pages);
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_NE(current_page, nullptr);
+ ASSERT_NO_FATAL_FAILURE(
+ CheckDataPageHeader(this->data_page_headers_[0], current_page.get()));
+
+ // This callback will skip all pages.
+ auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+ this->page_reader_->set_data_page_filter(skip_all_pages);
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+}
+
+// Test that we do not skip dictionary pages.
+TEST_F(TestPageSerde, DoesNotFilterDictionaryPages) {
+ int data_size = 1024;
+ std::vector<uint8_t> faux_data(data_size);
+
+ ASSERT_NO_FATAL_FAILURE(
+ WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+ ASSERT_NO_FATAL_FAILURE(WriteDictionaryPageHeader(data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+ ASSERT_NO_FATAL_FAILURE(
+ WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+ EndStream();
+
+ // Try to read it back while asking for all data pages to be skipped.
+ auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_);
+ page_reader_ = PageReader::Open(stream, /*num_rows=*/100, Compression::UNCOMPRESSED);
+
+ auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+ page_reader_->set_data_page_filter(skip_all_pages);
+ // The first data page is skipped, so we are now at the dictionary page.
+ std::shared_ptr<Page> current_page = page_reader_->NextPage();
+ ASSERT_NE(current_page, nullptr);
+ ASSERT_EQ(current_page->type(), PageType::DICTIONARY_PAGE);
+ // The data page after dictionary page is skipped.
+ ASSERT_EQ(page_reader_->NextPage(), nullptr);
+}
+
+// Tests that we successfully skip non-data pages.
+TEST_F(TestPageSerde, SkipsNonDataPages) {
+ int data_size = 1024;
+ std::vector<uint8_t> faux_data(data_size);
+ ASSERT_NO_FATAL_FAILURE(WriteIndexPageHeader(data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+ ASSERT_NO_FATAL_FAILURE(
+ WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+ ASSERT_NO_FATAL_FAILURE(WriteIndexPageHeader(data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+ ASSERT_NO_FATAL_FAILURE(WriteIndexPageHeader(data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+ ASSERT_NO_FATAL_FAILURE(
+ WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+ ASSERT_NO_FATAL_FAILURE(WriteIndexPageHeader(data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+ EndStream();
+
+ auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_);
+ page_reader_ = PageReader::Open(stream, /*num_rows=*/100, Compression::UNCOMPRESSED);
+
+ // Only the two data pages are returned.
+ std::shared_ptr<Page> current_page = page_reader_->NextPage();
+ ASSERT_EQ(current_page->type(), PageType::DATA_PAGE);
+ current_page = page_reader_->NextPage();
+ ASSERT_EQ(current_page->type(), PageType::DATA_PAGE);
+ ASSERT_EQ(page_reader_->NextPage(), nullptr);
+}
+
TEST_F(TestPageSerde, DataPageV2) {
int stats_size = 512;
const int32_t num_rows = 4444;
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index fd7067b8a7..9e8412217d 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -33,7 +33,6 @@
#include "parquet/exception.h"
#include "parquet/schema.h"
#include "parquet/schema_internal.h"
-#include "parquet/statistics.h"
#include "parquet/thrift_internal.h"
namespace parquet {
diff --git a/cpp/src/parquet/statistics.cc b/cpp/src/parquet/statistics.cc
index 13d20fcb33..deba7ad3c2 100644
--- a/cpp/src/parquet/statistics.cc
+++ b/cpp/src/parquet/statistics.cc
@@ -858,6 +858,17 @@ std::shared_ptr<Statistics> Statistics::Make(Type::type physical_type, const voi
return nullptr;
}
+std::shared_ptr<Statistics> Statistics::Make(const ColumnDescriptor* descr,
+ const EncodedStatistics* encoded_stats,
+ int64_t num_values,
+ ::arrow::MemoryPool* pool) {
+ DCHECK(encoded_stats != nullptr);
+ return Make(descr, encoded_stats->min(), encoded_stats->max(), num_values,
+ encoded_stats->null_count, encoded_stats->distinct_count,
+ encoded_stats->has_min && encoded_stats->has_max,
+ encoded_stats->has_null_count, encoded_stats->has_distinct_count, pool);
+}
+
std::shared_ptr<Statistics> Statistics::Make(const ColumnDescriptor* descr,
const std::string& encoded_min,
const std::string& encoded_max,
diff --git a/cpp/src/parquet/statistics.h b/cpp/src/parquet/statistics.h
index b4e9fb382a..71d9b662ba 100644
--- a/cpp/src/parquet/statistics.h
+++ b/cpp/src/parquet/statistics.h
@@ -216,6 +216,14 @@ class PARQUET_EXPORT Statistics {
bool has_distinct_count,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+ // Helper function to convert EncodedStatistics to Statistics.
+ // EncodedStatistics does not contain number of non-null values, and it can be
+ // passed using the num_values parameter.
+ static std::shared_ptr<Statistics> Make(
+ const ColumnDescriptor* descr, const EncodedStatistics* encoded_statistics,
+ int64_t num_values = -1,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
/// \brief Return true if the count of null values is set
virtual bool HasNullCount() const = 0;