You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ra...@apache.org on 2023/04/17 12:58:35 UTC

[arrow] 01/11: GH-34888: [C++][Parquet] Writer supports adding extra kv meta (#34889)

This is an automated email from the ASF dual-hosted git repository.

raulcd pushed a commit to branch maint-12.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 6dd95ee81b0d292b504590e0328dc205f14eb1ec
Author: Gang Wu <us...@gmail.com>
AuthorDate: Thu Apr 13 08:41:56 2023 +0800

    GH-34888: [C++][Parquet] Writer supports adding extra kv meta (#34889)
    
    ### Rationale for this change
    
    Parquet specs support storing key-value metadata provided by the user. However, the parquet-cpp writer can only set it via ParquetFileWriter::Open(). Sometimes user may want to add extra information to it while writing. So it is good to support adding extra key-value metadata any time before closing the file writer.
    
    ### What changes are included in this PR?
    
    Add a new interface `void AddKeyValueMetadata(std::shared_ptr<const KeyValueMetadata> key_value_metadata)` to the `ParquetFileWriter` class. User can now add more key-value metadata to the file if not closed.
    
    ### Are these changes tested?
    
    Added a new `Metadata.TestAddKeyValueMetadata` test to verify key-value metadata added before closing the writer are well preserved.
    
    ### Are there any user-facing changes?
    
    Yes, user can add custom key-value metadata whenever writer is not closed.
    * Closes: #34888
    
    Lead-authored-by: Gang Wu <us...@gmail.com>
    Co-authored-by: Will Jones <wi...@gmail.com>
    Signed-off-by: Will Jones <wi...@gmail.com>
---
 cpp/src/parquet/file_writer.cc   | 29 +++++++++++++++++++----
 cpp/src/parquet/file_writer.h    | 10 ++++++++
 cpp/src/parquet/metadata.cc      | 24 +++++++++++++++----
 cpp/src/parquet/metadata.h       | 11 ++++++---
 cpp/src/parquet/metadata_test.cc | 51 ++++++++++++++++++++++++++++++++++++++--
 5 files changed, 110 insertions(+), 15 deletions(-)

diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index 481d5b6d30..f1098e4a74 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -17,13 +17,14 @@
 
 #include "parquet/file_writer.h"
 
-#include <cstddef>
 #include <memory>
 #include <ostream>
 #include <string>
 #include <utility>
 #include <vector>
 
+#include "arrow/util/key_value_metadata.h"
+#include "arrow/util/logging.h"
 #include "parquet/column_writer.h"
 #include "parquet/encryption/encryption_internal.h"
 #include "parquet/encryption/internal_file_encryptor.h"
@@ -338,7 +339,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
       auto file_encryption_properties = properties_->file_encryption_properties();
 
       if (file_encryption_properties == nullptr) {  // Non encrypted file.
-        file_metadata_ = metadata_->Finish();
+        file_metadata_ = metadata_->Finish(key_value_metadata_);
         WriteFileMetaData(*file_metadata_, sink_.get());
       } else {  // Encrypted file
         CloseEncryptedFile(file_encryption_properties);
@@ -376,6 +377,15 @@ class FileSerializer : public ParquetFileWriter::Contents {
 
   RowGroupWriter* AppendBufferedRowGroup() override { return AppendRowGroup(true); }
 
+  void AddKeyValueMetadata(
+      const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) override {
+    if (key_value_metadata_ == nullptr) {
+      key_value_metadata_ = std::move(key_value_metadata);
+    } else if (key_value_metadata != nullptr) {
+      key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
+    }
+  }
+
   ~FileSerializer() override {
     try {
       Close();
@@ -394,7 +404,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
         properties_(std::move(properties)),
         num_row_groups_(0),
         num_rows_(0),
-        metadata_(FileMetaDataBuilder::Make(&schema_, properties_, key_value_metadata_)) {
+        metadata_(FileMetaDataBuilder::Make(&schema_, properties_)) {
     PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell());
     if (position == 0) {
       StartFile();
@@ -407,7 +417,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
     // Encrypted file with encrypted footer
     if (file_encryption_properties->encrypted_footer()) {
       // encrypted footer
-      file_metadata_ = metadata_->Finish();
+      file_metadata_ = metadata_->Finish(key_value_metadata_);
 
       PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell());
       uint64_t metadata_start = static_cast<uint64_t>(position);
@@ -422,7 +432,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
           sink_->Write(reinterpret_cast<uint8_t*>(&footer_and_crypto_len), 4));
       PARQUET_THROW_NOT_OK(sink_->Write(kParquetEMagic, 4));
     } else {  // Encrypted file with plaintext footer
-      file_metadata_ = metadata_->Finish();
+      file_metadata_ = metadata_->Finish(key_value_metadata_);
       auto footer_signing_encryptor = file_encryptor_->GetFooterSigningEncryptor();
       WriteEncryptedFileMetadata(*file_metadata_, sink_.get(), footer_signing_encryptor,
                                  false);
@@ -613,6 +623,15 @@ RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
   return AppendRowGroup();
 }
 
+void ParquetFileWriter::AddKeyValueMetadata(
+    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+  if (contents_) {
+    contents_->AddKeyValueMetadata(key_value_metadata);
+  } else {
+    throw ParquetException("Cannot add key-value metadata to closed file");
+  }
+}
+
 const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
   return contents_->properties();
 }
diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h
index f2888599fb..3bda1e535c 100644
--- a/cpp/src/parquet/file_writer.h
+++ b/cpp/src/parquet/file_writer.h
@@ -163,6 +163,9 @@ class PARQUET_EXPORT ParquetFileWriter {
       return key_value_metadata_;
     }
 
+    virtual void AddKeyValueMetadata(
+        const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) = 0;
+
     // Return const-pointer to make it clear that this object is not to be copied
     const SchemaDescriptor* schema() const { return &schema_; }
 
@@ -209,6 +212,13 @@ class PARQUET_EXPORT ParquetFileWriter {
   /// until the next call to AppendRowGroup or AppendBufferedRowGroup or Close.
   RowGroupWriter* AppendBufferedRowGroup();
 
+  /// \brief Add key-value metadata to the file.
+  /// \param[in] key_value_metadata the metadata to add.
+  /// \note This will overwrite any existing metadata with the same key.
+  /// \throw ParquetException if Close() has been called.
+  void AddKeyValueMetadata(
+      const std::shared_ptr<const KeyValueMetadata>& key_value_metadata);
+
   /// Number of columns.
   ///
   /// This number is fixed during the lifetime of the writer as it is determined via
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index b6c240115f..ddb4fb143d 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -1740,7 +1740,6 @@ void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written,
 }
 
 // file metadata
-// TODO(PARQUET-595) Support key_value_metadata
 class FileMetaDataBuilder::FileMetaDataBuilderImpl {
  public:
   explicit FileMetaDataBuilderImpl(
@@ -1797,7 +1796,8 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
     }
   }
 
-  std::unique_ptr<FileMetaData> Finish() {
+  std::unique_ptr<FileMetaData> Finish(
+      const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
     int64_t total_rows = 0;
     for (auto row_group : row_groups_) {
       total_rows += row_group.num_rows;
@@ -1805,7 +1805,12 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
     metadata_->__set_num_rows(total_rows);
     metadata_->__set_row_groups(row_groups_);
 
-    if (key_value_metadata_) {
+    if (key_value_metadata_ || key_value_metadata) {
+      if (!key_value_metadata_) {
+        key_value_metadata_ = key_value_metadata;
+      } else if (key_value_metadata) {
+        key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
+      }
       metadata_->key_value_metadata.clear();
       metadata_->key_value_metadata.reserve(key_value_metadata_->size());
       for (int64_t i = 0; i < key_value_metadata_->size(); ++i) {
@@ -1829,7 +1834,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
     metadata_->__set_version(file_version);
     metadata_->__set_created_by(properties_->created_by());
 
-    // Users cannot set the `ColumnOrder` since we donot not have user defined sort order
+    // Users cannot set the `ColumnOrder` since we do not have user defined sort order
     // in the spec yet.
     // We always default to `TYPE_DEFINED_ORDER`. We can expose it in
     // the API once we have user defined sort orders in the Parquet format.
@@ -1910,6 +1915,12 @@ std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make(
       new FileMetaDataBuilder(schema, std::move(props), std::move(key_value_metadata)));
 }
 
+std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make(
+    const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props) {
+  return std::unique_ptr<FileMetaDataBuilder>(
+      new FileMetaDataBuilder(schema, std::move(props)));
+}
+
 FileMetaDataBuilder::FileMetaDataBuilder(
     const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props,
     std::shared_ptr<const KeyValueMetadata> key_value_metadata)
@@ -1926,7 +1937,10 @@ void FileMetaDataBuilder::SetPageIndexLocation(const PageIndexLocation& location
   impl_->SetPageIndexLocation(location);
 }
 
-std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() { return impl_->Finish(); }
+std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish(
+    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+  return impl_->Finish(key_value_metadata);
+}
 
 std::unique_ptr<FileCryptoMetaData> FileMetaDataBuilder::GetCryptoMetaData() {
   return impl_->BuildFileCryptoMetaData();
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index efcb17be04..620bc842f9 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -523,10 +523,14 @@ struct PageIndexLocation {
 
 class PARQUET_EXPORT FileMetaDataBuilder {
  public:
-  // API convenience to get a MetaData reader
+  ARROW_DEPRECATED("Deprecated in 12.0.0. Use overload without KeyValueMetadata instead.")
   static std::unique_ptr<FileMetaDataBuilder> Make(
       const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props,
-      std::shared_ptr<const KeyValueMetadata> key_value_metadata = NULLPTR);
+      std::shared_ptr<const KeyValueMetadata> key_value_metadata);
+
+  // API convenience to get a MetaData builder
+  static std::unique_ptr<FileMetaDataBuilder> Make(
+      const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props);
 
   ~FileMetaDataBuilder();
 
@@ -537,7 +541,8 @@ class PARQUET_EXPORT FileMetaDataBuilder {
   void SetPageIndexLocation(const PageIndexLocation& location);
 
   // Complete the Thrift structure
-  std::unique_ptr<FileMetaData> Finish();
+  std::unique_ptr<FileMetaData> Finish(
+      const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = NULLPTR);
 
   // crypto metadata
   std::unique_ptr<FileCryptoMetaData> GetCryptoMetaData();
diff --git a/cpp/src/parquet/metadata_test.cc b/cpp/src/parquet/metadata_test.cc
index a7c7b7b00e..4375661f5c 100644
--- a/cpp/src/parquet/metadata_test.cc
+++ b/cpp/src/parquet/metadata_test.cc
@@ -21,6 +21,7 @@
 
 #include "arrow/util/key_value_metadata.h"
 #include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
 #include "parquet/schema.h"
 #include "parquet/statistics.h"
 #include "parquet/test_util.h"
@@ -284,16 +285,62 @@ TEST(Metadata, TestKeyValueMetadata) {
   auto kvmeta = std::make_shared<KeyValueMetadata>();
   kvmeta->Append("test_key", "test_value");
 
-  auto f_builder = FileMetaDataBuilder::Make(&schema, props, kvmeta);
+  auto f_builder = FileMetaDataBuilder::Make(&schema, props);
 
   // Read the metadata
-  auto f_accessor = f_builder->Finish();
+  auto f_accessor = f_builder->Finish(kvmeta);
 
   // Key value metadata
   ASSERT_TRUE(f_accessor->key_value_metadata());
   EXPECT_TRUE(f_accessor->key_value_metadata()->Equals(*kvmeta));
 }
 
+TEST(Metadata, TestAddKeyValueMetadata) {
+  schema::NodeVector fields;
+  fields.push_back(schema::Int32("int_col", Repetition::REQUIRED));
+  auto schema = std::static_pointer_cast<schema::GroupNode>(
+      schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
+
+  auto kv_meta = std::make_shared<KeyValueMetadata>();
+  kv_meta->Append("test_key_1", "test_value_1");
+  kv_meta->Append("test_key_2", "test_value_2_");
+
+  auto sink = CreateOutputStream();
+  auto writer_props = parquet::WriterProperties::Builder().disable_dictionary()->build();
+  auto file_writer =
+      parquet::ParquetFileWriter::Open(sink, schema, writer_props, kv_meta);
+
+  // Key value metadata that will be added to the file.
+  auto kv_meta_added = std::make_shared<KeyValueMetadata>();
+  kv_meta_added->Append("test_key_2", "test_value_2");
+  kv_meta_added->Append("test_key_3", "test_value_3");
+
+  file_writer->AddKeyValueMetadata(kv_meta_added);
+  file_writer->Close();
+
+  // Throw if appending key value metadata to closed file.
+  auto kv_meta_ignored = std::make_shared<KeyValueMetadata>();
+  kv_meta_ignored->Append("test_key_4", "test_value_4");
+  EXPECT_THROW(file_writer->AddKeyValueMetadata(kv_meta_ignored), ParquetException);
+
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
+  auto file_reader = ParquetFileReader::Open(source);
+
+  ASSERT_NE(nullptr, file_reader->metadata());
+  ASSERT_NE(nullptr, file_reader->metadata()->key_value_metadata());
+  auto read_kv_meta = file_reader->metadata()->key_value_metadata();
+
+  // Verify keys that were added before file writer was closed are present.
+  for (int i = 1; i <= 3; ++i) {
+    auto index = std::to_string(i);
+    PARQUET_ASSIGN_OR_THROW(auto value, read_kv_meta->Get("test_key_" + index));
+    EXPECT_EQ("test_value_" + index, value);
+  }
+  // Verify keys that were added after file writer was closed are not present.
+  EXPECT_FALSE(read_kv_meta->Contains("test_key_4"));
+}
+
 TEST(Metadata, TestHasBloomFilter) {
   std::string dir_string(parquet::test::get_data_dir());
   std::string path = dir_string + "/data_index_bloom_encoding_stats.parquet";