You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by wj...@apache.org on 2023/04/19 18:36:13 UTC
[arrow] branch main updated: GH-34949: [C++][Parquet] Enable page index by columns (#35230)
This is an automated email from the ASF dual-hosted git repository.
wjones127 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new d2c4c21218 GH-34949: [C++][Parquet] Enable page index by columns (#35230)
d2c4c21218 is described below
commit d2c4c212185037adc8ae5eebef7703d1b41f2383
Author: Gang Wu <us...@gmail.com>
AuthorDate: Thu Apr 20 02:36:05 2023 +0800
GH-34949: [C++][Parquet] Enable page index by columns (#35230)
### Rationale for this change
Currently parquet writer only supports enabling page index for all columns. It would be good to enable/disable at the column level as sometimes it may not be useful for some columns but it pays to create them.
### What changes are included in this PR?
Similar to `WriterProperties::Builder::enable_dictionary/disable_dictionary`, this patch adds `WriterProperties::Builder::enable_write_page_index/disable_write_page_index` and keep it backward compatible to enable/disable for all columns.
### Are these changes tested?
Added `ParquetPageIndexRoundTripTest::EnablePerColumn` to cover the new settings.
### Are there any user-facing changes?
Yes, users are now more flexible to enable/disable page index.
* Closes: #34949
Authored-by: Gang Wu <us...@gmail.com>
Signed-off-by: Will Jones <wi...@gmail.com>
---
cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 38 ++++++++++-
cpp/src/parquet/column_writer.cc | 2 +-
cpp/src/parquet/file_writer.cc | 10 +--
cpp/src/parquet/properties.h | 81 +++++++++++++++++------
4 files changed, 102 insertions(+), 29 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index b5456e89c6..831513a94d 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -26,6 +26,7 @@
#include <cstdint>
#include <functional>
+#include <set>
#include <sstream>
#include <vector>
@@ -5235,7 +5236,8 @@ class ParquetPageIndexRoundTripTest : public ::testing::Test {
ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish());
}
- void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages) {
+ void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages,
+ const std::set<int>& expect_columns_without_index = {}) {
auto read_properties = default_arrow_reader_properties();
auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));
@@ -5255,7 +5257,12 @@ class ParquetPageIndexRoundTripTest : public ::testing::Test {
column_indexes_.emplace_back(column_index.get());
auto offset_index = row_group_index_reader->GetOffsetIndex(col);
- CheckOffsetIndex(offset_index.get(), expect_num_pages, &offset_lower_bound);
+ if (expect_columns_without_index.find(col) !=
+ expect_columns_without_index.cend()) {
+ ASSERT_EQ(offset_index, nullptr);
+ } else {
+ CheckOffsetIndex(offset_index.get(), expect_num_pages, &offset_lower_bound);
+ }
}
}
}
@@ -5425,5 +5432,32 @@ TEST_F(ParquetPageIndexRoundTripTest, DoubleWithNaNs) {
/* Page with only NaN values does not have column index built */}));
}
+TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
+ auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()),
+ ::arrow::field("c1", ::arrow::int64()),
+ ::arrow::field("c2", ::arrow::int64())});
+ auto writer_properties =
+ WriterProperties::Builder()
+ .enable_write_page_index() /* enable by default */
+ ->enable_write_page_index("c0") /* enable c0 explicitly */
+ ->disable_write_page_index("c1") /* disable c1 explicitly */
+ ->build();
+ WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([[0, 1, 2]])"}));
+
+ ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1,
+ /*expect_columns_without_index=*/{1});
+
+ EXPECT_THAT(
+ column_indexes_,
+ ::testing::ElementsAre(
+ ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(0)},
+ /*max_values=*/{encode_int64(0)}, BoundaryOrder::Ascending,
+ /*null_counts=*/{0}},
+ ColumnIndexObject{/* page index of c1 is disabled */},
+ ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(2)},
+ /*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending,
+ /*null_counts=*/{0}}));
+}
+
} // namespace arrow
} // namespace parquet
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 222fc853e3..2892335227 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -1368,7 +1368,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
bool pages_change_on_record_boundaries() const {
return properties_->data_page_version() == ParquetDataPageVersion::V2 ||
- properties_->write_page_index();
+ properties_->page_index_enabled(descr_->path());
}
private:
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index f1098e4a74..57067bc533 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -146,10 +146,10 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
auto data_encryptor =
file_encryptor_ ? file_encryptor_->GetColumnDataEncryptor(path->ToDotString())
: nullptr;
- auto ci_builder = page_index_builder_
+ auto ci_builder = page_index_builder_ && properties_->page_index_enabled(path)
? page_index_builder_->GetColumnIndexBuilder(column_ordinal)
: nullptr;
- auto oi_builder = page_index_builder_
+ auto oi_builder = page_index_builder_ && properties_->page_index_enabled(path)
? page_index_builder_->GetOffsetIndexBuilder(column_ordinal)
: nullptr;
std::unique_ptr<PageWriter> pager = PageWriter::Open(
@@ -283,10 +283,10 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
auto data_encryptor =
file_encryptor_ ? file_encryptor_->GetColumnDataEncryptor(path->ToDotString())
: nullptr;
- auto ci_builder = page_index_builder_
+ auto ci_builder = page_index_builder_ && properties_->page_index_enabled(path)
? page_index_builder_->GetColumnIndexBuilder(column_ordinal)
: nullptr;
- auto oi_builder = page_index_builder_
+ auto oi_builder = page_index_builder_ && properties_->page_index_enabled(path)
? page_index_builder_->GetOffsetIndexBuilder(column_ordinal)
: nullptr;
std::unique_ptr<PageWriter> pager = PageWriter::Open(
@@ -505,7 +505,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
}
}
- if (properties_->write_page_index()) {
+ if (properties_->page_index_enabled()) {
page_index_builder_ = PageIndexBuilder::Make(&schema_);
}
}
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index d892788960..f38dd17482 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -138,6 +138,7 @@ static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096;
static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;
+static constexpr bool DEFAULT_IS_PAGE_INDEX_ENABLED = false;
class PARQUET_EXPORT ColumnProperties {
public:
@@ -145,13 +146,15 @@ class PARQUET_EXPORT ColumnProperties {
Compression::type codec = DEFAULT_COMPRESSION_TYPE,
bool dictionary_enabled = DEFAULT_IS_DICTIONARY_ENABLED,
bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED,
- size_t max_stats_size = DEFAULT_MAX_STATISTICS_SIZE)
+ size_t max_stats_size = DEFAULT_MAX_STATISTICS_SIZE,
+ bool page_index_enabled = DEFAULT_IS_PAGE_INDEX_ENABLED)
: encoding_(encoding),
codec_(codec),
dictionary_enabled_(dictionary_enabled),
statistics_enabled_(statistics_enabled),
max_stats_size_(max_stats_size),
- compression_level_(Codec::UseDefaultCompressionLevel()) {}
+ compression_level_(Codec::UseDefaultCompressionLevel()),
+ page_index_enabled_(DEFAULT_IS_PAGE_INDEX_ENABLED) {}
void set_encoding(Encoding::type encoding) { encoding_ = encoding; }
@@ -173,6 +176,10 @@ class PARQUET_EXPORT ColumnProperties {
compression_level_ = compression_level;
}
+ void set_page_index_enabled(bool page_index_enabled) {
+ page_index_enabled_ = page_index_enabled;
+ }
+
Encoding::type encoding() const { return encoding_; }
Compression::type compression() const { return codec_; }
@@ -185,6 +192,8 @@ class PARQUET_EXPORT ColumnProperties {
int compression_level() const { return compression_level_; }
+ bool page_index_enabled() const { return page_index_enabled_; }
+
private:
Encoding::type encoding_;
Compression::type codec_;
@@ -192,6 +201,7 @@ class PARQUET_EXPORT ColumnProperties {
bool statistics_enabled_;
size_t max_stats_size_;
int compression_level_;
+ bool page_index_enabled_;
};
class PARQUET_EXPORT WriterProperties {
@@ -208,8 +218,7 @@ class PARQUET_EXPORT WriterProperties {
data_page_version_(ParquetDataPageVersion::V1),
created_by_(DEFAULT_CREATED_BY),
store_decimal_as_integer_(false),
- page_checksum_enabled_(false),
- write_page_index_(false) {}
+ page_checksum_enabled_(false) {}
virtual ~Builder() {}
/// Specify the memory pool for the writer. Default default_memory_pool.
@@ -502,28 +511,46 @@ class PARQUET_EXPORT WriterProperties {
return this;
}
- /// Enable writing page index.
+ /// Enable writing page index in general for all columns. Default disabled.
///
/// Page index contains statistics for data pages and can be used to skip pages
/// when scanning data in ordered and unordered columns.
///
/// Please check the link below for more details:
/// https://github.com/apache/parquet-format/blob/master/PageIndex.md
- ///
- /// Default disabled.
Builder* enable_write_page_index() {
- write_page_index_ = true;
+ default_column_properties_.set_page_index_enabled(true);
return this;
}
- /// Disable writing page index.
- ///
- /// Default disabled.
+ /// Disable writing page index in general for all columns. Default disabled.
Builder* disable_write_page_index() {
- write_page_index_ = false;
+ default_column_properties_.set_page_index_enabled(false);
+ return this;
+ }
+
+ /// Enable writing page index for column specified by `path`. Default disabled.
+ Builder* enable_write_page_index(const std::string& path) {
+ page_index_enabled_[path] = true;
+ return this;
+ }
+
+ /// Enable writing page index for column specified by `path`. Default disabled.
+ Builder* enable_write_page_index(const std::shared_ptr<schema::ColumnPath>& path) {
+ return this->enable_write_page_index(path->ToDotString());
+ }
+
+ /// Disable writing page index for column specified by `path`. Default disabled.
+ Builder* disable_write_page_index(const std::string& path) {
+ page_index_enabled_[path] = false;
return this;
}
+ /// Disable writing page index for column specified by `path`. Default disabled.
+ Builder* disable_write_page_index(const std::shared_ptr<schema::ColumnPath>& path) {
+ return this->disable_write_page_index(path->ToDotString());
+ }
+
/// \brief Build the WriterProperties with the builder parameters.
/// \return The WriterProperties defined by the builder.
std::shared_ptr<WriterProperties> build() {
@@ -544,13 +571,14 @@ class PARQUET_EXPORT WriterProperties {
get(item.first).set_dictionary_enabled(item.second);
for (const auto& item : statistics_enabled_)
get(item.first).set_statistics_enabled(item.second);
+ for (const auto& item : page_index_enabled_)
+ get(item.first).set_page_index_enabled(item.second);
return std::shared_ptr<WriterProperties>(new WriterProperties(
pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_,
pagesize_, version_, created_by_, page_checksum_enabled_,
std::move(file_encryption_properties_), default_column_properties_,
- column_properties, data_page_version_, store_decimal_as_integer_,
- write_page_index_));
+ column_properties, data_page_version_, store_decimal_as_integer_));
}
private:
@@ -564,7 +592,6 @@ class PARQUET_EXPORT WriterProperties {
std::string created_by_;
bool store_decimal_as_integer_;
bool page_checksum_enabled_;
- bool write_page_index_;
std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
@@ -575,6 +602,7 @@ class PARQUET_EXPORT WriterProperties {
std::unordered_map<std::string, int32_t> codecs_compression_level_;
std::unordered_map<std::string, bool> dictionary_enabled_;
std::unordered_map<std::string, bool> statistics_enabled_;
+ std::unordered_map<std::string, bool> page_index_enabled_;
};
inline MemoryPool* memory_pool() const { return pool_; }
@@ -599,8 +627,6 @@ class PARQUET_EXPORT WriterProperties {
inline bool page_checksum_enabled() const { return page_checksum_enabled_; }
- inline bool write_page_index() const { return write_page_index_; }
-
inline Encoding::type dictionary_index_encoding() const {
if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
return Encoding::PLAIN_DICTIONARY;
@@ -648,6 +674,22 @@ class PARQUET_EXPORT WriterProperties {
return column_properties(path).max_statistics_size();
}
+ bool page_index_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
+ return column_properties(path).page_index_enabled();
+ }
+
+ bool page_index_enabled() const {
+ if (default_column_properties_.page_index_enabled()) {
+ return true;
+ }
+ for (const auto& item : column_properties_) {
+ if (item.second.page_index_enabled()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
inline FileEncryptionProperties* file_encryption_properties() const {
return file_encryption_properties_.get();
}
@@ -669,8 +711,7 @@ class PARQUET_EXPORT WriterProperties {
std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
const ColumnProperties& default_column_properties,
const std::unordered_map<std::string, ColumnProperties>& column_properties,
- ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer,
- bool write_page_index)
+ ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer)
: pool_(pool),
dictionary_pagesize_limit_(dictionary_pagesize_limit),
write_batch_size_(write_batch_size),
@@ -681,7 +722,6 @@ class PARQUET_EXPORT WriterProperties {
parquet_created_by_(created_by),
store_decimal_as_integer_(store_short_decimal_as_integer),
page_checksum_enabled_(page_write_checksum_enabled),
- write_page_index_(write_page_index),
file_encryption_properties_(file_encryption_properties),
default_column_properties_(default_column_properties),
column_properties_(column_properties) {}
@@ -696,7 +736,6 @@ class PARQUET_EXPORT WriterProperties {
std::string parquet_created_by_;
bool store_decimal_as_integer_;
bool page_checksum_enabled_;
- bool write_page_index_;
std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;