You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2020/05/01 00:41:15 UTC

[arrow] branch master updated: ARROW-300: [Format] Proposal for "trivial" IPC body buffer compression using either LZ4 or ZSTD codecs

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cae938  ARROW-300: [Format] Proposal for "trivial" IPC body buffer compression using either LZ4 or ZSTD codecs
6cae938 is described below

commit 6cae9387a7ecf4d1ce12054b8edf2099c734366d
Author: Wes McKinney <we...@apache.org>
AuthorDate: Thu Apr 30 19:39:46 2020 -0500

    ARROW-300: [Format] Proposal for "trivial" IPC body buffer compression using either LZ4 or ZSTD codecs
    
    This permits each constituent buffer forming the IPC message body to be compressed independently using either the LZ4 or ZSTD codecs. The compressed data is written with a 64-bit signed integer prefix indicating the size of the uncompressed data.
    
    Note that there is still interest in the project in developing metadata for data-type-specific encoding and compression on the wire, but there is still more design work to do there.
    
    As a special case, the size prefix may be "-1" to indicate that the buffer is uncompressed, which can be helpful for small buffers or buffers where the producer decided that the compression ratio was not beneficial.
    
    Note that this change is not forward compatible, so "old" readers will not be able to read compressed messages nor determine that they cannot be read. If a "new" writer writes uncompressed messages, there is no issue. Since we are still pre-1.0 I think this is OK. We may consider increasing the Flatbuffer metadata version to prevent old readers from blindly treating compressed buffers as uncompressed.
    
    Closes #6707 from wesm/ipc-body-buffer-compression
    
    Authored-by: Wes McKinney <we...@apache.org>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/build-support/update-flatbuffers.sh    |   5 +-
 cpp/src/generated/File_generated.h         |   4 +-
 cpp/src/generated/Message_generated.h      | 153 ++++++++++++++++++++++++++++-
 cpp/src/generated/Schema_generated.h       |   4 +-
 cpp/src/generated/SparseTensor_generated.h |  59 +++++------
 cpp/src/generated/Tensor_generated.h       |   4 +-
 cpp/src/generated/feather_generated.h      |   7 +-
 cpp/src/plasma/common_generated.h          |   4 +-
 cpp/src/plasma/plasma_generated.h          |   4 +-
 format/Message.fbs                         |  38 +++++++
 10 files changed, 241 insertions(+), 41 deletions(-)

diff --git a/cpp/build-support/update-flatbuffers.sh b/cpp/build-support/update-flatbuffers.sh
index 799ea2d..e9bcbab 100755
--- a/cpp/build-support/update-flatbuffers.sh
+++ b/cpp/build-support/update-flatbuffers.sh
@@ -23,8 +23,9 @@
 CWD="$(cd "$(dirname "${BASH_SOURCE[0]:-$0}")" && pwd)"
 SOURCE_DIR=$CWD/../src
 FORMAT_DIR=$CWD/../../format
+FLATC="flatc -c --cpp-std c++11"
 
-flatc -c -o $SOURCE_DIR/generated \
+$FLATC -o $SOURCE_DIR/generated \
       --scoped-enums \
       $FORMAT_DIR/Message.fbs \
       $FORMAT_DIR/File.fbs \
@@ -33,7 +34,7 @@ flatc -c -o $SOURCE_DIR/generated \
       $FORMAT_DIR/SparseTensor.fbs \
       src/arrow/ipc/feather.fbs
 
-flatc -c -o $SOURCE_DIR/plasma \
+$FLATC -o $SOURCE_DIR/plasma \
       --gen-object-api \
       --scoped-enums \
       $SOURCE_DIR/plasma/common.fbs \
diff --git a/cpp/src/generated/File_generated.h b/cpp/src/generated/File_generated.h
index 8c2292d..06953c4 100644
--- a/cpp/src/generated/File_generated.h
+++ b/cpp/src/generated/File_generated.h
@@ -1,7 +1,8 @@
 // automatically generated by the FlatBuffers compiler, do not modify
 
 
-#pragma once
+#ifndef FLATBUFFERS_GENERATED_FILE_ORG_APACHE_ARROW_FLATBUF_H_
+#define FLATBUFFERS_GENERATED_FILE_ORG_APACHE_ARROW_FLATBUF_H_
 
 #include "flatbuffers/flatbuffers.h"
 
@@ -196,3 +197,4 @@ inline void FinishSizePrefixedFooterBuffer(
 }  // namespace apache
 }  // namespace org
 
+#endif  // FLATBUFFERS_GENERATED_FILE_ORG_APACHE_ARROW_FLATBUF_H_
diff --git a/cpp/src/generated/Message_generated.h b/cpp/src/generated/Message_generated.h
index 3dbb8c9..822bec9 100644
--- a/cpp/src/generated/Message_generated.h
+++ b/cpp/src/generated/Message_generated.h
@@ -1,7 +1,8 @@
 // automatically generated by the FlatBuffers compiler, do not modify
 
 
-#pragma once
+#ifndef FLATBUFFERS_GENERATED_MESSAGE_ORG_APACHE_ARROW_FLATBUF_H_
+#define FLATBUFFERS_GENERATED_MESSAGE_ORG_APACHE_ARROW_FLATBUF_H_
 
 #include "flatbuffers/flatbuffers.h"
 
@@ -16,6 +17,9 @@ namespace flatbuf {
 
 struct FieldNode;
 
+struct BodyCompression;
+struct BodyCompressionBuilder;
+
 struct RecordBatch;
 struct RecordBatchBuilder;
 
@@ -25,6 +29,73 @@ struct DictionaryBatchBuilder;
 struct Message;
 struct MessageBuilder;
 
+enum class CompressionType : int8_t {
+  LZ4_FRAME = 0,
+  ZSTD = 1,
+  MIN = LZ4_FRAME,
+  MAX = ZSTD
+};
+
+inline const CompressionType (&EnumValuesCompressionType())[2] {
+  static const CompressionType values[] = {
+    CompressionType::LZ4_FRAME,
+    CompressionType::ZSTD
+  };
+  return values;
+}
+
+inline const char * const *EnumNamesCompressionType() {
+  static const char * const names[3] = {
+    "LZ4_FRAME",
+    "ZSTD",
+    nullptr
+  };
+  return names;
+}
+
+inline const char *EnumNameCompressionType(CompressionType e) {
+  if (flatbuffers::IsOutRange(e, CompressionType::LZ4_FRAME, CompressionType::ZSTD)) return "";
+  const size_t index = static_cast<size_t>(e);
+  return EnumNamesCompressionType()[index];
+}
+
+/// Provided for forward compatibility in case we need to support different
+/// strategies for compressing the IPC message body (like whole-body
+/// compression rather than buffer-level) in the future
+enum class BodyCompressionMethod : int8_t {
+  /// Each constituent buffer is first compressed with the indicated
+  /// compressor, and then written with the uncompressed length in the first 8
+  /// bytes as a 64-bit little-endian signed integer followed by the compressed
+  /// buffer bytes (and then padding as required by the protocol). The
+  /// uncompressed length may be set to -1 to indicate that the data that
+  /// follows is not compressed, which can be useful for cases where
+  /// compression does not yield appreciable savings.
+  BUFFER = 0,
+  MIN = BUFFER,
+  MAX = BUFFER
+};
+
+inline const BodyCompressionMethod (&EnumValuesBodyCompressionMethod())[1] {
+  static const BodyCompressionMethod values[] = {
+    BodyCompressionMethod::BUFFER
+  };
+  return values;
+}
+
+inline const char * const *EnumNamesBodyCompressionMethod() {
+  static const char * const names[2] = {
+    "BUFFER",
+    nullptr
+  };
+  return names;
+}
+
+inline const char *EnumNameBodyCompressionMethod(BodyCompressionMethod e) {
+  if (flatbuffers::IsOutRange(e, BodyCompressionMethod::BUFFER, BodyCompressionMethod::BUFFER)) return "";
+  const size_t index = static_cast<size_t>(e);
+  return EnumNamesBodyCompressionMethod()[index];
+}
+
 /// ----------------------------------------------------------------------
 /// The root Message type
 /// This union enables us to easily send different message types without
@@ -138,6 +209,63 @@ FLATBUFFERS_MANUALLY_ALIGNED_STRUCT(8) FieldNode FLATBUFFERS_FINAL_CLASS {
 };
 FLATBUFFERS_STRUCT_END(FieldNode, 16);
 
+/// Optional compression for the memory buffers constituting IPC message
+/// bodies. Intended for use with RecordBatch but could be used for other
+/// message types
+struct BodyCompression FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
+  typedef BodyCompressionBuilder Builder;
+  enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
+    VT_CODEC = 4,
+    VT_METHOD = 6
+  };
+  /// Compressor library
+  org::apache::arrow::flatbuf::CompressionType codec() const {
+    return static_cast<org::apache::arrow::flatbuf::CompressionType>(GetField<int8_t>(VT_CODEC, 0));
+  }
+  /// Indicates the way the record batch body was compressed
+  org::apache::arrow::flatbuf::BodyCompressionMethod method() const {
+    return static_cast<org::apache::arrow::flatbuf::BodyCompressionMethod>(GetField<int8_t>(VT_METHOD, 0));
+  }
+  bool Verify(flatbuffers::Verifier &verifier) const {
+    return VerifyTableStart(verifier) &&
+           VerifyField<int8_t>(verifier, VT_CODEC) &&
+           VerifyField<int8_t>(verifier, VT_METHOD) &&
+           verifier.EndTable();
+  }
+};
+
+struct BodyCompressionBuilder {
+  typedef BodyCompression Table;
+  flatbuffers::FlatBufferBuilder &fbb_;
+  flatbuffers::uoffset_t start_;
+  void add_codec(org::apache::arrow::flatbuf::CompressionType codec) {
+    fbb_.AddElement<int8_t>(BodyCompression::VT_CODEC, static_cast<int8_t>(codec), 0);
+  }
+  void add_method(org::apache::arrow::flatbuf::BodyCompressionMethod method) {
+    fbb_.AddElement<int8_t>(BodyCompression::VT_METHOD, static_cast<int8_t>(method), 0);
+  }
+  explicit BodyCompressionBuilder(flatbuffers::FlatBufferBuilder &_fbb)
+        : fbb_(_fbb) {
+    start_ = fbb_.StartTable();
+  }
+  BodyCompressionBuilder &operator=(const BodyCompressionBuilder &);
+  flatbuffers::Offset<BodyCompression> Finish() {
+    const auto end = fbb_.EndTable(start_);
+    auto o = flatbuffers::Offset<BodyCompression>(end);
+    return o;
+  }
+};
+
+inline flatbuffers::Offset<BodyCompression> CreateBodyCompression(
+    flatbuffers::FlatBufferBuilder &_fbb,
+    org::apache::arrow::flatbuf::CompressionType codec = org::apache::arrow::flatbuf::CompressionType::LZ4_FRAME,
+    org::apache::arrow::flatbuf::BodyCompressionMethod method = org::apache::arrow::flatbuf::BodyCompressionMethod::BUFFER) {
+  BodyCompressionBuilder builder_(_fbb);
+  builder_.add_method(method);
+  builder_.add_codec(codec);
+  return builder_.Finish();
+}
+
 /// A data header describing the shared memory layout of a "record" or "row"
 /// batch. Some systems call this a "row batch" internally and others a "record
 /// batch".
@@ -146,7 +274,8 @@ struct RecordBatch FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
   enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
     VT_LENGTH = 4,
     VT_NODES = 6,
-    VT_BUFFERS = 8
+    VT_BUFFERS = 8,
+    VT_COMPRESSION = 10
   };
   /// number of records / rows. The arrays in the batch should all have this
   /// length
@@ -166,6 +295,10 @@ struct RecordBatch FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
   const flatbuffers::Vector<const org::apache::arrow::flatbuf::Buffer *> *buffers() const {
     return GetPointer<const flatbuffers::Vector<const org::apache::arrow::flatbuf::Buffer *> *>(VT_BUFFERS);
   }
+  /// Optional compression of the message body
+  const org::apache::arrow::flatbuf::BodyCompression *compression() const {
+    return GetPointer<const org::apache::arrow::flatbuf::BodyCompression *>(VT_COMPRESSION);
+  }
   bool Verify(flatbuffers::Verifier &verifier) const {
     return VerifyTableStart(verifier) &&
            VerifyField<int64_t>(verifier, VT_LENGTH) &&
@@ -173,6 +306,8 @@ struct RecordBatch FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
            verifier.VerifyVector(nodes()) &&
            VerifyOffset(verifier, VT_BUFFERS) &&
            verifier.VerifyVector(buffers()) &&
+           VerifyOffset(verifier, VT_COMPRESSION) &&
+           verifier.VerifyTable(compression()) &&
            verifier.EndTable();
   }
 };
@@ -190,6 +325,9 @@ struct RecordBatchBuilder {
   void add_buffers(flatbuffers::Offset<flatbuffers::Vector<const org::apache::arrow::flatbuf::Buffer *>> buffers) {
     fbb_.AddOffset(RecordBatch::VT_BUFFERS, buffers);
   }
+  void add_compression(flatbuffers::Offset<org::apache::arrow::flatbuf::BodyCompression> compression) {
+    fbb_.AddOffset(RecordBatch::VT_COMPRESSION, compression);
+  }
   explicit RecordBatchBuilder(flatbuffers::FlatBufferBuilder &_fbb)
         : fbb_(_fbb) {
     start_ = fbb_.StartTable();
@@ -206,9 +344,11 @@ inline flatbuffers::Offset<RecordBatch> CreateRecordBatch(
     flatbuffers::FlatBufferBuilder &_fbb,
     int64_t length = 0,
     flatbuffers::Offset<flatbuffers::Vector<const org::apache::arrow::flatbuf::FieldNode *>> nodes = 0,
-    flatbuffers::Offset<flatbuffers::Vector<const org::apache::arrow::flatbuf::Buffer *>> buffers = 0) {
+    flatbuffers::Offset<flatbuffers::Vector<const org::apache::arrow::flatbuf::Buffer *>> buffers = 0,
+    flatbuffers::Offset<org::apache::arrow::flatbuf::BodyCompression> compression = 0) {
   RecordBatchBuilder builder_(_fbb);
   builder_.add_length(length);
+  builder_.add_compression(compression);
   builder_.add_buffers(buffers);
   builder_.add_nodes(nodes);
   return builder_.Finish();
@@ -218,14 +358,16 @@ inline flatbuffers::Offset<RecordBatch> CreateRecordBatchDirect(
     flatbuffers::FlatBufferBuilder &_fbb,
     int64_t length = 0,
     const std::vector<org::apache::arrow::flatbuf::FieldNode> *nodes = nullptr,
-    const std::vector<org::apache::arrow::flatbuf::Buffer> *buffers = nullptr) {
+    const std::vector<org::apache::arrow::flatbuf::Buffer> *buffers = nullptr,
+    flatbuffers::Offset<org::apache::arrow::flatbuf::BodyCompression> compression = 0) {
   auto nodes__ = nodes ? _fbb.CreateVectorOfStructs<org::apache::arrow::flatbuf::FieldNode>(*nodes) : 0;
   auto buffers__ = buffers ? _fbb.CreateVectorOfStructs<org::apache::arrow::flatbuf::Buffer>(*buffers) : 0;
   return org::apache::arrow::flatbuf::CreateRecordBatch(
       _fbb,
       length,
       nodes__,
-      buffers__);
+      buffers__,
+      compression);
 }
 
 /// For sending dictionary encoding information. Any Field can be
@@ -514,3 +656,4 @@ inline void FinishSizePrefixedMessageBuffer(
 }  // namespace apache
 }  // namespace org
 
+#endif  // FLATBUFFERS_GENERATED_MESSAGE_ORG_APACHE_ARROW_FLATBUF_H_
diff --git a/cpp/src/generated/Schema_generated.h b/cpp/src/generated/Schema_generated.h
index e921b61..6e7e58d 100644
--- a/cpp/src/generated/Schema_generated.h
+++ b/cpp/src/generated/Schema_generated.h
@@ -1,7 +1,8 @@
 // automatically generated by the FlatBuffers compiler, do not modify
 
 
-#pragma once
+#ifndef FLATBUFFERS_GENERATED_SCHEMA_ORG_APACHE_ARROW_FLATBUF_H_
+#define FLATBUFFERS_GENERATED_SCHEMA_ORG_APACHE_ARROW_FLATBUF_H_
 
 #include "flatbuffers/flatbuffers.h"
 
@@ -2159,3 +2160,4 @@ inline void FinishSizePrefixedSchemaBuffer(
 }  // namespace apache
 }  // namespace org
 
+#endif  // FLATBUFFERS_GENERATED_SCHEMA_ORG_APACHE_ARROW_FLATBUF_H_
diff --git a/cpp/src/generated/SparseTensor_generated.h b/cpp/src/generated/SparseTensor_generated.h
index 9c00326..a9bc88b 100644
--- a/cpp/src/generated/SparseTensor_generated.h
+++ b/cpp/src/generated/SparseTensor_generated.h
@@ -1,7 +1,8 @@
 // automatically generated by the FlatBuffers compiler, do not modify
 
 
-#pragma once
+#ifndef FLATBUFFERS_GENERATED_SPARSETENSOR_ORG_APACHE_ARROW_FLATBUF_H_
+#define FLATBUFFERS_GENERATED_SPARSETENSOR_ORG_APACHE_ARROW_FLATBUF_H_
 
 #include "flatbuffers/flatbuffers.h"
 
@@ -103,7 +104,7 @@ template<> struct SparseTensorIndexTraits<org::apache::arrow::flatbuf::SparseMat
   static const SparseTensorIndex enum_value = SparseTensorIndex::SparseMatrixIndexCSX;
 };
 
-template<> struct SparseTensorIndexTraits<SparseTensorIndexCSF> {
+template<> struct SparseTensorIndexTraits<org::apache::arrow::flatbuf::SparseTensorIndexCSF> {
   static const SparseTensorIndex enum_value = SparseTensorIndex::SparseTensorIndexCSF;
 };
 
@@ -348,6 +349,7 @@ inline flatbuffers::Offset<SparseMatrixIndexCSX> CreateSparseMatrixIndexCSX(
 
 /// Compressed Sparse Fiber (CSF) sparse tensor index.
 struct SparseTensorIndexCSF FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
+  typedef SparseTensorIndexCSFBuilder Builder;
   enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
     VT_INDPTRTYPE = 4,
     VT_INDPTRBUFFERS = 6,
@@ -385,8 +387,8 @@ struct SparseTensorIndexCSF FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table
   ///    /|  /|   |    /| |
   ///   1 2 0 2   0   0 1 2
   /// The type of values in indptrBuffers
-  const Int *indptrType() const {
-    return GetPointer<const Int *>(VT_INDPTRTYPE);
+  const org::apache::arrow::flatbuf::Int *indptrType() const {
+    return GetPointer<const org::apache::arrow::flatbuf::Int *>(VT_INDPTRTYPE);
   }
   /// indptrBuffers stores the sparsity structure.
   /// Each two consecutive dimensions in a tensor correspond to a buffer in
@@ -402,12 +404,12 @@ struct SparseTensorIndexCSF FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table
   ///                       [0, 2, 4, 5, 8]
   ///                     ].
   ///
-  const flatbuffers::Vector<const Buffer *> *indptrBuffers() const {
-    return GetPointer<const flatbuffers::Vector<const Buffer *> *>(VT_INDPTRBUFFERS);
+  const flatbuffers::Vector<const org::apache::arrow::flatbuf::Buffer *> *indptrBuffers() const {
+    return GetPointer<const flatbuffers::Vector<const org::apache::arrow::flatbuf::Buffer *> *>(VT_INDPTRBUFFERS);
   }
   /// The type of values in indicesBuffers
-  const Int *indicesType() const {
-    return GetPointer<const Int *>(VT_INDICESTYPE);
+  const org::apache::arrow::flatbuf::Int *indicesType() const {
+    return GetPointer<const org::apache::arrow::flatbuf::Int *>(VT_INDICESTYPE);
   }
   /// indicesBuffers stores values of nodes.
   /// Each tensor dimension corresponds to a buffer in indicesBuffers.
@@ -420,8 +422,8 @@ struct SparseTensorIndexCSF FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table
   ///                        [1, 2, 0, 2, 0, 0, 1, 2]
   ///                      ].
   ///
-  const flatbuffers::Vector<const Buffer *> *indicesBuffers() const {
-    return GetPointer<const flatbuffers::Vector<const Buffer *> *>(VT_INDICESBUFFERS);
+  const flatbuffers::Vector<const org::apache::arrow::flatbuf::Buffer *> *indicesBuffers() const {
+    return GetPointer<const flatbuffers::Vector<const org::apache::arrow::flatbuf::Buffer *> *>(VT_INDICESBUFFERS);
   }
   /// axisOrder stores the sequence in which dimensions were traversed to
   /// produce the prefix tree.
@@ -452,16 +454,16 @@ struct SparseTensorIndexCSFBuilder {
   typedef SparseTensorIndexCSF Table;
   flatbuffers::FlatBufferBuilder &fbb_;
   flatbuffers::uoffset_t start_;
-  void add_indptrType(flatbuffers::Offset<Int> indptrType) {
+  void add_indptrType(flatbuffers::Offset<org::apache::arrow::flatbuf::Int> indptrType) {
     fbb_.AddOffset(SparseTensorIndexCSF::VT_INDPTRTYPE, indptrType);
   }
-  void add_indptrBuffers(flatbuffers::Offset<flatbuffers::Vector<const Buffer *>> indptrBuffers) {
+  void add_indptrBuffers(flatbuffers::Offset<flatbuffers::Vector<const org::apache::arrow::flatbuf::Buffer *>> indptrBuffers) {
     fbb_.AddOffset(SparseTensorIndexCSF::VT_INDPTRBUFFERS, indptrBuffers);
   }
-  void add_indicesType(flatbuffers::Offset<Int> indicesType) {
+  void add_indicesType(flatbuffers::Offset<org::apache::arrow::flatbuf::Int> indicesType) {
     fbb_.AddOffset(SparseTensorIndexCSF::VT_INDICESTYPE, indicesType);
   }
-  void add_indicesBuffers(flatbuffers::Offset<flatbuffers::Vector<const Buffer *>> indicesBuffers) {
+  void add_indicesBuffers(flatbuffers::Offset<flatbuffers::Vector<const org::apache::arrow::flatbuf::Buffer *>> indicesBuffers) {
     fbb_.AddOffset(SparseTensorIndexCSF::VT_INDICESBUFFERS, indicesBuffers);
   }
   void add_axisOrder(flatbuffers::Offset<flatbuffers::Vector<int32_t>> axisOrder) {
@@ -486,10 +488,10 @@ struct SparseTensorIndexCSFBuilder {
 
 inline flatbuffers::Offset<SparseTensorIndexCSF> CreateSparseTensorIndexCSF(
     flatbuffers::FlatBufferBuilder &_fbb,
-    flatbuffers::Offset<Int> indptrType = 0,
-    flatbuffers::Offset<flatbuffers::Vector<const Buffer *>> indptrBuffers = 0,
-    flatbuffers::Offset<Int> indicesType = 0,
-    flatbuffers::Offset<flatbuffers::Vector<const Buffer *>> indicesBuffers = 0,
+    flatbuffers::Offset<org::apache::arrow::flatbuf::Int> indptrType = 0,
+    flatbuffers::Offset<flatbuffers::Vector<const org::apache::arrow::flatbuf::Buffer *>> indptrBuffers = 0,
+    flatbuffers::Offset<org::apache::arrow::flatbuf::Int> indicesType = 0,
+    flatbuffers::Offset<flatbuffers::Vector<const org::apache::arrow::flatbuf::Buffer *>> indicesBuffers = 0,
     flatbuffers::Offset<flatbuffers::Vector<int32_t>> axisOrder = 0) {
   SparseTensorIndexCSFBuilder builder_(_fbb);
   builder_.add_axisOrder(axisOrder);
@@ -502,13 +504,13 @@ inline flatbuffers::Offset<SparseTensorIndexCSF> CreateSparseTensorIndexCSF(
 
 inline flatbuffers::Offset<SparseTensorIndexCSF> CreateSparseTensorIndexCSFDirect(
     flatbuffers::FlatBufferBuilder &_fbb,
-    flatbuffers::Offset<Int> indptrType = 0,
-    const std::vector<Buffer> *indptrBuffers = nullptr,
-    flatbuffers::Offset<Int> indicesType = 0,
-    const std::vector<Buffer> *indicesBuffers = nullptr,
+    flatbuffers::Offset<org::apache::arrow::flatbuf::Int> indptrType = 0,
+    const std::vector<org::apache::arrow::flatbuf::Buffer> *indptrBuffers = nullptr,
+    flatbuffers::Offset<org::apache::arrow::flatbuf::Int> indicesType = 0,
+    const std::vector<org::apache::arrow::flatbuf::Buffer> *indicesBuffers = nullptr,
     const std::vector<int32_t> *axisOrder = nullptr) {
-  auto indptrBuffers__ = indptrBuffers ? _fbb.CreateVectorOfStructs<Buffer>(*indptrBuffers) : 0;
-  auto indicesBuffers__ = indicesBuffers ? _fbb.CreateVectorOfStructs<Buffer>(*indicesBuffers) : 0;
+  auto indptrBuffers__ = indptrBuffers ? _fbb.CreateVectorOfStructs<org::apache::arrow::flatbuf::Buffer>(*indptrBuffers) : 0;
+  auto indicesBuffers__ = indicesBuffers ? _fbb.CreateVectorOfStructs<org::apache::arrow::flatbuf::Buffer>(*indicesBuffers) : 0;
   auto axisOrder__ = axisOrder ? _fbb.CreateVector<int32_t>(*axisOrder) : 0;
   return org::apache::arrow::flatbuf::CreateSparseTensorIndexCSF(
       _fbb,
@@ -625,8 +627,8 @@ struct SparseTensor FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
   const org::apache::arrow::flatbuf::SparseMatrixIndexCSX *sparseIndex_as_SparseMatrixIndexCSX() const {
     return sparseIndex_type() == org::apache::arrow::flatbuf::SparseTensorIndex::SparseMatrixIndexCSX ? static_cast<const org::apache::arrow::flatbuf::SparseMatrixIndexCSX *>(sparseIndex()) : nullptr;
   }
-  const SparseTensorIndexCSF *sparseIndex_as_SparseTensorIndexCSF() const {
-    return sparseIndex_type() == SparseTensorIndex::SparseTensorIndexCSF ? static_cast<const SparseTensorIndexCSF *>(sparseIndex()) : nullptr;
+  const org::apache::arrow::flatbuf::SparseTensorIndexCSF *sparseIndex_as_SparseTensorIndexCSF() const {
+    return sparseIndex_type() == org::apache::arrow::flatbuf::SparseTensorIndex::SparseTensorIndexCSF ? static_cast<const org::apache::arrow::flatbuf::SparseTensorIndexCSF *>(sparseIndex()) : nullptr;
   }
   /// The location and size of the tensor's data
   const org::apache::arrow::flatbuf::Buffer *data() const {
@@ -741,7 +743,7 @@ template<> inline const org::apache::arrow::flatbuf::SparseMatrixIndexCSX *Spars
   return sparseIndex_as_SparseMatrixIndexCSX();
 }
 
-template<> inline const SparseTensorIndexCSF *SparseTensor::sparseIndex_as<SparseTensorIndexCSF>() const {
+template<> inline const org::apache::arrow::flatbuf::SparseTensorIndexCSF *SparseTensor::sparseIndex_as<org::apache::arrow::flatbuf::SparseTensorIndexCSF>() const {
   return sparseIndex_as_SparseTensorIndexCSF();
 }
 
@@ -841,7 +843,7 @@ inline bool VerifySparseTensorIndex(flatbuffers::Verifier &verifier, const void
       return verifier.VerifyTable(ptr);
     }
     case SparseTensorIndex::SparseTensorIndexCSF: {
-      auto ptr = reinterpret_cast<const SparseTensorIndexCSF *>(obj);
+      auto ptr = reinterpret_cast<const org::apache::arrow::flatbuf::SparseTensorIndexCSF *>(obj);
       return verifier.VerifyTable(ptr);
     }
     default: return true;
@@ -895,3 +897,4 @@ inline void FinishSizePrefixedSparseTensorBuffer(
 }  // namespace apache
 }  // namespace org
 
+#endif  // FLATBUFFERS_GENERATED_SPARSETENSOR_ORG_APACHE_ARROW_FLATBUF_H_
diff --git a/cpp/src/generated/Tensor_generated.h b/cpp/src/generated/Tensor_generated.h
index 67bee86..062a3b9 100644
--- a/cpp/src/generated/Tensor_generated.h
+++ b/cpp/src/generated/Tensor_generated.h
@@ -1,7 +1,8 @@
 // automatically generated by the FlatBuffers compiler, do not modify
 
 
-#pragma once
+#ifndef FLATBUFFERS_GENERATED_TENSOR_ORG_APACHE_ARROW_FLATBUF_H_
+#define FLATBUFFERS_GENERATED_TENSOR_ORG_APACHE_ARROW_FLATBUF_H_
 
 #include "flatbuffers/flatbuffers.h"
 
@@ -383,3 +384,4 @@ inline void FinishSizePrefixedTensorBuffer(
 }  // namespace apache
 }  // namespace org
 
+#endif  // FLATBUFFERS_GENERATED_TENSOR_ORG_APACHE_ARROW_FLATBUF_H_
diff --git a/cpp/src/generated/feather_generated.h b/cpp/src/generated/feather_generated.h
index bd01c5f..b925eb2 100644
--- a/cpp/src/generated/feather_generated.h
+++ b/cpp/src/generated/feather_generated.h
@@ -1,7 +1,8 @@
 // automatically generated by the FlatBuffers compiler, do not modify
 
 
-#pragma once
+#ifndef FLATBUFFERS_GENERATED_FEATHER_ARROW_IPC_FEATHER_FBS_H_
+#define FLATBUFFERS_GENERATED_FEATHER_ARROW_IPC_FEATHER_FBS_H_
 
 #include "flatbuffers/flatbuffers.h"
 
@@ -696,6 +697,9 @@ struct CTable FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
     return GetPointer<const flatbuffers::Vector<flatbuffers::Offset<arrow::ipc::feather::fbs::Column>> *>(VT_COLUMNS);
   }
   /// Version number of the Feather format
+  ///
+  /// Internal versions 0, 1, and 2: Implemented in Apache Arrow <= 0.16.0 and
+  /// wesm/feather. Uses "custom" metadata defined in this file.
   int32_t version() const {
     return GetField<int32_t>(VT_VERSION, 0);
   }
@@ -856,3 +860,4 @@ inline void FinishSizePrefixedCTableBuffer(
 }  // namespace ipc
 }  // namespace arrow
 
+#endif  // FLATBUFFERS_GENERATED_FEATHER_ARROW_IPC_FEATHER_FBS_H_
diff --git a/cpp/src/plasma/common_generated.h b/cpp/src/plasma/common_generated.h
index 601c9ec..ba9ef6e 100644
--- a/cpp/src/plasma/common_generated.h
+++ b/cpp/src/plasma/common_generated.h
@@ -1,7 +1,8 @@
 // automatically generated by the FlatBuffers compiler, do not modify
 
 
-#pragma once
+#ifndef FLATBUFFERS_GENERATED_COMMON_PLASMA_FLATBUF_H_
+#define FLATBUFFERS_GENERATED_COMMON_PLASMA_FLATBUF_H_
 
 #include "flatbuffers/flatbuffers.h"
 
@@ -226,3 +227,4 @@ inline flatbuffers::Offset<ObjectInfo> CreateObjectInfo(flatbuffers::FlatBufferB
 }  // namespace flatbuf
 }  // namespace plasma
 
+#endif  // FLATBUFFERS_GENERATED_COMMON_PLASMA_FLATBUF_H_
diff --git a/cpp/src/plasma/plasma_generated.h b/cpp/src/plasma/plasma_generated.h
index 43f5cfe..340f043 100644
--- a/cpp/src/plasma/plasma_generated.h
+++ b/cpp/src/plasma/plasma_generated.h
@@ -1,7 +1,8 @@
 // automatically generated by the FlatBuffers compiler, do not modify
 
 
-#pragma once
+#ifndef FLATBUFFERS_GENERATED_PLASMA_PLASMA_FLATBUF_H_
+#define FLATBUFFERS_GENERATED_PLASMA_PLASMA_FLATBUF_H_
 
 #include "flatbuffers/flatbuffers.h"
 
@@ -3980,3 +3981,4 @@ inline flatbuffers::Offset<PlasmaRefreshLRUReply> CreatePlasmaRefreshLRUReply(fl
 }  // namespace flatbuf
 }  // namespace plasma
 
+#endif  // FLATBUFFERS_GENERATED_PLASMA_PLASMA_FLATBUF_H_
diff --git a/format/Message.fbs b/format/Message.fbs
index 7e8e677..1a7e0df 100644
--- a/format/Message.fbs
+++ b/format/Message.fbs
@@ -42,6 +42,41 @@ struct FieldNode {
   null_count: long;
 }
 
+enum CompressionType:byte {
+  // LZ4 frame format, for portability, as provided by lz4frame.h or wrappers
+  // thereof. Not to be confused with "raw" (also called "block") format
+  // provided by lz4.h
+  LZ4_FRAME,
+
+  // Zstandard
+  ZSTD
+}
+
+/// Provided for forward compatibility in case we need to support different
+/// strategies for compressing the IPC message body (like whole-body
+/// compression rather than buffer-level) in the future
+enum BodyCompressionMethod:byte {
+  /// Each constituent buffer is first compressed with the indicated
+  /// compressor, and then written with the uncompressed length in the first 8
+  /// bytes as a 64-bit little-endian signed integer followed by the compressed
+  /// buffer bytes (and then padding as required by the protocol). The
+  /// uncompressed length may be set to -1 to indicate that the data that
+  /// follows is not compressed, which can be useful for cases where
+  /// compression does not yield appreciable savings.
+  BUFFER
+}
+
+/// Optional compression for the memory buffers constituting IPC message
+/// bodies. Intended for use with RecordBatch but could be used for other
+/// message types
+table BodyCompression {
+  /// Compressor library
+  codec: CompressionType = LZ4_FRAME;
+
+  /// Indicates the way the record batch body was compressed
+  method: BodyCompressionMethod = BUFFER;
+}
+
 /// A data header describing the shared memory layout of a "record" or "row"
 /// batch. Some systems call this a "row batch" internally and others a "record
 /// batch".
@@ -60,6 +95,9 @@ table RecordBatch {
   /// bitmap and 1 for the values. For struct arrays, there will only be a
   /// single buffer for the validity (nulls) bitmap
   buffers: [Buffer];
+
+  /// Optional compression of the message body
+  compression: BodyCompression;
 }
 
 /// For sending dictionary encoding information. Any Field can be