You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2020/06/28 22:20:35 UTC

[arrow] branch master updated: ARROW-8671: [C++] Use new BodyCompression Flatbuffers member for IPC compression metadata

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new f589370  ARROW-8671: [C++] Use new BodyCompression Flatbuffers member for IPC compression metadata
f589370 is described below

commit f589370bef55042ddaef097a1dbf976d580b446a
Author: Wes McKinney <we...@apache.org>
AuthorDate: Sun Jun 28 17:19:37 2020 -0500

    ARROW-8671: [C++] Use new BodyCompression Flatbuffers member for IPC compression metadata
    
    If the message uses V4 metadata then we also look for the "ARROW:experimental_compression" field in Message::custom_metadata so that IPC message written with 0.17.x can be read in 1.0.0 and beyond.
    
    Closes #7571 from wesm/ARROW-8671
    
    Authored-by: Wes McKinney <we...@apache.org>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/src/arrow/ipc/CMakeLists.txt       |  8 ------
 cpp/src/arrow/ipc/metadata_internal.cc | 41 +++++++++++++++++++++++------
 cpp/src/arrow/ipc/metadata_internal.h  |  4 +--
 cpp/src/arrow/ipc/options.h            |  5 ++++
 cpp/src/arrow/ipc/read_write_test.cc   |  7 ++---
 cpp/src/arrow/ipc/reader.cc            | 48 +++++++++++++++++++++++++++++-----
 cpp/src/arrow/ipc/writer.cc            |  8 ++----
 7 files changed, 88 insertions(+), 33 deletions(-)

diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index 10294d9..8d9f039 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -36,19 +36,11 @@ function(ADD_ARROW_IPC_TEST REL_TEST_NAME)
     set(PREFIX "arrow-ipc")
   endif()
 
-  if(ARG_LABELS)
-    set(LABELS ${ARG_LABELS})
-  else()
-    set(LABELS "arrow_ipc")
-  endif()
-
   add_arrow_test(${REL_TEST_NAME}
                  EXTRA_LINK_LIBS
                  ${ARROW_DATASET_TEST_LINK_LIBS}
                  PREFIX
                  ${PREFIX}
-                 LABELS
-                 ${LABELS}
                  ${ARG_UNPARSED_ARGUMENTS})
 endfunction()
 
diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc
index 25f52c0..30214ac 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -29,6 +29,7 @@
 #include "arrow/io/interfaces.h"
 #include "arrow/ipc/dictionary.h"
 #include "arrow/ipc/message.h"
+#include "arrow/ipc/options.h"
 #include "arrow/ipc/util.h"
 #include "arrow/sparse_tensor.h"
 #include "arrow/status.h"
@@ -839,6 +840,7 @@ Result<std::shared_ptr<Buffer>> WriteFBMessage(
 using FieldNodeVector =
     flatbuffers::Offset<flatbuffers::Vector<const flatbuf::FieldNode*>>;
 using BufferVector = flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Buffer*>>;
+using BodyCompressionOffset = flatbuffers::Offset<flatbuf::BodyCompression>;
 
 static Status WriteFieldNodes(FBB& fbb, const std::vector<FieldMetadata>& nodes,
                               FieldNodeVector* out) {
@@ -870,17 +872,38 @@ static Status WriteBuffers(FBB& fbb, const std::vector<BufferMetadata>& buffers,
   return Status::OK();
 }
 
+static Status GetBodyCompression(FBB& fbb, const IpcWriteOptions& options,
+                                 BodyCompressionOffset* out) {
+  if (options.compression != Compression::UNCOMPRESSED) {
+    flatbuf::CompressionType codec;
+    if (options.compression == Compression::LZ4_FRAME) {
+      codec = flatbuf::CompressionType::LZ4_FRAME;
+    } else if (options.compression == Compression::ZSTD) {
+      codec = flatbuf::CompressionType::ZSTD;
+    } else {
+      return Status::Invalid("Unsupported IPC compression codec: ",
+                             util::Codec::GetCodecAsString(options.compression));
+    }
+    *out = flatbuf::CreateBodyCompression(fbb, codec,
+                                          flatbuf::BodyCompressionMethod::BUFFER);
+  }
+  return Status::OK();
+}
+
 static Status MakeRecordBatch(FBB& fbb, int64_t length, int64_t body_length,
                               const std::vector<FieldMetadata>& nodes,
                               const std::vector<BufferMetadata>& buffers,
-                              RecordBatchOffset* offset) {
+                              const IpcWriteOptions& options, RecordBatchOffset* offset) {
   FieldNodeVector fb_nodes;
-  BufferVector fb_buffers;
-
   RETURN_NOT_OK(WriteFieldNodes(fbb, nodes, &fb_nodes));
+
+  BufferVector fb_buffers;
   RETURN_NOT_OK(WriteBuffers(fbb, buffers, &fb_buffers));
 
-  *offset = flatbuf::CreateRecordBatch(fbb, length, fb_nodes, fb_buffers);
+  BodyCompressionOffset fb_compression;
+  RETURN_NOT_OK(GetBodyCompression(fbb, options, &fb_compression));
+
+  *offset = flatbuf::CreateRecordBatch(fbb, length, fb_nodes, fb_buffers, fb_compression);
   return Status::OK();
 }
 
@@ -1125,10 +1148,11 @@ Status WriteRecordBatchMessage(
     int64_t length, int64_t body_length,
     const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
     const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
-    std::shared_ptr<Buffer>* out) {
+    const IpcWriteOptions& options, std::shared_ptr<Buffer>* out) {
   FBB fbb;
   RecordBatchOffset record_batch;
-  RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch));
+  RETURN_NOT_OK(
+      MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch));
   return WriteFBMessage(fbb, flatbuf::MessageHeader::RecordBatch, record_batch.Union(),
                         body_length, custom_metadata)
       .Value(out);
@@ -1183,10 +1207,11 @@ Status WriteDictionaryMessage(
     int64_t id, int64_t length, int64_t body_length,
     const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
     const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
-    std::shared_ptr<Buffer>* out) {
+    const IpcWriteOptions& options, std::shared_ptr<Buffer>* out) {
   FBB fbb;
   RecordBatchOffset record_batch;
-  RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch));
+  RETURN_NOT_OK(
+      MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch));
   auto dictionary_batch = flatbuf::CreateDictionaryBatch(fbb, id, record_batch).Union();
   return WriteFBMessage(fbb, flatbuf::MessageHeader::DictionaryBatch, dictionary_batch,
                         body_length, custom_metadata)
diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h
index e94c027..b0da188 100644
--- a/cpp/src/arrow/ipc/metadata_internal.h
+++ b/cpp/src/arrow/ipc/metadata_internal.h
@@ -180,7 +180,7 @@ Status WriteRecordBatchMessage(
     const int64_t length, const int64_t body_length,
     const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
     const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
-    std::shared_ptr<Buffer>* out);
+    const IpcWriteOptions& options, std::shared_ptr<Buffer>* out);
 
 Result<std::shared_ptr<Buffer>> WriteTensorMessage(const Tensor& tensor,
                                                    const int64_t buffer_start_offset);
@@ -198,7 +198,7 @@ Status WriteDictionaryMessage(
     const int64_t id, const int64_t length, const int64_t body_length,
     const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
     const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
-    std::shared_ptr<Buffer>* out);
+    const IpcWriteOptions& options, std::shared_ptr<Buffer>* out);
 
 static inline Result<std::shared_ptr<Buffer>> WriteFlatbufferBuilder(
     flatbuffers::FlatBufferBuilder& fbb) {
diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h
index 3cb42ba..385fc68 100644
--- a/cpp/src/arrow/ipc/options.h
+++ b/cpp/src/arrow/ipc/options.h
@@ -20,6 +20,7 @@
 #include <cstdint>
 #include <vector>
 
+#include "arrow/ipc/type_fwd.h"
 #include "arrow/status.h"
 #include "arrow/type_fwd.h"
 #include "arrow/util/compression.h"
@@ -66,6 +67,10 @@ struct ARROW_EXPORT IpcWriteOptions {
   /// like compression
   bool use_threads = true;
 
+  /// \brief Format version to use for IPC messages and their
+  /// metadata. Presently using V4 version (readable by v0.8.0 and later).
+  MetadataVersion metadata_version = MetadataVersion::V4;
+
   static IpcWriteOptions Defaults();
 };
 
diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index 9236708..2c1bf1c 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -126,9 +126,10 @@ TEST(TestMessage, SerializeCustomMetadata) {
       key_value_metadata({"foo", "bar"}, {"fizz", "buzz"})};
   for (auto metadata : cases) {
     std::shared_ptr<Buffer> serialized;
-    ASSERT_OK(internal::WriteRecordBatchMessage(/*length=*/0, /*body_length=*/0, metadata,
-                                                /*nodes=*/{},
-                                                /*buffers=*/{}, &serialized));
+    ASSERT_OK(internal::WriteRecordBatchMessage(
+        /*length=*/0, /*body_length=*/0, metadata,
+        /*nodes=*/{},
+        /*buffers=*/{}, IpcWriteOptions::Defaults(), &serialized));
     ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
                          Message::Open(serialized, /*body=*/nullptr));
 
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index f266f60..877ab69 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -478,7 +478,29 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatch(
 // ----------------------------------------------------------------------
 // Array loading
 
-Status GetCompression(const flatbuf::Message* message, Compression::type* out) {
+Status GetCompression(const flatbuf::RecordBatch* batch, Compression::type* out) {
+  *out = Compression::UNCOMPRESSED;
+  const flatbuf::BodyCompression* compression = batch->compression();
+  if (compression != nullptr) {
+    if (compression->method() != flatbuf::BodyCompressionMethod::BUFFER) {
+      // Forward compatibility
+      return Status::Invalid("This library only supports BUFFER compression method");
+    }
+
+    if (compression->codec() == flatbuf::CompressionType::LZ4_FRAME) {
+      *out = Compression::LZ4_FRAME;
+    } else if (compression->codec() == flatbuf::CompressionType::ZSTD) {
+      *out = Compression::ZSTD;
+    } else {
+      return Status::Invalid("Unsupported codec in RecordBatch::compression metadata");
+    }
+    return Status::OK();
+  }
+  return Status::OK();
+}
+
+Status GetCompressionExperimental(const flatbuf::Message* message,
+                                  Compression::type* out) {
   *out = Compression::UNCOMPRESSED;
   if (message->custom_metadata() != nullptr) {
     // TODO: Ensure this deserialization only ever happens once
@@ -489,7 +511,7 @@ Status GetCompression(const flatbuf::Message* message, Compression::type* out) {
       ARROW_ASSIGN_OR_RAISE(*out,
                             util::Codec::GetCompressionType(metadata->value(index)));
     }
-    RETURN_NOT_OK(internal::CheckCompressionSupported(*out));
+    return internal::CheckCompressionSupported(*out);
   }
   return Status::OK();
 }
@@ -535,8 +557,15 @@ Result<std::shared_ptr<RecordBatch>> ReadRecordBatchInternal(
     return Status::IOError(
         "Header-type of flatbuffer-encoded Message is not RecordBatch.");
   }
+
   Compression::type compression;
-  RETURN_NOT_OK(GetCompression(message, &compression));
+  RETURN_NOT_OK(GetCompression(batch, &compression));
+  if (compression == Compression::UNCOMPRESSED &&
+      message->version() == flatbuf::MetadataVersion::V4) {
+    // Possibly obtain codec information from experimental serialization format
+    // in 0.17.x
+    RETURN_NOT_OK(GetCompressionExperimental(message, &compression));
+  }
   return LoadRecordBatch(batch, schema, inclusion_mask, dictionary_memo, options,
                          compression, file);
 }
@@ -623,8 +652,17 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,
         "Header-type of flatbuffer-encoded Message is not DictionaryBatch.");
   }
 
+  // The dictionary is embedded in a record batch with a single column
+  auto batch_meta = dictionary_batch->data();
+
   Compression::type compression;
-  RETURN_NOT_OK(GetCompression(message, &compression));
+  RETURN_NOT_OK(GetCompression(batch_meta, &compression));
+  if (compression == Compression::UNCOMPRESSED &&
+      message->version() == flatbuf::MetadataVersion::V4) {
+    // Possibly obtain codec information from experimental serialization format
+    // in 0.17.x
+    RETURN_NOT_OK(GetCompressionExperimental(message, &compression));
+  }
 
   int64_t id = dictionary_batch->id();
 
@@ -635,8 +673,6 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,
 
   auto value_field = ::arrow::field("dummy", value_type);
 
-  // The dictionary is embedded in a record batch with a single column
-  auto batch_meta = dictionary_batch->data();
   CHECK_FLATBUFFERS_NOT_NULL(batch_meta, "DictionaryBatch.data");
 
   std::shared_ptr<RecordBatch> batch;
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index d0d7deb..3587490 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -155,7 +155,7 @@ class RecordBatchSerializer {
   // Override this for writing dictionary metadata
   virtual Status SerializeMetadata(int64_t num_rows) {
     return WriteRecordBatchMessage(num_rows, out_->body_length, custom_metadata_,
-                                   field_nodes_, buffer_meta_, &out_->metadata);
+                                   field_nodes_, buffer_meta_, options_, &out_->metadata);
   }
 
   void AppendCustomMetadata(const std::string& key, const std::string& value) {
@@ -186,10 +186,6 @@ class RecordBatchSerializer {
 
     RETURN_NOT_OK(internal::CheckCompressionSupported(options_.compression));
 
-    // TODO check allowed values for compression?
-    AppendCustomMetadata("ARROW:experimental_compression",
-                         util::Codec::GetCodecAsString(options_.compression));
-
     ARROW_ASSIGN_OR_RAISE(
         codec, util::Codec::Create(options_.compression, options_.compression_level));
 
@@ -543,7 +539,7 @@ class DictionarySerializer : public RecordBatchSerializer {
 
   Status SerializeMetadata(int64_t num_rows) override {
     return WriteDictionaryMessage(dictionary_id_, num_rows, out_->body_length,
-                                  custom_metadata_, field_nodes_, buffer_meta_,
+                                  custom_metadata_, field_nodes_, buffer_meta_, options_,
                                   &out_->metadata);
   }