You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/09/25 08:16:17 UTC
[arrow] branch master updated: ARROW-3212: [C++] Make IPC metadata
deterministic,
regardless of current stream position. Clean up stream / tensor alignment
logic
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new c9ac869 ARROW-3212: [C++] Make IPC metadata deterministic, regardless of current stream position. Clean up stream / tensor alignment logic
c9ac869 is described below
commit c9ac8696a335f4e818c7356d9af77317150e94ce
Author: Wes McKinney <we...@apache.org>
AuthorDate: Tue Sep 25 04:16:04 2018 -0400
ARROW-3212: [C++] Make IPC metadata deterministic, regardless of current stream position. Clean up stream / tensor alignment logic
The detail of whether an `InputStream` or `OutputStream` is aligned to 8- or 64-byte boundary had contaminated a number of IPC functions, causing generated metadata (plus padding, if any) to be non-deterministic. This introduces a new `ipc::AlignStream` function and the desired use is to align the stream at the highest level possible. I've implemented the changes in the Python tensor serialization code paths. I removed various cruft where alignment issues leaked into the API
This also resolves ARROW-2840, ARROW-2027, ARROW-2776, ARROW-1996
Author: Wes McKinney <we...@apache.org>
Author: Kouhei Sutou <ko...@clear-code.com>
Closes #2615 from wesm/ARROW-3212 and squashes the following commits:
db23bfd01 <Wes McKinney> Use kTensorAlignment in more places
bf105330a <Wes McKinney> Remove outdated comment, remove BufferOutputStream::Create convenience. Better document aligned SerializeTo tests
672ba9a2b <Kouhei Sutou> Update "Since" version
f3fe30dab <Wes McKinney> Fix int conversion warning on windows
2bdb2fbf3 <Wes McKinney> Do not pad tensor message body. Align stream after writing each tensor in Serialize code path, fix tensorflow ops
06a3201fc <Wes McKinney> Remove position argument
11593434b <Wes McKinney> Actually check tensor
79d886909 <Wes McKinney> Update glib per ipc::ReadTensor API change
4cefedc09 <Wes McKinney> Place internal flatbuffer helper in header so does not need to be exported in DLL
c4debf16d <Wes McKinney> Include warning in gcc7
70910df08 <Wes McKinney> Add unit test to verify that ARROW-1996 is fixed
f30ddecbd <Wes McKinney> Do not write message body bytes if not included in the metadata
6a5d1d3ee <Wes McKinney> More fixes, assertions. Fix Python unit tests
a987e398c <Wes McKinney> Add comments. tensor alignment test failing
9a392b76b <Wes McKinney> Add InputStream::Advance, fix Python compilation
5ad084f1e <Wes McKinney> Finish alignment refactor, fix C++ unit tests
47fc688e0 <Wes McKinney> Some refactoring of stream alignment
---
c_glib/arrow-glib/input-stream.cpp | 56 ++++++++++------------
c_glib/arrow-glib/input-stream.h | 6 +--
c_glib/test/test-tensor.rb | 3 +-
cpp/cmake_modules/SetupCxxFlags.cmake | 3 +-
cpp/src/arrow/buffer.h | 2 +-
cpp/src/arrow/flight/flight-benchmark.cc | 8 ++--
cpp/src/arrow/flight/server.cc | 2 +-
cpp/src/arrow/io/interfaces.cc | 5 ++
cpp/src/arrow/io/interfaces.h | 6 +++
cpp/src/arrow/io/memory.h | 6 +++
cpp/src/arrow/ipc/ipc-read-write-test.cc | 76 ++++++++++++++++++++++++++++--
cpp/src/arrow/ipc/message.cc | 71 ++++++++++++++++++++--------
cpp/src/arrow/ipc/message.h | 42 +++++++++++++----
cpp/src/arrow/ipc/metadata-internal.cc | 33 ++++---------
cpp/src/arrow/ipc/metadata-internal.h | 27 ++++++++++-
cpp/src/arrow/ipc/reader.cc | 18 ++-----
cpp/src/arrow/ipc/reader.h | 8 ++--
cpp/src/arrow/ipc/util.h | 9 ++--
cpp/src/arrow/ipc/writer.cc | 66 +++++++++-----------------
cpp/src/arrow/ipc/writer.h | 7 ++-
cpp/src/arrow/python/deserialize.cc | 16 +++++--
cpp/src/arrow/python/serialize.cc | 5 ++
python/pyarrow/includes/libarrow.pxd | 9 ++--
python/pyarrow/ipc.pxi | 19 ++++----
python/pyarrow/tests/conftest.py | 7 +++
python/pyarrow/tests/test_serialization.py | 12 +++++
26 files changed, 339 insertions(+), 183 deletions(-)
diff --git a/c_glib/arrow-glib/input-stream.cpp b/c_glib/arrow-glib/input-stream.cpp
index 353d00a..b9f4c27 100644
--- a/c_glib/arrow-glib/input-stream.cpp
+++ b/c_glib/arrow-glib/input-stream.cpp
@@ -178,6 +178,32 @@ garrow_input_stream_class_init(GArrowInputStreamClass *klass)
}
+/**
+ * garrow_input_stream_read_tensor:
+ * @input_stream: A #GArrowInputStream.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: (transfer full) (nullable):
+ * #GArrowTensor on success, %NULL on error.
+ *
+ * Since: 0.11.0
+ */
+GArrowTensor *
+garrow_input_stream_read_tensor(GArrowInputStream *input_stream,
+ GError **error)
+{
+ auto arrow_input_stream = garrow_input_stream_get_raw(input_stream);
+
+ std::shared_ptr<arrow::Tensor> arrow_tensor;
+ auto status = arrow::ipc::ReadTensor(arrow_input_stream.get(),
+ &arrow_tensor);
+ if (garrow_error_check(error, status, "[input-stream][read-tensor]")) {
+ return garrow_tensor_new_raw(&arrow_tensor);
+ } else {
+ return NULL;
+ }
+}
+
G_DEFINE_TYPE(GArrowSeekableInputStream, \
garrow_seekable_input_stream, \
GARROW_TYPE_INPUT_STREAM);
@@ -258,36 +284,6 @@ garrow_seekable_input_stream_read_at(GArrowSeekableInputStream *input_stream,
}
}
-/**
- * garrow_seekable_input_stream_read_tensor:
- * @input_stream: A #GArrowSeekableInputStream.
- * @position: The read start position.
- * @error: (nullable): Return location for a #GError or %NULL.
- *
- * Returns: (transfer full) (nullable):
- * #GArrowTensor on success, %NULL on error.
- *
- * Since: 0.4.0
- */
-GArrowTensor *
-garrow_seekable_input_stream_read_tensor(GArrowSeekableInputStream *input_stream,
- gint64 position,
- GError **error)
-{
- auto arrow_random_access_file =
- garrow_seekable_input_stream_get_raw(input_stream);
-
- std::shared_ptr<arrow::Tensor> arrow_tensor;
- auto status = arrow::ipc::ReadTensor(position,
- arrow_random_access_file.get(),
- &arrow_tensor);
- if (garrow_error_check(error, status, "[seekable-input-stream][read-tensor]")) {
- return garrow_tensor_new_raw(&arrow_tensor);
- } else {
- return NULL;
- }
-}
-
typedef struct GArrowBufferInputStreamPrivate_ {
GArrowBuffer *buffer;
diff --git a/c_glib/arrow-glib/input-stream.h b/c_glib/arrow-glib/input-stream.h
index c2068d6..224bcc8 100644
--- a/c_glib/arrow-glib/input-stream.h
+++ b/c_glib/arrow-glib/input-stream.h
@@ -37,6 +37,9 @@ struct _GArrowInputStreamClass
GObjectClass parent_class;
};
+GArrowTensor *garrow_input_stream_read_tensor(GArrowInputStream *input_stream,
+ GError **error);
+
#define GARROW_TYPE_SEEKABLE_INPUT_STREAM \
(garrow_seekable_input_stream_get_type())
G_DECLARE_DERIVABLE_TYPE(GArrowSeekableInputStream,
@@ -56,9 +59,6 @@ GArrowBuffer *garrow_seekable_input_stream_read_at(GArrowSeekableInputStream *in
gint64 position,
gint64 n_bytes,
GError **error);
-GArrowTensor *garrow_seekable_input_stream_read_tensor(GArrowSeekableInputStream *input_stream,
- gint64 position,
- GError **error);
#define GARROW_TYPE_BUFFER_INPUT_STREAM \
diff --git a/c_glib/test/test-tensor.rb b/c_glib/test/test-tensor.rb
index e812d5e..4f18011 100644
--- a/c_glib/test/test-tensor.rb
+++ b/c_glib/test/test-tensor.rb
@@ -120,7 +120,6 @@ class TestTensor < Test::Unit::TestCase
output = Arrow::BufferOutputStream.new(buffer)
output.write_tensor(@tensor)
input = Arrow::BufferInputStream.new(buffer)
- assert_equal(@tensor,
- input.read_tensor(0))
+ assert_equal(@tensor, input.read_tensor)
end
end
diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake
index a707a21..c2c50eb 100644
--- a/cpp/cmake_modules/SetupCxxFlags.cmake
+++ b/cpp/cmake_modules/SetupCxxFlags.cmake
@@ -178,7 +178,8 @@ if ("${COMPILER_FAMILY}" STREQUAL "msvc")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /wd4800")
endif()
-if ("${COMPILER_FAMILY}" STREQUAL "gcc")
+if ("${COMPILER_FAMILY}" STREQUAL "gcc" AND
+ "${COMPILER_VERSION}" VERSION_GREATER "6.0")
# Without this, gcc >= 7 warns related to changes in C++17
set(CXX_ONLY_FLAGS "${CXX_ONLY_FLAGS} -Wno-noexcept-type")
endif()
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index 42b99bf..75d0c21 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -144,7 +144,7 @@ class ARROW_EXPORT Buffer {
static std::shared_ptr<Buffer> Wrap(const std::vector<T>& data) {
return std::make_shared<Buffer>(reinterpret_cast<const uint8_t*>(data.data()),
static_cast<int64_t>(sizeof(T) * data.size()));
- } // namespace arrow
+ }
/// \brief Copy buffer contents into a new std::string
/// \return std::string
diff --git a/cpp/src/arrow/flight/flight-benchmark.cc b/cpp/src/arrow/flight/flight-benchmark.cc
index ac50ab0..1110ec3 100644
--- a/cpp/src/arrow/flight/flight-benchmark.cc
+++ b/cpp/src/arrow/flight/flight-benchmark.cc
@@ -156,7 +156,8 @@ Status RunPerformanceTest(const int port) {
// Elapsed time in seconds
uint64_t elapsed_nanos = timer.Stop();
- double time_elapsed = elapsed_nanos / static_cast<double>(1000000000LL);
+ double time_elapsed =
+ static_cast<double>(elapsed_nanos) / static_cast<double>(1000000000);
constexpr double kMegabyte = static_cast<double>(1 << 20);
@@ -167,8 +168,9 @@ Status RunPerformanceTest(const int port) {
std::cout << "Bytes read: " << stats.total_bytes << std::endl;
std::cout << "Nanos: " << elapsed_nanos << std::endl;
- std::cout << "Speed: " << (stats.total_bytes / time_elapsed / kMegabyte) << " MB/s"
- << std::endl;
+ std::cout << "Speed: "
+ << (static_cast<double>(stats.total_bytes) / kMegabyte / time_elapsed)
+ << " MB/s" << std::endl;
return Status::OK();
}
diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
index 967a254..46815b5 100644
--- a/cpp/src/arrow/flight/server.cc
+++ b/cpp/src/arrow/flight/server.cc
@@ -146,7 +146,7 @@ class SerializationTraits<IpcPayload> {
pb_stream.WriteRawMaybeAliased(buffer->data(), static_cast<int>(buffer->size()));
// Write padding if not multiple of 8
- const int remainder = buffer->size() % 8;
+ const int remainder = static_cast<int>(buffer->size() % 8);
if (remainder) {
pb_stream.WriteRawMaybeAliased(kPaddingBytes, 8 - remainder);
}
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index 0456020..a35bf67 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -28,6 +28,11 @@ namespace io {
FileInterface::~FileInterface() = default;
+Status InputStream::Advance(int64_t nbytes) {
+ std::shared_ptr<Buffer> temp;
+ return Read(nbytes, &temp);
+}
+
struct RandomAccessFile::RandomAccessFileImpl {
std::mutex lock_;
};
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index a9d68c3..ec368f6 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -114,6 +114,12 @@ class ARROW_EXPORT OutputStream : virtual public FileInterface, public Writable
};
class ARROW_EXPORT InputStream : virtual public FileInterface, public Readable {
+ public:
+ /// \brief Advance or skip stream indicated number of bytes
+ /// \param[in] nbytes the number to move forward
+ /// \return Status
+ Status Advance(int64_t nbytes);
+
protected:
InputStream() = default;
};
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index d9b22c7..ea4e8ca 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -40,6 +40,12 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream {
public:
explicit BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer);
+ /// \brief Create in-memory output stream with indicated capacity using a
+ /// memory pool
+ /// \param[in] initial_capacity the initial allocated internal capacity of
+ /// the OutputStream
+ /// \param[in,out] pool a MemoryPool to use for allocations
+ /// \param[out] out the created stream
static Status Create(int64_t initial_capacity, MemoryPool* pool,
std::shared_ptr<BufferOutputStream>* out);
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index 3154d57..f8e29f5 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -28,6 +28,7 @@
#include "arrow/buffer.h"
#include "arrow/io/memory.h"
#include "arrow/io/test-common.h"
+#include "arrow/ipc/Message_generated.h"
#include "arrow/ipc/api.h"
#include "arrow/ipc/metadata-internal.h"
#include "arrow/ipc/test-common.h"
@@ -86,6 +87,58 @@ TEST(TestMessage, Equals) {
ASSERT_FALSE(msg5.Equals(msg1));
}
+TEST(TestMessage, SerializeTo) {
+ const int64_t body_length = 64;
+
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.Finish(flatbuf::CreateMessage(fbb, internal::kCurrentMetadataVersion,
+ flatbuf::MessageHeader_RecordBatch, 0 /* header */,
+ body_length));
+
+ std::shared_ptr<Buffer> metadata;
+ ASSERT_OK(internal::WriteFlatbufferBuilder(fbb, &metadata));
+
+ std::string body = "abcdef";
+
+ std::unique_ptr<Message> message;
+ ASSERT_OK(Message::Open(metadata, std::make_shared<Buffer>(body), &message));
+
+ int64_t output_length = 0;
+ int64_t position = 0;
+
+ std::shared_ptr<io::BufferOutputStream> stream;
+
+ {
+ const int32_t alignment = 8;
+
+ ASSERT_OK(io::BufferOutputStream::Create(1 << 10, default_memory_pool(), &stream));
+ ASSERT_OK(message->SerializeTo(stream.get(), alignment, &output_length));
+ ASSERT_OK(stream->Tell(&position));
+ ASSERT_EQ(BitUtil::RoundUp(metadata->size() + 4, alignment) + body_length,
+ output_length);
+ ASSERT_EQ(output_length, position);
+ }
+
+ {
+ const int32_t alignment = 64;
+
+ ASSERT_OK(io::BufferOutputStream::Create(1 << 10, default_memory_pool(), &stream));
+ ASSERT_OK(message->SerializeTo(stream.get(), alignment, &output_length));
+ ASSERT_OK(stream->Tell(&position));
+ ASSERT_EQ(BitUtil::RoundUp(metadata->size() + 4, alignment) + body_length,
+ output_length);
+ ASSERT_EQ(output_length, position);
+ }
+}
+
+TEST(TestMessage, Verify) {
+ std::string metadata = "invalid";
+ std::string body = "abcdef";
+
+ Message message(std::make_shared<Buffer>(metadata), std::make_shared<Buffer>(body));
+ ASSERT_FALSE(message.Verify());
+}
+
const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
TEST_F(TestSchemaMetadata, PrimitiveFields) {
@@ -725,13 +778,22 @@ class TestTensorRoundTrip : public ::testing::Test, public IpcTestFixture {
int32_t metadata_length;
int64_t body_length;
+ const auto& type = checked_cast<const FixedWidthType&>(*tensor.type());
+ const int elem_size = type.bit_width() / 8;
+
ASSERT_OK(mmap_->Seek(0));
ASSERT_OK(WriteTensor(tensor, mmap_.get(), &metadata_length, &body_length));
+ const int64_t expected_body_length = elem_size * tensor.size();
+ ASSERT_EQ(expected_body_length, body_length);
+
+ ASSERT_OK(mmap_->Seek(0));
+
std::shared_ptr<Tensor> result;
- ASSERT_OK(ReadTensor(0, mmap_.get(), &result));
+ ASSERT_OK(ReadTensor(mmap_.get(), &result));
+ ASSERT_EQ(result->data()->size(), expected_body_length);
ASSERT_TRUE(tensor.Equals(*result));
}
};
@@ -752,14 +814,22 @@ TEST_F(TestTensorRoundTrip, BasicRoundtrip) {
auto data = Buffer::Wrap(values);
Tensor t0(int64(), data, shape, strides, dim_names);
- Tensor tzero(int64(), data, {}, {}, {});
+ Tensor t_no_dims(int64(), data, {}, {}, {});
+ Tensor t_zero_length_dim(int64(), data, {0}, {8}, {"foo"});
CheckTensorRoundTrip(t0);
- CheckTensorRoundTrip(tzero);
+ CheckTensorRoundTrip(t_no_dims);
+ CheckTensorRoundTrip(t_zero_length_dim);
int64_t serialized_size;
ASSERT_OK(GetTensorSize(t0, &serialized_size));
ASSERT_TRUE(serialized_size > static_cast<int64_t>(size * sizeof(int64_t)));
+
+ // ARROW-2840: Check that padding/alignment minded
+ std::vector<int64_t> shape_2 = {1, 1};
+ std::vector<int64_t> strides_2 = {8, 8};
+ Tensor t0_not_multiple_64(int64(), data, shape_2, strides_2, dim_names);
+ CheckTensorRoundTrip(t0_not_multiple_64);
}
TEST_F(TestTensorRoundTrip, NonContiguous) {
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index d77248a..797a490 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -179,21 +179,42 @@ Status Message::ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& me
return Message::Open(metadata, body, out);
}
-Status Message::SerializeTo(io::OutputStream* file, int64_t* output_length) const {
+Status WritePadding(io::OutputStream* stream, int64_t nbytes) {
+ while (nbytes > 0) {
+ const int64_t bytes_to_write = std::min<int64_t>(nbytes, kArrowAlignment);
+ RETURN_NOT_OK(stream->Write(kPaddingBytes, bytes_to_write));
+ nbytes -= bytes_to_write;
+ }
+ return Status::OK();
+}
+
+Status Message::SerializeTo(io::OutputStream* stream, int32_t alignment,
+ int64_t* output_length) const {
int32_t metadata_length = 0;
- RETURN_NOT_OK(internal::WriteMessage(*metadata(), file, &metadata_length));
+ RETURN_NOT_OK(internal::WriteMessage(*metadata(), alignment, stream, &metadata_length));
*output_length = metadata_length;
auto body_buffer = body();
if (body_buffer) {
- RETURN_NOT_OK(file->Write(body_buffer->data(), body_buffer->size()));
+ RETURN_NOT_OK(stream->Write(body_buffer->data(), body_buffer->size()));
*output_length += body_buffer->size();
- }
+ DCHECK_GE(this->body_length(), body_buffer->size());
+
+ int64_t remainder = this->body_length() - body_buffer->size();
+ RETURN_NOT_OK(WritePadding(stream, remainder));
+ *output_length += remainder;
+ }
return Status::OK();
}
+bool Message::Verify() const {
+ std::shared_ptr<Buffer> meta = this->metadata();
+ flatbuffers::Verifier verifier(meta->data(), meta->size(), 128);
+ return flatbuf::VerifyMessageBuffer(verifier);
+}
+
std::string FormatMessageType(Message::Type type) {
switch (type) {
case Message::SCHEMA:
@@ -235,8 +256,33 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile
return Message::ReadFrom(offset + metadata_length, metadata, file, message);
}
-Status ReadMessage(io::InputStream* file, bool aligned,
- std::unique_ptr<Message>* message) {
+Status AlignStream(io::InputStream* stream, int32_t alignment) {
+ int64_t position = -1;
+ RETURN_NOT_OK(stream->Tell(&position));
+ return stream->Advance(PaddedLength(position, alignment) - position);
+}
+
+Status AlignStream(io::OutputStream* stream, int32_t alignment) {
+ int64_t position = -1;
+ RETURN_NOT_OK(stream->Tell(&position));
+ int64_t remainder = PaddedLength(position, alignment) - position;
+ if (remainder > 0) {
+ return stream->Write(kPaddingBytes, remainder);
+ }
+ return Status::OK();
+}
+
+Status CheckAligned(io::FileInterface* stream, int32_t alignment) {
+ int64_t current_position;
+ ARROW_RETURN_NOT_OK(stream->Tell(¤t_position));
+ if (current_position % alignment != 0) {
+ return Status::Invalid("Stream is not aligned");
+ } else {
+ return Status::OK();
+ }
+}
+
+Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) {
int32_t message_length = 0;
int64_t bytes_read = 0;
RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
@@ -262,22 +308,9 @@ Status ReadMessage(io::InputStream* file, bool aligned,
return Status::Invalid(ss.str());
}
- // If requested, align the file before reading the message.
- if (aligned) {
- int64_t offset;
- RETURN_NOT_OK(file->Tell(&offset));
- int64_t aligned_offset = PaddedLength(offset);
- int64_t num_extra_bytes = aligned_offset - offset;
- std::shared_ptr<Buffer> dummy_buffer;
- RETURN_NOT_OK(file->Read(num_extra_bytes, &dummy_buffer));
- }
-
return Message::ReadFrom(metadata, file, message);
}
-Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) {
- return ReadMessage(file, false /* aligned */, message);
-}
// ----------------------------------------------------------------------
// Implement InputStream message reader
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index 08176ab..092a19f 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -34,6 +34,7 @@ class Buffer;
namespace io {
+class FileInterface;
class InputStream;
class OutputStream;
class RandomAccessFile;
@@ -140,9 +141,17 @@ class ARROW_EXPORT Message {
/// \brief Write length-prefixed metadata and body to output stream
///
/// \param[in] file output stream to write to
+ /// \param[in] alignment byte alignment for metadata, usually 8 or
+ /// 64. Whether the body is padded depends on the metadata; if the body
+ /// buffer is smaller than the size indicated in the metadata, then extra
+ /// padding bytes will be written
/// \param[out] output_length the number of bytes written
/// \return Status
- Status SerializeTo(io::OutputStream* file, int64_t* output_length) const;
+ Status SerializeTo(io::OutputStream* file, int32_t alignment,
+ int64_t* output_length) const;
+
+ /// \brief Return true if the Message metadata passes Flatbuffer validation
+ bool Verify() const;
private:
// Hide serialization details from user API
@@ -192,20 +201,35 @@ ARROW_EXPORT
Status ReadMessage(const int64_t offset, const int32_t metadata_length,
io::RandomAccessFile* file, std::unique_ptr<Message>* message);
+/// \brief Advance stream to an 8-byte offset if its position is not a multiple
+/// of 8 already
+/// \param[in] stream an input stream
+/// \param[in] alignment the byte multiple for the metadata prefix, usually 8
+/// or 64, to ensure the body starts on a multiple of that alignment
+/// \return Status
+ARROW_EXPORT
+Status AlignStream(io::InputStream* stream, int32_t alignment = 8);
+
+/// \brief Advance stream to an 8-byte offset if its position is not a multiple
+/// of 8 already
+/// \param[in] stream an output stream
+/// \param[in] alignment the byte multiple for the metadata prefix, usually 8
+/// or 64, to ensure the body starts on a multiple of that alignment
+/// \return Status
+ARROW_EXPORT
+Status AlignStream(io::OutputStream* stream, int32_t alignment = 8);
+
+/// \brief Return error Status if file position is not a multiple of the
+/// indicated alignment
+ARROW_EXPORT
+Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8);
+
/// \brief Read encapsulated RPC message (metadata and body) from InputStream
///
/// Read length-prefixed message with as-yet unknown length. Returns null if
/// there are not enough bytes available or the message length is 0 (e.g. EOS
/// in a stream)
ARROW_EXPORT
-Status ReadMessage(io::InputStream* stream, bool aligned,
- std::unique_ptr<Message>* message);
-
-/// \brief Read encapsulated RPC message (metadata and body) from InputStream.
-///
-/// This is a version of ReadMessage that does not have the aligned argument
-/// for backwards compatibility.
-ARROW_EXPORT
Status ReadMessage(io::InputStream* stream, std::unique_ptr<Message>* message);
} // namespace ipc
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index 0bf18f3..3d9b97c 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -682,18 +682,6 @@ static Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema,
return Status::OK();
}
-static Status WriteFlatbufferBuilder(FBB& fbb, std::shared_ptr<Buffer>* out) {
- int32_t size = fbb.GetSize();
-
- std::shared_ptr<Buffer> result;
- RETURN_NOT_OK(AllocateBuffer(default_memory_pool(), size, &result));
-
- uint8_t* dst = result->mutable_data();
- memcpy(dst, fbb.GetBufferPointer(), size);
- *out = result;
- return Status::OK();
-}
-
static Status WriteFBMessage(FBB& fbb, flatbuf::MessageHeader header_type,
flatbuffers::Offset<void> header, int64_t body_length,
std::shared_ptr<Buffer>* out) {
@@ -776,6 +764,9 @@ Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset,
FBB fbb;
+ const auto& type = checked_cast<const FixedWidthType&>(*tensor.type());
+ const int elem_size = type.bit_width() / 8;
+
flatbuf::Type fb_type_type;
Offset fb_type;
RETURN_NOT_OK(TensorTypeToFlatbuffer(fbb, *tensor.type(), &fb_type_type, &fb_type));
@@ -788,7 +779,8 @@ Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset,
auto fb_shape = fbb.CreateVector(dims);
auto fb_strides = fbb.CreateVector(tensor.strides());
- int64_t body_length = tensor.data()->size();
+
+ int64_t body_length = tensor.size() * elem_size;
flatbuf::Buffer buffer(buffer_start_offset, body_length);
TensorOffset fb_tensor =
@@ -953,20 +945,13 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type
// ----------------------------------------------------------------------
// Implement message writing
-Status WriteMessage(const Buffer& message, io::OutputStream* file,
+Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file,
int32_t* message_length) {
- // Need to write 4 bytes (message size), the message, plus padding to
- // end on an 8-byte offset
- int64_t start_offset;
- RETURN_NOT_OK(file->Tell(&start_offset));
-
- // TODO(wesm): Should we depend on the position of the OutputStream? See
- // ARROW-3212
+ // ARROW-3212: We do not make assumptions that the output stream is aligned
int32_t padded_message_length = static_cast<int32_t>(message.size()) + 4;
- const int32_t remainder =
- (padded_message_length + static_cast<int32_t>(start_offset)) % 8;
+ const int32_t remainder = padded_message_length % alignment;
if (remainder != 0) {
- padded_message_length += 8 - remainder;
+ padded_message_length += alignment - remainder;
}
// The returned message size includes the length prefix, the flatbuffer,
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
index 730a1a5..0683b8f 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -25,6 +25,7 @@
#include <string>
#include <vector>
+#include "arrow/buffer.h"
#include "arrow/ipc/Schema_generated.h"
#include "arrow/ipc/dictionary.h"
#include "arrow/ipc/message.h"
@@ -97,10 +98,19 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type
std::vector<std::string>* dim_names);
/// Write a serialized message metadata with a length-prefix and padding to an
-/// 8-byte offset
+/// 8-byte offset. Does not make assumptions about whether the stream is
+/// aligned already
///
/// <message_size: int32><message: const void*><padding>
-Status WriteMessage(const Buffer& message, io::OutputStream* file,
+///
+/// \param[in] message a buffer containing the metadata to write
+/// \param[in] alignment the size multiple of the total message size including
+/// length prefix, metadata, and padding. Usually 8 or 64
+/// \param[in,out] file the OutputStream to write to
+/// \param[out] message_length the total size of the payload written including
+/// padding
+/// \return Status
+Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file,
int32_t* message_length);
// Serialize arrow::Schema as a Flatbuffer
@@ -131,6 +141,19 @@ Status WriteDictionaryMessage(const int64_t id, const int64_t length,
const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out);
+static inline Status WriteFlatbufferBuilder(flatbuffers::FlatBufferBuilder& fbb,
+ std::shared_ptr<Buffer>* out) {
+ int32_t size = fbb.GetSize();
+
+ std::shared_ptr<Buffer> result;
+ RETURN_NOT_OK(AllocateBuffer(default_memory_pool(), size, &result));
+
+ uint8_t* dst = result->mutable_data();
+ memcpy(dst, fbb.GetBufferPointer(), size);
+ *out = result;
+ return Status::OK();
+}
+
} // namespace internal
} // namespace ipc
} // namespace arrow
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 92cf75b..ba83229 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -695,20 +695,15 @@ Status RecordBatchFileReader::ReadRecordBatch(int i,
return impl_->ReadRecordBatch(i, batch);
}
-static Status ReadContiguousPayload(io::InputStream* file, bool aligned,
+static Status ReadContiguousPayload(io::InputStream* file,
std::unique_ptr<Message>* message) {
- RETURN_NOT_OK(ReadMessage(file, aligned, message));
+ RETURN_NOT_OK(ReadMessage(file, message));
if (*message == nullptr) {
return Status::Invalid("Unable to read metadata at offset");
}
return Status::OK();
}
-static Status ReadContiguousPayload(io::InputStream* file,
- std::unique_ptr<Message>* message) {
- return ReadContiguousPayload(file, false /* aligned */, message);
-}
-
Status ReadSchema(io::InputStream* stream, std::shared_ptr<Schema>* out) {
std::shared_ptr<RecordBatchReader> reader;
RETURN_NOT_OK(RecordBatchStreamReader::Open(stream, &reader));
@@ -725,14 +720,9 @@ Status ReadRecordBatch(const std::shared_ptr<Schema>& schema, io::InputStream* f
out);
}
-Status ReadTensor(int64_t offset, io::RandomAccessFile* file,
- std::shared_ptr<Tensor>* out) {
- // Respect alignment of Tensor messages (see WriteTensor)
- offset = PaddedLength(offset);
- RETURN_NOT_OK(file->Seek(offset));
-
+Status ReadTensor(io::InputStream* file, std::shared_ptr<Tensor>* out) {
std::unique_ptr<Message> message;
- RETURN_NOT_OK(ReadContiguousPayload(file, true /* aligned */, &message));
+ RETURN_NOT_OK(ReadContiguousPayload(file, &message));
return ReadTensor(*message, out);
}
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index faa63d5..942664d 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -219,15 +219,13 @@ Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& sc
int max_recursion_depth, io::RandomAccessFile* file,
std::shared_ptr<RecordBatch>* out);
-/// \brief EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file
+/// \brief Read arrow::Tensor as encapsulated IPC message in file
///
-/// \param[in] offset the file location of the start of the message
-/// \param[in] file the file where the batch is located
+/// \param[in] file an InputStream pointed at the start of the message
/// \param[out] out the read tensor
/// \return Status
ARROW_EXPORT
-Status ReadTensor(int64_t offset, io::RandomAccessFile* file,
- std::shared_ptr<Tensor>* out);
+Status ReadTensor(io::InputStream* file, std::shared_ptr<Tensor>* out);
/// \brief EXPERIMENTAL: Read arrow::Tensor from IPC message
///
diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h
index 412f312..80f9f3c 100644
--- a/cpp/src/arrow/ipc/util.h
+++ b/cpp/src/arrow/ipc/util.h
@@ -28,14 +28,17 @@ namespace arrow {
namespace ipc {
// Buffers are padded to 64-byte boundaries (for SIMD)
-static constexpr int kArrowAlignment = 64;
+static constexpr int32_t kArrowAlignment = 64;
+
+// Tensors are padded to 64-byte boundaries
+static constexpr int32_t kTensorAlignment = 64;
// Align on 8-byte boundaries in IPC
-static constexpr int kArrowIpcAlignment = 8;
+static constexpr int32_t kArrowIpcAlignment = 8;
static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0};
-static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) {
+static inline int64_t PaddedLength(int64_t nbytes, int32_t alignment = kArrowAlignment) {
return ((nbytes + alignment - 1) / alignment) * alignment;
}
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 60ca34e..7568bfd 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -482,16 +482,11 @@ class DictionaryWriter : public RecordBatchSerializer {
Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
int32_t* metadata_length) {
-#ifndef NDEBUG
- int64_t start_position, current_position;
- RETURN_NOT_OK(dst->Tell(&start_position));
-#endif
-
- RETURN_NOT_OK(internal::WriteMessage(*payload.metadata, dst, metadata_length));
+ RETURN_NOT_OK(internal::WriteMessage(*payload.metadata, kArrowIpcAlignment, dst,
+ metadata_length));
#ifndef NDEBUG
- RETURN_NOT_OK(dst->Tell(¤t_position));
- DCHECK(BitUtil::IsMultipleOf8(current_position));
+ RETURN_NOT_OK(CheckAligned(dst));
#endif
// Now write the buffers
@@ -516,8 +511,7 @@ Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
}
#ifndef NDEBUG
- RETURN_NOT_OK(dst->Tell(¤t_position));
- DCHECK(BitUtil::IsMultipleOf8(current_position));
+ RETURN_NOT_OK(CheckAligned(dst));
#endif
return Status::OK();
@@ -531,18 +525,6 @@ Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool,
} // namespace internal
-// Adds padding bytes if necessary to ensure all memory blocks are written on
-// 64-byte boundaries.
-Status AlignStreamPosition(io::OutputStream* stream) {
- int64_t position;
- RETURN_NOT_OK(stream->Tell(&position));
- int64_t remainder = PaddedLength(position) - position;
- if (remainder > 0) {
- return stream->Write(kPaddingBytes, remainder);
- }
- return Status::OK();
-}
-
Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
io::OutputStream* dst, int32_t* metadata_length,
int64_t* body_length, MemoryPool* pool, int max_recursion_depth,
@@ -584,10 +566,10 @@ Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offs
namespace {
Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst,
- int32_t* metadata_length, int64_t* body_length) {
+ int32_t* metadata_length) {
std::shared_ptr<Buffer> metadata;
RETURN_NOT_OK(internal::WriteTensorMessage(tensor, 0, &metadata));
- return internal::WriteMessage(*metadata, dst, metadata_length);
+ return internal::WriteMessage(*metadata, kTensorAlignment, dst, metadata_length);
}
Status WriteStridedTensorData(int dim_index, int64_t offset, int elem_size,
@@ -636,30 +618,24 @@ Status GetContiguousTensor(const Tensor& tensor, MemoryPool* pool,
Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length,
int64_t* body_length) {
- RETURN_NOT_OK(AlignStreamPosition(dst));
+ const auto& type = checked_cast<const FixedWidthType&>(*tensor.type());
+ const int elem_size = type.bit_width() / 8;
+
+ *body_length = tensor.size() * elem_size;
+ // Tensor metadata accounts for padding
if (tensor.is_contiguous()) {
- RETURN_NOT_OK(WriteTensorHeader(tensor, dst, metadata_length, body_length));
- // It's important to align the stream position again so that the tensor data
- // is aligned.
- RETURN_NOT_OK(AlignStreamPosition(dst));
+ RETURN_NOT_OK(WriteTensorHeader(tensor, dst, metadata_length));
auto data = tensor.data();
if (data && data->data()) {
- *body_length = data->size();
- return dst->Write(data->data(), *body_length);
+ RETURN_NOT_OK(dst->Write(data->data(), *body_length));
} else {
*body_length = 0;
- return Status::OK();
}
} else {
- Tensor dummy(tensor.type(), tensor.data(), tensor.shape());
- const auto& type = checked_cast<const FixedWidthType&>(*tensor.type());
- RETURN_NOT_OK(WriteTensorHeader(dummy, dst, metadata_length, body_length));
- // It's important to align the stream position again so that the tensor data
- // is aligned.
- RETURN_NOT_OK(AlignStreamPosition(dst));
-
- const int elem_size = type.bit_width() / 8;
+ // The tensor written is made contiguous
+ Tensor dummy(tensor.type(), nullptr, tensor.shape());
+ RETURN_NOT_OK(WriteTensorHeader(dummy, dst, metadata_length));
// TODO(wesm): Do we care enough about this temporary allocation to pass in
// a MemoryPool to this function?
@@ -667,9 +643,11 @@ Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadat
RETURN_NOT_OK(
AllocateBuffer(tensor.shape()[tensor.ndim() - 1] * elem_size, &scratch_space));
- return WriteStridedTensorData(0, 0, elem_size, tensor, scratch_space->mutable_data(),
- dst);
+ RETURN_NOT_OK(WriteStridedTensorData(0, 0, elem_size, tensor,
+ scratch_space->mutable_data(), dst));
}
+
+ return Status::OK();
}
Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool,
@@ -763,7 +741,7 @@ class StreamBookKeeper {
return Status::OK();
}
- Status Align(int64_t alignment = kArrowIpcAlignment) {
+ Status Align(int32_t alignment = kArrowIpcAlignment) {
// Adds padding bytes if necessary to ensure all memory blocks are written on
// 8-byte (or other alignment) boundaries.
int64_t remainder = PaddedLength(position_, alignment) - position_;
@@ -801,7 +779,7 @@ class SchemaWriter : public StreamBookKeeper {
RETURN_NOT_OK(internal::WriteSchemaMessage(schema_, dictionary_memo_, &schema_fb));
int32_t metadata_length = 0;
- RETURN_NOT_OK(internal::WriteMessage(*schema_fb, sink_, &metadata_length));
+ RETURN_NOT_OK(internal::WriteMessage(*schema_fb, 8, sink_, &metadata_length));
RETURN_NOT_OK(UpdatePositionCheckAligned());
return Status::OK();
}
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index bcf09aa..9843126 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -256,11 +256,14 @@ ARROW_EXPORT
Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool,
std::unique_ptr<Message>* out);
-/// \brief EXPERIMENTAL: Write arrow::Tensor as a contiguous message
+/// \brief Write arrow::Tensor as a contiguous message. The metadata and body
+/// are written assuming 64-byte alignment. It is the user's responsibility to
+/// ensure that the OutputStream has been aligned to a 64-byte multiple before
+/// writing the message.
///
/// \param[in] tensor the Tensor to write
/// \param[in] dst the OutputStream to write to
-/// \param[out] metadata_length the actual metadata length
+/// \param[out] metadata_length the actual metadata length, including padding
/// \param[out] body_length the acutal message body length
/// \return Status
///
diff --git a/cpp/src/arrow/python/deserialize.cc b/cpp/src/arrow/python/deserialize.cc
index 3dbc18f..3bf23d7 100644
--- a/cpp/src/arrow/python/deserialize.cc
+++ b/cpp/src/arrow/python/deserialize.cc
@@ -32,6 +32,7 @@
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/reader.h"
+#include "arrow/ipc/util.h"
#include "arrow/table.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
@@ -250,7 +251,6 @@ Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx,
}
Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out) {
- int64_t offset;
int64_t bytes_read;
int32_t num_tensors;
int32_t num_buffers;
@@ -264,15 +264,21 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out)
RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader));
RETURN_NOT_OK(reader->ReadNext(&out->batch));
- RETURN_NOT_OK(src->Tell(&offset));
- offset += 4; // Skip the end-of-stream message
+ /// Skip EOS marker
+ RETURN_NOT_OK(src->Advance(4));
+
+ /// Align stream so tensor bodies are 64-byte aligned
+ RETURN_NOT_OK(ipc::AlignStream(src, ipc::kTensorAlignment));
+
for (int i = 0; i < num_tensors; ++i) {
std::shared_ptr<Tensor> tensor;
- RETURN_NOT_OK(ipc::ReadTensor(offset, src, &tensor));
+ RETURN_NOT_OK(ipc::ReadTensor(src, &tensor));
+ RETURN_NOT_OK(ipc::AlignStream(src, ipc::kTensorAlignment));
out->tensors.push_back(tensor);
- RETURN_NOT_OK(src->Tell(&offset));
}
+ int64_t offset = -1;
+ RETURN_NOT_OK(src->Tell(&offset));
for (int i = 0; i < num_buffers; ++i) {
int64_t size;
RETURN_NOT_OK(src->ReadAt(offset, sizeof(int64_t), &bytes_read,
diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc
index 56565ac..92a6519 100644
--- a/cpp/src/arrow/python/serialize.cc
+++ b/cpp/src/arrow/python/serialize.cc
@@ -32,6 +32,7 @@
#include "arrow/builder.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
+#include "arrow/ipc/util.h"
#include "arrow/ipc/writer.h"
#include "arrow/memory_pool.h"
#include "arrow/record_batch.h"
@@ -760,10 +761,14 @@ Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
dst->Write(reinterpret_cast<const uint8_t*>(&num_buffers), sizeof(int32_t)));
RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, dst));
+ // Align stream to 64-byte offset so tensor bodies are 64-byte aligned
+ RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
+
int32_t metadata_length;
int64_t body_length;
for (const auto& tensor : this->tensors) {
RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length));
+ RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
}
for (const auto& buffer : this->buffers) {
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 370e14d..4b37cf8 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -780,7 +780,8 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
MetadataVersion metadata_version()
MessageType type()
- CStatus SerializeTo(OutputStream* stream, int64_t* output_length)
+ CStatus SerializeTo(OutputStream* stream, int32_t alignment,
+ int64_t* output_length)
c_string FormatMessageType(MessageType type)
@@ -848,8 +849,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
int32_t* metadata_length,
int64_t* body_length)
- CStatus ReadTensor(int64_t offset, RandomAccessFile* file,
- shared_ptr[CTensor]* out)
+ CStatus ReadTensor(InputStream* stream, shared_ptr[CTensor]* out)
CStatus ReadRecordBatch(const CMessage& message,
const shared_ptr[CSchema]& schema,
@@ -868,6 +868,9 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
InputStream* stream,
shared_ptr[CRecordBatch]* out)
+ CStatus AlignStream(InputStream* stream, int64_t alignment)
+ CStatus AlignStream(OutputStream* stream, int64_t alignment)
+
cdef cppclass CFeatherWriter" arrow::ipc::feather::TableWriter":
@staticmethod
CStatus Open(const shared_ptr[OutputStream]& stream,
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 5c259ea..0331fba 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -60,12 +60,14 @@ cdef class Message:
result = self.message.get().Equals(deref(other.message.get()))
return result
- def serialize(self, memory_pool=None):
+ def serialize(self, alignment=8, memory_pool=None):
"""
Write message as encapsulated IPC message
Parameters
----------
+ alignment : int, default 8
+ Byte alignment for metadata and body
memory_pool : MemoryPool, default None
Uses default memory pool if not specified
@@ -76,10 +78,11 @@ cdef class Message:
cdef:
BufferOutputStream stream = BufferOutputStream(memory_pool)
int64_t output_length = 0
+ int32_t c_alignment = alignment
with nogil:
check_status(self.message.get()
- .SerializeTo(stream.wr_file.get(),
+ .SerializeTo(stream.wr_file.get(), c_alignment,
&output_length))
return stream.getvalue()
@@ -440,10 +443,10 @@ def write_tensor(Tensor tensor, NativeFile dest):
def read_tensor(NativeFile source):
- """
- Read pyarrow.Tensor from pyarrow.NativeFile object from current
+ """Read pyarrow.Tensor from pyarrow.NativeFile object from current
position. If the file source supports zero copy (e.g. a memory map), then
- this operation does not allocate any memory
+ this operation does not allocate any memory. This function not assume that
+ the stream is aligned
Parameters
----------
@@ -452,16 +455,14 @@ def read_tensor(NativeFile source):
Returns
-------
tensor : Tensor
+
"""
cdef:
shared_ptr[CTensor] sp_tensor
source._assert_readable()
-
- cdef int64_t offset = source.tell()
with nogil:
- check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor))
-
+ check_status(ReadTensor(source.rd_file.get(), &sp_tensor))
return pyarrow_wrap_tensor(sp_tensor)
diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py
index fff0ca4..68266c8 100644
--- a/python/pyarrow/tests/conftest.py
+++ b/python/pyarrow/tests/conftest.py
@@ -63,6 +63,13 @@ except ImportError:
pass
+try:
+ import tensorflow # noqa
+ defaults['tensorflow'] = True
+except ImportError:
+ pass
+
+
def pytest_configure(config):
pass
diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py
index 4bac300..184a144 100644
--- a/python/pyarrow/tests/test_serialization.py
+++ b/python/pyarrow/tests/test_serialization.py
@@ -640,6 +640,18 @@ def test_serialize_to_components_invalid_cases():
pa.deserialize_components(components)
+def test_serialize_read_concatenated_records():
+ # ARROW-1996 -- see stream alignment work in ARROW-2840, ARROW-3212
+ f = pa.BufferOutputStream()
+ pa.serialize_to(12, f)
+ pa.serialize_to(23, f)
+ buf = f.getvalue()
+
+ f = pa.BufferReader(buf)
+ pa.read_serialized(f).deserialize()
+ pa.read_serialized(f).deserialize()
+
+
@pytest.mark.skipif(os.name == 'nt', reason="deserialize_regex not pickleable")
def test_deserialize_in_different_process():
from multiprocessing import Process, Queue