You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2020/06/28 22:20:35 UTC
[arrow] branch master updated: ARROW-8671: [C++] Use new
BodyCompression Flatbuffers member for IPC compression metadata
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new f589370 ARROW-8671: [C++] Use new BodyCompression Flatbuffers member for IPC compression metadata
f589370 is described below
commit f589370bef55042ddaef097a1dbf976d580b446a
Author: Wes McKinney <we...@apache.org>
AuthorDate: Sun Jun 28 17:19:37 2020 -0500
ARROW-8671: [C++] Use new BodyCompression Flatbuffers member for IPC compression metadata
If the message uses V4 metadata then we also look for the "ARROW:experimental_compression" field in Message::custom_metadata so that IPC message written with 0.17.x can be read in 1.0.0 and beyond.
Closes #7571 from wesm/ARROW-8671
Authored-by: Wes McKinney <we...@apache.org>
Signed-off-by: Wes McKinney <we...@apache.org>
---
cpp/src/arrow/ipc/CMakeLists.txt | 8 ------
cpp/src/arrow/ipc/metadata_internal.cc | 41 +++++++++++++++++++++++------
cpp/src/arrow/ipc/metadata_internal.h | 4 +--
cpp/src/arrow/ipc/options.h | 5 ++++
cpp/src/arrow/ipc/read_write_test.cc | 7 ++---
cpp/src/arrow/ipc/reader.cc | 48 +++++++++++++++++++++++++++++-----
cpp/src/arrow/ipc/writer.cc | 8 ++----
7 files changed, 88 insertions(+), 33 deletions(-)
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index 10294d9..8d9f039 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -36,19 +36,11 @@ function(ADD_ARROW_IPC_TEST REL_TEST_NAME)
set(PREFIX "arrow-ipc")
endif()
- if(ARG_LABELS)
- set(LABELS ${ARG_LABELS})
- else()
- set(LABELS "arrow_ipc")
- endif()
-
add_arrow_test(${REL_TEST_NAME}
EXTRA_LINK_LIBS
${ARROW_DATASET_TEST_LINK_LIBS}
PREFIX
${PREFIX}
- LABELS
- ${LABELS}
${ARG_UNPARSED_ARGUMENTS})
endfunction()
diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc
index 25f52c0..30214ac 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -29,6 +29,7 @@
#include "arrow/io/interfaces.h"
#include "arrow/ipc/dictionary.h"
#include "arrow/ipc/message.h"
+#include "arrow/ipc/options.h"
#include "arrow/ipc/util.h"
#include "arrow/sparse_tensor.h"
#include "arrow/status.h"
@@ -839,6 +840,7 @@ Result<std::shared_ptr<Buffer>> WriteFBMessage(
using FieldNodeVector =
flatbuffers::Offset<flatbuffers::Vector<const flatbuf::FieldNode*>>;
using BufferVector = flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Buffer*>>;
+using BodyCompressionOffset = flatbuffers::Offset<flatbuf::BodyCompression>;
static Status WriteFieldNodes(FBB& fbb, const std::vector<FieldMetadata>& nodes,
FieldNodeVector* out) {
@@ -870,17 +872,38 @@ static Status WriteBuffers(FBB& fbb, const std::vector<BufferMetadata>& buffers,
return Status::OK();
}
+static Status GetBodyCompression(FBB& fbb, const IpcWriteOptions& options,
+ BodyCompressionOffset* out) {
+ if (options.compression != Compression::UNCOMPRESSED) {
+ flatbuf::CompressionType codec;
+ if (options.compression == Compression::LZ4_FRAME) {
+ codec = flatbuf::CompressionType::LZ4_FRAME;
+ } else if (options.compression == Compression::ZSTD) {
+ codec = flatbuf::CompressionType::ZSTD;
+ } else {
+ return Status::Invalid("Unsupported IPC compression codec: ",
+ util::Codec::GetCodecAsString(options.compression));
+ }
+ *out = flatbuf::CreateBodyCompression(fbb, codec,
+ flatbuf::BodyCompressionMethod::BUFFER);
+ }
+ return Status::OK();
+}
+
static Status MakeRecordBatch(FBB& fbb, int64_t length, int64_t body_length,
const std::vector<FieldMetadata>& nodes,
const std::vector<BufferMetadata>& buffers,
- RecordBatchOffset* offset) {
+ const IpcWriteOptions& options, RecordBatchOffset* offset) {
FieldNodeVector fb_nodes;
- BufferVector fb_buffers;
-
RETURN_NOT_OK(WriteFieldNodes(fbb, nodes, &fb_nodes));
+
+ BufferVector fb_buffers;
RETURN_NOT_OK(WriteBuffers(fbb, buffers, &fb_buffers));
- *offset = flatbuf::CreateRecordBatch(fbb, length, fb_nodes, fb_buffers);
+ BodyCompressionOffset fb_compression;
+ RETURN_NOT_OK(GetBodyCompression(fbb, options, &fb_compression));
+
+ *offset = flatbuf::CreateRecordBatch(fbb, length, fb_nodes, fb_buffers, fb_compression);
return Status::OK();
}
@@ -1125,10 +1148,11 @@ Status WriteRecordBatchMessage(
int64_t length, int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
- std::shared_ptr<Buffer>* out) {
+ const IpcWriteOptions& options, std::shared_ptr<Buffer>* out) {
FBB fbb;
RecordBatchOffset record_batch;
- RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch));
+ RETURN_NOT_OK(
+ MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch));
return WriteFBMessage(fbb, flatbuf::MessageHeader::RecordBatch, record_batch.Union(),
body_length, custom_metadata)
.Value(out);
@@ -1183,10 +1207,11 @@ Status WriteDictionaryMessage(
int64_t id, int64_t length, int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
- std::shared_ptr<Buffer>* out) {
+ const IpcWriteOptions& options, std::shared_ptr<Buffer>* out) {
FBB fbb;
RecordBatchOffset record_batch;
- RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch));
+ RETURN_NOT_OK(
+ MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch));
auto dictionary_batch = flatbuf::CreateDictionaryBatch(fbb, id, record_batch).Union();
return WriteFBMessage(fbb, flatbuf::MessageHeader::DictionaryBatch, dictionary_batch,
body_length, custom_metadata)
diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h
index e94c027..b0da188 100644
--- a/cpp/src/arrow/ipc/metadata_internal.h
+++ b/cpp/src/arrow/ipc/metadata_internal.h
@@ -180,7 +180,7 @@ Status WriteRecordBatchMessage(
const int64_t length, const int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
- std::shared_ptr<Buffer>* out);
+ const IpcWriteOptions& options, std::shared_ptr<Buffer>* out);
Result<std::shared_ptr<Buffer>> WriteTensorMessage(const Tensor& tensor,
const int64_t buffer_start_offset);
@@ -198,7 +198,7 @@ Status WriteDictionaryMessage(
const int64_t id, const int64_t length, const int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
- std::shared_ptr<Buffer>* out);
+ const IpcWriteOptions& options, std::shared_ptr<Buffer>* out);
static inline Result<std::shared_ptr<Buffer>> WriteFlatbufferBuilder(
flatbuffers::FlatBufferBuilder& fbb) {
diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h
index 3cb42ba..385fc68 100644
--- a/cpp/src/arrow/ipc/options.h
+++ b/cpp/src/arrow/ipc/options.h
@@ -20,6 +20,7 @@
#include <cstdint>
#include <vector>
+#include "arrow/ipc/type_fwd.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/util/compression.h"
@@ -66,6 +67,10 @@ struct ARROW_EXPORT IpcWriteOptions {
/// like compression
bool use_threads = true;
+ /// \brief Format version to use for IPC messages and their
+ /// metadata. Presently using V4 version (readable by v0.8.0 and later).
+ MetadataVersion metadata_version = MetadataVersion::V4;
+
static IpcWriteOptions Defaults();
};
diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index 9236708..2c1bf1c 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -126,9 +126,10 @@ TEST(TestMessage, SerializeCustomMetadata) {
key_value_metadata({"foo", "bar"}, {"fizz", "buzz"})};
for (auto metadata : cases) {
std::shared_ptr<Buffer> serialized;
- ASSERT_OK(internal::WriteRecordBatchMessage(/*length=*/0, /*body_length=*/0, metadata,
- /*nodes=*/{},
- /*buffers=*/{}, &serialized));
+ ASSERT_OK(internal::WriteRecordBatchMessage(
+ /*length=*/0, /*body_length=*/0, metadata,
+ /*nodes=*/{},
+ /*buffers=*/{}, IpcWriteOptions::Defaults(), &serialized));
ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
Message::Open(serialized, /*body=*/nullptr));
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index f266f60..877ab69 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -478,7 +478,29 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatch(
// ----------------------------------------------------------------------
// Array loading
-Status GetCompression(const flatbuf::Message* message, Compression::type* out) {
+Status GetCompression(const flatbuf::RecordBatch* batch, Compression::type* out) {
+ *out = Compression::UNCOMPRESSED;
+ const flatbuf::BodyCompression* compression = batch->compression();
+ if (compression != nullptr) {
+ if (compression->method() != flatbuf::BodyCompressionMethod::BUFFER) {
+ // Forward compatibility
+ return Status::Invalid("This library only supports BUFFER compression method");
+ }
+
+ if (compression->codec() == flatbuf::CompressionType::LZ4_FRAME) {
+ *out = Compression::LZ4_FRAME;
+ } else if (compression->codec() == flatbuf::CompressionType::ZSTD) {
+ *out = Compression::ZSTD;
+ } else {
+ return Status::Invalid("Unsupported codec in RecordBatch::compression metadata");
+ }
+ return Status::OK();
+ }
+ return Status::OK();
+}
+
+Status GetCompressionExperimental(const flatbuf::Message* message,
+ Compression::type* out) {
*out = Compression::UNCOMPRESSED;
if (message->custom_metadata() != nullptr) {
// TODO: Ensure this deserialization only ever happens once
@@ -489,7 +511,7 @@ Status GetCompression(const flatbuf::Message* message, Compression::type* out) {
ARROW_ASSIGN_OR_RAISE(*out,
util::Codec::GetCompressionType(metadata->value(index)));
}
- RETURN_NOT_OK(internal::CheckCompressionSupported(*out));
+ return internal::CheckCompressionSupported(*out);
}
return Status::OK();
}
@@ -535,8 +557,15 @@ Result<std::shared_ptr<RecordBatch>> ReadRecordBatchInternal(
return Status::IOError(
"Header-type of flatbuffer-encoded Message is not RecordBatch.");
}
+
Compression::type compression;
- RETURN_NOT_OK(GetCompression(message, &compression));
+ RETURN_NOT_OK(GetCompression(batch, &compression));
+ if (compression == Compression::UNCOMPRESSED &&
+ message->version() == flatbuf::MetadataVersion::V4) {
+ // Possibly obtain codec information from experimental serialization format
+ // in 0.17.x
+ RETURN_NOT_OK(GetCompressionExperimental(message, &compression));
+ }
return LoadRecordBatch(batch, schema, inclusion_mask, dictionary_memo, options,
compression, file);
}
@@ -623,8 +652,17 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,
"Header-type of flatbuffer-encoded Message is not DictionaryBatch.");
}
+ // The dictionary is embedded in a record batch with a single column
+ auto batch_meta = dictionary_batch->data();
+
Compression::type compression;
- RETURN_NOT_OK(GetCompression(message, &compression));
+ RETURN_NOT_OK(GetCompression(batch_meta, &compression));
+ if (compression == Compression::UNCOMPRESSED &&
+ message->version() == flatbuf::MetadataVersion::V4) {
+ // Possibly obtain codec information from experimental serialization format
+ // in 0.17.x
+ RETURN_NOT_OK(GetCompressionExperimental(message, &compression));
+ }
int64_t id = dictionary_batch->id();
@@ -635,8 +673,6 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,
auto value_field = ::arrow::field("dummy", value_type);
- // The dictionary is embedded in a record batch with a single column
- auto batch_meta = dictionary_batch->data();
CHECK_FLATBUFFERS_NOT_NULL(batch_meta, "DictionaryBatch.data");
std::shared_ptr<RecordBatch> batch;
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index d0d7deb..3587490 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -155,7 +155,7 @@ class RecordBatchSerializer {
// Override this for writing dictionary metadata
virtual Status SerializeMetadata(int64_t num_rows) {
return WriteRecordBatchMessage(num_rows, out_->body_length, custom_metadata_,
- field_nodes_, buffer_meta_, &out_->metadata);
+ field_nodes_, buffer_meta_, options_, &out_->metadata);
}
void AppendCustomMetadata(const std::string& key, const std::string& value) {
@@ -186,10 +186,6 @@ class RecordBatchSerializer {
RETURN_NOT_OK(internal::CheckCompressionSupported(options_.compression));
- // TODO check allowed values for compression?
- AppendCustomMetadata("ARROW:experimental_compression",
- util::Codec::GetCodecAsString(options_.compression));
-
ARROW_ASSIGN_OR_RAISE(
codec, util::Codec::Create(options_.compression, options_.compression_level));
@@ -543,7 +539,7 @@ class DictionarySerializer : public RecordBatchSerializer {
Status SerializeMetadata(int64_t num_rows) override {
return WriteDictionaryMessage(dictionary_id_, num_rows, out_->body_length,
- custom_metadata_, field_nodes_, buffer_meta_,
+ custom_metadata_, field_nodes_, buffer_meta_, options_,
&out_->metadata);
}