You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ra...@apache.org on 2023/04/17 12:58:35 UTC
[arrow] 01/11: GH-34888: [C++][Parquet] Writer supports adding extra kv meta (#34889)
This is an automated email from the ASF dual-hosted git repository.
raulcd pushed a commit to branch maint-12.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 6dd95ee81b0d292b504590e0328dc205f14eb1ec
Author: Gang Wu <us...@gmail.com>
AuthorDate: Thu Apr 13 08:41:56 2023 +0800
GH-34888: [C++][Parquet] Writer supports adding extra kv meta (#34889)
### Rationale for this change
Parquet specs support storing key-value metadata provided by the user. However, the parquet-cpp writer can only set it via ParquetFileWriter::Open(). Sometimes user may want to add extra information to it while writing. So it is good to support adding extra key-value metadata any time before closing the file writer.
### What changes are included in this PR?
Add a new interface `void AddKeyValueMetadata(std::shared_ptr<const KeyValueMetadata> key_value_metadata)` to the `ParquetFileWriter` class. User can now add more key-value metadata to the file if not closed.
### Are these changes tested?
Added a new `Metadata.TestAddKeyValueMetadata` test to verify key-value metadata added before closing the writer are well preserved.
### Are there any user-facing changes?
Yes, user can add custom key-value metadata whenever writer is not closed.
* Closes: #34888
Lead-authored-by: Gang Wu <us...@gmail.com>
Co-authored-by: Will Jones <wi...@gmail.com>
Signed-off-by: Will Jones <wi...@gmail.com>
---
cpp/src/parquet/file_writer.cc | 29 +++++++++++++++++++----
cpp/src/parquet/file_writer.h | 10 ++++++++
cpp/src/parquet/metadata.cc | 24 +++++++++++++++----
cpp/src/parquet/metadata.h | 11 ++++++---
cpp/src/parquet/metadata_test.cc | 51 ++++++++++++++++++++++++++++++++++++++--
5 files changed, 110 insertions(+), 15 deletions(-)
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index 481d5b6d30..f1098e4a74 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -17,13 +17,14 @@
#include "parquet/file_writer.h"
-#include <cstddef>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
+#include "arrow/util/key_value_metadata.h"
+#include "arrow/util/logging.h"
#include "parquet/column_writer.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/internal_file_encryptor.h"
@@ -338,7 +339,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
auto file_encryption_properties = properties_->file_encryption_properties();
if (file_encryption_properties == nullptr) { // Non encrypted file.
- file_metadata_ = metadata_->Finish();
+ file_metadata_ = metadata_->Finish(key_value_metadata_);
WriteFileMetaData(*file_metadata_, sink_.get());
} else { // Encrypted file
CloseEncryptedFile(file_encryption_properties);
@@ -376,6 +377,15 @@ class FileSerializer : public ParquetFileWriter::Contents {
RowGroupWriter* AppendBufferedRowGroup() override { return AppendRowGroup(true); }
+ void AddKeyValueMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) override {
+ if (key_value_metadata_ == nullptr) {
+ key_value_metadata_ = std::move(key_value_metadata);
+ } else if (key_value_metadata != nullptr) {
+ key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
+ }
+ }
+
~FileSerializer() override {
try {
Close();
@@ -394,7 +404,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
properties_(std::move(properties)),
num_row_groups_(0),
num_rows_(0),
- metadata_(FileMetaDataBuilder::Make(&schema_, properties_, key_value_metadata_)) {
+ metadata_(FileMetaDataBuilder::Make(&schema_, properties_)) {
PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell());
if (position == 0) {
StartFile();
@@ -407,7 +417,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
// Encrypted file with encrypted footer
if (file_encryption_properties->encrypted_footer()) {
// encrypted footer
- file_metadata_ = metadata_->Finish();
+ file_metadata_ = metadata_->Finish(key_value_metadata_);
PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell());
uint64_t metadata_start = static_cast<uint64_t>(position);
@@ -422,7 +432,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
sink_->Write(reinterpret_cast<uint8_t*>(&footer_and_crypto_len), 4));
PARQUET_THROW_NOT_OK(sink_->Write(kParquetEMagic, 4));
} else { // Encrypted file with plaintext footer
- file_metadata_ = metadata_->Finish();
+ file_metadata_ = metadata_->Finish(key_value_metadata_);
auto footer_signing_encryptor = file_encryptor_->GetFooterSigningEncryptor();
WriteEncryptedFileMetadata(*file_metadata_, sink_.get(), footer_signing_encryptor,
false);
@@ -613,6 +623,15 @@ RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
return AppendRowGroup();
}
+void ParquetFileWriter::AddKeyValueMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+ if (contents_) {
+ contents_->AddKeyValueMetadata(key_value_metadata);
+ } else {
+ throw ParquetException("Cannot add key-value metadata to closed file");
+ }
+}
+
const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
return contents_->properties();
}
diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h
index f2888599fb..3bda1e535c 100644
--- a/cpp/src/parquet/file_writer.h
+++ b/cpp/src/parquet/file_writer.h
@@ -163,6 +163,9 @@ class PARQUET_EXPORT ParquetFileWriter {
return key_value_metadata_;
}
+ virtual void AddKeyValueMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) = 0;
+
// Return const-pointer to make it clear that this object is not to be copied
const SchemaDescriptor* schema() const { return &schema_; }
@@ -209,6 +212,13 @@ class PARQUET_EXPORT ParquetFileWriter {
/// until the next call to AppendRowGroup or AppendBufferedRowGroup or Close.
RowGroupWriter* AppendBufferedRowGroup();
+ /// \brief Add key-value metadata to the file.
+ /// \param[in] key_value_metadata the metadata to add.
+ /// \note This will overwrite any existing metadata with the same key.
+ /// \throw ParquetException if Close() has been called.
+ void AddKeyValueMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata);
+
/// Number of columns.
///
/// This number is fixed during the lifetime of the writer as it is determined via
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index b6c240115f..ddb4fb143d 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -1740,7 +1740,6 @@ void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written,
}
// file metadata
-// TODO(PARQUET-595) Support key_value_metadata
class FileMetaDataBuilder::FileMetaDataBuilderImpl {
public:
explicit FileMetaDataBuilderImpl(
@@ -1797,7 +1796,8 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
}
}
- std::unique_ptr<FileMetaData> Finish() {
+ std::unique_ptr<FileMetaData> Finish(
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
int64_t total_rows = 0;
for (auto row_group : row_groups_) {
total_rows += row_group.num_rows;
@@ -1805,7 +1805,12 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
metadata_->__set_num_rows(total_rows);
metadata_->__set_row_groups(row_groups_);
- if (key_value_metadata_) {
+ if (key_value_metadata_ || key_value_metadata) {
+ if (!key_value_metadata_) {
+ key_value_metadata_ = key_value_metadata;
+ } else if (key_value_metadata) {
+ key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
+ }
metadata_->key_value_metadata.clear();
metadata_->key_value_metadata.reserve(key_value_metadata_->size());
for (int64_t i = 0; i < key_value_metadata_->size(); ++i) {
@@ -1829,7 +1834,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
metadata_->__set_version(file_version);
metadata_->__set_created_by(properties_->created_by());
- // Users cannot set the `ColumnOrder` since we donot not have user defined sort order
+ // Users cannot set the `ColumnOrder` since we do not have user defined sort order
// in the spec yet.
// We always default to `TYPE_DEFINED_ORDER`. We can expose it in
// the API once we have user defined sort orders in the Parquet format.
@@ -1910,6 +1915,12 @@ std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make(
new FileMetaDataBuilder(schema, std::move(props), std::move(key_value_metadata)));
}
+std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make(
+ const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props) {
+ return std::unique_ptr<FileMetaDataBuilder>(
+ new FileMetaDataBuilder(schema, std::move(props)));
+}
+
FileMetaDataBuilder::FileMetaDataBuilder(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props,
std::shared_ptr<const KeyValueMetadata> key_value_metadata)
@@ -1926,7 +1937,10 @@ void FileMetaDataBuilder::SetPageIndexLocation(const PageIndexLocation& location
impl_->SetPageIndexLocation(location);
}
-std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() { return impl_->Finish(); }
+std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish(
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+ return impl_->Finish(key_value_metadata);
+}
std::unique_ptr<FileCryptoMetaData> FileMetaDataBuilder::GetCryptoMetaData() {
return impl_->BuildFileCryptoMetaData();
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index efcb17be04..620bc842f9 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -523,10 +523,14 @@ struct PageIndexLocation {
class PARQUET_EXPORT FileMetaDataBuilder {
public:
- // API convenience to get a MetaData reader
+ ARROW_DEPRECATED("Deprecated in 12.0.0. Use overload without KeyValueMetadata instead.")
static std::unique_ptr<FileMetaDataBuilder> Make(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props,
- std::shared_ptr<const KeyValueMetadata> key_value_metadata = NULLPTR);
+ std::shared_ptr<const KeyValueMetadata> key_value_metadata);
+
+ // API convenience to get a MetaData builder
+ static std::unique_ptr<FileMetaDataBuilder> Make(
+ const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props);
~FileMetaDataBuilder();
@@ -537,7 +541,8 @@ class PARQUET_EXPORT FileMetaDataBuilder {
void SetPageIndexLocation(const PageIndexLocation& location);
// Complete the Thrift structure
- std::unique_ptr<FileMetaData> Finish();
+ std::unique_ptr<FileMetaData> Finish(
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = NULLPTR);
// crypto metadata
std::unique_ptr<FileCryptoMetaData> GetCryptoMetaData();
diff --git a/cpp/src/parquet/metadata_test.cc b/cpp/src/parquet/metadata_test.cc
index a7c7b7b00e..4375661f5c 100644
--- a/cpp/src/parquet/metadata_test.cc
+++ b/cpp/src/parquet/metadata_test.cc
@@ -21,6 +21,7 @@
#include "arrow/util/key_value_metadata.h"
#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
#include "parquet/schema.h"
#include "parquet/statistics.h"
#include "parquet/test_util.h"
@@ -284,16 +285,62 @@ TEST(Metadata, TestKeyValueMetadata) {
auto kvmeta = std::make_shared<KeyValueMetadata>();
kvmeta->Append("test_key", "test_value");
- auto f_builder = FileMetaDataBuilder::Make(&schema, props, kvmeta);
+ auto f_builder = FileMetaDataBuilder::Make(&schema, props);
// Read the metadata
- auto f_accessor = f_builder->Finish();
+ auto f_accessor = f_builder->Finish(kvmeta);
// Key value metadata
ASSERT_TRUE(f_accessor->key_value_metadata());
EXPECT_TRUE(f_accessor->key_value_metadata()->Equals(*kvmeta));
}
+TEST(Metadata, TestAddKeyValueMetadata) {
+ schema::NodeVector fields;
+ fields.push_back(schema::Int32("int_col", Repetition::REQUIRED));
+ auto schema = std::static_pointer_cast<schema::GroupNode>(
+ schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
+
+ auto kv_meta = std::make_shared<KeyValueMetadata>();
+ kv_meta->Append("test_key_1", "test_value_1");
+ kv_meta->Append("test_key_2", "test_value_2_");
+
+ auto sink = CreateOutputStream();
+ auto writer_props = parquet::WriterProperties::Builder().disable_dictionary()->build();
+ auto file_writer =
+ parquet::ParquetFileWriter::Open(sink, schema, writer_props, kv_meta);
+
+ // Key value metadata that will be added to the file.
+ auto kv_meta_added = std::make_shared<KeyValueMetadata>();
+ kv_meta_added->Append("test_key_2", "test_value_2");
+ kv_meta_added->Append("test_key_3", "test_value_3");
+
+ file_writer->AddKeyValueMetadata(kv_meta_added);
+ file_writer->Close();
+
+ // Throw if appending key value metadata to closed file.
+ auto kv_meta_ignored = std::make_shared<KeyValueMetadata>();
+ kv_meta_ignored->Append("test_key_4", "test_value_4");
+ EXPECT_THROW(file_writer->AddKeyValueMetadata(kv_meta_ignored), ParquetException);
+
+ PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+ auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
+ auto file_reader = ParquetFileReader::Open(source);
+
+ ASSERT_NE(nullptr, file_reader->metadata());
+ ASSERT_NE(nullptr, file_reader->metadata()->key_value_metadata());
+ auto read_kv_meta = file_reader->metadata()->key_value_metadata();
+
+ // Verify keys that were added before file writer was closed are present.
+ for (int i = 1; i <= 3; ++i) {
+ auto index = std::to_string(i);
+ PARQUET_ASSIGN_OR_THROW(auto value, read_kv_meta->Get("test_key_" + index));
+ EXPECT_EQ("test_value_" + index, value);
+ }
+ // Verify keys that were added after file writer was closed are not present.
+ EXPECT_FALSE(read_kv_meta->Contains("test_key_4"));
+}
+
TEST(Metadata, TestHasBloomFilter) {
std::string dir_string(parquet::test::get_data_dir());
std::string path = dir_string + "/data_index_bloom_encoding_stats.parquet";