You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/09/11 22:07:34 UTC
[arrow] 01/03: ARROW-6314: [C++] Implement IPC message format
alignment changes,
provide backwards compatibility and "legacy" option to emit old message
format
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch ARROW-6313-flatbuffer-alignment
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 9514e2e225cdb84da09476cb5707dbb9443239eb
Author: Wes McKinney <we...@apache.org>
AuthorDate: Wed Aug 28 11:58:52 2019 -0500
ARROW-6314: [C++] Implement IPC message format alignment changes, provide backwards compatibility and "legacy" option to emit old message format
This also moves the alignment multiple to `IpcOptions` and adds the `IpcOptions` argument to more functions.
Closes #5211 from wesm/ARROW-6314 and squashes the following commits:
df3b910bd <Wes McKinney> Fix MSVC narrowing warning
62758b614 <Wes McKinney> Code review comments. Copy metadata always in prefix_length==4 legacy case
857a57155 <Antoine Pitrou> Fix CUDA IPC
71b2fad4f <Wes McKinney> Add tests exercising backwards compatibility write and read path
4777d2bc8 <Wes McKinney> Implement backwards compatibility and compatibility mode, pass IpcOptions in more APIs
1a3843215 <Wes McKinney> Revert changes to submodule
69883cf63 <Micah Kornfield> verify 8 bytes alignment fixes ubsan for ipc
Lead-authored-by: Wes McKinney <we...@apache.org>
Co-authored-by: Antoine Pitrou <an...@python.org>
Co-authored-by: Micah Kornfield <em...@gmail.com>
Signed-off-by: Wes McKinney <we...@apache.org>
---
cpp/src/arrow/gpu/cuda_arrow_ipc.cc | 26 +-----
cpp/src/arrow/ipc/message.cc | 154 ++++++++++++++++++++++++++++-----
cpp/src/arrow/ipc/message.h | 47 +++++++---
cpp/src/arrow/ipc/metadata_internal.cc | 32 -------
cpp/src/arrow/ipc/metadata_internal.h | 16 ----
cpp/src/arrow/ipc/options.h | 8 ++
cpp/src/arrow/ipc/read_write_test.cc | 119 ++++++++++++++++++-------
cpp/src/arrow/ipc/writer.cc | 69 +++++++--------
cpp/src/arrow/ipc/writer.h | 22 ++---
python/pyarrow/includes/libarrow.pxd | 7 +-
python/pyarrow/ipc.pxi | 5 +-
11 files changed, 309 insertions(+), 196 deletions(-)
diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
index 34488a1..0fb81bc 100644
--- a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
+++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
@@ -63,31 +63,7 @@ Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx,
Status ReadMessage(CudaBufferReader* reader, MemoryPool* pool,
std::unique_ptr<ipc::Message>* out) {
- int32_t message_length = 0;
- int64_t bytes_read = 0;
-
- RETURN_NOT_OK(reader->Read(sizeof(int32_t), &bytes_read,
- reinterpret_cast<uint8_t*>(&message_length)));
- if (bytes_read != sizeof(int32_t)) {
- *out = nullptr;
- return Status::OK();
- }
-
- if (message_length == 0) {
- // Optional 0 EOS control message
- *out = nullptr;
- return Status::OK();
- }
-
- std::shared_ptr<Buffer> metadata;
- RETURN_NOT_OK(AllocateBuffer(pool, message_length, &metadata));
- RETURN_NOT_OK(reader->Read(message_length, &bytes_read, metadata->mutable_data()));
- if (bytes_read != message_length) {
- return Status::IOError("Expected ", message_length, " metadata bytes, but only got ",
- bytes_read);
- }
-
- return ipc::Message::ReadFrom(metadata, reader, out);
+ return ipc::ReadMessageCopy(reader, pool, out);
}
Status ReadRecordBatch(const std::shared_ptr<Schema>& schema,
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index dad1f98..a281b0d 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -32,10 +32,14 @@
#include "arrow/ipc/util.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"
+#include "arrow/util/ubsan.h"
namespace arrow {
namespace ipc {
+// This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message
+constexpr int32_t kIpcContinuationToken = -1;
+
class Message::MessageImpl {
public:
explicit MessageImpl(const std::shared_ptr<Buffer>& metadata,
@@ -142,12 +146,19 @@ bool Message::Equals(const Message& other) const {
}
}
-Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
- std::unique_ptr<Message>* out) {
+Status CheckMetadataAndGetBodyLength(const Buffer& metadata, int64_t* body_length) {
+ // Check metadata memory alignment in debug builds
+ DCHECK_EQ(0, reinterpret_cast<uintptr_t>(metadata.data()) % 8);
const flatbuf::Message* fb_message;
- RETURN_NOT_OK(internal::VerifyMessage(metadata->data(), metadata->size(), &fb_message));
+ RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &fb_message));
+ *body_length = fb_message->bodyLength();
+ return Status::OK();
+}
- int64_t body_length = fb_message->bodyLength();
+Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
+ std::unique_ptr<Message>* out) {
+ int64_t body_length = -1;
+ RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));
std::shared_ptr<Buffer> body;
RETURN_NOT_OK(stream->Read(body_length, &body));
@@ -161,9 +172,8 @@ Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStrea
Status Message::ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& metadata,
io::RandomAccessFile* file, std::unique_ptr<Message>* out) {
- const flatbuf::Message* fb_message;
- RETURN_NOT_OK(internal::VerifyMessage(metadata->data(), metadata->size(), &fb_message));
- int64_t body_length = fb_message->bodyLength();
+ int64_t body_length = -1;
+ RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));
std::shared_ptr<Buffer> body;
RETURN_NOT_OK(file->ReadAt(offset, body_length, &body));
@@ -184,10 +194,10 @@ Status WritePadding(io::OutputStream* stream, int64_t nbytes) {
return Status::OK();
}
-Status Message::SerializeTo(io::OutputStream* stream, int32_t alignment,
+Status Message::SerializeTo(io::OutputStream* stream, const IpcOptions& options,
int64_t* output_length) const {
int32_t metadata_length = 0;
- RETURN_NOT_OK(internal::WriteMessage(*metadata(), alignment, stream, &metadata_length));
+ RETURN_NOT_OK(WriteMessage(*metadata(), options, stream, &metadata_length));
*output_length = metadata_length;
@@ -237,15 +247,47 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile
" metadata bytes but got ", buffer->size());
}
- int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
+ const int32_t continuation = util::SafeLoadAs<int32_t>(buffer->data());
+
+ // The size of the Flatbuffer including padding
+ int32_t flatbuffer_length = -1;
+ int32_t prefix_size = -1;
+ if (continuation == kIpcContinuationToken) {
+ if (metadata_length < 8) {
+ return Status::Invalid(
+ "Corrupted IPC message, had continuation token "
+ " but length ",
+ metadata_length);
+ }
- if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) {
- return Status::Invalid("flatbuffer size ", metadata_length,
+ // Valid IPC message, parse the message length now
+ flatbuffer_length = util::SafeLoadAs<int32_t>(buffer->data() + 4);
+ prefix_size = 8;
+ } else if (continuation == 0) {
+ // EOS
+ *message = nullptr;
+ return Status::OK();
+ } else {
+ // ARROW-6314: Backwards compatibility for reading old IPC
+ // messages produced prior to version 0.15.0
+ flatbuffer_length = continuation;
+ prefix_size = 4;
+ }
+
+ if (flatbuffer_length + prefix_size != metadata_length) {
+ return Status::Invalid("flatbuffer size ", flatbuffer_length,
" invalid. File offset: ", offset,
", metadata length: ", metadata_length);
}
- auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4);
+ std::shared_ptr<Buffer> metadata =
+ SliceBuffer(buffer, prefix_size, buffer->size() - prefix_size);
+ if (prefix_size == 4) {
+ // ARROW-6314: For old messages we copy the metadata to fix UBSAN
+ // issues with Flatbuffers. For new messages, they are already
+ // aligned
+ RETURN_NOT_OK(metadata->Copy(0, metadata->size(), &metadata));
+ }
return Message::ReadFrom(offset + metadata_length, metadata, file, message);
}
@@ -269,39 +311,105 @@ Status CheckAligned(io::FileInterface* stream, int32_t alignment) {
int64_t current_position;
ARROW_RETURN_NOT_OK(stream->Tell(¤t_position));
if (current_position % alignment != 0) {
- return Status::Invalid("Stream is not aligned");
+ return Status::Invalid("Stream is not aligned pos: ", current_position,
+ " alignment: ", alignment);
} else {
return Status::OK();
}
}
-Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) {
- int32_t message_length = 0;
+namespace {
+
+Status ReadMessage(io::InputStream* file, MemoryPool* pool, bool copy_metadata,
+ std::unique_ptr<Message>* message) {
+ int32_t continuation = 0;
int64_t bytes_read = 0;
RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
- reinterpret_cast<uint8_t*>(&message_length)));
+ reinterpret_cast<uint8_t*>(&continuation)));
if (bytes_read != sizeof(int32_t)) {
+ // EOS
*message = nullptr;
return Status::OK();
}
- if (message_length == 0) {
- // Optional 0 EOS control message
+ int32_t flatbuffer_length = -1;
+ bool legacy_format = false;
+ if (continuation == kIpcContinuationToken) {
+ // Valid IPC message, read the message length now
+ RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
+ reinterpret_cast<uint8_t*>(&flatbuffer_length)));
+ } else if (continuation == 0) {
+ // EOS
*message = nullptr;
return Status::OK();
+ } else {
+ // ARROW-6314: Backwards compatibility for reading old IPC
+ // messages produced prior to version 0.15.0
+ flatbuffer_length = continuation;
+ legacy_format = true;
}
std::shared_ptr<Buffer> metadata;
- RETURN_NOT_OK(file->Read(message_length, &metadata));
- if (metadata->size() != message_length) {
- return Status::Invalid("Expected to read ", message_length, " metadata bytes, but ",
- "only read ", metadata->size());
+ if (legacy_format || copy_metadata) {
+ DCHECK_NE(pool, nullptr);
+ RETURN_NOT_OK(AllocateBuffer(pool, flatbuffer_length, &metadata));
+ RETURN_NOT_OK(file->Read(flatbuffer_length, &bytes_read, metadata->mutable_data()));
+ } else {
+ RETURN_NOT_OK(file->Read(flatbuffer_length, &metadata));
+ bytes_read = metadata->size();
+ }
+ if (bytes_read != flatbuffer_length) {
+ return Status::Invalid("Expected to read ", flatbuffer_length,
+ " metadata bytes, but ", "only read ", bytes_read);
}
return Message::ReadFrom(metadata, file, message);
}
+} // namespace
+
+Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* out) {
+ return ReadMessage(file, default_memory_pool(), /*copy_metadata=*/false, out);
+}
+
+Status ReadMessageCopy(io::InputStream* file, MemoryPool* pool,
+ std::unique_ptr<Message>* out) {
+ return ReadMessage(file, pool, /*copy_metadata=*/true, out);
+}
+
+Status WriteMessage(const Buffer& message, const IpcOptions& options,
+ io::OutputStream* file, int32_t* message_length) {
+ const int32_t prefix_size = options.write_legacy_ipc_format ? 4 : 8;
+ const int32_t flatbuffer_size = static_cast<int32_t>(message.size());
+
+ int32_t padded_message_length = static_cast<int32_t>(
+ PaddedLength(flatbuffer_size + prefix_size, options.alignment));
+
+ int32_t padding = padded_message_length - flatbuffer_size - prefix_size;
+
+ // The returned message size includes the length prefix, the flatbuffer,
+ // plus padding
+ *message_length = padded_message_length;
+
+ // ARROW-6314: Write continuation / padding token
+ if (!options.write_legacy_ipc_format) {
+ RETURN_NOT_OK(file->Write(&kIpcContinuationToken, sizeof(int32_t)));
+ }
+
+ // Write the flatbuffer size prefix including padding
+ int32_t padded_flatbuffer_size = padded_message_length - prefix_size;
+ RETURN_NOT_OK(file->Write(&padded_flatbuffer_size, sizeof(int32_t)));
+
+ // Write the flatbuffer
+ RETURN_NOT_OK(file->Write(message.data(), flatbuffer_size));
+ if (padding > 0) {
+ RETURN_NOT_OK(file->Write(kPaddingBytes, padding));
+ }
+
+ return Status::OK();
+}
+
// ----------------------------------------------------------------------
// Implement InputStream message reader
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index 9c152d7..89be45e 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -17,8 +17,7 @@
// C++ object model and user API for interprocess schema messaging
-#ifndef ARROW_IPC_MESSAGE_H
-#define ARROW_IPC_MESSAGE_H
+#pragma once
#include <cstdint>
#include <memory>
@@ -32,6 +31,7 @@
namespace arrow {
class Buffer;
+class MemoryPool;
namespace io {
@@ -137,13 +137,10 @@ class ARROW_EXPORT Message {
/// \brief Write length-prefixed metadata and body to output stream
///
/// \param[in] file output stream to write to
- /// \param[in] alignment byte alignment for metadata, usually 8 or
- /// 64. Whether the body is padded depends on the metadata; if the body
- /// buffer is smaller than the size indicated in the metadata, then extra
- /// padding bytes will be written
+ /// \param[in] options IPC writing options including alignment
/// \param[out] output_length the number of bytes written
/// \return Status
- Status SerializeTo(io::OutputStream* file, int32_t alignment,
+ Status SerializeTo(io::OutputStream* file, const IpcOptions& options,
int64_t* output_length) const;
/// \brief Return true if the Message metadata passes Flatbuffer validation
@@ -223,15 +220,39 @@ Status AlignStream(io::OutputStream* stream, int32_t alignment = 8);
ARROW_EXPORT
Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8);
-/// \brief Read encapsulated RPC message (metadata and body) from InputStream
+/// \brief Read encapsulated IPC message (metadata and body) from InputStream
///
-/// Read length-prefixed message with as-yet unknown length. Returns null if
-/// there are not enough bytes available or the message length is 0 (e.g. EOS
-/// in a stream)
+/// Returns null if there are not enough bytes available or the
+/// message length is 0 (e.g. EOS in a stream)
ARROW_EXPORT
Status ReadMessage(io::InputStream* stream, std::unique_ptr<Message>* message);
+/// \brief Read encapsulated IPC message (metadata and body) from InputStream
+///
+/// Like ReadMessage, except that the metadata is copied in a new buffer.
+/// This is necessary if the stream returns non-CPU buffers.
+ARROW_EXPORT
+Status ReadMessageCopy(io::InputStream* stream, MemoryPool* pool,
+ std::unique_ptr<Message>* message);
+
+/// Write encapsulated IPC message Does not make assumptions about
+/// whether the stream is aligned already. Can write legacy (pre
+/// version 0.15.0) IPC message if option set
+///
+/// continuation: 0xFFFFFFFF
+/// message_size: int32
+/// message: const void*
+/// padding
+///
+/// \param[in] message a buffer containing the metadata to write
+/// \param[in] options IPC writing options, including alignment and
+/// legacy message support
+/// \param[in,out] file the OutputStream to write to
+/// \param[out] message_length the total size of the payload written including
+/// padding
+/// \return Status
+Status WriteMessage(const Buffer& message, const IpcOptions& options,
+ io::OutputStream* file, int32_t* message_length);
+
} // namespace ipc
} // namespace arrow
-
-#endif // ARROW_IPC_MESSAGE_H
diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc
index 6810351..dff3369 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -1282,38 +1282,6 @@ Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>
return ConcreteTypeFromFlatbuffer(sparse_tensor->type_type(), type_data, {}, type);
}
-// ----------------------------------------------------------------------
-// Implement message writing
-
-Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file,
- int32_t* message_length) {
- // ARROW-3212: We do not make assumptions that the output stream is aligned
- int32_t padded_message_length = static_cast<int32_t>(message.size()) + 4;
- const int32_t remainder = padded_message_length % alignment;
- if (remainder != 0) {
- padded_message_length += alignment - remainder;
- }
-
- // The returned message size includes the length prefix, the flatbuffer,
- // plus padding
- *message_length = padded_message_length;
-
- // Write the flatbuffer size prefix including padding
- int32_t flatbuffer_size = padded_message_length - 4;
- RETURN_NOT_OK(file->Write(&flatbuffer_size, sizeof(int32_t)));
-
- // Write the flatbuffer
- RETURN_NOT_OK(file->Write(message.data(), message.size()));
-
- // Write any padding
- int32_t padding = padded_message_length - static_cast<int32_t>(message.size()) - 4;
- if (padding > 0) {
- RETURN_NOT_OK(file->Write(kPaddingBytes, padding));
- }
-
- return Status::OK();
-}
-
} // namespace internal
} // namespace ipc
} // namespace arrow
diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h
index 828affd..420cfb1 100644
--- a/cpp/src/arrow/ipc/metadata_internal.h
+++ b/cpp/src/arrow/ipc/metadata_internal.h
@@ -128,22 +128,6 @@ static inline Status VerifyMessage(const uint8_t* data, int64_t size,
return Status::OK();
}
-/// Write a serialized message metadata with a length-prefix and padding to an
-/// 8-byte offset. Does not make assumptions about whether the stream is
-/// aligned already
-///
-/// <message_size: int32><message: const void*><padding>
-///
-/// \param[in] message a buffer containing the metadata to write
-/// \param[in] alignment the size multiple of the total message size including
-/// length prefix, metadata, and padding. Usually 8 or 64
-/// \param[in,out] file the OutputStream to write to
-/// \param[out] message_length the total size of the payload written including
-/// padding
-/// \return Status
-Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file,
- int32_t* message_length);
-
// Serialize arrow::Schema as a Flatbuffer
//
// \param[in] schema a Schema instance
diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h
index d380402..3570c06 100644
--- a/cpp/src/arrow/ipc/options.h
+++ b/cpp/src/arrow/ipc/options.h
@@ -36,6 +36,14 @@ struct ARROW_EXPORT IpcOptions {
// The maximum permitted schema nesting depth.
int max_recursion_depth = kMaxNestingDepth;
+ // Write padding after memory buffers to this multiple of
+ // bytes. Generally 8 or 64
+ int32_t alignment = 8;
+
+ /// \brief Write the pre-0.15.0 encapsulated IPC message format
+ /// consisting of a 4-byte prefix instead of 8 byte
+ bool write_legacy_ipc_format = false;
+
static IpcOptions Defaults();
};
diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index 9cbeacf..efd0a88 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -106,27 +106,69 @@ TEST(TestMessage, SerializeTo) {
std::shared_ptr<io::BufferOutputStream> stream;
- {
- const int32_t alignment = 8;
-
+ auto CheckWithAlignment = [&](int32_t alignment) {
+ IpcOptions options;
+ options.alignment = alignment;
+ const int32_t prefix_size = 8;
ASSERT_OK(io::BufferOutputStream::Create(1 << 10, default_memory_pool(), &stream));
- ASSERT_OK(message->SerializeTo(stream.get(), alignment, &output_length));
+ ASSERT_OK(message->SerializeTo(stream.get(), options, &output_length));
ASSERT_OK(stream->Tell(&position));
- ASSERT_EQ(BitUtil::RoundUp(metadata->size() + 4, alignment) + body_length,
+ ASSERT_EQ(BitUtil::RoundUp(metadata->size() + prefix_size, alignment) + body_length,
output_length);
ASSERT_EQ(output_length, position);
- }
+ };
- {
- const int32_t alignment = 64;
+ CheckWithAlignment(8);
+ CheckWithAlignment(64);
+}
- ASSERT_OK(io::BufferOutputStream::Create(1 << 10, default_memory_pool(), &stream));
- ASSERT_OK(message->SerializeTo(stream.get(), alignment, &output_length));
- ASSERT_OK(stream->Tell(&position));
- ASSERT_EQ(BitUtil::RoundUp(metadata->size() + 4, alignment) + body_length,
- output_length);
- ASSERT_EQ(output_length, position);
- }
+void BuffersOverlapEquals(const Buffer& left, const Buffer& right) {
+ ASSERT_GT(left.size(), 0);
+ ASSERT_GT(right.size(), 0);
+ ASSERT_TRUE(left.Equals(right, std::min(left.size(), right.size())));
+}
+
+TEST(TestMessage, LegacyIpcBackwardsCompatibility) {
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK(MakeIntBatchSized(36, &batch));
+
+ auto RoundtripWithOptions = [&](const IpcOptions& arg_options,
+ std::shared_ptr<Buffer>* out_serialized,
+ std::unique_ptr<Message>* out) {
+ internal::IpcPayload payload;
+ ASSERT_OK(internal::GetRecordBatchPayload(*batch, arg_options, default_memory_pool(),
+ &payload));
+
+ std::shared_ptr<io::BufferOutputStream> stream;
+ ASSERT_OK(io::BufferOutputStream::Create(1 << 20, default_memory_pool(), &stream));
+
+ int32_t metadata_length = -1;
+ ASSERT_OK(
+ internal::WriteIpcPayload(payload, arg_options, stream.get(), &metadata_length));
+
+ ASSERT_OK(stream->Finish(out_serialized));
+ io::BufferReader io_reader(*out_serialized);
+ ASSERT_OK(ReadMessage(&io_reader, out));
+ };
+
+ std::shared_ptr<Buffer> serialized, legacy_serialized;
+ std::unique_ptr<Message> message, legacy_message;
+
+ IpcOptions options;
+ RoundtripWithOptions(options, &serialized, &message);
+
+ // First 4 bytes 0xFFFFFFFF Continuation marker
+ ASSERT_EQ(-1, util::SafeLoadAs<int32_t>(serialized->data()));
+
+ options.write_legacy_ipc_format = true;
+ RoundtripWithOptions(options, &legacy_serialized, &legacy_message);
+
+ // Check that the continuation marker is not written
+ ASSERT_NE(-1, util::SafeLoadAs<int32_t>(legacy_serialized->data()));
+
+ // Have to use the smaller size to exclude padding
+ BuffersOverlapEquals(*legacy_message->metadata(), *message->metadata());
+ ASSERT_TRUE(legacy_message->body()->Equals(*message->body()));
}
TEST(TestMessage, Verify) {
@@ -635,13 +677,14 @@ TEST_F(RecursionLimits, StressLimit) {
#endif // !defined(_WIN32) || defined(NDEBUG)
struct FileWriterHelper {
- Status Init(const std::shared_ptr<Schema>& schema) {
+ Status Init(const std::shared_ptr<Schema>& schema, const IpcOptions& options) {
num_batches_written_ = 0;
RETURN_NOT_OK(AllocateResizableBuffer(0, &buffer_));
sink_.reset(new io::BufferOutputStream(buffer_));
-
- return RecordBatchFileWriter::Open(sink_.get(), schema, &writer_);
+ ARROW_ASSIGN_OR_RAISE(writer_,
+ RecordBatchFileWriter::Open(sink_.get(), schema, options));
+ return Status::OK();
}
Status WriteBatch(const std::shared_ptr<RecordBatch>& batch) {
@@ -680,11 +723,12 @@ struct FileWriterHelper {
};
struct StreamWriterHelper {
- Status Init(const std::shared_ptr<Schema>& schema) {
+ Status Init(const std::shared_ptr<Schema>& schema, const IpcOptions& options) {
RETURN_NOT_OK(AllocateResizableBuffer(0, &buffer_));
sink_.reset(new io::BufferOutputStream(buffer_));
-
- return RecordBatchStreamWriter::Open(sink_.get(), schema, &writer_);
+ ARROW_ASSIGN_OR_RAISE(writer_,
+ RecordBatchStreamWriter::Open(sink_.get(), schema, options));
+ return Status::OK();
}
Status WriteBatch(const std::shared_ptr<RecordBatch>& batch) {
@@ -718,7 +762,7 @@ class ReaderWriterMixin {
// Check simple RecordBatch roundtripping
template <typename Param>
- void TestRoundTrip(Param&& param) {
+ void TestRoundTrip(Param&& param, const IpcOptions& options) {
std::shared_ptr<RecordBatch> batch1;
std::shared_ptr<RecordBatch> batch2;
ASSERT_OK(param(&batch1)); // NOLINT clang-tidy gtest issue
@@ -727,7 +771,7 @@ class ReaderWriterMixin {
BatchVector in_batches = {batch1, batch2};
BatchVector out_batches;
- ASSERT_OK(RoundTripHelper(in_batches, &out_batches));
+ ASSERT_OK(RoundTripHelper(in_batches, options, &out_batches));
ASSERT_EQ(out_batches.size(), in_batches.size());
// Compare batches
@@ -741,7 +785,7 @@ class ReaderWriterMixin {
ASSERT_OK(MakeDictionary(&batch));
BatchVector out_batches;
- ASSERT_OK(RoundTripHelper({batch}, &out_batches));
+ ASSERT_OK(RoundTripHelper({batch}, IpcOptions::Defaults(), &out_batches));
ASSERT_EQ(out_batches.size(), 1);
// TODO(wesm): This was broken in ARROW-3144. I'm not sure how to
@@ -764,7 +808,7 @@ class ReaderWriterMixin {
schema = schema->WithMetadata(key_value_metadata({"some_key"}, {"some_value"}));
WriterHelper writer_helper;
- ASSERT_OK(writer_helper.Init(schema));
+ ASSERT_OK(writer_helper.Init(schema, IpcOptions::Defaults()));
// Writing a record batch with a different schema
ASSERT_RAISES(Invalid, writer_helper.WriteBatch(batch_ints));
// Writing a record batch with the same schema (except metadata)
@@ -781,9 +825,10 @@ class ReaderWriterMixin {
}
private:
- Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) {
+ Status RoundTripHelper(const BatchVector& in_batches, const IpcOptions& options,
+ BatchVector* out_batches) {
WriterHelper writer_helper;
- RETURN_NOT_OK(writer_helper.Init(in_batches[0]->schema()));
+ RETURN_NOT_OK(writer_helper.Init(in_batches[0]->schema(), options));
for (const auto& batch : in_batches) {
RETURN_NOT_OK(writer_helper.WriteBatch(batch));
}
@@ -813,9 +858,21 @@ class TestFileFormat : public ReaderWriterMixin<FileWriterHelper>,
class TestStreamFormat : public ReaderWriterMixin<StreamWriterHelper>,
public ::testing::TestWithParam<MakeRecordBatch*> {};
-TEST_P(TestFileFormat, RoundTrip) { TestRoundTrip(*GetParam()); }
+TEST_P(TestFileFormat, RoundTrip) {
+ TestRoundTrip(*GetParam(), IpcOptions::Defaults());
-TEST_P(TestStreamFormat, RoundTrip) { TestRoundTrip(*GetParam()); }
+ IpcOptions options;
+ options.write_legacy_ipc_format = true;
+ TestRoundTrip(*GetParam(), options);
+}
+
+TEST_P(TestStreamFormat, RoundTrip) {
+ TestRoundTrip(*GetParam(), IpcOptions::Defaults());
+
+ IpcOptions options;
+ options.write_legacy_ipc_format = true;
+ TestRoundTrip(*GetParam(), options);
+}
INSTANTIATE_TEST_CASE_P(GenericIpcRoundTripTests, TestIpcRoundTrip, BATCH_CASES());
INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES());
@@ -912,13 +969,15 @@ void SpliceMessages(std::shared_ptr<Buffer> stream,
continue;
}
+ IpcOptions options;
internal::IpcPayload payload;
payload.type = msg->type();
payload.metadata = msg->metadata();
payload.body_buffers.push_back(msg->body());
payload.body_length = msg->body()->size();
int32_t unused_metadata_length = -1;
- ASSERT_OK(internal::WriteIpcPayload(payload, out.get(), &unused_metadata_length));
+ ASSERT_OK(
+ internal::WriteIpcPayload(payload, options, out.get(), &unused_metadata_length));
}
ASSERT_OK(out->Finish(spliced_stream));
}
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 81332a6..7127300 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -529,10 +529,9 @@ class DictionaryWriter : public RecordBatchSerializer {
int64_t dictionary_id_;
};
-Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
- int32_t* metadata_length) {
- RETURN_NOT_OK(internal::WriteMessage(*payload.metadata, kArrowIpcAlignment, dst,
- metadata_length));
+Status WriteIpcPayload(const IpcPayload& payload, const IpcOptions& options,
+ io::OutputStream* dst, int32_t* metadata_length) {
+ RETURN_NOT_OK(WriteMessage(*payload.metadata, options, dst, metadata_length));
#ifndef NDEBUG
RETURN_NOT_OK(CheckAligned(dst));
@@ -604,7 +603,7 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
// The body size is computed in the payload
*body_length = payload.body_length;
- return internal::WriteIpcPayload(payload, dst, metadata_length);
+ return internal::WriteIpcPayload(payload, options, dst, metadata_length);
}
Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& batches,
@@ -625,7 +624,9 @@ Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst,
int32_t* metadata_length) {
std::shared_ptr<Buffer> metadata;
RETURN_NOT_OK(internal::WriteTensorMessage(tensor, 0, &metadata));
- return internal::WriteMessage(*metadata, kTensorAlignment, dst, metadata_length);
+ IpcOptions options;
+ options.alignment = kTensorAlignment;
+ return WriteMessage(*metadata, options, dst, metadata_length);
}
Status WriteStridedTensorData(int dim_index, int64_t offset, int elem_size,
@@ -818,19 +819,7 @@ Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* ds
RETURN_NOT_OK(writer.Assemble(sparse_tensor));
*body_length = payload.body_length;
- return internal::WriteIpcPayload(payload, dst, metadata_length);
-}
-
-Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
- int64_t buffer_start_offset, io::OutputStream* dst,
- int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) {
- auto options = IpcOptions::Defaults();
- internal::IpcPayload payload;
- RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, options, pool, &payload));
-
- // The body size is computed in the payload
- *body_length = payload.body_length;
- return internal::WriteIpcPayload(payload, dst, metadata_length);
+ return internal::WriteIpcPayload(payload, IpcOptions::Defaults(), dst, metadata_length);
}
Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
@@ -1022,20 +1011,24 @@ class StreamBookKeeper {
return Status::OK();
}
+ Status WriteEOS() {
+ // End of stream marker
+ constexpr int64_t kEos = 0;
+ return Write(&kEos, sizeof(kEos));
+ }
+
protected:
io::OutputStream* sink_;
int64_t position_;
};
-// End of stream marker
-constexpr int32_t kEos = 0;
-
/// A IpcPayloadWriter implementation that writes to a IPC stream
/// (with an end-of-stream marker)
class PayloadStreamWriter : public internal::IpcPayloadWriter,
protected StreamBookKeeper {
public:
- explicit PayloadStreamWriter(io::OutputStream* sink) : StreamBookKeeper(sink) {}
+ PayloadStreamWriter(const IpcOptions& options, io::OutputStream* sink)
+ : StreamBookKeeper(sink), options_(options) {}
~PayloadStreamWriter() override = default;
@@ -1046,23 +1039,24 @@ class PayloadStreamWriter : public internal::IpcPayloadWriter,
#endif
int32_t metadata_length = 0; // unused
- RETURN_NOT_OK(WriteIpcPayload(payload, sink_, &metadata_length));
+ RETURN_NOT_OK(WriteIpcPayload(payload, options_, sink_, &metadata_length));
RETURN_NOT_OK(UpdatePositionCheckAligned());
return Status::OK();
}
- Status Close() override {
- // Write 0 EOS message
- return Write(&kEos, sizeof(int32_t));
- }
+ Status Close() override { return WriteEOS(); }
+
+ private:
+ IpcOptions options_;
};
/// A IpcPayloadWriter implementation that writes to a IPC file
/// (with a footer as defined in File.fbs)
class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBookKeeper {
public:
- PayloadFileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema)
- : StreamBookKeeper(sink), schema_(schema) {}
+ PayloadFileWriter(const IpcOptions& options, const std::shared_ptr<Schema>& schema,
+ io::OutputStream* sink)
+ : StreamBookKeeper(sink), options_(options), schema_(schema) {}
~PayloadFileWriter() override = default;
@@ -1074,7 +1068,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
// Metadata length must include padding, it's computed by WriteIpcPayload()
FileBlock block = {position_, 0, payload.body_length};
- RETURN_NOT_OK(WriteIpcPayload(payload, sink_, &block.metadata_length));
+ RETURN_NOT_OK(WriteIpcPayload(payload, options_, sink_, &block.metadata_length));
RETURN_NOT_OK(UpdatePositionCheckAligned());
// Record position and size of some message types, to list them in the footer
@@ -1107,7 +1101,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
Status Close() override {
// Write 0 EOS message for compatibility with sequential readers
- RETURN_NOT_OK(Write(&kEos, sizeof(int32_t)));
+ RETURN_NOT_OK(WriteEOS());
// Write file footer
RETURN_NOT_OK(UpdatePosition());
@@ -1128,6 +1122,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
}
protected:
+ IpcOptions options_;
std::shared_ptr<Schema> schema_;
std::vector<FileBlock> dictionaries_;
std::vector<FileBlock> record_batches_;
@@ -1141,9 +1136,9 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl
RecordBatchStreamWriterImpl(io::OutputStream* sink,
const std::shared_ptr<Schema>& schema,
const IpcOptions& options)
- : RecordBatchPayloadWriter(
- std::unique_ptr<internal::IpcPayloadWriter>(new PayloadStreamWriter(sink)),
- schema, options) {}
+ : RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter>(
+ new PayloadStreamWriter(options, sink)),
+ schema, options) {}
~RecordBatchStreamWriterImpl() = default;
};
@@ -1153,7 +1148,7 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl : public RecordBatchPaylo
RecordBatchFileWriterImpl(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
const IpcOptions& options)
: RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter>(
- new PayloadFileWriter(sink, schema)),
+ new PayloadFileWriter(options, schema, sink)),
schema, options) {}
~RecordBatchFileWriterImpl() = default;
@@ -1277,7 +1272,7 @@ Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo,
RETURN_NOT_OK(io::BufferOutputStream::Create(1024, pool, &stream));
auto options = IpcOptions::Defaults();
- auto payload_writer = make_unique<PayloadStreamWriter>(stream.get());
+ auto payload_writer = make_unique<PayloadStreamWriter>(options, stream.get());
RecordBatchPayloadWriter writer(std::move(payload_writer), schema, options,
dictionary_memo);
// Write schema and populate fields (but not dictionaries) in dictionary_memo
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index 75030c2..c673b0a 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -177,7 +177,9 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
std::unique_ptr<RecordBatchFileWriterImpl> file_impl_;
};
-/// \brief Low-level API for writing a record batch (without schema) to an OutputStream
+/// \brief Low-level API for writing a record batch (without schema)
+/// to an OutputStream as encapsulated IPC message. See Arrow format
+/// documentation for more detail.
///
/// \param[in] batch the record batch to write
/// \param[in] buffer_start_offset the start offset to use in the buffer metadata,
@@ -189,20 +191,6 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
/// \param[in] options options for serialization
/// \param[in] pool the memory pool to allocate memory from
/// \return Status
-///
-/// Write the RecordBatch (collection of equal-length Arrow arrays) to the
-/// output stream in a contiguous block. The record batch metadata is written as
-/// a flatbuffer (see format/Message.fbs -- the RecordBatch message type)
-/// prefixed by its size, followed by each of the memory buffers in the batch
-/// written end to end (with appropriate alignment and padding):
-///
-/// \code
-/// <int32: metadata size> <uint8*: metadata> <buffers ...>
-/// \endcode
-///
-/// Finally, the absolute offsets (relative to the start of the output stream)
-/// to the end of the body and end of the metadata / data header (suffixed by
-/// the header size) is returned in out-variables
ARROW_EXPORT
Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
io::OutputStream* dst, int32_t* metadata_length,
@@ -392,8 +380,8 @@ Status GetRecordBatchPayload(const RecordBatch& batch, const IpcOptions& options
MemoryPool* pool, IpcPayload* out);
ARROW_EXPORT
-Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
- int32_t* metadata_length);
+Status WriteIpcPayload(const IpcPayload& payload, const IpcOptions& options,
+ io::OutputStream* dst, int32_t* metadata_length);
} // namespace internal
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 2c05ec5..87393fb 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -964,6 +964,11 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
MessageType_V4" arrow::ipc::MetadataVersion::V4"
cdef cppclass CIpcOptions" arrow::ipc::IpcOptions":
+ c_bool allow_64bit
+ int max_recursion_depth
+ int32_t alignment
+ c_bool write_legacy_ipc_format
+
@staticmethod
CIpcOptions Defaults()
@@ -989,7 +994,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
MetadataVersion metadata_version()
MessageType type()
- CStatus SerializeTo(OutputStream* stream, int32_t alignment,
+ CStatus SerializeTo(OutputStream* stream, const CIpcOptions& options,
int64_t* output_length)
c_string FormatMessageType(MessageType type)
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index b1aca23..6710f63 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -76,13 +76,14 @@ cdef class Message:
"""
cdef:
int64_t output_length = 0
- int32_t c_alignment = alignment
OutputStream* out
+ CIpcOptions options
+ options.alignment = alignment
out = sink.get_output_stream().get()
with nogil:
check_status(self.message.get()
- .SerializeTo(out, c_alignment, &output_length))
+ .SerializeTo(out, options, &output_length))
def serialize(self, alignment=8, memory_pool=None):
"""