You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/09/25 08:16:17 UTC

[arrow] branch master updated: ARROW-3212: [C++] Make IPC metadata deterministic, regardless of current stream position. Clean up stream / tensor alignment logic

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new c9ac869  ARROW-3212: [C++] Make IPC metadata deterministic, regardless of current stream position. Clean up stream / tensor alignment logic
c9ac869 is described below

commit c9ac8696a335f4e818c7356d9af77317150e94ce
Author: Wes McKinney <we...@apache.org>
AuthorDate: Tue Sep 25 04:16:04 2018 -0400

    ARROW-3212: [C++] Make IPC metadata deterministic, regardless of current stream position. Clean up stream / tensor alignment logic
    
    The detail of whether an `InputStream` or `OutputStream` is aligned to 8- or 64-byte boundary had contaminated a number of IPC functions, causing generated metadata (plus padding, if any) to be non-deterministic. This introduces a new `ipc::AlignStream` function and the desired use is to align the stream at the highest level possible. I've implemented the changes in the Python tensor serialization code paths. I removed various cruft where alignment issues leaked into the API
    
    This also resolves ARROW-2840, ARROW-2027, ARROW-2776, ARROW-1996
    
    Author: Wes McKinney <we...@apache.org>
    Author: Kouhei Sutou <ko...@clear-code.com>
    
    Closes #2615 from wesm/ARROW-3212 and squashes the following commits:
    
    db23bfd01 <Wes McKinney> Use kTensorAlignment in more places
    bf105330a <Wes McKinney> Remove outdated comment, remove BufferOutputStream::Create convenience. Better document aligned SerializeTo tests
    672ba9a2b <Kouhei Sutou> Update "Since" version
    f3fe30dab <Wes McKinney> Fix int conversion warning on windows
    2bdb2fbf3 <Wes McKinney> Do not pad tensor message body. Align stream after writing each tensor in Serialize code path, fix tensorflow ops
    06a3201fc <Wes McKinney> Remove position argument
    11593434b <Wes McKinney> Actually check tensor
    79d886909 <Wes McKinney> Update glib per ipc::ReadTensor API change
    4cefedc09 <Wes McKinney> Place internal flatbuffer helper in header so does not need to be exported in DLL
    c4debf16d <Wes McKinney> Include warning in gcc7
    70910df08 <Wes McKinney> Add unit test to verify that ARROW-1996 is fixed
    f30ddecbd <Wes McKinney> Do not write message body bytes if not included in the metadata
    6a5d1d3ee <Wes McKinney> More fixes, assertions. Fix Python unit tests
    a987e398c <Wes McKinney> Add comments. tensor alignment test failing
    9a392b76b <Wes McKinney> Add InputStream::Advance, fix Python compilation
    5ad084f1e <Wes McKinney> Finish alignment refactor, fix C++ unit tests
    47fc688e0 <Wes McKinney> Some refactoring of stream alignment
---
 c_glib/arrow-glib/input-stream.cpp         | 56 ++++++++++------------
 c_glib/arrow-glib/input-stream.h           |  6 +--
 c_glib/test/test-tensor.rb                 |  3 +-
 cpp/cmake_modules/SetupCxxFlags.cmake      |  3 +-
 cpp/src/arrow/buffer.h                     |  2 +-
 cpp/src/arrow/flight/flight-benchmark.cc   |  8 ++--
 cpp/src/arrow/flight/server.cc             |  2 +-
 cpp/src/arrow/io/interfaces.cc             |  5 ++
 cpp/src/arrow/io/interfaces.h              |  6 +++
 cpp/src/arrow/io/memory.h                  |  6 +++
 cpp/src/arrow/ipc/ipc-read-write-test.cc   | 76 ++++++++++++++++++++++++++++--
 cpp/src/arrow/ipc/message.cc               | 71 ++++++++++++++++++++--------
 cpp/src/arrow/ipc/message.h                | 42 +++++++++++++----
 cpp/src/arrow/ipc/metadata-internal.cc     | 33 ++++---------
 cpp/src/arrow/ipc/metadata-internal.h      | 27 ++++++++++-
 cpp/src/arrow/ipc/reader.cc                | 18 ++-----
 cpp/src/arrow/ipc/reader.h                 |  8 ++--
 cpp/src/arrow/ipc/util.h                   |  9 ++--
 cpp/src/arrow/ipc/writer.cc                | 66 +++++++++-----------------
 cpp/src/arrow/ipc/writer.h                 |  7 ++-
 cpp/src/arrow/python/deserialize.cc        | 16 +++++--
 cpp/src/arrow/python/serialize.cc          |  5 ++
 python/pyarrow/includes/libarrow.pxd       |  9 ++--
 python/pyarrow/ipc.pxi                     | 19 ++++----
 python/pyarrow/tests/conftest.py           |  7 +++
 python/pyarrow/tests/test_serialization.py | 12 +++++
 26 files changed, 339 insertions(+), 183 deletions(-)

diff --git a/c_glib/arrow-glib/input-stream.cpp b/c_glib/arrow-glib/input-stream.cpp
index 353d00a..b9f4c27 100644
--- a/c_glib/arrow-glib/input-stream.cpp
+++ b/c_glib/arrow-glib/input-stream.cpp
@@ -178,6 +178,32 @@ garrow_input_stream_class_init(GArrowInputStreamClass *klass)
 }
 
 
+/**
+ * garrow_input_stream_read_tensor:
+ * @input_stream: A #GArrowInputStream.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: (transfer full) (nullable):
+ *   #GArrowTensor on success, %NULL on error.
+ *
+ * Since: 0.11.0
+ */
+GArrowTensor *
+garrow_input_stream_read_tensor(GArrowInputStream *input_stream,
+                                GError **error)
+{
+  auto arrow_input_stream = garrow_input_stream_get_raw(input_stream);
+
+  std::shared_ptr<arrow::Tensor> arrow_tensor;
+  auto status = arrow::ipc::ReadTensor(arrow_input_stream.get(),
+                                       &arrow_tensor);
+  if (garrow_error_check(error, status, "[input-stream][read-tensor]")) {
+    return garrow_tensor_new_raw(&arrow_tensor);
+  } else {
+    return NULL;
+  }
+}
+
 G_DEFINE_TYPE(GArrowSeekableInputStream,                \
               garrow_seekable_input_stream,             \
               GARROW_TYPE_INPUT_STREAM);
@@ -258,36 +284,6 @@ garrow_seekable_input_stream_read_at(GArrowSeekableInputStream *input_stream,
   }
 }
 
-/**
- * garrow_seekable_input_stream_read_tensor:
- * @input_stream: A #GArrowSeekableInputStream.
- * @position: The read start position.
- * @error: (nullable): Return location for a #GError or %NULL.
- *
- * Returns: (transfer full) (nullable):
- *   #GArrowTensor on success, %NULL on error.
- *
- * Since: 0.4.0
- */
-GArrowTensor *
-garrow_seekable_input_stream_read_tensor(GArrowSeekableInputStream *input_stream,
-                                         gint64 position,
-                                         GError **error)
-{
-  auto arrow_random_access_file =
-    garrow_seekable_input_stream_get_raw(input_stream);
-
-  std::shared_ptr<arrow::Tensor> arrow_tensor;
-  auto status = arrow::ipc::ReadTensor(position,
-                                       arrow_random_access_file.get(),
-                                       &arrow_tensor);
-  if (garrow_error_check(error, status, "[seekable-input-stream][read-tensor]")) {
-    return garrow_tensor_new_raw(&arrow_tensor);
-  } else {
-    return NULL;
-  }
-}
-
 
 typedef struct GArrowBufferInputStreamPrivate_ {
   GArrowBuffer *buffer;
diff --git a/c_glib/arrow-glib/input-stream.h b/c_glib/arrow-glib/input-stream.h
index c2068d6..224bcc8 100644
--- a/c_glib/arrow-glib/input-stream.h
+++ b/c_glib/arrow-glib/input-stream.h
@@ -37,6 +37,9 @@ struct _GArrowInputStreamClass
   GObjectClass parent_class;
 };
 
+GArrowTensor *garrow_input_stream_read_tensor(GArrowInputStream *input_stream,
+                                              GError **error);
+
 #define GARROW_TYPE_SEEKABLE_INPUT_STREAM       \
   (garrow_seekable_input_stream_get_type())
 G_DECLARE_DERIVABLE_TYPE(GArrowSeekableInputStream,
@@ -56,9 +59,6 @@ GArrowBuffer *garrow_seekable_input_stream_read_at(GArrowSeekableInputStream *in
                                                    gint64 position,
                                                    gint64 n_bytes,
                                                    GError **error);
-GArrowTensor *garrow_seekable_input_stream_read_tensor(GArrowSeekableInputStream *input_stream,
-                                                       gint64 position,
-                                                       GError **error);
 
 
 #define GARROW_TYPE_BUFFER_INPUT_STREAM         \
diff --git a/c_glib/test/test-tensor.rb b/c_glib/test/test-tensor.rb
index e812d5e..4f18011 100644
--- a/c_glib/test/test-tensor.rb
+++ b/c_glib/test/test-tensor.rb
@@ -120,7 +120,6 @@ class TestTensor < Test::Unit::TestCase
     output = Arrow::BufferOutputStream.new(buffer)
     output.write_tensor(@tensor)
     input = Arrow::BufferInputStream.new(buffer)
-    assert_equal(@tensor,
-                 input.read_tensor(0))
+    assert_equal(@tensor, input.read_tensor)
   end
 end
diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake
index a707a21..c2c50eb 100644
--- a/cpp/cmake_modules/SetupCxxFlags.cmake
+++ b/cpp/cmake_modules/SetupCxxFlags.cmake
@@ -178,7 +178,8 @@ if ("${COMPILER_FAMILY}" STREQUAL "msvc")
   set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /wd4800")
 endif()
 
-if ("${COMPILER_FAMILY}" STREQUAL "gcc")
+if ("${COMPILER_FAMILY}" STREQUAL "gcc" AND
+    "${COMPILER_VERSION}" VERSION_GREATER "6.0")
   # Without this, gcc >= 7 warns related to changes in C++17
   set(CXX_ONLY_FLAGS "${CXX_ONLY_FLAGS} -Wno-noexcept-type")
 endif()
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index 42b99bf..75d0c21 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -144,7 +144,7 @@ class ARROW_EXPORT Buffer {
   static std::shared_ptr<Buffer> Wrap(const std::vector<T>& data) {
     return std::make_shared<Buffer>(reinterpret_cast<const uint8_t*>(data.data()),
                                     static_cast<int64_t>(sizeof(T) * data.size()));
-  }  // namespace arrow
+  }
 
   /// \brief Copy buffer contents into a new std::string
   /// \return std::string
diff --git a/cpp/src/arrow/flight/flight-benchmark.cc b/cpp/src/arrow/flight/flight-benchmark.cc
index ac50ab0..1110ec3 100644
--- a/cpp/src/arrow/flight/flight-benchmark.cc
+++ b/cpp/src/arrow/flight/flight-benchmark.cc
@@ -156,7 +156,8 @@ Status RunPerformanceTest(const int port) {
 
   // Elapsed time in seconds
   uint64_t elapsed_nanos = timer.Stop();
-  double time_elapsed = elapsed_nanos / static_cast<double>(1000000000LL);
+  double time_elapsed =
+      static_cast<double>(elapsed_nanos) / static_cast<double>(1000000000);
 
   constexpr double kMegabyte = static_cast<double>(1 << 20);
 
@@ -167,8 +168,9 @@ Status RunPerformanceTest(const int port) {
 
   std::cout << "Bytes read: " << stats.total_bytes << std::endl;
   std::cout << "Nanos: " << elapsed_nanos << std::endl;
-  std::cout << "Speed: " << (stats.total_bytes / time_elapsed / kMegabyte) << " MB/s"
-            << std::endl;
+  std::cout << "Speed: "
+            << (static_cast<double>(stats.total_bytes) / kMegabyte / time_elapsed)
+            << " MB/s" << std::endl;
   return Status::OK();
 }
 
diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
index 967a254..46815b5 100644
--- a/cpp/src/arrow/flight/server.cc
+++ b/cpp/src/arrow/flight/server.cc
@@ -146,7 +146,7 @@ class SerializationTraits<IpcPayload> {
       pb_stream.WriteRawMaybeAliased(buffer->data(), static_cast<int>(buffer->size()));
 
       // Write padding if not multiple of 8
-      const int remainder = buffer->size() % 8;
+      const int remainder = static_cast<int>(buffer->size() % 8);
       if (remainder) {
         pb_stream.WriteRawMaybeAliased(kPaddingBytes, 8 - remainder);
       }
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index 0456020..a35bf67 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -28,6 +28,11 @@ namespace io {
 
 FileInterface::~FileInterface() = default;
 
+Status InputStream::Advance(int64_t nbytes) {
+  std::shared_ptr<Buffer> temp;
+  return Read(nbytes, &temp);
+}
+
 struct RandomAccessFile::RandomAccessFileImpl {
   std::mutex lock_;
 };
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index a9d68c3..ec368f6 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -114,6 +114,12 @@ class ARROW_EXPORT OutputStream : virtual public FileInterface, public Writable
 };
 
 class ARROW_EXPORT InputStream : virtual public FileInterface, public Readable {
+ public:
+  /// \brief Advance or skip stream indicated number of bytes
+  /// \param[in] nbytes the number to move forward
+  /// \return Status
+  Status Advance(int64_t nbytes);
+
  protected:
   InputStream() = default;
 };
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index d9b22c7..ea4e8ca 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -40,6 +40,12 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream {
  public:
   explicit BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer);
 
+  /// \brief Create in-memory output stream with indicated capacity using a
+  /// memory pool
+  /// \param[in] initial_capacity the initial allocated internal capacity of
+  /// the OutputStream
+  /// \param[in,out] pool a MemoryPool to use for allocations
+  /// \param[out] out the created stream
   static Status Create(int64_t initial_capacity, MemoryPool* pool,
                        std::shared_ptr<BufferOutputStream>* out);
 
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index 3154d57..f8e29f5 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -28,6 +28,7 @@
 #include "arrow/buffer.h"
 #include "arrow/io/memory.h"
 #include "arrow/io/test-common.h"
+#include "arrow/ipc/Message_generated.h"
 #include "arrow/ipc/api.h"
 #include "arrow/ipc/metadata-internal.h"
 #include "arrow/ipc/test-common.h"
@@ -86,6 +87,58 @@ TEST(TestMessage, Equals) {
   ASSERT_FALSE(msg5.Equals(msg1));
 }
 
+TEST(TestMessage, SerializeTo) {
+  const int64_t body_length = 64;
+
+  flatbuffers::FlatBufferBuilder fbb;
+  fbb.Finish(flatbuf::CreateMessage(fbb, internal::kCurrentMetadataVersion,
+                                    flatbuf::MessageHeader_RecordBatch, 0 /* header */,
+                                    body_length));
+
+  std::shared_ptr<Buffer> metadata;
+  ASSERT_OK(internal::WriteFlatbufferBuilder(fbb, &metadata));
+
+  std::string body = "abcdef";
+
+  std::unique_ptr<Message> message;
+  ASSERT_OK(Message::Open(metadata, std::make_shared<Buffer>(body), &message));
+
+  int64_t output_length = 0;
+  int64_t position = 0;
+
+  std::shared_ptr<io::BufferOutputStream> stream;
+
+  {
+    const int32_t alignment = 8;
+
+    ASSERT_OK(io::BufferOutputStream::Create(1 << 10, default_memory_pool(), &stream));
+    ASSERT_OK(message->SerializeTo(stream.get(), alignment, &output_length));
+    ASSERT_OK(stream->Tell(&position));
+    ASSERT_EQ(BitUtil::RoundUp(metadata->size() + 4, alignment) + body_length,
+              output_length);
+    ASSERT_EQ(output_length, position);
+  }
+
+  {
+    const int32_t alignment = 64;
+
+    ASSERT_OK(io::BufferOutputStream::Create(1 << 10, default_memory_pool(), &stream));
+    ASSERT_OK(message->SerializeTo(stream.get(), alignment, &output_length));
+    ASSERT_OK(stream->Tell(&position));
+    ASSERT_EQ(BitUtil::RoundUp(metadata->size() + 4, alignment) + body_length,
+              output_length);
+    ASSERT_EQ(output_length, position);
+  }
+}
+
+TEST(TestMessage, Verify) {
+  std::string metadata = "invalid";
+  std::string body = "abcdef";
+
+  Message message(std::make_shared<Buffer>(metadata), std::make_shared<Buffer>(body));
+  ASSERT_FALSE(message.Verify());
+}
+
 const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
 
 TEST_F(TestSchemaMetadata, PrimitiveFields) {
@@ -725,13 +778,22 @@ class TestTensorRoundTrip : public ::testing::Test, public IpcTestFixture {
     int32_t metadata_length;
     int64_t body_length;
 
+    const auto& type = checked_cast<const FixedWidthType&>(*tensor.type());
+    const int elem_size = type.bit_width() / 8;
+
     ASSERT_OK(mmap_->Seek(0));
 
     ASSERT_OK(WriteTensor(tensor, mmap_.get(), &metadata_length, &body_length));
 
+    const int64_t expected_body_length = elem_size * tensor.size();
+    ASSERT_EQ(expected_body_length, body_length);
+
+    ASSERT_OK(mmap_->Seek(0));
+
     std::shared_ptr<Tensor> result;
-    ASSERT_OK(ReadTensor(0, mmap_.get(), &result));
+    ASSERT_OK(ReadTensor(mmap_.get(), &result));
 
+    ASSERT_EQ(result->data()->size(), expected_body_length);
     ASSERT_TRUE(tensor.Equals(*result));
   }
 };
@@ -752,14 +814,22 @@ TEST_F(TestTensorRoundTrip, BasicRoundtrip) {
   auto data = Buffer::Wrap(values);
 
   Tensor t0(int64(), data, shape, strides, dim_names);
-  Tensor tzero(int64(), data, {}, {}, {});
+  Tensor t_no_dims(int64(), data, {}, {}, {});
+  Tensor t_zero_length_dim(int64(), data, {0}, {8}, {"foo"});
 
   CheckTensorRoundTrip(t0);
-  CheckTensorRoundTrip(tzero);
+  CheckTensorRoundTrip(t_no_dims);
+  CheckTensorRoundTrip(t_zero_length_dim);
 
   int64_t serialized_size;
   ASSERT_OK(GetTensorSize(t0, &serialized_size));
   ASSERT_TRUE(serialized_size > static_cast<int64_t>(size * sizeof(int64_t)));
+
+  // ARROW-2840: Check that padding/alignment minded
+  std::vector<int64_t> shape_2 = {1, 1};
+  std::vector<int64_t> strides_2 = {8, 8};
+  Tensor t0_not_multiple_64(int64(), data, shape_2, strides_2, dim_names);
+  CheckTensorRoundTrip(t0_not_multiple_64);
 }
 
 TEST_F(TestTensorRoundTrip, NonContiguous) {
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index d77248a..797a490 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -179,21 +179,42 @@ Status Message::ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& me
   return Message::Open(metadata, body, out);
 }
 
-Status Message::SerializeTo(io::OutputStream* file, int64_t* output_length) const {
+Status WritePadding(io::OutputStream* stream, int64_t nbytes) {
+  while (nbytes > 0) {
+    const int64_t bytes_to_write = std::min<int64_t>(nbytes, kArrowAlignment);
+    RETURN_NOT_OK(stream->Write(kPaddingBytes, bytes_to_write));
+    nbytes -= bytes_to_write;
+  }
+  return Status::OK();
+}
+
+Status Message::SerializeTo(io::OutputStream* stream, int32_t alignment,
+                            int64_t* output_length) const {
   int32_t metadata_length = 0;
-  RETURN_NOT_OK(internal::WriteMessage(*metadata(), file, &metadata_length));
+  RETURN_NOT_OK(internal::WriteMessage(*metadata(), alignment, stream, &metadata_length));
 
   *output_length = metadata_length;
 
   auto body_buffer = body();
   if (body_buffer) {
-    RETURN_NOT_OK(file->Write(body_buffer->data(), body_buffer->size()));
+    RETURN_NOT_OK(stream->Write(body_buffer->data(), body_buffer->size()));
     *output_length += body_buffer->size();
-  }
 
+    DCHECK_GE(this->body_length(), body_buffer->size());
+
+    int64_t remainder = this->body_length() - body_buffer->size();
+    RETURN_NOT_OK(WritePadding(stream, remainder));
+    *output_length += remainder;
+  }
   return Status::OK();
 }
 
+bool Message::Verify() const {
+  std::shared_ptr<Buffer> meta = this->metadata();
+  flatbuffers::Verifier verifier(meta->data(), meta->size(), 128);
+  return flatbuf::VerifyMessageBuffer(verifier);
+}
+
 std::string FormatMessageType(Message::Type type) {
   switch (type) {
     case Message::SCHEMA:
@@ -235,8 +256,33 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile
   return Message::ReadFrom(offset + metadata_length, metadata, file, message);
 }
 
-Status ReadMessage(io::InputStream* file, bool aligned,
-                   std::unique_ptr<Message>* message) {
+Status AlignStream(io::InputStream* stream, int32_t alignment) {
+  int64_t position = -1;
+  RETURN_NOT_OK(stream->Tell(&position));
+  return stream->Advance(PaddedLength(position, alignment) - position);
+}
+
+Status AlignStream(io::OutputStream* stream, int32_t alignment) {
+  int64_t position = -1;
+  RETURN_NOT_OK(stream->Tell(&position));
+  int64_t remainder = PaddedLength(position, alignment) - position;
+  if (remainder > 0) {
+    return stream->Write(kPaddingBytes, remainder);
+  }
+  return Status::OK();
+}
+
+Status CheckAligned(io::FileInterface* stream, int32_t alignment) {
+  int64_t current_position;
+  ARROW_RETURN_NOT_OK(stream->Tell(&current_position));
+  if (current_position % alignment != 0) {
+    return Status::Invalid("Stream is not aligned");
+  } else {
+    return Status::OK();
+  }
+}
+
+Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) {
   int32_t message_length = 0;
   int64_t bytes_read = 0;
   RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
@@ -262,22 +308,9 @@ Status ReadMessage(io::InputStream* file, bool aligned,
     return Status::Invalid(ss.str());
   }
 
-  // If requested, align the file before reading the message.
-  if (aligned) {
-    int64_t offset;
-    RETURN_NOT_OK(file->Tell(&offset));
-    int64_t aligned_offset = PaddedLength(offset);
-    int64_t num_extra_bytes = aligned_offset - offset;
-    std::shared_ptr<Buffer> dummy_buffer;
-    RETURN_NOT_OK(file->Read(num_extra_bytes, &dummy_buffer));
-  }
-
   return Message::ReadFrom(metadata, file, message);
 }
 
-Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) {
-  return ReadMessage(file, false /* aligned */, message);
-}
 // ----------------------------------------------------------------------
 // Implement InputStream message reader
 
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index 08176ab..092a19f 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -34,6 +34,7 @@ class Buffer;
 
 namespace io {
 
+class FileInterface;
 class InputStream;
 class OutputStream;
 class RandomAccessFile;
@@ -140,9 +141,17 @@ class ARROW_EXPORT Message {
   /// \brief Write length-prefixed metadata and body to output stream
   ///
   /// \param[in] file output stream to write to
+  /// \param[in] alignment byte alignment for metadata, usually 8 or
+  /// 64. Whether the body is padded depends on the metadata; if the body
+  /// buffer is smaller than the size indicated in the metadata, then extra
+  /// padding bytes will be written
   /// \param[out] output_length the number of bytes written
   /// \return Status
-  Status SerializeTo(io::OutputStream* file, int64_t* output_length) const;
+  Status SerializeTo(io::OutputStream* file, int32_t alignment,
+                     int64_t* output_length) const;
+
+  /// \brief Return true if the Message metadata passes Flatbuffer validation
+  bool Verify() const;
 
  private:
   // Hide serialization details from user API
@@ -192,20 +201,35 @@ ARROW_EXPORT
 Status ReadMessage(const int64_t offset, const int32_t metadata_length,
                    io::RandomAccessFile* file, std::unique_ptr<Message>* message);
 
+/// \brief Advance stream to an 8-byte offset if its position is not a multiple
+/// of 8 already
+/// \param[in] stream an input stream
+/// \param[in] alignment the byte multiple for the metadata prefix, usually 8
+/// or 64, to ensure the body starts on a multiple of that alignment
+/// \return Status
+ARROW_EXPORT
+Status AlignStream(io::InputStream* stream, int32_t alignment = 8);
+
+/// \brief Advance stream to an 8-byte offset if its position is not a multiple
+/// of 8 already
+/// \param[in] stream an output stream
+/// \param[in] alignment the byte multiple for the metadata prefix, usually 8
+/// or 64, to ensure the body starts on a multiple of that alignment
+/// \return Status
+ARROW_EXPORT
+Status AlignStream(io::OutputStream* stream, int32_t alignment = 8);
+
+/// \brief Return error Status if file position is not a multiple of the
+/// indicated alignment
+ARROW_EXPORT
+Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8);
+
 /// \brief Read encapsulated RPC message (metadata and body) from InputStream
 ///
 /// Read length-prefixed message with as-yet unknown length. Returns null if
 /// there are not enough bytes available or the message length is 0 (e.g. EOS
 /// in a stream)
 ARROW_EXPORT
-Status ReadMessage(io::InputStream* stream, bool aligned,
-                   std::unique_ptr<Message>* message);
-
-/// \brief Read encapsulated RPC message (metadata and body) from InputStream.
-///
-/// This is a version of ReadMessage that does not have the aligned argument
-/// for backwards compatibility.
-ARROW_EXPORT
 Status ReadMessage(io::InputStream* stream, std::unique_ptr<Message>* message);
 
 }  // namespace ipc
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index 0bf18f3..3d9b97c 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -682,18 +682,6 @@ static Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema,
   return Status::OK();
 }
 
-static Status WriteFlatbufferBuilder(FBB& fbb, std::shared_ptr<Buffer>* out) {
-  int32_t size = fbb.GetSize();
-
-  std::shared_ptr<Buffer> result;
-  RETURN_NOT_OK(AllocateBuffer(default_memory_pool(), size, &result));
-
-  uint8_t* dst = result->mutable_data();
-  memcpy(dst, fbb.GetBufferPointer(), size);
-  *out = result;
-  return Status::OK();
-}
-
 static Status WriteFBMessage(FBB& fbb, flatbuf::MessageHeader header_type,
                              flatbuffers::Offset<void> header, int64_t body_length,
                              std::shared_ptr<Buffer>* out) {
@@ -776,6 +764,9 @@ Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset,
 
   FBB fbb;
 
+  const auto& type = checked_cast<const FixedWidthType&>(*tensor.type());
+  const int elem_size = type.bit_width() / 8;
+
   flatbuf::Type fb_type_type;
   Offset fb_type;
   RETURN_NOT_OK(TensorTypeToFlatbuffer(fbb, *tensor.type(), &fb_type_type, &fb_type));
@@ -788,7 +779,8 @@ Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset,
 
   auto fb_shape = fbb.CreateVector(dims);
   auto fb_strides = fbb.CreateVector(tensor.strides());
-  int64_t body_length = tensor.data()->size();
+
+  int64_t body_length = tensor.size() * elem_size;
   flatbuf::Buffer buffer(buffer_start_offset, body_length);
 
   TensorOffset fb_tensor =
@@ -953,20 +945,13 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type
 // ----------------------------------------------------------------------
 // Implement message writing
 
-Status WriteMessage(const Buffer& message, io::OutputStream* file,
+Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file,
                     int32_t* message_length) {
-  // Need to write 4 bytes (message size), the message, plus padding to
-  // end on an 8-byte offset
-  int64_t start_offset;
-  RETURN_NOT_OK(file->Tell(&start_offset));
-
-  // TODO(wesm): Should we depend on the position of the OutputStream? See
-  // ARROW-3212
+  // ARROW-3212: We do not make assumptions that the output stream is aligned
   int32_t padded_message_length = static_cast<int32_t>(message.size()) + 4;
-  const int32_t remainder =
-      (padded_message_length + static_cast<int32_t>(start_offset)) % 8;
+  const int32_t remainder = padded_message_length % alignment;
   if (remainder != 0) {
-    padded_message_length += 8 - remainder;
+    padded_message_length += alignment - remainder;
   }
 
   // The returned message size includes the length prefix, the flatbuffer,
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
index 730a1a5..0683b8f 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -25,6 +25,7 @@
 #include <string>
 #include <vector>
 
+#include "arrow/buffer.h"
 #include "arrow/ipc/Schema_generated.h"
 #include "arrow/ipc/dictionary.h"
 #include "arrow/ipc/message.h"
@@ -97,10 +98,19 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type
                          std::vector<std::string>* dim_names);
 
 /// Write a serialized message metadata with a length-prefix and padding to an
-/// 8-byte offset
+/// 8-byte offset. Does not make assumptions about whether the stream is
+/// aligned already
 ///
 /// <message_size: int32><message: const void*><padding>
-Status WriteMessage(const Buffer& message, io::OutputStream* file,
+///
+/// \param[in] message a buffer containing the metadata to write
+/// \param[in] alignment the size multiple of the total message size including
+/// length prefix, metadata, and padding. Usually 8 or 64
+/// \param[in,out] file the OutputStream to write to
+/// \param[out] message_length the total size of the payload written including
+/// padding
+/// \return Status
+Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file,
                     int32_t* message_length);
 
 // Serialize arrow::Schema as a Flatbuffer
@@ -131,6 +141,19 @@ Status WriteDictionaryMessage(const int64_t id, const int64_t length,
                               const std::vector<BufferMetadata>& buffers,
                               std::shared_ptr<Buffer>* out);
 
+static inline Status WriteFlatbufferBuilder(flatbuffers::FlatBufferBuilder& fbb,
+                                            std::shared_ptr<Buffer>* out) {
+  int32_t size = fbb.GetSize();
+
+  std::shared_ptr<Buffer> result;
+  RETURN_NOT_OK(AllocateBuffer(default_memory_pool(), size, &result));
+
+  uint8_t* dst = result->mutable_data();
+  memcpy(dst, fbb.GetBufferPointer(), size);
+  *out = result;
+  return Status::OK();
+}
+
 }  // namespace internal
 }  // namespace ipc
 }  // namespace arrow
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 92cf75b..ba83229 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -695,20 +695,15 @@ Status RecordBatchFileReader::ReadRecordBatch(int i,
   return impl_->ReadRecordBatch(i, batch);
 }
 
-static Status ReadContiguousPayload(io::InputStream* file, bool aligned,
+static Status ReadContiguousPayload(io::InputStream* file,
                                     std::unique_ptr<Message>* message) {
-  RETURN_NOT_OK(ReadMessage(file, aligned, message));
+  RETURN_NOT_OK(ReadMessage(file, message));
   if (*message == nullptr) {
     return Status::Invalid("Unable to read metadata at offset");
   }
   return Status::OK();
 }
 
-static Status ReadContiguousPayload(io::InputStream* file,
-                                    std::unique_ptr<Message>* message) {
-  return ReadContiguousPayload(file, false /* aligned */, message);
-}
-
 Status ReadSchema(io::InputStream* stream, std::shared_ptr<Schema>* out) {
   std::shared_ptr<RecordBatchReader> reader;
   RETURN_NOT_OK(RecordBatchStreamReader::Open(stream, &reader));
@@ -725,14 +720,9 @@ Status ReadRecordBatch(const std::shared_ptr<Schema>& schema, io::InputStream* f
                          out);
 }
 
-Status ReadTensor(int64_t offset, io::RandomAccessFile* file,
-                  std::shared_ptr<Tensor>* out) {
-  // Respect alignment of Tensor messages (see WriteTensor)
-  offset = PaddedLength(offset);
-  RETURN_NOT_OK(file->Seek(offset));
-
+Status ReadTensor(io::InputStream* file, std::shared_ptr<Tensor>* out) {
   std::unique_ptr<Message> message;
-  RETURN_NOT_OK(ReadContiguousPayload(file, true /* aligned */, &message));
+  RETURN_NOT_OK(ReadContiguousPayload(file, &message));
   return ReadTensor(*message, out);
 }
 
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index faa63d5..942664d 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -219,15 +219,13 @@ Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& sc
                        int max_recursion_depth, io::RandomAccessFile* file,
                        std::shared_ptr<RecordBatch>* out);
 
-/// \brief EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file
+/// \brief Read arrow::Tensor as encapsulated IPC message in file
 ///
-/// \param[in] offset the file location of the start of the message
-/// \param[in] file the file where the batch is located
+/// \param[in] file an InputStream pointed at the start of the message
 /// \param[out] out the read tensor
 /// \return Status
 ARROW_EXPORT
-Status ReadTensor(int64_t offset, io::RandomAccessFile* file,
-                  std::shared_ptr<Tensor>* out);
+Status ReadTensor(io::InputStream* file, std::shared_ptr<Tensor>* out);
 
 /// \brief EXPERIMENTAL: Read arrow::Tensor from IPC message
 ///
diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h
index 412f312..80f9f3c 100644
--- a/cpp/src/arrow/ipc/util.h
+++ b/cpp/src/arrow/ipc/util.h
@@ -28,14 +28,17 @@ namespace arrow {
 namespace ipc {
 
 // Buffers are padded to 64-byte boundaries (for SIMD)
-static constexpr int kArrowAlignment = 64;
+static constexpr int32_t kArrowAlignment = 64;
+
+// Tensors are padded to 64-byte boundaries
+static constexpr int32_t kTensorAlignment = 64;
 
 // Align on 8-byte boundaries in IPC
-static constexpr int kArrowIpcAlignment = 8;
+static constexpr int32_t kArrowIpcAlignment = 8;
 
 static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0};
 
-static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) {
+static inline int64_t PaddedLength(int64_t nbytes, int32_t alignment = kArrowAlignment) {
   return ((nbytes + alignment - 1) / alignment) * alignment;
 }
 
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 60ca34e..7568bfd 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -482,16 +482,11 @@ class DictionaryWriter : public RecordBatchSerializer {
 
 Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
                        int32_t* metadata_length) {
-#ifndef NDEBUG
-  int64_t start_position, current_position;
-  RETURN_NOT_OK(dst->Tell(&start_position));
-#endif
-
-  RETURN_NOT_OK(internal::WriteMessage(*payload.metadata, dst, metadata_length));
+  RETURN_NOT_OK(internal::WriteMessage(*payload.metadata, kArrowIpcAlignment, dst,
+                                       metadata_length));
 
 #ifndef NDEBUG
-  RETURN_NOT_OK(dst->Tell(&current_position));
-  DCHECK(BitUtil::IsMultipleOf8(current_position));
+  RETURN_NOT_OK(CheckAligned(dst));
 #endif
 
   // Now write the buffers
@@ -516,8 +511,7 @@ Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
   }
 
 #ifndef NDEBUG
-  RETURN_NOT_OK(dst->Tell(&current_position));
-  DCHECK(BitUtil::IsMultipleOf8(current_position));
+  RETURN_NOT_OK(CheckAligned(dst));
 #endif
 
   return Status::OK();
@@ -531,18 +525,6 @@ Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool,
 
 }  // namespace internal
 
-// Adds padding bytes if necessary to ensure all memory blocks are written on
-// 64-byte boundaries.
-Status AlignStreamPosition(io::OutputStream* stream) {
-  int64_t position;
-  RETURN_NOT_OK(stream->Tell(&position));
-  int64_t remainder = PaddedLength(position) - position;
-  if (remainder > 0) {
-    return stream->Write(kPaddingBytes, remainder);
-  }
-  return Status::OK();
-}
-
 Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
                         io::OutputStream* dst, int32_t* metadata_length,
                         int64_t* body_length, MemoryPool* pool, int max_recursion_depth,
@@ -584,10 +566,10 @@ Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offs
 namespace {
 
 Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst,
-                         int32_t* metadata_length, int64_t* body_length) {
+                         int32_t* metadata_length) {
   std::shared_ptr<Buffer> metadata;
   RETURN_NOT_OK(internal::WriteTensorMessage(tensor, 0, &metadata));
-  return internal::WriteMessage(*metadata, dst, metadata_length);
+  return internal::WriteMessage(*metadata, kTensorAlignment, dst, metadata_length);
 }
 
 Status WriteStridedTensorData(int dim_index, int64_t offset, int elem_size,
@@ -636,30 +618,24 @@ Status GetContiguousTensor(const Tensor& tensor, MemoryPool* pool,
 
 Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length,
                    int64_t* body_length) {
-  RETURN_NOT_OK(AlignStreamPosition(dst));
+  const auto& type = checked_cast<const FixedWidthType&>(*tensor.type());
+  const int elem_size = type.bit_width() / 8;
+
+  *body_length = tensor.size() * elem_size;
 
+  // Tensor metadata accounts for padding
   if (tensor.is_contiguous()) {
-    RETURN_NOT_OK(WriteTensorHeader(tensor, dst, metadata_length, body_length));
-    // It's important to align the stream position again so that the tensor data
-    // is aligned.
-    RETURN_NOT_OK(AlignStreamPosition(dst));
+    RETURN_NOT_OK(WriteTensorHeader(tensor, dst, metadata_length));
     auto data = tensor.data();
     if (data && data->data()) {
-      *body_length = data->size();
-      return dst->Write(data->data(), *body_length);
+      RETURN_NOT_OK(dst->Write(data->data(), *body_length));
     } else {
       *body_length = 0;
-      return Status::OK();
     }
   } else {
-    Tensor dummy(tensor.type(), tensor.data(), tensor.shape());
-    const auto& type = checked_cast<const FixedWidthType&>(*tensor.type());
-    RETURN_NOT_OK(WriteTensorHeader(dummy, dst, metadata_length, body_length));
-    // It's important to align the stream position again so that the tensor data
-    // is aligned.
-    RETURN_NOT_OK(AlignStreamPosition(dst));
-
-    const int elem_size = type.bit_width() / 8;
+    // The tensor written is made contiguous
+    Tensor dummy(tensor.type(), nullptr, tensor.shape());
+    RETURN_NOT_OK(WriteTensorHeader(dummy, dst, metadata_length));
 
     // TODO(wesm): Do we care enough about this temporary allocation to pass in
     // a MemoryPool to this function?
@@ -667,9 +643,11 @@ Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadat
     RETURN_NOT_OK(
         AllocateBuffer(tensor.shape()[tensor.ndim() - 1] * elem_size, &scratch_space));
 
-    return WriteStridedTensorData(0, 0, elem_size, tensor, scratch_space->mutable_data(),
-                                  dst);
+    RETURN_NOT_OK(WriteStridedTensorData(0, 0, elem_size, tensor,
+                                         scratch_space->mutable_data(), dst));
   }
+
+  return Status::OK();
 }
 
 Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool,
@@ -763,7 +741,7 @@ class StreamBookKeeper {
     return Status::OK();
   }
 
-  Status Align(int64_t alignment = kArrowIpcAlignment) {
+  Status Align(int32_t alignment = kArrowIpcAlignment) {
     // Adds padding bytes if necessary to ensure all memory blocks are written on
     // 8-byte (or other alignment) boundaries.
     int64_t remainder = PaddedLength(position_, alignment) - position_;
@@ -801,7 +779,7 @@ class SchemaWriter : public StreamBookKeeper {
     RETURN_NOT_OK(internal::WriteSchemaMessage(schema_, dictionary_memo_, &schema_fb));
 
     int32_t metadata_length = 0;
-    RETURN_NOT_OK(internal::WriteMessage(*schema_fb, sink_, &metadata_length));
+    RETURN_NOT_OK(internal::WriteMessage(*schema_fb, 8, sink_, &metadata_length));
     RETURN_NOT_OK(UpdatePositionCheckAligned());
     return Status::OK();
   }
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index bcf09aa..9843126 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -256,11 +256,14 @@ ARROW_EXPORT
 Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool,
                         std::unique_ptr<Message>* out);
 
-/// \brief EXPERIMENTAL: Write arrow::Tensor as a contiguous message
+/// \brief Write arrow::Tensor as a contiguous message. The metadata and body
+/// are written assuming 64-byte alignment. It is the user's responsibility to
+/// ensure that the OutputStream has been aligned to a 64-byte multiple before
+/// writing the message.
 ///
 /// \param[in] tensor the Tensor to write
 /// \param[in] dst the OutputStream to write to
-/// \param[out] metadata_length the actual metadata length
+/// \param[out] metadata_length the actual metadata length, including padding
 /// \param[out] body_length the acutal message body length
 /// \return Status
 ///
diff --git a/cpp/src/arrow/python/deserialize.cc b/cpp/src/arrow/python/deserialize.cc
index 3dbc18f..3bf23d7 100644
--- a/cpp/src/arrow/python/deserialize.cc
+++ b/cpp/src/arrow/python/deserialize.cc
@@ -32,6 +32,7 @@
 #include "arrow/io/interfaces.h"
 #include "arrow/io/memory.h"
 #include "arrow/ipc/reader.h"
+#include "arrow/ipc/util.h"
 #include "arrow/table.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/logging.h"
@@ -250,7 +251,6 @@ Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx,
 }
 
 Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out) {
-  int64_t offset;
   int64_t bytes_read;
   int32_t num_tensors;
   int32_t num_buffers;
@@ -264,15 +264,21 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out)
   RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader));
   RETURN_NOT_OK(reader->ReadNext(&out->batch));
 
-  RETURN_NOT_OK(src->Tell(&offset));
-  offset += 4;  // Skip the end-of-stream message
+  /// Skip EOS marker
+  RETURN_NOT_OK(src->Advance(4));
+
+  /// Align stream so tensor bodies are 64-byte aligned
+  RETURN_NOT_OK(ipc::AlignStream(src, ipc::kTensorAlignment));
+
   for (int i = 0; i < num_tensors; ++i) {
     std::shared_ptr<Tensor> tensor;
-    RETURN_NOT_OK(ipc::ReadTensor(offset, src, &tensor));
+    RETURN_NOT_OK(ipc::ReadTensor(src, &tensor));
+    RETURN_NOT_OK(ipc::AlignStream(src, ipc::kTensorAlignment));
     out->tensors.push_back(tensor);
-    RETURN_NOT_OK(src->Tell(&offset));
   }
 
+  int64_t offset = -1;
+  RETURN_NOT_OK(src->Tell(&offset));
   for (int i = 0; i < num_buffers; ++i) {
     int64_t size;
     RETURN_NOT_OK(src->ReadAt(offset, sizeof(int64_t), &bytes_read,
diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc
index 56565ac..92a6519 100644
--- a/cpp/src/arrow/python/serialize.cc
+++ b/cpp/src/arrow/python/serialize.cc
@@ -32,6 +32,7 @@
 #include "arrow/builder.h"
 #include "arrow/io/interfaces.h"
 #include "arrow/io/memory.h"
+#include "arrow/ipc/util.h"
 #include "arrow/ipc/writer.h"
 #include "arrow/memory_pool.h"
 #include "arrow/record_batch.h"
@@ -760,10 +761,14 @@ Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
       dst->Write(reinterpret_cast<const uint8_t*>(&num_buffers), sizeof(int32_t)));
   RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, dst));
 
+  // Align stream to 64-byte offset so tensor bodies are 64-byte aligned
+  RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
+
   int32_t metadata_length;
   int64_t body_length;
   for (const auto& tensor : this->tensors) {
     RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length));
+    RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
   }
 
   for (const auto& buffer : this->buffers) {
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 370e14d..4b37cf8 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -780,7 +780,8 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
         MetadataVersion metadata_version()
         MessageType type()
 
-        CStatus SerializeTo(OutputStream* stream, int64_t* output_length)
+        CStatus SerializeTo(OutputStream* stream, int32_t alignment,
+                            int64_t* output_length)
 
     c_string FormatMessageType(MessageType type)
 
@@ -848,8 +849,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
                         int32_t* metadata_length,
                         int64_t* body_length)
 
-    CStatus ReadTensor(int64_t offset, RandomAccessFile* file,
-                       shared_ptr[CTensor]* out)
+    CStatus ReadTensor(InputStream* stream, shared_ptr[CTensor]* out)
 
     CStatus ReadRecordBatch(const CMessage& message,
                             const shared_ptr[CSchema]& schema,
@@ -868,6 +868,9 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
                             InputStream* stream,
                             shared_ptr[CRecordBatch]* out)
 
+    CStatus AlignStream(InputStream* stream, int64_t alignment)
+    CStatus AlignStream(OutputStream* stream, int64_t alignment)
+
     cdef cppclass CFeatherWriter" arrow::ipc::feather::TableWriter":
         @staticmethod
         CStatus Open(const shared_ptr[OutputStream]& stream,
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 5c259ea..0331fba 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -60,12 +60,14 @@ cdef class Message:
             result = self.message.get().Equals(deref(other.message.get()))
         return result
 
-    def serialize(self, memory_pool=None):
+    def serialize(self, alignment=8, memory_pool=None):
         """
         Write message as encapsulated IPC message
 
         Parameters
         ----------
+        alignment : int, default 8
+            Byte alignment for metadata and body
         memory_pool : MemoryPool, default None
             Uses default memory pool if not specified
 
@@ -76,10 +78,11 @@ cdef class Message:
         cdef:
             BufferOutputStream stream = BufferOutputStream(memory_pool)
             int64_t output_length = 0
+            int32_t c_alignment = alignment
 
         with nogil:
             check_status(self.message.get()
-                         .SerializeTo(stream.wr_file.get(),
+                         .SerializeTo(stream.wr_file.get(), c_alignment,
                                       &output_length))
         return stream.getvalue()
 
@@ -440,10 +443,10 @@ def write_tensor(Tensor tensor, NativeFile dest):
 
 
 def read_tensor(NativeFile source):
-    """
-    Read pyarrow.Tensor from pyarrow.NativeFile object from current
+    """Read pyarrow.Tensor from pyarrow.NativeFile object from current
     position. If the file source supports zero copy (e.g. a memory map), then
-    this operation does not allocate any memory
+    this operation does not allocate any memory. This function not assume that
+    the stream is aligned
 
     Parameters
     ----------
@@ -452,16 +455,14 @@ def read_tensor(NativeFile source):
     Returns
     -------
     tensor : Tensor
+
     """
     cdef:
         shared_ptr[CTensor] sp_tensor
 
     source._assert_readable()
-
-    cdef int64_t offset = source.tell()
     with nogil:
-        check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor))
-
+        check_status(ReadTensor(source.rd_file.get(), &sp_tensor))
     return pyarrow_wrap_tensor(sp_tensor)
 
 
diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py
index fff0ca4..68266c8 100644
--- a/python/pyarrow/tests/conftest.py
+++ b/python/pyarrow/tests/conftest.py
@@ -63,6 +63,13 @@ except ImportError:
     pass
 
 
+try:
+    import tensorflow  # noqa
+    defaults['tensorflow'] = True
+except ImportError:
+    pass
+
+
 def pytest_configure(config):
     pass
 
diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py
index 4bac300..184a144 100644
--- a/python/pyarrow/tests/test_serialization.py
+++ b/python/pyarrow/tests/test_serialization.py
@@ -640,6 +640,18 @@ def test_serialize_to_components_invalid_cases():
         pa.deserialize_components(components)
 
 
+def test_serialize_read_concatenated_records():
+    # ARROW-1996 -- see stream alignment work in ARROW-2840, ARROW-3212
+    f = pa.BufferOutputStream()
+    pa.serialize_to(12, f)
+    pa.serialize_to(23, f)
+    buf = f.getvalue()
+
+    f = pa.BufferReader(buf)
+    pa.read_serialized(f).deserialize()
+    pa.read_serialized(f).deserialize()
+
+
 @pytest.mark.skipif(os.name == 'nt', reason="deserialize_regex not pickleable")
 def test_deserialize_in_different_process():
     from multiprocessing import Process, Queue