You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/04/25 15:56:25 UTC
[arrow] branch master updated: ARROW-16131 [C++] support saving and retrieving custom metadata in batches for IPC file
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 942f77e5c5 ARROW-16131 [C++] support saving and retrieving custom metadata in batches for IPC file
942f77e5c5 is described below
commit 942f77e5c52412694cb78cd4eca96d559475906e
Author: Yue Ni <ni...@gmail.com>
AuthorDate: Mon Apr 25 17:56:14 2022 +0200
ARROW-16131 [C++] support saving and retrieving custom metadata in batches for IPC file
This PR aims to address https://issues.apache.org/jira/projects/ARROW/issues/ARROW-16131
Currently, when writing an IPC file with multiple record batches using the `arrow::ipc::RecordBatchWriter`, the `custom_metadata` for each record batch is not saved and will be discarded silently.
The current writer does provide the `AppendCustomMetadata` API internally, but it is never called. I use this API to add the custom metadata during writing and retrieve the custom metadata when reading the record batch. I am not completely sure if this is intentionally ignored or accidentally missing, but from a user's perspective, the metadata can be set via public API (see the example case in ARROW-16131) and I think it is not very friendly that this piece of data is silently discarded.
Several test cases are added in the `read_write_test.cc` for verifying this change.
Closes #12812 from niyue/bugfix/ipc-batch-meta
Lead-authored-by: Yue Ni <ni...@gmail.com>
Co-authored-by: Antoine Pitrou <an...@python.org>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/src/arrow/ipc/read_write_test.cc | 106 +++++++++++++++++++++++++++++++----
cpp/src/arrow/ipc/reader.cc | 69 +++++++++++++++++------
cpp/src/arrow/ipc/reader.h | 7 +++
cpp/src/arrow/ipc/writer.cc | 37 +++++++-----
cpp/src/arrow/ipc/writer.h | 24 ++++++++
cpp/src/arrow/record_batch.h | 9 +++
6 files changed, 209 insertions(+), 43 deletions(-)
diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index 1a4784fcf5..b27dcee33b 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -66,6 +66,7 @@ namespace ipc {
using internal::FieldPosition;
using internal::IoRecordedRandomAccessFile;
+using MetadataVector = std::vector<std::shared_ptr<KeyValueMetadata>>;
namespace test {
@@ -1018,8 +1019,9 @@ struct FileWriterHelper {
return Status::OK();
}
- Status WriteBatch(const std::shared_ptr<RecordBatch>& batch) {
- RETURN_NOT_OK(writer_->WriteRecordBatch(*batch));
+ Status WriteBatch(const std::shared_ptr<RecordBatch>& batch,
+ const std::shared_ptr<const KeyValueMetadata>& metadata = nullptr) {
+ RETURN_NOT_OK(writer_->WriteRecordBatch(*batch, metadata));
num_batches_written_++;
return Status::OK();
}
@@ -1042,16 +1044,22 @@ struct FileWriterHelper {
virtual Status ReadBatches(const IpcReadOptions& options,
RecordBatchVector* out_batches,
- ReadStats* out_stats = nullptr) {
+ ReadStats* out_stats = nullptr,
+ MetadataVector* out_metadata_list = nullptr) {
auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchFileReader::Open(
buf_reader.get(), footer_offset_, options));
EXPECT_EQ(num_batches_written_, reader->num_record_batches());
for (int i = 0; i < num_batches_written_; ++i) {
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> chunk,
- reader->ReadRecordBatch(i));
+ ARROW_ASSIGN_OR_RAISE(auto chunk_with_metadata,
+ reader->ReadRecordBatchWithCustomMetadata(i));
+ auto chunk = chunk_with_metadata.batch;
out_batches->push_back(chunk);
+ if (out_metadata_list) {
+ auto metadata = chunk_with_metadata.custom_metadata;
+ out_metadata_list->push_back(metadata);
+ }
}
if (out_stats) {
*out_stats = reader->stats();
@@ -1096,7 +1104,8 @@ class NoZeroCopyBufferReader : public io::BufferReader {
template <bool kCoalesce>
struct FileGeneratorWriterHelper : public FileWriterHelper {
Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
- ReadStats* out_stats = nullptr) override {
+ ReadStats* out_stats = nullptr,
+ MetadataVector* out_metadata_list = nullptr) override {
std::shared_ptr<io::RandomAccessFile> buf_reader;
if (kCoalesce) {
// Use a non-zero-copy enabled BufferReader so we can test paths properly
@@ -1145,8 +1154,9 @@ struct StreamWriterHelper {
return Status::OK();
}
- Status WriteBatch(const std::shared_ptr<RecordBatch>& batch) {
- RETURN_NOT_OK(writer_->WriteRecordBatch(*batch));
+ Status WriteBatch(const std::shared_ptr<RecordBatch>& batch,
+ const std::shared_ptr<const KeyValueMetadata>& metadata = nullptr) {
+ RETURN_NOT_OK(writer_->WriteRecordBatch(*batch, metadata));
return Status::OK();
}
@@ -1165,10 +1175,23 @@ struct StreamWriterHelper {
virtual Status ReadBatches(const IpcReadOptions& options,
RecordBatchVector* out_batches,
- ReadStats* out_stats = nullptr) {
+ ReadStats* out_stats = nullptr,
+ MetadataVector* out_metadata_list = nullptr) {
auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchStreamReader::Open(buf_reader, options))
- ARROW_ASSIGN_OR_RAISE(*out_batches, reader->ToRecordBatches());
+ if (out_metadata_list) {
+ while (true) {
+ ARROW_ASSIGN_OR_RAISE(auto chunk_with_metadata, reader->ReadNext());
+ if (chunk_with_metadata.batch == nullptr) {
+ break;
+ }
+ out_batches->push_back(chunk_with_metadata.batch);
+ out_metadata_list->push_back(chunk_with_metadata.custom_metadata);
+ }
+ } else {
+ ARROW_ASSIGN_OR_RAISE(*out_batches, reader->ToRecordBatches());
+ }
+
if (out_stats) {
*out_stats = reader->stats();
}
@@ -1195,7 +1218,8 @@ struct StreamWriterHelper {
struct StreamDecoderWriterHelper : public StreamWriterHelper {
Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
- ReadStats* out_stats = nullptr) override {
+ ReadStats* out_stats = nullptr,
+ MetadataVector* out_metadata_list = nullptr) override {
auto listener = std::make_shared<CollectListener>();
StreamDecoder decoder(listener, options);
RETURN_NOT_OK(DoConsume(&decoder));
@@ -1420,6 +1444,57 @@ class ReaderWriterMixin : public ExtensionTypesMixin {
ASSERT_TRUE(out_batches[0]->schema()->Equals(*schema));
}
+ void TestWriteBatchWithMetadata() {
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK(MakeIntRecordBatch(&batch));
+
+ WriterHelper writer_helper;
+ ASSERT_OK(writer_helper.Init(batch->schema(), IpcWriteOptions::Defaults()));
+
+ auto metadata = key_value_metadata({"some_key"}, {"some_value"});
+ ASSERT_OK(writer_helper.WriteBatch(batch, metadata));
+ ASSERT_OK(writer_helper.Finish());
+
+ RecordBatchVector out_batches;
+ MetadataVector out_metadata_list;
+ ASSERT_OK(writer_helper.ReadBatches(IpcReadOptions::Defaults(), &out_batches, nullptr,
+ &out_metadata_list));
+ ASSERT_EQ(out_batches.size(), 1);
+ ASSERT_EQ(out_metadata_list.size(), 1);
+ CompareBatch(*out_batches[0], *batch, false /* compare_metadata */);
+ ASSERT_TRUE(out_metadata_list[0]->Equals(*metadata));
+ }
+
+ // write multiple batches and each of them with different metadata
+ void TestWriteDifferentMetadata() {
+ std::shared_ptr<RecordBatch> batch_0;
+ std::shared_ptr<RecordBatch> batch_1;
+ auto metadata_0 = key_value_metadata({"some_key"}, {"0"});
+ auto metadata_1 = key_value_metadata({"some_key"}, {"1"});
+ ASSERT_OK(MakeIntRecordBatch(&batch_0));
+ ASSERT_OK(MakeIntRecordBatch(&batch_1));
+
+ WriterHelper writer_helper;
+ ASSERT_OK(writer_helper.Init(batch_0->schema(), IpcWriteOptions::Defaults()));
+
+ ASSERT_OK(writer_helper.WriteBatch(batch_0, metadata_0));
+
+ // Write a batch with different metadata
+ ASSERT_OK(writer_helper.WriteBatch(batch_1, metadata_1));
+ ASSERT_OK(writer_helper.Finish());
+
+ RecordBatchVector out_batches;
+ MetadataVector out_metadata_list;
+ ASSERT_OK(writer_helper.ReadBatches(IpcReadOptions::Defaults(), &out_batches, nullptr,
+ &out_metadata_list));
+ ASSERT_EQ(out_batches.size(), 2);
+ ASSERT_EQ(out_metadata_list.size(), 2);
+ CompareBatch(*out_batches[0], *batch_0, true /* compare_metadata */);
+ CompareBatch(*out_batches[1], *batch_1, true /* compare_metadata */);
+ ASSERT_TRUE(out_metadata_list[0]->Equals(*metadata_0));
+ ASSERT_TRUE(out_metadata_list[1]->Equals(*metadata_1));
+ }
+
void TestWriteNoRecordBatches() {
// Test writing no batches.
auto schema = arrow::schema({field("a", int32())});
@@ -1800,6 +1875,15 @@ TEST_F(TestFileFormatGeneratorCoalesced, DictionaryRoundTrip) {
}
TEST_F(TestStreamFormat, DifferentSchema) { TestWriteDifferentSchema(); }
+
+TEST_F(TestFileFormat, BatchWithMetadata) { TestWriteBatchWithMetadata(); }
+
+TEST_F(TestStreamFormat, BatchWithMetadata) { TestWriteBatchWithMetadata(); }
+
+TEST_F(TestFileFormat, DifferentMetadataBatches) { TestWriteDifferentMetadata(); }
+
+TEST_F(TestStreamFormat, DifferentMetadataBatches) { TestWriteDifferentMetadata(); }
+
TEST_F(TestFileFormat, DifferentSchema) { TestWriteDifferentSchema(); }
TEST_F(TestFileFormatGenerator, DifferentSchema) { TestWriteDifferentSchema(); }
TEST_F(TestFileFormatGeneratorCoalesced, DifferentSchema) { TestWriteDifferentSchema(); }
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index a5f31d74fe..0b46203795 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -654,7 +654,7 @@ Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
reader.get());
}
-Result<std::shared_ptr<RecordBatch>> ReadRecordBatchInternal(
+Result<RecordBatchWithMetadata> ReadRecordBatchInternal(
const Buffer& metadata, const std::shared_ptr<Schema>& schema,
const std::vector<bool>& inclusion_mask, IpcReadContext& context,
io::RandomAccessFile* file) {
@@ -676,7 +676,15 @@ Result<std::shared_ptr<RecordBatch>> ReadRecordBatchInternal(
}
context.compression = compression;
context.metadata_version = internal::GetMetadataVersion(message->version());
- return LoadRecordBatch(batch, schema, inclusion_mask, context, file);
+
+ std::shared_ptr<KeyValueMetadata> custom_metadata;
+ if (message->custom_metadata() != nullptr) {
+ RETURN_NOT_OK(
+ internal::GetKeyValueMetadata(message->custom_metadata(), &custom_metadata));
+ }
+ ARROW_ASSIGN_OR_RAISE(auto record_batch,
+ LoadRecordBatch(batch, schema, inclusion_mask, context, file));
+ return RecordBatchWithMetadata{record_batch, custom_metadata};
}
// If we are selecting only certain fields, populate an inclusion mask for fast lookups.
@@ -756,7 +764,10 @@ Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
IpcReadContext context(const_cast<DictionaryMemo*>(dictionary_memo), options, false);
RETURN_NOT_OK(GetInclusionMaskAndOutSchema(schema, context.options.included_fields,
&inclusion_mask, &out_schema));
- return ReadRecordBatchInternal(metadata, schema, inclusion_mask, context, file);
+ ARROW_ASSIGN_OR_RAISE(
+ auto batch_and_custom_metadata,
+ ReadRecordBatchInternal(metadata, schema, inclusion_mask, context, file));
+ return batch_and_custom_metadata.batch;
}
Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context,
@@ -852,15 +863,21 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
}
Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+ ARROW_ASSIGN_OR_RAISE(auto batch_with_metadata, ReadNext());
+ *batch = std::move(batch_with_metadata.batch);
+ return Status::OK();
+ }
+
+ Result<RecordBatchWithMetadata> ReadNext() override {
if (!have_read_initial_dictionaries_) {
RETURN_NOT_OK(ReadInitialDictionaries());
}
+ RecordBatchWithMetadata batch_with_metadata;
if (empty_stream_) {
// ARROW-6006: Degenerate case where stream contains no data, we do not
// bother trying to read a RecordBatch message from the stream
- *batch = nullptr;
- return Status::OK();
+ return batch_with_metadata;
}
// Continue to read other dictionaries, if any
@@ -874,16 +891,14 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
if (message == nullptr) {
// End of stream
- *batch = nullptr;
- return Status::OK();
+ return batch_with_metadata;
}
CHECK_HAS_BODY(*message);
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
return ReadRecordBatchInternal(*message->metadata(), schema_, field_inclusion_mask_,
- context, reader.get())
- .Value(batch);
+ context, reader.get());
}
std::shared_ptr<Schema> schema() const override { return out_schema_; }
@@ -1158,12 +1173,26 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
}
Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(int i) override {
+ ARROW_ASSIGN_OR_RAISE(auto batch_with_metadata, ReadRecordBatchWithCustomMetadata(i));
+ return batch_with_metadata.batch;
+ }
+
+ Result<RecordBatchWithMetadata> ReadRecordBatchWithCustomMetadata(int i) override {
DCHECK_GE(i, 0);
DCHECK_LT(i, num_record_batches());
auto cached_metadata = cached_metadata_.find(i);
if (cached_metadata != cached_metadata_.end()) {
- return ReadCachedRecordBatch(i, cached_metadata->second).result();
+ auto result = ReadCachedRecordBatch(i, cached_metadata->second).result();
+ ARROW_ASSIGN_OR_RAISE(auto batch, result);
+ ARROW_ASSIGN_OR_RAISE(auto message_obj, cached_metadata->second.result());
+ ARROW_ASSIGN_OR_RAISE(auto message, GetFlatbufMessage(message_obj));
+ std::shared_ptr<KeyValueMetadata> custom_metadata;
+ if (message->custom_metadata() != nullptr) {
+ RETURN_NOT_OK(
+ internal::GetKeyValueMetadata(message->custom_metadata(), &custom_metadata));
+ }
+ return RecordBatchWithMetadata{std::move(batch), std::move(custom_metadata)};
}
RETURN_NOT_OK(WaitForDictionaryReadFinished());
@@ -1185,11 +1214,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
CHECK_HAS_BODY(*message);
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
- ARROW_ASSIGN_OR_RAISE(auto batch, ReadRecordBatchInternal(
- *message->metadata(), schema_,
- field_inclusion_mask_, context, reader.get()));
+ ARROW_ASSIGN_OR_RAISE(
+ auto batch_with_metadata,
+ ReadRecordBatchInternal(*message->metadata(), schema_, field_inclusion_mask_,
+ context, reader.get()));
++stats_.num_record_batches;
- return batch;
+ return batch_with_metadata;
}
Result<int64_t> CountRows() override {
@@ -1832,8 +1862,11 @@ Result<std::shared_ptr<RecordBatch>> WholeIpcFileRecordBatchGenerator::ReadRecor
CHECK_HAS_BODY(*message);
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_);
- return ReadRecordBatchInternal(*message->metadata(), state->schema_,
- state->field_inclusion_mask_, context, reader.get());
+ ARROW_ASSIGN_OR_RAISE(
+ auto batch_with_metadata,
+ ReadRecordBatchInternal(*message->metadata(), state->schema_,
+ state->field_inclusion_mask_, context, reader.get()));
+ return batch_with_metadata.batch;
}
Status Listener::OnEOS() { return Status::OK(); }
@@ -1938,11 +1971,11 @@ class StreamDecoder::StreamDecoderImpl : public MessageDecoderListener {
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
ARROW_ASSIGN_OR_RAISE(
- auto batch,
+ auto batch_with_metadata,
ReadRecordBatchInternal(*message->metadata(), schema_, field_inclusion_mask_,
context, reader.get()));
++stats_.num_record_batches;
- return listener_->OnRecordBatchDecoded(std::move(batch));
+ return listener_->OnRecordBatchDecoded(std::move(batch_with_metadata.batch));
}
}
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index 4bdbccc509..ad7969b31c 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -190,6 +190,13 @@ class ARROW_EXPORT RecordBatchFileReader
/// \return the read batch
virtual Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(int i) = 0;
+ /// \brief Read a particular record batch along with its custom metadada from the file.
+ /// Does not copy memory if the input source supports zero-copy.
+ ///
+ /// \param[in] i the index of the record batch to return
+ /// \return a struct containing the read batch and its custom metadata
+ virtual Result<RecordBatchWithMetadata> ReadRecordBatchWithCustomMetadata(int i) = 0;
+
/// \brief Return current read statistics
virtual ReadStats stats() const = 0;
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index cf5a08bf3b..4a7671e158 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -128,9 +128,11 @@ static inline bool NeedTruncate(int64_t offset, const Buffer* buffer,
class RecordBatchSerializer {
public:
- RecordBatchSerializer(int64_t buffer_start_offset, const IpcWriteOptions& options,
- IpcPayload* out)
+ RecordBatchSerializer(int64_t buffer_start_offset,
+ const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
+ const IpcWriteOptions& options, IpcPayload* out)
: out_(out),
+ custom_metadata_(custom_metadata),
options_(options),
max_recursion_depth_(options.max_recursion_depth),
buffer_start_offset_(buffer_start_offset) {
@@ -175,13 +177,6 @@ class RecordBatchSerializer {
field_nodes_, buffer_meta_, options_, &out_->metadata);
}
- void AppendCustomMetadata(const std::string& key, const std::string& value) {
- if (!custom_metadata_) {
- custom_metadata_ = std::make_shared<KeyValueMetadata>();
- }
- custom_metadata_->Append(key, value);
- }
-
Status CompressBuffer(const Buffer& buffer, util::Codec* codec,
std::shared_ptr<Buffer>* out) {
// Convert buffer to uncompressed-length-prefixed compressed buffer
@@ -540,7 +535,7 @@ class RecordBatchSerializer {
// Destination for output buffers
IpcPayload* out_;
- std::shared_ptr<KeyValueMetadata> custom_metadata_;
+ std::shared_ptr<const KeyValueMetadata> custom_metadata_;
std::vector<internal::FieldMetadata> field_nodes_;
std::vector<internal::BufferMetadata> buffer_meta_;
@@ -554,7 +549,7 @@ class DictionarySerializer : public RecordBatchSerializer {
public:
DictionarySerializer(int64_t dictionary_id, bool is_delta, int64_t buffer_start_offset,
const IpcWriteOptions& options, IpcPayload* out)
- : RecordBatchSerializer(buffer_start_offset, options, out),
+ : RecordBatchSerializer(buffer_start_offset, NULLPTR, options, out),
dictionary_id_(dictionary_id),
is_delta_(is_delta) {}
@@ -636,8 +631,16 @@ Status GetDictionaryPayload(int64_t id, bool is_delta,
Status GetRecordBatchPayload(const RecordBatch& batch, const IpcWriteOptions& options,
IpcPayload* out) {
+ return GetRecordBatchPayload(batch, NULLPTR, options, out);
+}
+
+Status GetRecordBatchPayload(
+ const RecordBatch& batch,
+ const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
+ const IpcWriteOptions& options, IpcPayload* out) {
out->type = MessageType::RECORD_BATCH;
- RecordBatchSerializer assembler(/*buffer_start_offset=*/0, options, out);
+ RecordBatchSerializer assembler(/*buffer_start_offset=*/0, custom_metadata, options,
+ out);
return assembler.Assemble(batch);
}
@@ -645,7 +648,7 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
io::OutputStream* dst, int32_t* metadata_length,
int64_t* body_length, const IpcWriteOptions& options) {
IpcPayload payload;
- RecordBatchSerializer assembler(buffer_start_offset, options, &payload);
+ RecordBatchSerializer assembler(buffer_start_offset, NULLPTR, options, &payload);
RETURN_NOT_OK(assembler.Assemble(batch));
// TODO: it's a rough edge that the metadata and body length here are
@@ -1000,6 +1003,12 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter {
}
Status WriteRecordBatch(const RecordBatch& batch) override {
+ return WriteRecordBatch(batch, NULLPTR);
+ }
+
+ Status WriteRecordBatch(
+ const RecordBatch& batch,
+ const std::shared_ptr<const KeyValueMetadata>& custom_metadata) override {
if (!batch.schema()->Equals(schema_, false /* check_metadata */)) {
return Status::Invalid("Tried to write record batch with different schema");
}
@@ -1009,7 +1018,7 @@ class ARROW_EXPORT IpcFormatWriter : public RecordBatchWriter {
RETURN_NOT_OK(WriteDictionaries(batch));
IpcPayload payload;
- RETURN_NOT_OK(GetRecordBatchPayload(batch, options_, &payload));
+ RETURN_NOT_OK(GetRecordBatchPayload(batch, custom_metadata, options_, &payload));
RETURN_NOT_OK(WritePayload(payload));
++stats_.num_record_batches;
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index 31937fa69c..eb5827a609 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -96,6 +96,18 @@ class ARROW_EXPORT RecordBatchWriter {
/// \return Status
virtual Status WriteRecordBatch(const RecordBatch& batch) = 0;
+ /// \brief Write a record batch with custom metadata to the stream
+ ///
+ /// \param[in] batch the record batch to write to the stream
+ /// \param[in] custom_metadata the record batch's custom metadata to write to the stream
+ /// \return Status
+ virtual Status WriteRecordBatch(
+ const RecordBatch& batch,
+ const std::shared_ptr<const KeyValueMetadata>& custom_metadata) {
+ return Status::NotImplemented(
+ "Write record batch with custom metadata not implemented");
+ }
+
/// \brief Write possibly-chunked table by creating sequence of record batches
/// \param[in] table table to write
/// \return Status
@@ -389,6 +401,18 @@ ARROW_EXPORT
Status GetRecordBatchPayload(const RecordBatch& batch, const IpcWriteOptions& options,
IpcPayload* out);
+/// \brief Compute IpcPayload for the given record batch and custom metadata
+/// \param[in] batch the RecordBatch that is being serialized
+/// \param[in] custom_metadata the custom metadata to be serialized with the record batch
+/// \param[in] options options for serialization
+/// \param[out] out the returned IpcPayload
+/// \return Status
+ARROW_EXPORT
+Status GetRecordBatchPayload(
+ const RecordBatch& batch,
+ const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
+ const IpcWriteOptions& options, IpcPayload* out);
+
/// \brief Write an IPC payload to the given stream.
/// \param[in] payload the payload to write
/// \param[in] options options for serialization
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
index fb7eea25ee..60aa9ad9c9 100644
--- a/cpp/src/arrow/record_batch.h
+++ b/cpp/src/arrow/record_batch.h
@@ -210,6 +210,11 @@ class ARROW_EXPORT RecordBatch {
ARROW_DISALLOW_COPY_AND_ASSIGN(RecordBatch);
};
+struct ARROW_EXPORT RecordBatchWithMetadata {
+ std::shared_ptr<RecordBatch> batch;
+ std::shared_ptr<KeyValueMetadata> custom_metadata;
+};
+
/// \brief Abstract interface for reading stream of record batches
class ARROW_EXPORT RecordBatchReader {
public:
@@ -227,6 +232,10 @@ class ARROW_EXPORT RecordBatchReader {
/// \return Status
virtual Status ReadNext(std::shared_ptr<RecordBatch>* batch) = 0;
+ virtual Result<RecordBatchWithMetadata> ReadNext() {
+ return Status::NotImplemented("ReadNext with custom metadata");
+ }
+
/// \brief Iterator interface
Result<std::shared_ptr<RecordBatch>> Next() {
std::shared_ptr<RecordBatch> batch;