You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by wj...@apache.org on 2023/05/01 14:49:27 UTC

[arrow] branch main updated: GH-35331: [C++][Parquet] Parquet Export Footer metadata SortColumns (#35351)

This is an automated email from the ASF dual-hosted git repository.

wjones127 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new da6dbd4860 GH-35331: [C++][Parquet] Parquet Export Footer metadata SortColumns (#35351)
da6dbd4860 is described below

commit da6dbd48607089d716505054176e345b704570c5
Author: mwish <ma...@gmail.com>
AuthorDate: Mon May 1 22:49:17 2023 +0800

    GH-35331: [C++][Parquet] Parquet Export Footer metadata SortColumns (#35351)
    
    
    
    ### Rationale for this change
    
    Allow read/set SortColumns in C++ parquet. Node that currently we didn't check sort columns, so user should ensure
    that records don't violates the order
    
    ### What changes are included in this PR?
    
    For RowGroupMetadata, add a SortColumns interface
    
    ### Are these changes tested?
    
    * [x] tests
    
    ### Are there any user-facing changes?
    
    User can read sort columns in the future
    
    * Closes: #35331
    
    Authored-by: mwish <ma...@gmail.com>
    Signed-off-by: Will Jones <wi...@gmail.com>
---
 cpp/src/parquet/metadata.cc       | 25 +++++++++++++++++++++++
 cpp/src/parquet/metadata.h        |  2 ++
 cpp/src/parquet/metadata_test.cc  | 43 +++++++++++++++++++++++++++++++++++++++
 cpp/src/parquet/properties.h      | 25 +++++++++++++++++++++--
 cpp/src/parquet/thrift_internal.h | 16 +++++++++++++++
 cpp/src/parquet/types.h           | 21 +++++++++++++++++++
 6 files changed, 130 insertions(+), 2 deletions(-)

diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index ddb4fb143d..0bbd965807 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -504,6 +504,18 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
                            " columns, requested metadata for column: ", i);
   }
 
+  std::vector<SortingColumn> sorting_columns() const {
+    std::vector<SortingColumn> sorting_columns;
+    if (!row_group_->__isset.sorting_columns) {
+      return sorting_columns;
+    }
+    sorting_columns.resize(row_group_->sorting_columns.size());
+    for (size_t i = 0; i < sorting_columns.size(); ++i) {
+      sorting_columns[i] = FromThrift(row_group_->sorting_columns[i]);
+    }
+    return sorting_columns;
+  }
+
  private:
   const format::RowGroup* row_group_;
   const SchemaDescriptor* schema_;
@@ -571,6 +583,10 @@ bool RowGroupMetaData::can_decompress() const {
   return true;
 }
 
+std::vector<SortingColumn> RowGroupMetaData::sorting_columns() const {
+  return impl_->sorting_columns();
+}
+
 // file metadata
 class FileMetaData::FileMetaDataImpl {
  public:
@@ -1684,6 +1700,15 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
       total_compressed_size += column_builders_[i]->total_compressed_size();
     }
 
+    const auto& sorting_columns = properties_->sorting_columns();
+    if (!sorting_columns.empty()) {
+      std::vector<format::SortingColumn> thrift_sorting_columns(sorting_columns.size());
+      for (size_t i = 0; i < sorting_columns.size(); ++i) {
+        thrift_sorting_columns[i] = ToThrift(sorting_columns[i]);
+      }
+      row_group_->__set_sorting_columns(std::move(thrift_sorting_columns));
+    }
+
     row_group_->__set_file_offset(file_offset);
     row_group_->__set_total_compressed_size(total_compressed_size);
     row_group_->__set_total_byte_size(total_bytes_written);
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index 620bc842f9..e62b2d187a 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -251,6 +251,8 @@ class PARQUET_EXPORT RowGroupMetaData {
   const SchemaDescriptor* schema() const;
   // Indicate if all of the RowGroup's ColumnChunks can be decompressed.
   bool can_decompress() const;
+  // Sorting columns of the row group if any.
+  std::vector<SortingColumn> sorting_columns() const;
 
  private:
   explicit RowGroupMetaData(
diff --git a/cpp/src/parquet/metadata_test.cc b/cpp/src/parquet/metadata_test.cc
index 4375661f5c..fc4dfb3ac8 100644
--- a/cpp/src/parquet/metadata_test.cc
+++ b/cpp/src/parquet/metadata_test.cc
@@ -394,6 +394,49 @@ TEST(Metadata, TestReadPageIndex) {
   }
 }
 
+TEST(Metadata, TestSortingColumns) {
+  schema::NodeVector fields;
+  fields.push_back(schema::Int32("sort_col", Repetition::REQUIRED));
+  fields.push_back(schema::Int32("int_col", Repetition::REQUIRED));
+
+  auto schema = std::static_pointer_cast<schema::GroupNode>(
+      schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
+
+  std::vector<SortingColumn> sorting_columns;
+  {
+    SortingColumn sorting_column;
+    sorting_column.column_idx = 0;
+    sorting_column.descending = false;
+    sorting_column.nulls_first = false;
+    sorting_columns.push_back(sorting_column);
+  }
+
+  auto sink = CreateOutputStream();
+  auto writer_props = parquet::WriterProperties::Builder()
+                          .disable_dictionary()
+                          ->set_sorting_columns(sorting_columns)
+                          ->build();
+
+  EXPECT_EQ(sorting_columns, writer_props->sorting_columns());
+
+  auto file_writer = parquet::ParquetFileWriter::Open(sink, schema, writer_props);
+
+  auto row_group_writer = file_writer->AppendBufferedRowGroup();
+  row_group_writer->Close();
+  file_writer->Close();
+
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
+  auto file_reader = ParquetFileReader::Open(source);
+
+  ASSERT_NE(nullptr, file_reader->metadata());
+  ASSERT_EQ(1, file_reader->metadata()->num_row_groups());
+  auto row_group_reader = file_reader->RowGroup(0);
+  auto* row_group_read_metadata = row_group_reader->metadata();
+  ASSERT_NE(nullptr, row_group_read_metadata);
+  EXPECT_EQ(sorting_columns, row_group_read_metadata->sorting_columns());
+}
+
 TEST(ApplicationVersion, Basics) {
   ApplicationVersion version("parquet-mr version 1.7.9");
   ApplicationVersion version1("parquet-mr version 1.8.0");
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index f38dd17482..4c532235ca 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -466,6 +466,17 @@ class PARQUET_EXPORT WriterProperties {
       return this->enable_statistics(path->ToDotString());
     }
 
+    /// Define the sorting columns.
+    /// Default empty.
+    ///
+    /// If sorting columns are set, user should ensure that records
+    /// are sorted by sorting columns. Otherwise, the storing data
+    /// will be inconsistent with sorting_columns metadata.
+    Builder* set_sorting_columns(std::vector<SortingColumn> sorting_columns) {
+      sorting_columns_ = std::move(sorting_columns);
+      return this;
+    }
+
     /// Disable statistics for the column specified by `path`.
     /// Default enabled.
     Builder* disable_statistics(const std::string& path) {
@@ -578,7 +589,8 @@ class PARQUET_EXPORT WriterProperties {
           pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_,
           pagesize_, version_, created_by_, page_checksum_enabled_,
           std::move(file_encryption_properties_), default_column_properties_,
-          column_properties, data_page_version_, store_decimal_as_integer_));
+          column_properties, data_page_version_, store_decimal_as_integer_,
+          std::move(sorting_columns_)));
     }
 
    private:
@@ -595,6 +607,9 @@ class PARQUET_EXPORT WriterProperties {
 
     std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
 
+    // If empty, there is no sorting columns.
+    std::vector<SortingColumn> sorting_columns_;
+
     // Settings used for each column unless overridden in any of the maps below
     ColumnProperties default_column_properties_;
     std::unordered_map<std::string, Encoding::type> encodings_;
@@ -666,6 +681,8 @@ class PARQUET_EXPORT WriterProperties {
     return column_properties(path).dictionary_enabled();
   }
 
+  const std::vector<SortingColumn>& sorting_columns() const { return sorting_columns_; }
+
   bool statistics_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
     return column_properties(path).statistics_enabled();
   }
@@ -711,7 +728,8 @@ class PARQUET_EXPORT WriterProperties {
       std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
       const ColumnProperties& default_column_properties,
       const std::unordered_map<std::string, ColumnProperties>& column_properties,
-      ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer)
+      ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer,
+      std::vector<SortingColumn> sorting_columns)
       : pool_(pool),
         dictionary_pagesize_limit_(dictionary_pagesize_limit),
         write_batch_size_(write_batch_size),
@@ -723,6 +741,7 @@ class PARQUET_EXPORT WriterProperties {
         store_decimal_as_integer_(store_short_decimal_as_integer),
         page_checksum_enabled_(page_write_checksum_enabled),
         file_encryption_properties_(file_encryption_properties),
+        sorting_columns_(std::move(sorting_columns)),
         default_column_properties_(default_column_properties),
         column_properties_(column_properties) {}
 
@@ -739,6 +758,8 @@ class PARQUET_EXPORT WriterProperties {
 
   std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
 
+  std::vector<SortingColumn> sorting_columns_;
+
   ColumnProperties default_column_properties_;
   std::unordered_map<std::string, ColumnProperties> column_properties_;
 };
diff --git a/cpp/src/parquet/thrift_internal.h b/cpp/src/parquet/thrift_internal.h
index 56e2a67c8a..0df5c9b937 100644
--- a/cpp/src/parquet/thrift_internal.h
+++ b/cpp/src/parquet/thrift_internal.h
@@ -246,6 +246,14 @@ static inline EncryptionAlgorithm FromThrift(format::EncryptionAlgorithm encrypt
   return encryption_algorithm;
 }
 
+static inline SortingColumn FromThrift(format::SortingColumn thrift_sorting_column) {
+  SortingColumn sorting_column;
+  sorting_column.column_idx = thrift_sorting_column.column_idx;
+  sorting_column.nulls_first = thrift_sorting_column.nulls_first;
+  sorting_column.descending = thrift_sorting_column.descending;
+  return sorting_column;
+}
+
 // ----------------------------------------------------------------------
 // Convert Thrift enums from Parquet enums
 
@@ -307,6 +315,14 @@ static inline format::BoundaryOrder::type ToThrift(BoundaryOrder::type type) {
   }
 }
 
+static inline format::SortingColumn ToThrift(SortingColumn sorting_column) {
+  format::SortingColumn thrift_sorting_column;
+  thrift_sorting_column.column_idx = sorting_column.column_idx;
+  thrift_sorting_column.descending = sorting_column.descending;
+  thrift_sorting_column.nulls_first = sorting_column.nulls_first;
+  return thrift_sorting_column;
+}
+
 static inline format::Statistics ToThrift(const EncodedStatistics& stats) {
   format::Statistics statistics;
   if (stats.has_min) {
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index 6ec6870d3a..c5445facc3 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -543,6 +543,27 @@ struct BoundaryOrder {
   };
 };
 
+/// \brief SortingColumn is a proxy around format::SortingColumn.
+struct PARQUET_EXPORT SortingColumn {
+  // The column index (in this row group)
+  int32_t column_idx;
+
+  // If true, indicates this column is sorted in descending order.
+  bool descending;
+
+  // If true, nulls will come before non-null values, otherwise, nulls go at the end.
+  bool nulls_first;
+};
+
+inline bool operator==(const SortingColumn& left, const SortingColumn& right) {
+  return left.nulls_first == right.nulls_first && left.descending == right.descending &&
+         left.column_idx == right.column_idx;
+}
+
+inline bool operator!=(const SortingColumn& left, const SortingColumn& right) {
+  return !(left == right);
+}
+
 // ----------------------------------------------------------------------
 
 struct ByteArray {