You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by wj...@apache.org on 2023/05/01 14:49:27 UTC
[arrow] branch main updated: GH-35331: [C++][Parquet] Parquet Export Footer metadata SortColumns (#35351)
This is an automated email from the ASF dual-hosted git repository.
wjones127 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new da6dbd4860 GH-35331: [C++][Parquet] Parquet Export Footer metadata SortColumns (#35351)
da6dbd4860 is described below
commit da6dbd48607089d716505054176e345b704570c5
Author: mwish <ma...@gmail.com>
AuthorDate: Mon May 1 22:49:17 2023 +0800
GH-35331: [C++][Parquet] Parquet Export Footer metadata SortColumns (#35351)
### Rationale for this change
Allow read/set SortColumns in C++ parquet. Node that currently we didn't check sort columns, so user should ensure
that records don't violates the order
### What changes are included in this PR?
For RowGroupMetadata, add a SortColumns interface
### Are these changes tested?
* [x] tests
### Are there any user-facing changes?
User can read sort columns in the future
* Closes: #35331
Authored-by: mwish <ma...@gmail.com>
Signed-off-by: Will Jones <wi...@gmail.com>
---
cpp/src/parquet/metadata.cc | 25 +++++++++++++++++++++++
cpp/src/parquet/metadata.h | 2 ++
cpp/src/parquet/metadata_test.cc | 43 +++++++++++++++++++++++++++++++++++++++
cpp/src/parquet/properties.h | 25 +++++++++++++++++++++--
cpp/src/parquet/thrift_internal.h | 16 +++++++++++++++
cpp/src/parquet/types.h | 21 +++++++++++++++++++
6 files changed, 130 insertions(+), 2 deletions(-)
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index ddb4fb143d..0bbd965807 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -504,6 +504,18 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
" columns, requested metadata for column: ", i);
}
+ std::vector<SortingColumn> sorting_columns() const {
+ std::vector<SortingColumn> sorting_columns;
+ if (!row_group_->__isset.sorting_columns) {
+ return sorting_columns;
+ }
+ sorting_columns.resize(row_group_->sorting_columns.size());
+ for (size_t i = 0; i < sorting_columns.size(); ++i) {
+ sorting_columns[i] = FromThrift(row_group_->sorting_columns[i]);
+ }
+ return sorting_columns;
+ }
+
private:
const format::RowGroup* row_group_;
const SchemaDescriptor* schema_;
@@ -571,6 +583,10 @@ bool RowGroupMetaData::can_decompress() const {
return true;
}
+std::vector<SortingColumn> RowGroupMetaData::sorting_columns() const {
+ return impl_->sorting_columns();
+}
+
// file metadata
class FileMetaData::FileMetaDataImpl {
public:
@@ -1684,6 +1700,15 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
total_compressed_size += column_builders_[i]->total_compressed_size();
}
+ const auto& sorting_columns = properties_->sorting_columns();
+ if (!sorting_columns.empty()) {
+ std::vector<format::SortingColumn> thrift_sorting_columns(sorting_columns.size());
+ for (size_t i = 0; i < sorting_columns.size(); ++i) {
+ thrift_sorting_columns[i] = ToThrift(sorting_columns[i]);
+ }
+ row_group_->__set_sorting_columns(std::move(thrift_sorting_columns));
+ }
+
row_group_->__set_file_offset(file_offset);
row_group_->__set_total_compressed_size(total_compressed_size);
row_group_->__set_total_byte_size(total_bytes_written);
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index 620bc842f9..e62b2d187a 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -251,6 +251,8 @@ class PARQUET_EXPORT RowGroupMetaData {
const SchemaDescriptor* schema() const;
// Indicate if all of the RowGroup's ColumnChunks can be decompressed.
bool can_decompress() const;
+ // Sorting columns of the row group if any.
+ std::vector<SortingColumn> sorting_columns() const;
private:
explicit RowGroupMetaData(
diff --git a/cpp/src/parquet/metadata_test.cc b/cpp/src/parquet/metadata_test.cc
index 4375661f5c..fc4dfb3ac8 100644
--- a/cpp/src/parquet/metadata_test.cc
+++ b/cpp/src/parquet/metadata_test.cc
@@ -394,6 +394,49 @@ TEST(Metadata, TestReadPageIndex) {
}
}
+TEST(Metadata, TestSortingColumns) {
+ schema::NodeVector fields;
+ fields.push_back(schema::Int32("sort_col", Repetition::REQUIRED));
+ fields.push_back(schema::Int32("int_col", Repetition::REQUIRED));
+
+ auto schema = std::static_pointer_cast<schema::GroupNode>(
+ schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
+
+ std::vector<SortingColumn> sorting_columns;
+ {
+ SortingColumn sorting_column;
+ sorting_column.column_idx = 0;
+ sorting_column.descending = false;
+ sorting_column.nulls_first = false;
+ sorting_columns.push_back(sorting_column);
+ }
+
+ auto sink = CreateOutputStream();
+ auto writer_props = parquet::WriterProperties::Builder()
+ .disable_dictionary()
+ ->set_sorting_columns(sorting_columns)
+ ->build();
+
+ EXPECT_EQ(sorting_columns, writer_props->sorting_columns());
+
+ auto file_writer = parquet::ParquetFileWriter::Open(sink, schema, writer_props);
+
+ auto row_group_writer = file_writer->AppendBufferedRowGroup();
+ row_group_writer->Close();
+ file_writer->Close();
+
+ PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+ auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
+ auto file_reader = ParquetFileReader::Open(source);
+
+ ASSERT_NE(nullptr, file_reader->metadata());
+ ASSERT_EQ(1, file_reader->metadata()->num_row_groups());
+ auto row_group_reader = file_reader->RowGroup(0);
+ auto* row_group_read_metadata = row_group_reader->metadata();
+ ASSERT_NE(nullptr, row_group_read_metadata);
+ EXPECT_EQ(sorting_columns, row_group_read_metadata->sorting_columns());
+}
+
TEST(ApplicationVersion, Basics) {
ApplicationVersion version("parquet-mr version 1.7.9");
ApplicationVersion version1("parquet-mr version 1.8.0");
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index f38dd17482..4c532235ca 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -466,6 +466,17 @@ class PARQUET_EXPORT WriterProperties {
return this->enable_statistics(path->ToDotString());
}
+ /// Define the sorting columns.
+ /// Default empty.
+ ///
+ /// If sorting columns are set, user should ensure that records
+ /// are sorted by sorting columns. Otherwise, the storing data
+ /// will be inconsistent with sorting_columns metadata.
+ Builder* set_sorting_columns(std::vector<SortingColumn> sorting_columns) {
+ sorting_columns_ = std::move(sorting_columns);
+ return this;
+ }
+
/// Disable statistics for the column specified by `path`.
/// Default enabled.
Builder* disable_statistics(const std::string& path) {
@@ -578,7 +589,8 @@ class PARQUET_EXPORT WriterProperties {
pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_,
pagesize_, version_, created_by_, page_checksum_enabled_,
std::move(file_encryption_properties_), default_column_properties_,
- column_properties, data_page_version_, store_decimal_as_integer_));
+ column_properties, data_page_version_, store_decimal_as_integer_,
+ std::move(sorting_columns_)));
}
private:
@@ -595,6 +607,9 @@ class PARQUET_EXPORT WriterProperties {
std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
+ // If empty, there is no sorting columns.
+ std::vector<SortingColumn> sorting_columns_;
+
// Settings used for each column unless overridden in any of the maps below
ColumnProperties default_column_properties_;
std::unordered_map<std::string, Encoding::type> encodings_;
@@ -666,6 +681,8 @@ class PARQUET_EXPORT WriterProperties {
return column_properties(path).dictionary_enabled();
}
+ const std::vector<SortingColumn>& sorting_columns() const { return sorting_columns_; }
+
bool statistics_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).statistics_enabled();
}
@@ -711,7 +728,8 @@ class PARQUET_EXPORT WriterProperties {
std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
const ColumnProperties& default_column_properties,
const std::unordered_map<std::string, ColumnProperties>& column_properties,
- ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer)
+ ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer,
+ std::vector<SortingColumn> sorting_columns)
: pool_(pool),
dictionary_pagesize_limit_(dictionary_pagesize_limit),
write_batch_size_(write_batch_size),
@@ -723,6 +741,7 @@ class PARQUET_EXPORT WriterProperties {
store_decimal_as_integer_(store_short_decimal_as_integer),
page_checksum_enabled_(page_write_checksum_enabled),
file_encryption_properties_(file_encryption_properties),
+ sorting_columns_(std::move(sorting_columns)),
default_column_properties_(default_column_properties),
column_properties_(column_properties) {}
@@ -739,6 +758,8 @@ class PARQUET_EXPORT WriterProperties {
std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
+ std::vector<SortingColumn> sorting_columns_;
+
ColumnProperties default_column_properties_;
std::unordered_map<std::string, ColumnProperties> column_properties_;
};
diff --git a/cpp/src/parquet/thrift_internal.h b/cpp/src/parquet/thrift_internal.h
index 56e2a67c8a..0df5c9b937 100644
--- a/cpp/src/parquet/thrift_internal.h
+++ b/cpp/src/parquet/thrift_internal.h
@@ -246,6 +246,14 @@ static inline EncryptionAlgorithm FromThrift(format::EncryptionAlgorithm encrypt
return encryption_algorithm;
}
+static inline SortingColumn FromThrift(format::SortingColumn thrift_sorting_column) {
+ SortingColumn sorting_column;
+ sorting_column.column_idx = thrift_sorting_column.column_idx;
+ sorting_column.nulls_first = thrift_sorting_column.nulls_first;
+ sorting_column.descending = thrift_sorting_column.descending;
+ return sorting_column;
+}
+
// ----------------------------------------------------------------------
// Convert Thrift enums from Parquet enums
@@ -307,6 +315,14 @@ static inline format::BoundaryOrder::type ToThrift(BoundaryOrder::type type) {
}
}
+static inline format::SortingColumn ToThrift(SortingColumn sorting_column) {
+ format::SortingColumn thrift_sorting_column;
+ thrift_sorting_column.column_idx = sorting_column.column_idx;
+ thrift_sorting_column.descending = sorting_column.descending;
+ thrift_sorting_column.nulls_first = sorting_column.nulls_first;
+ return thrift_sorting_column;
+}
+
static inline format::Statistics ToThrift(const EncodedStatistics& stats) {
format::Statistics statistics;
if (stats.has_min) {
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index 6ec6870d3a..c5445facc3 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -543,6 +543,27 @@ struct BoundaryOrder {
};
};
+/// \brief SortingColumn is a proxy around format::SortingColumn.
+struct PARQUET_EXPORT SortingColumn {
+ // The column index (in this row group)
+ int32_t column_idx;
+
+ // If true, indicates this column is sorted in descending order.
+ bool descending;
+
+ // If true, nulls will come before non-null values, otherwise, nulls go at the end.
+ bool nulls_first;
+};
+
+inline bool operator==(const SortingColumn& left, const SortingColumn& right) {
+ return left.nulls_first == right.nulls_first && left.descending == right.descending &&
+ left.column_idx == right.column_idx;
+}
+
+inline bool operator!=(const SortingColumn& left, const SortingColumn& right) {
+ return !(left == right);
+}
+
// ----------------------------------------------------------------------
struct ByteArray {