You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2019/08/14 10:45:39 UTC

[arrow] branch master updated: ARROW-5559: [C++] Add an IpcOptions structure

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new cef7e80  ARROW-5559: [C++] Add an IpcOptions structure
cef7e80 is described below

commit cef7e80b527523b01a13bffbe9b36a547c3d74a9
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Wed Aug 14 12:45:21 2019 +0200

    ARROW-5559: [C++] Add an IpcOptions structure
    
    Switch existing APIs to take IpcOptions rather than individual option arguments.
    
    Closes #5032 from pitrou/ARROW-5559-ipc-options and squashes the following commits:
    
    e39ddc640 <Antoine Pitrou> ARROW-5559:  Add an IpcOptions structure
    
    Authored-by: Antoine Pitrou <an...@python.org>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/CMakeLists.txt                |   1 +
 cpp/src/arrow/extension_type-test.cc        |   6 +-
 cpp/src/arrow/flight/client.cc              |  10 +-
 cpp/src/arrow/flight/client.h               |   3 +-
 cpp/src/arrow/flight/perf-server.cc         |   7 +-
 cpp/src/arrow/flight/server.cc              |  14 +--
 cpp/src/arrow/ipc/message.h                 |   6 +-
 cpp/src/arrow/ipc/options.cc                |  26 +++++
 cpp/src/arrow/ipc/options.h                 |  43 +++++++++
 cpp/src/arrow/ipc/read-write-benchmark.cc   |   6 +-
 cpp/src/arrow/ipc/read-write-test.cc        |  64 ++++++++-----
 cpp/src/arrow/ipc/reader.cc                 |  27 ++++--
 cpp/src/arrow/ipc/reader.h                  |   5 +-
 cpp/src/arrow/ipc/writer.cc                 | 144 +++++++++++++++-------------
 cpp/src/arrow/ipc/writer.h                  |  47 +++++----
 cpp/src/arrow/python/flight.cc              |   4 +-
 cpp/src/arrow/python/flight.h               |   1 +
 cpp/src/arrow/python/serialize.cc           |   7 +-
 cpp/src/arrow/python/serialize.h            |   4 +
 python/pyarrow/_flight.pyx                  |   8 +-
 python/pyarrow/includes/libarrow.pxd        |  20 +++-
 python/pyarrow/includes/libarrow_flight.pxd |  17 +---
 python/pyarrow/ipc.pxi                      |   2 +-
 r/src/recordbatchwriter.cpp                 |   2 +-
 24 files changed, 299 insertions(+), 175 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index dc1cec2..0085238 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -255,6 +255,7 @@ if(ARROW_IPC)
       ipc/json-simple.cc
       ipc/message.cc
       ipc/metadata-internal.cc
+      ipc/options.cc
       ipc/reader.cc
       ipc/writer.cc)
   set(ARROW_SRCS ${ARROW_SRCS} ${ARROW_IPC_SRCS})
diff --git a/cpp/src/arrow/extension_type-test.cc b/cpp/src/arrow/extension_type-test.cc
index 29c5356..2f680af 100644
--- a/cpp/src/arrow/extension_type-test.cc
+++ b/cpp/src/arrow/extension_type-test.cc
@@ -221,7 +221,8 @@ auto RoundtripBatch = [](const std::shared_ptr<RecordBatch>& batch,
                          std::shared_ptr<RecordBatch>* out) {
   std::shared_ptr<io::BufferOutputStream> out_stream;
   ASSERT_OK(io::BufferOutputStream::Create(1024, default_memory_pool(), &out_stream));
-  ASSERT_OK(ipc::WriteRecordBatchStream({batch}, out_stream.get()));
+  ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcOptions::Defaults(),
+                                        out_stream.get()));
 
   std::shared_ptr<Buffer> complete_ipc_stream;
   ASSERT_OK(out_stream->Finish(&complete_ipc_stream));
@@ -273,7 +274,8 @@ TEST_F(TestExtensionType, UnrecognizedExtension) {
   // and ensure that a plain instance of the storage type is created
   std::shared_ptr<io::BufferOutputStream> out_stream;
   ASSERT_OK(io::BufferOutputStream::Create(1024, default_memory_pool(), &out_stream));
-  ASSERT_OK(ipc::WriteRecordBatchStream({batch}, out_stream.get()));
+  ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcOptions::Defaults(),
+                                        out_stream.get()));
 
   std::shared_ptr<Buffer> complete_ipc_stream;
   ASSERT_OK(out_stream->Finish(&complete_ipc_stream));
diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
index eb4df58..6dc5944 100644
--- a/cpp/src/arrow/flight/client.cc
+++ b/cpp/src/arrow/flight/client.cc
@@ -251,13 +251,13 @@ class GrpcStreamWriter : public FlightStreamWriter {
       std::shared_ptr<grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>> writer,
       std::unique_ptr<FlightStreamWriter>* out);
 
-  Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override {
-    return WriteWithMetadata(batch, nullptr, allow_64bit);
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    return WriteWithMetadata(batch, nullptr);
   }
-  Status WriteWithMetadata(const RecordBatch& batch, std::shared_ptr<Buffer> app_metadata,
-                           bool allow_64bit = false) override {
+  Status WriteWithMetadata(const RecordBatch& batch,
+                           std::shared_ptr<Buffer> app_metadata) override {
     app_metadata_ = app_metadata;
-    return batch_writer_->WriteRecordBatch(batch, allow_64bit);
+    return batch_writer_->WriteRecordBatch(batch);
   }
   void set_memory_pool(MemoryPool* pool) override {
     batch_writer_->set_memory_pool(pool);
diff --git a/cpp/src/arrow/flight/client.h b/cpp/src/arrow/flight/client.h
index 0fa571d..774b0f6 100644
--- a/cpp/src/arrow/flight/client.h
+++ b/cpp/src/arrow/flight/client.h
@@ -86,8 +86,7 @@ class ARROW_FLIGHT_EXPORT FlightStreamReader : public MetadataRecordBatchReader
 class ARROW_FLIGHT_EXPORT FlightStreamWriter : public ipc::RecordBatchWriter {
  public:
   virtual Status WriteWithMetadata(const RecordBatch& batch,
-                                   std::shared_ptr<Buffer> app_metadata,
-                                   bool allow_64bit = false) = 0;
+                                   std::shared_ptr<Buffer> app_metadata) = 0;
 };
 
 #ifdef _MSC_VER
diff --git a/cpp/src/arrow/flight/perf-server.cc b/cpp/src/arrow/flight/perf-server.cc
index 60b2d4e..129fe53 100644
--- a/cpp/src/arrow/flight/perf-server.cc
+++ b/cpp/src/arrow/flight/perf-server.cc
@@ -74,7 +74,7 @@ class PerfDataStream : public FlightDataStream {
   std::shared_ptr<Schema> schema() override { return schema_; }
 
   Status GetSchemaPayload(FlightPayload* payload) override {
-    return ipc::internal::GetSchemaPayload(*schema_, &dictionary_memo_,
+    return ipc::internal::GetSchemaPayload(*schema_, ipc_options_, &dictionary_memo_,
                                            &payload->ipc_message);
   }
 
@@ -103,8 +103,8 @@ class PerfDataStream : public FlightDataStream {
     } else {
       records_sent_ += batch_length_;
     }
-    return ipc::internal::GetRecordBatchPayload(*batch, default_memory_pool(),
-                                                &payload->ipc_message);
+    return ipc::internal::GetRecordBatchPayload(
+        *batch, ipc_options_, default_memory_pool(), &payload->ipc_message);
   }
 
  private:
@@ -115,6 +115,7 @@ class PerfDataStream : public FlightDataStream {
   int64_t records_sent_;
   std::shared_ptr<Schema> schema_;
   ipc::DictionaryMemo dictionary_memo_;
+  ipc::IpcOptions ipc_options_;
   std::shared_ptr<RecordBatch> batch_;
   ArrayVector arrays_;
 };
diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
index 849fbf0..d3178ed 100644
--- a/cpp/src/arrow/flight/server.cc
+++ b/cpp/src/arrow/flight/server.cc
@@ -36,6 +36,7 @@
 
 #include "arrow/buffer.h"
 #include "arrow/ipc/dictionary.h"
+#include "arrow/ipc/options.h"
 #include "arrow/ipc/reader.h"
 #include "arrow/ipc/writer.h"
 #include "arrow/memory_pool.h"
@@ -640,13 +641,13 @@ class RecordBatchStream::RecordBatchStreamImpl {
 
   RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
                         MemoryPool* pool)
-      : pool_(pool), reader_(reader) {}
+      : pool_(pool), reader_(reader), ipc_options_(ipc::IpcOptions::Defaults()) {}
 
   std::shared_ptr<Schema> schema() { return reader_->schema(); }
 
   Status GetSchemaPayload(FlightPayload* payload) {
-    return ipc::internal::GetSchemaPayload(*reader_->schema(), &dictionary_memo_,
-                                           &payload->ipc_message);
+    return ipc::internal::GetSchemaPayload(*reader_->schema(), ipc_options_,
+                                           &dictionary_memo_, &payload->ipc_message);
   }
 
   Status Next(FlightPayload* payload) {
@@ -664,7 +665,7 @@ class RecordBatchStream::RecordBatchStreamImpl {
     if (stage_ == Stage::DICTIONARY) {
       if (dictionary_index_ == static_cast<int>(dictionaries_.size())) {
         stage_ = Stage::RECORD_BATCH;
-        return ipc::internal::GetRecordBatchPayload(*current_batch_, pool_,
+        return ipc::internal::GetRecordBatchPayload(*current_batch_, ipc_options_, pool_,
                                                     &payload->ipc_message);
       } else {
         return GetNextDictionary(payload);
@@ -679,7 +680,7 @@ class RecordBatchStream::RecordBatchStreamImpl {
       payload->ipc_message.metadata = nullptr;
       return Status::OK();
     } else {
-      return ipc::internal::GetRecordBatchPayload(*current_batch_, pool_,
+      return ipc::internal::GetRecordBatchPayload(*current_batch_, ipc_options_, pool_,
                                                   &payload->ipc_message);
     }
   }
@@ -687,7 +688,7 @@ class RecordBatchStream::RecordBatchStreamImpl {
  private:
   Status GetNextDictionary(FlightPayload* payload) {
     const auto& it = dictionaries_[dictionary_index_++];
-    return ipc::internal::GetDictionaryPayload(it.first, it.second, pool_,
+    return ipc::internal::GetDictionaryPayload(it.first, it.second, ipc_options_, pool_,
                                                &payload->ipc_message);
   }
 
@@ -703,6 +704,7 @@ class RecordBatchStream::RecordBatchStreamImpl {
   MemoryPool* pool_;
   std::shared_ptr<RecordBatchReader> reader_;
   ipc::DictionaryMemo dictionary_memo_;
+  ipc::IpcOptions ipc_options_;
   std::shared_ptr<RecordBatch> current_batch_;
   std::vector<std::pair<int64_t, std::shared_ptr<Array>>> dictionaries_;
 
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index fcc7e77..9c152d7 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -24,6 +24,7 @@
 #include <memory>
 #include <string>
 
+#include "arrow/ipc/options.h"
 #include "arrow/status.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
@@ -57,11 +58,6 @@ enum class MetadataVersion : char {
   V4
 };
 
-// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
-// deeply nested schemas, it is expected the user will indicate explicitly the
-// maximum allowed recursion depth
-constexpr int kMaxNestingDepth = 64;
-
 // Read interface classes. We do not fully deserialize the flatbuffers so that
 // individual fields metadata can be retrieved from very large schema without
 //
diff --git a/cpp/src/arrow/ipc/options.cc b/cpp/src/arrow/ipc/options.cc
new file mode 100644
index 0000000..a5714f3
--- /dev/null
+++ b/cpp/src/arrow/ipc/options.cc
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/ipc/options.h"
+
+namespace arrow {
+namespace ipc {
+
+IpcOptions IpcOptions::Defaults() { return IpcOptions(); }
+
+}  // namespace ipc
+}  // namespace arrow
diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h
new file mode 100644
index 0000000..d380402
--- /dev/null
+++ b/cpp/src/arrow/ipc/options.h
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace ipc {
+
+// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
+// deeply nested schemas, it is expected the user will indicate explicitly the
+// maximum allowed recursion depth
+constexpr int kMaxNestingDepth = 64;
+
+struct ARROW_EXPORT IpcOptions {
+  // If true, allow field lengths that don't fit in a signed 32-bit int.
+  // Some implementations may not be able to parse such streams.
+  bool allow_64bit = false;
+  // The maximum permitted schema nesting depth.
+  int max_recursion_depth = kMaxNestingDepth;
+
+  static IpcOptions Defaults();
+};
+
+}  // namespace ipc
+}  // namespace arrow
diff --git a/cpp/src/arrow/ipc/read-write-benchmark.cc b/cpp/src/arrow/ipc/read-write-benchmark.cc
index 6f66f9c..d2b1d99 100644
--- a/cpp/src/arrow/ipc/read-write-benchmark.cc
+++ b/cpp/src/arrow/ipc/read-write-benchmark.cc
@@ -50,6 +50,7 @@ std::shared_ptr<RecordBatch> MakeRecordBatch(int64_t total_size, int64_t num_fie
 static void WriteRecordBatch(benchmark::State& state) {  // NOLINT non-const reference
   // 1MB
   constexpr int64_t kTotalSize = 1 << 20;
+  auto options = ipc::IpcOptions::Defaults();
 
   std::shared_ptr<ResizableBuffer> buffer;
   ABORT_NOT_OK(AllocateResizableBuffer(kTotalSize & 2, &buffer));
@@ -60,7 +61,7 @@ static void WriteRecordBatch(benchmark::State& state) {  // NOLINT non-const ref
     int32_t metadata_length;
     int64_t body_length;
     if (!ipc::WriteRecordBatch(*record_batch, 0, &stream, &metadata_length, &body_length,
-                               default_memory_pool())
+                               options, default_memory_pool())
              .ok()) {
       state.SkipWithError("Failed to write!");
     }
@@ -71,6 +72,7 @@ static void WriteRecordBatch(benchmark::State& state) {  // NOLINT non-const ref
 static void ReadRecordBatch(benchmark::State& state) {  // NOLINT non-const reference
   // 1MB
   constexpr int64_t kTotalSize = 1 << 20;
+  auto options = ipc::IpcOptions::Defaults();
 
   std::shared_ptr<ResizableBuffer> buffer;
   ABORT_NOT_OK(AllocateResizableBuffer(kTotalSize & 2, &buffer));
@@ -81,7 +83,7 @@ static void ReadRecordBatch(benchmark::State& state) {  // NOLINT non-const refe
   int32_t metadata_length;
   int64_t body_length;
   if (!ipc::WriteRecordBatch(*record_batch, 0, &stream, &metadata_length, &body_length,
-                             default_memory_pool())
+                             options, default_memory_pool())
            .ok()) {
     state.SkipWithError("Failed to write!");
   }
diff --git a/cpp/src/arrow/ipc/read-write-test.cc b/cpp/src/arrow/ipc/read-write-test.cc
index d5a0034..d79962b 100644
--- a/cpp/src/arrow/ipc/read-write-test.cc
+++ b/cpp/src/arrow/ipc/read-write-test.cc
@@ -224,6 +224,11 @@ static int g_file_number = 0;
 
 class IpcTestFixture : public io::MemoryMapFixture {
  public:
+  void SetUp() {
+    pool_ = default_memory_pool();
+    options_ = IpcOptions::Defaults();
+  }
+
   void DoSchemaRoundTrip(const Schema& schema, DictionaryMemo* out_memo,
                          std::shared_ptr<Schema>* result) {
     std::shared_ptr<Buffer> serialized_schema;
@@ -251,9 +256,13 @@ class IpcTestFixture : public io::MemoryMapFixture {
     }
     RETURN_NOT_OK(mmap_->Seek(0));
 
-    std::shared_ptr<RecordBatchWriter> file_writer;
-    RETURN_NOT_OK(RecordBatchFileWriter::Open(mmap_.get(), batch.schema(), &file_writer));
-    RETURN_NOT_OK(file_writer->WriteRecordBatch(batch, true));
+    auto options = options_;
+    options.allow_64bit = true;
+
+    auto res = RecordBatchFileWriter::Open(mmap_.get(), batch.schema(), options);
+    RETURN_NOT_OK(res.status());
+    std::shared_ptr<RecordBatchWriter> file_writer = *res;
+    RETURN_NOT_OK(file_writer->WriteRecordBatch(batch));
     RETURN_NOT_OK(file_writer->Close());
 
     int64_t offset;
@@ -308,19 +317,20 @@ class IpcTestFixture : public io::MemoryMapFixture {
  protected:
   std::shared_ptr<io::MemoryMappedFile> mmap_;
   MemoryPool* pool_;
+  IpcOptions options_;
 };
 
 class TestWriteRecordBatch : public ::testing::Test, public IpcTestFixture {
  public:
-  void SetUp() { pool_ = default_memory_pool(); }
-  void TearDown() { io::MemoryMapFixture::TearDown(); }
+  void SetUp() { IpcTestFixture::SetUp(); }
+  void TearDown() { IpcTestFixture::TearDown(); }
 };
 
 class TestIpcRoundTrip : public ::testing::TestWithParam<MakeRecordBatch*>,
                          public IpcTestFixture {
  public:
-  void SetUp() { pool_ = default_memory_pool(); }
-  void TearDown() { io::MemoryMapFixture::TearDown(); }
+  void SetUp() { IpcTestFixture::SetUp(); }
+  void TearDown() { IpcTestFixture::TearDown(); }
 };
 
 TEST_P(TestIpcRoundTrip, RoundTrip) {
@@ -342,7 +352,7 @@ TEST_F(TestIpcRoundTrip, MetadataVersion) {
   const int64_t buffer_offset = 0;
 
   ASSERT_OK(WriteRecordBatch(*batch, buffer_offset, mmap_.get(), &metadata_length,
-                             &body_length, pool_));
+                             &body_length, options_, pool_));
 
   std::unique_ptr<Message> message;
   ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
@@ -461,13 +471,14 @@ TEST_F(TestWriteRecordBatch, SliceTruncatesBuffers) {
   CheckArray(a1);
 }
 
-void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
+void TestGetRecordBatchSize(const IpcOptions& options,
+                            std::shared_ptr<RecordBatch> batch) {
   io::MockOutputStream mock;
   int32_t mock_metadata_length = -1;
   int64_t mock_body_length = -1;
   int64_t size = -1;
   ASSERT_OK(WriteRecordBatch(*batch, 0, &mock, &mock_metadata_length, &mock_body_length,
-                             default_memory_pool()));
+                             options, default_memory_pool()));
   ASSERT_OK(GetRecordBatchSize(*batch, &size));
   ASSERT_EQ(mock.GetExtentBytesWritten(), size);
 }
@@ -476,19 +487,19 @@ TEST_F(TestWriteRecordBatch, IntegerGetRecordBatchSize) {
   std::shared_ptr<RecordBatch> batch;
 
   ASSERT_OK(MakeIntRecordBatch(&batch));
-  TestGetRecordBatchSize(batch);
+  TestGetRecordBatchSize(options_, batch);
 
   ASSERT_OK(MakeListRecordBatch(&batch));
-  TestGetRecordBatchSize(batch);
+  TestGetRecordBatchSize(options_, batch);
 
   ASSERT_OK(MakeZeroLengthRecordBatch(&batch));
-  TestGetRecordBatchSize(batch);
+  TestGetRecordBatchSize(options_, batch);
 
   ASSERT_OK(MakeNonNullRecordBatch(&batch));
-  TestGetRecordBatchSize(batch);
+  TestGetRecordBatchSize(options_, batch);
 
   ASSERT_OK(MakeDeeplyNestedList(&batch));
-  TestGetRecordBatchSize(batch);
+  TestGetRecordBatchSize(options_, batch);
 }
 
 class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
@@ -521,13 +532,12 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
     const int memory_map_size = 1 << 20;
     RETURN_NOT_OK(io::MemoryMapFixture::InitMemoryMap(memory_map_size, ss.str(), &mmap_));
 
+    auto options = IpcOptions::Defaults();
     if (override_level) {
-      return WriteRecordBatch(**batch, 0, mmap_.get(), metadata_length, body_length,
-                              pool_, recursion_level + 1);
-    } else {
-      return WriteRecordBatch(**batch, 0, mmap_.get(), metadata_length, body_length,
-                              pool_);
+      options.max_recursion_depth = recursion_level + 1;
     }
+    return WriteRecordBatch(**batch, 0, mmap_.get(), metadata_length, body_length,
+                            options, pool_);
   }
 
  protected:
@@ -582,10 +592,12 @@ TEST_F(RecursionLimits, StressLimit) {
 
     DictionaryMemo empty_memo;
 
+    auto options = IpcOptions::Defaults();
+    options.max_recursion_depth = recursion_depth + 1;
     io::BufferReader reader(message->body());
     std::shared_ptr<RecordBatch> result;
-    ASSERT_OK(ReadRecordBatch(*message->metadata(), schema, &empty_memo,
-                              recursion_depth + 1, &reader, &result));
+    ASSERT_OK(ReadRecordBatch(*message->metadata(), schema, &empty_memo, options, &reader,
+                              &result));
     *it_works = result->Equals(*batch);
   };
 
@@ -854,8 +866,8 @@ TEST(TestRecordBatchStreamReader, EmptyStreamWithDictionaries) {
 
 class TestTensorRoundTrip : public ::testing::Test, public IpcTestFixture {
  public:
-  void SetUp() { pool_ = default_memory_pool(); }
-  void TearDown() { io::MemoryMapFixture::TearDown(); }
+  void SetUp() { IpcTestFixture::SetUp(); }
+  void TearDown() { IpcTestFixture::TearDown(); }
 
   void CheckTensorRoundTrip(const Tensor& tensor) {
     int32_t metadata_length;
@@ -931,8 +943,8 @@ TEST_F(TestTensorRoundTrip, NonContiguous) {
 
 class TestSparseTensorRoundTrip : public ::testing::Test, public IpcTestFixture {
  public:
-  void SetUp() { pool_ = default_memory_pool(); }
-  void TearDown() { io::MemoryMapFixture::TearDown(); }
+  void SetUp() { IpcTestFixture::SetUp(); }
+  void TearDown() { IpcTestFixture::TearDown(); }
 
   template <typename SparseIndexType>
   void CheckSparseTensorRoundTrip(const SparseTensorImpl<SparseIndexType>& tensor) {
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 4a554aa..648bb89 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -348,7 +348,8 @@ static Status LoadArray(const Field& field, ArrayLoaderContext* context, ArrayDa
 Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema,
                        const DictionaryMemo* dictionary_memo, io::RandomAccessFile* file,
                        std::shared_ptr<RecordBatch>* out) {
-  return ReadRecordBatch(metadata, schema, dictionary_memo, kMaxNestingDepth, file, out);
+  auto options = IpcOptions::Defaults();
+  return ReadRecordBatch(metadata, schema, dictionary_memo, options, file, out);
 }
 
 Status ReadRecordBatch(const Message& message, const std::shared_ptr<Schema>& schema,
@@ -356,9 +357,10 @@ Status ReadRecordBatch(const Message& message, const std::shared_ptr<Schema>& sc
                        std::shared_ptr<RecordBatch>* out) {
   CHECK_MESSAGE_TYPE(message.type(), Message::RECORD_BATCH);
   CHECK_HAS_BODY(message);
+  auto options = IpcOptions::Defaults();
   io::BufferReader reader(message.body());
-  return ReadRecordBatch(*message.metadata(), schema, dictionary_memo, kMaxNestingDepth,
-                         &reader, out);
+  return ReadRecordBatch(*message.metadata(), schema, dictionary_memo, options, &reader,
+                         out);
 }
 
 // ----------------------------------------------------------------------
@@ -389,15 +391,17 @@ static Status LoadRecordBatchFromSource(const std::shared_ptr<Schema>& schema,
 static inline Status ReadRecordBatch(const flatbuf::RecordBatch* metadata,
                                      const std::shared_ptr<Schema>& schema,
                                      const DictionaryMemo* dictionary_memo,
-                                     int max_recursion_depth, io::RandomAccessFile* file,
+                                     const IpcOptions& options,
+                                     io::RandomAccessFile* file,
                                      std::shared_ptr<RecordBatch>* out) {
   IpcComponentSource source(metadata, file);
-  return LoadRecordBatchFromSource(schema, metadata->length(), max_recursion_depth,
-                                   &source, dictionary_memo, out);
+  return LoadRecordBatchFromSource(schema, metadata->length(),
+                                   options.max_recursion_depth, &source, dictionary_memo,
+                                   out);
 }
 
 Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema,
-                       const DictionaryMemo* dictionary_memo, int max_recursion_depth,
+                       const DictionaryMemo* dictionary_memo, const IpcOptions& options,
                        io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
   const flatbuf::Message* message;
   RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &message));
@@ -406,11 +410,13 @@ Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& sc
     return Status::IOError(
         "Header-type of flatbuffer-encoded Message is not RecordBatch.");
   }
-  return ReadRecordBatch(batch, schema, dictionary_memo, max_recursion_depth, file, out);
+  return ReadRecordBatch(batch, schema, dictionary_memo, options, file, out);
 }
 
 Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,
                       io::RandomAccessFile* file) {
+  auto options = IpcOptions::Defaults();
+
   const flatbuf::Message* message;
   RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &message));
   auto dictionary_batch = message->header_as_DictionaryBatch();
@@ -432,7 +438,7 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,
   std::shared_ptr<RecordBatch> batch;
   auto batch_meta = dictionary_batch->data();
   RETURN_NOT_OK(ReadRecordBatch(batch_meta, ::arrow::schema({value_field}),
-                                dictionary_memo, kMaxNestingDepth, file, &batch));
+                                dictionary_memo, options, file, &batch));
   if (batch->num_columns() != 1) {
     return Status::Invalid("Dictionary record batch must only contain one field");
   }
@@ -817,10 +823,11 @@ Status ReadSchema(const Message& message, DictionaryMemo* dictionary_memo,
 Status ReadRecordBatch(const std::shared_ptr<Schema>& schema,
                        const DictionaryMemo* dictionary_memo, io::InputStream* file,
                        std::shared_ptr<RecordBatch>* out) {
+  auto options = IpcOptions::Defaults();
   std::unique_ptr<Message> message;
   RETURN_NOT_OK(ReadContiguousPayload(file, &message));
   io::BufferReader buffer_reader(message->body());
-  return ReadRecordBatch(*message->metadata(), schema, dictionary_memo, kMaxNestingDepth,
+  return ReadRecordBatch(*message->metadata(), schema, dictionary_memo, options,
                          &buffer_reader, out);
 }
 
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index 34a0eef..1314a37 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -25,6 +25,7 @@
 
 #include "arrow/ipc/dictionary.h"
 #include "arrow/ipc/message.h"
+#include "arrow/ipc/options.h"
 #include "arrow/record_batch.h"
 #include "arrow/util/visibility.h"
 
@@ -245,12 +246,12 @@ Status ReadRecordBatch(const Message& message, const std::shared_ptr<Schema>& sc
 /// dictionaries. Can be nullptr if you are sure there are no
 /// dictionary-encoded fields
 /// \param[in] file a random access file
-/// \param[in] max_recursion_depth the maximum permitted nesting depth
+/// \param[in] options options for deserialization
 /// \param[out] out the read record batch
 /// \return Status
 ARROW_EXPORT
 Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema,
-                       const DictionaryMemo* dictionary_memo, int max_recursion_depth,
+                       const DictionaryMemo* dictionary_memo, const IpcOptions& options,
                        io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
 
 /// \brief Read arrow::Tensor as encapsulated IPC message in file
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index b5c16cd..a42e4cc 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -113,13 +113,13 @@ namespace internal {
 class RecordBatchSerializer : public ArrayVisitor {
  public:
   RecordBatchSerializer(MemoryPool* pool, int64_t buffer_start_offset,
-                        int max_recursion_depth, bool allow_64bit, IpcPayload* out)
+                        const IpcOptions& options, IpcPayload* out)
       : out_(out),
         pool_(pool),
-        max_recursion_depth_(max_recursion_depth),
-        buffer_start_offset_(buffer_start_offset),
-        allow_64bit_(allow_64bit) {
-    DCHECK_GT(max_recursion_depth, 0);
+        options_(options),
+        max_recursion_depth_(options.max_recursion_depth),
+        buffer_start_offset_(buffer_start_offset) {
+    DCHECK_GT(max_recursion_depth_, 0);
   }
 
   ~RecordBatchSerializer() override = default;
@@ -129,7 +129,7 @@ class RecordBatchSerializer : public ArrayVisitor {
       return Status::Invalid("Max recursion depth reached");
     }
 
-    if (!allow_64bit_ && arr.length() > std::numeric_limits<int32_t>::max()) {
+    if (!options_.allow_64bit && arr.length() > std::numeric_limits<int32_t>::max()) {
       return Status::CapacityError("Cannot write arrays larger than 2^31 - 1 in length");
     }
 
@@ -496,17 +496,16 @@ class RecordBatchSerializer : public ArrayVisitor {
   std::vector<internal::FieldMetadata> field_nodes_;
   std::vector<internal::BufferMetadata> buffer_meta_;
 
+  const IpcOptions& options_;
   int64_t max_recursion_depth_;
   int64_t buffer_start_offset_;
-  bool allow_64bit_;
 };
 
 class DictionaryWriter : public RecordBatchSerializer {
  public:
   DictionaryWriter(int64_t dictionary_id, MemoryPool* pool, int64_t buffer_start_offset,
-                   int max_recursion_depth, bool allow_64bit, IpcPayload* out)
-      : RecordBatchSerializer(pool, buffer_start_offset, max_recursion_depth, allow_64bit,
-                              out),
+                   const IpcOptions& options, IpcPayload* out)
+      : RecordBatchSerializer(pool, buffer_start_offset, options, out),
         dictionary_id_(dictionary_id) {}
 
   Status SerializeMetadata(int64_t num_rows) override {
@@ -562,25 +561,25 @@ Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
   return Status::OK();
 }
 
-Status GetSchemaPayload(const Schema& schema, DictionaryMemo* dictionary_memo,
-                        IpcPayload* out) {
+Status GetSchemaPayload(const Schema& schema, const IpcOptions& options,
+                        DictionaryMemo* dictionary_memo, IpcPayload* out) {
   out->type = Message::SCHEMA;
   return WriteSchemaMessage(schema, dictionary_memo, &out->metadata);
 }
 
 Status GetDictionaryPayload(int64_t id, const std::shared_ptr<Array>& dictionary,
-                            MemoryPool* pool, IpcPayload* out) {
+                            const IpcOptions& options, MemoryPool* pool,
+                            IpcPayload* out) {
   out->type = Message::DICTIONARY_BATCH;
   // Frame of reference is 0, see ARROW-384
-  DictionaryWriter writer(id, pool, /*buffer_start_offset=*/0, ipc::kMaxNestingDepth,
-                          true /* allow_64bit */, out);
+  DictionaryWriter writer(id, pool, /*buffer_start_offset=*/0, options, out);
   return writer.Assemble(dictionary);
 }
 
-Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool,
-                             IpcPayload* out) {
+Status GetRecordBatchPayload(const RecordBatch& batch, const IpcOptions& options,
+                             MemoryPool* pool, IpcPayload* out) {
   out->type = Message::RECORD_BATCH;
-  RecordBatchSerializer writer(pool, 0, kMaxNestingDepth, true, out);
+  RecordBatchSerializer writer(pool, /*buffer_start_offset=*/0, options, out);
   return writer.Assemble(batch);
 }
 
@@ -588,11 +587,10 @@ Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool,
 
 Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
                         io::OutputStream* dst, int32_t* metadata_length,
-                        int64_t* body_length, MemoryPool* pool, int max_recursion_depth,
-                        bool allow_64bit) {
+                        int64_t* body_length, const IpcOptions& options,
+                        MemoryPool* pool) {
   internal::IpcPayload payload;
-  internal::RecordBatchSerializer writer(pool, buffer_start_offset, max_recursion_depth,
-                                         allow_64bit, &payload);
+  internal::RecordBatchSerializer writer(pool, buffer_start_offset, options, &payload);
   RETURN_NOT_OK(writer.Assemble(batch));
 
   // TODO(wesm): it's a rough edge that the metadata and body length here are
@@ -605,25 +603,17 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
 }
 
 Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& batches,
-                              io::OutputStream* dst) {
+                              const IpcOptions& options, io::OutputStream* dst) {
   ASSIGN_OR_RAISE(std::shared_ptr<RecordBatchWriter> writer,
-                  RecordBatchStreamWriter::Open(dst, batches[0]->schema()));
+                  RecordBatchStreamWriter::Open(dst, batches[0]->schema(), options));
   for (const auto& batch : batches) {
-    // allow sizes > INT32_MAX
     DCHECK(batch->schema()->Equals(*batches[0]->schema())) << "Schemas unequal";
-    RETURN_NOT_OK(writer->WriteRecordBatch(*batch, true));
+    RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
   }
   RETURN_NOT_OK(writer->Close());
   return Status::OK();
 }
 
-Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
-                             io::OutputStream* dst, int32_t* metadata_length,
-                             int64_t* body_length, MemoryPool* pool) {
-  return WriteRecordBatch(batch, buffer_start_offset, dst, metadata_length, body_length,
-                          pool, kMaxNestingDepth, true);
-}
-
 namespace {
 
 Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst,
@@ -829,8 +819,9 @@ Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* ds
 Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
                        int64_t buffer_start_offset, io::OutputStream* dst,
                        int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) {
+  auto options = IpcOptions::Defaults();
   internal::IpcPayload payload;
-  RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, pool, &payload));
+  RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, options, pool, &payload));
 
   // The body size is computed in the payload
   *body_length = payload.body_length;
@@ -839,11 +830,12 @@ Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dict
 
 Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
   // emulates the behavior of Write without actually writing
+  auto options = IpcOptions::Defaults();
   int32_t metadata_length = 0;
   int64_t body_length = 0;
   io::MockOutputStream dst;
-  RETURN_NOT_OK(WriteRecordBatch(batch, 0, &dst, &metadata_length, &body_length,
-                                 default_memory_pool(), kMaxNestingDepth, true));
+  RETURN_NOT_OK(WriteRecordBatch(batch, 0, &dst, &metadata_length, &body_length, options,
+                                 default_memory_pool()));
   *size = dst.GetExtentBytesWritten();
   return Status::OK();
 }
@@ -875,7 +867,7 @@ Status RecordBatchWriter::WriteTable(const Table& table, int64_t max_chunksize)
     if (batch == nullptr) {
       break;
     }
-    RETURN_NOT_OK(WriteRecordBatch(*batch, true));
+    RETURN_NOT_OK(WriteRecordBatch(*batch));
   }
 
   return Status::OK();
@@ -902,11 +894,13 @@ class RecordBatchPayloadWriter : public RecordBatchWriter {
   ~RecordBatchPayloadWriter() override = default;
 
   RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter> payload_writer,
-                           const Schema& schema, DictionaryMemo* out_memo = nullptr)
+                           const Schema& schema, const IpcOptions& options,
+                           DictionaryMemo* out_memo = nullptr)
       : payload_writer_(std::move(payload_writer)),
         schema_(schema),
         pool_(default_memory_pool()),
-        dictionary_memo_(out_memo) {
+        dictionary_memo_(out_memo),
+        options_(options) {
     if (out_memo == nullptr) {
       dictionary_memo_ = &internal_dict_memo_;
     }
@@ -915,12 +909,12 @@ class RecordBatchPayloadWriter : public RecordBatchWriter {
   // A Schema-owning constructor variant
   RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter> payload_writer,
                            const std::shared_ptr<Schema>& schema,
-                           DictionaryMemo* out_memo = nullptr)
-      : RecordBatchPayloadWriter(std::move(payload_writer), *schema, out_memo) {
+                           const IpcOptions& options, DictionaryMemo* out_memo = nullptr)
+      : RecordBatchPayloadWriter(std::move(payload_writer), *schema, options, out_memo) {
     shared_schema_ = schema;
   }
 
-  Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override {
+  Status WriteRecordBatch(const RecordBatch& batch) override {
     if (!batch.schema()->Equals(schema_, false /* check_metadata */)) {
       return Status::Invalid("Tried to write record batch with different schema");
     }
@@ -936,7 +930,7 @@ class RecordBatchPayloadWriter : public RecordBatchWriter {
     // deltas while computing the RecordBatch payload to save time?
 
     internal::IpcPayload payload;
-    RETURN_NOT_OK(GetRecordBatchPayload(batch, pool_, &payload));
+    RETURN_NOT_OK(GetRecordBatchPayload(batch, options_, pool_, &payload));
     return payload_writer_->WritePayload(payload);
   }
 
@@ -952,7 +946,7 @@ class RecordBatchPayloadWriter : public RecordBatchWriter {
     RETURN_NOT_OK(payload_writer_->Start());
 
     internal::IpcPayload payload;
-    RETURN_NOT_OK(GetSchemaPayload(schema_, dictionary_memo_, &payload));
+    RETURN_NOT_OK(GetSchemaPayload(schema_, options_, dictionary_memo_, &payload));
     return payload_writer_->WritePayload(payload);
   }
 
@@ -972,7 +966,8 @@ class RecordBatchPayloadWriter : public RecordBatchWriter {
       int64_t dictionary_id = pair.first;
       const auto& dictionary = pair.second;
 
-      RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, pool_, &payload));
+      RETURN_NOT_OK(
+          GetDictionaryPayload(dictionary_id, dictionary, options_, pool_, &payload));
       RETURN_NOT_OK(payload_writer_->WritePayload(payload));
     }
     return Status::OK();
@@ -987,6 +982,7 @@ class RecordBatchPayloadWriter : public RecordBatchWriter {
   DictionaryMemo internal_dict_memo_;
   bool started_ = false;
   bool wrote_dictionaries_ = false;
+  IpcOptions options_;
 };
 
 // ----------------------------------------------------------------------
@@ -1138,20 +1134,22 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl
     : public RecordBatchPayloadWriter {
  public:
   RecordBatchStreamWriterImpl(io::OutputStream* sink,
-                              const std::shared_ptr<Schema>& schema)
+                              const std::shared_ptr<Schema>& schema,
+                              const IpcOptions& options)
       : RecordBatchPayloadWriter(
             std::unique_ptr<internal::IpcPayloadWriter>(new PayloadStreamWriter(sink)),
-            schema) {}
+            schema, options) {}
 
   ~RecordBatchStreamWriterImpl() = default;
 };
 
 class RecordBatchFileWriter::RecordBatchFileWriterImpl : public RecordBatchPayloadWriter {
  public:
-  RecordBatchFileWriterImpl(io::OutputStream* sink, const std::shared_ptr<Schema>& schema)
+  RecordBatchFileWriterImpl(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+                            const IpcOptions& options)
       : RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter>(
                                      new PayloadFileWriter(sink, schema)),
-                                 schema) {}
+                                 schema, options) {}
 
   ~RecordBatchFileWriterImpl() = default;
 };
@@ -1160,9 +1158,8 @@ RecordBatchStreamWriter::RecordBatchStreamWriter() {}
 
 RecordBatchStreamWriter::~RecordBatchStreamWriter() {}
 
-Status RecordBatchStreamWriter::WriteRecordBatch(const RecordBatch& batch,
-                                                 bool allow_64bit) {
-  return impl_->WriteRecordBatch(batch, allow_64bit);
+Status RecordBatchStreamWriter::WriteRecordBatch(const RecordBatch& batch) {
+  return impl_->WriteRecordBatch(batch);
 }
 
 void RecordBatchStreamWriter::set_memory_pool(MemoryPool* pool) {
@@ -1177,13 +1174,20 @@ Status RecordBatchStreamWriter::Open(io::OutputStream* sink,
 }
 
 Result<std::shared_ptr<RecordBatchWriter>> RecordBatchStreamWriter::Open(
-    io::OutputStream* sink, const std::shared_ptr<Schema>& schema) {
+    io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+    const IpcOptions& options) {
   // ctor is private
   auto result = std::shared_ptr<RecordBatchStreamWriter>(new RecordBatchStreamWriter());
-  result->impl_.reset(new RecordBatchStreamWriterImpl(sink, schema));
+  result->impl_.reset(new RecordBatchStreamWriterImpl(sink, schema, options));
   return std::move(result);
 }
 
+Result<std::shared_ptr<RecordBatchWriter>> RecordBatchStreamWriter::Open(
+    io::OutputStream* sink, const std::shared_ptr<Schema>& schema) {
+  auto options = IpcOptions::Defaults();
+  return Open(sink, schema, options);
+}
+
 Status RecordBatchStreamWriter::Close() { return impl_->Close(); }
 
 RecordBatchFileWriter::RecordBatchFileWriter() {}
@@ -1198,16 +1202,22 @@ Status RecordBatchFileWriter::Open(io::OutputStream* sink,
 }
 
 Result<std::shared_ptr<RecordBatchWriter>> RecordBatchFileWriter::Open(
-    io::OutputStream* sink, const std::shared_ptr<Schema>& schema) {
+    io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+    const IpcOptions& options) {
   // ctor is private
   auto result = std::shared_ptr<RecordBatchFileWriter>(new RecordBatchFileWriter());
-  result->file_impl_.reset(new RecordBatchFileWriterImpl(sink, schema));
+  result->file_impl_.reset(new RecordBatchFileWriterImpl(sink, schema, options));
   return std::move(result);
 }
 
-Status RecordBatchFileWriter::WriteRecordBatch(const RecordBatch& batch,
-                                               bool allow_64bit) {
-  return file_impl_->WriteRecordBatch(batch, allow_64bit);
+Result<std::shared_ptr<RecordBatchWriter>> RecordBatchFileWriter::Open(
+    io::OutputStream* sink, const std::shared_ptr<Schema>& schema) {
+  auto options = IpcOptions::Defaults();
+  return Open(sink, schema, options);
+}
+
+Status RecordBatchFileWriter::WriteRecordBatch(const RecordBatch& batch) {
+  return file_impl_->WriteRecordBatch(batch);
 }
 
 Status RecordBatchFileWriter::Close() { return file_impl_->Close(); }
@@ -1217,15 +1227,17 @@ namespace internal {
 Status OpenRecordBatchWriter(std::unique_ptr<IpcPayloadWriter> sink,
                              const std::shared_ptr<Schema>& schema,
                              std::unique_ptr<RecordBatchWriter>* out) {
-  ASSIGN_OR_RAISE(*out, OpenRecordBatchWriter(std::move(sink), schema));
+  auto options = IpcOptions::Defaults();
+  ASSIGN_OR_RAISE(*out, OpenRecordBatchWriter(std::move(sink), schema, options));
   return Status::OK();
 }
 
 Result<std::unique_ptr<RecordBatchWriter>> OpenRecordBatchWriter(
-    std::unique_ptr<IpcPayloadWriter> sink, const std::shared_ptr<Schema>& schema) {
+    std::unique_ptr<IpcPayloadWriter> sink, const std::shared_ptr<Schema>& schema,
+    const IpcOptions& options) {
   // XXX should we call Start()?
   return std::unique_ptr<RecordBatchWriter>(
-      new RecordBatchPayloadWriter(std::move(sink), schema));
+      new RecordBatchPayloadWriter(std::move(sink), schema, options));
 }
 
 }  // namespace internal
@@ -1248,10 +1260,10 @@ Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool,
 
 Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool,
                             io::OutputStream* out) {
+  auto options = IpcOptions::Defaults();
   int32_t metadata_length = 0;
   int64_t body_length = 0;
-  return WriteRecordBatch(batch, 0, out, &metadata_length, &body_length, pool,
-                          kMaxNestingDepth, true);
+  return WriteRecordBatch(batch, 0, out, &metadata_length, &body_length, options, pool);
 }
 
 Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo,
@@ -1259,8 +1271,10 @@ Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo,
   std::shared_ptr<io::BufferOutputStream> stream;
   RETURN_NOT_OK(io::BufferOutputStream::Create(1024, pool, &stream));
 
+  auto options = IpcOptions::Defaults();
   auto payload_writer = make_unique<PayloadStreamWriter>(stream.get());
-  RecordBatchPayloadWriter writer(std::move(payload_writer), schema, dictionary_memo);
+  RecordBatchPayloadWriter writer(std::move(payload_writer), schema, options,
+                                  dictionary_memo);
   // Write schema and populate fields (but not dictionaries) in dictionary_memo
   RETURN_NOT_OK(writer.Start());
   return stream->Finish(out);
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index d2b9faa..e70827e 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "arrow/ipc/message.h"
+#include "arrow/ipc/options.h"
 #include "arrow/result.h"
 #include "arrow/util/visibility.h"
 
@@ -59,10 +60,8 @@ class ARROW_EXPORT RecordBatchWriter {
   /// \brief Write a record batch to the stream
   ///
   /// \param[in] batch the record batch to write to the stream
-  /// \param[in] allow_64bit if true, allow field lengths that don't fit
-  ///    in a signed 32-bit int
   /// \return Status
-  virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) = 0;
+  virtual Status WriteRecordBatch(const RecordBatch& batch) = 0;
 
   /// \brief Write possibly-chunked table by creating sequence of record batches
   /// \param[in] table table to write
@@ -112,13 +111,15 @@ class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter {
   /// \return Result<std::shared_ptr<RecordBatchWriter>>
   static Result<std::shared_ptr<RecordBatchWriter>> Open(
       io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
+  static Result<std::shared_ptr<RecordBatchWriter>> Open(
+      io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+      const IpcOptions& options);
 
   /// \brief Write a record batch to the stream
   ///
   /// \param[in] batch the record batch to write
-  /// \param[in] allow_64bit allow array lengths over INT32_MAX - 1
   /// \return Status
-  Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
+  Status WriteRecordBatch(const RecordBatch& batch) override;
 
   /// \brief Close the stream by writing a 4-byte int32 0 EOS market
   /// \return Status
@@ -157,13 +158,15 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
   /// \return Status
   static Result<std::shared_ptr<RecordBatchWriter>> Open(
       io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
+  static Result<std::shared_ptr<RecordBatchWriter>> Open(
+      io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+      const IpcOptions& options);
 
   /// \brief Write a record batch to the file
   ///
   /// \param[in] batch the record batch to write
-  /// \param[in] allow_64bit allow array lengths over INT32_MAX - 1
   /// \return Status
-  Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
+  Status WriteRecordBatch(const RecordBatch& batch) override;
 
   /// \brief Close the file stream by writing the file footer and magic number
   /// \return Status
@@ -184,11 +187,8 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
 /// \param[out] metadata_length the size of the length-prefixed flatbuffer
 /// including padding to a 64-byte boundary
 /// \param[out] body_length the size of the contiguous buffer block plus
+/// \param[in] options options for serialization
 /// \param[in] pool the memory pool to allocate memory from
-/// \param[in] max_recursion_depth the maximum permitted nesting schema depth
-/// \param[in] allow_64bit permit field lengths exceeding INT32_MAX. May not be
-/// readable by other Arrow implementations
-/// padding bytes
 /// \return Status
 ///
 /// Write the RecordBatch (collection of equal-length Arrow arrays) to the
@@ -207,9 +207,8 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
 ARROW_EXPORT
 Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
                         io::OutputStream* dst, int32_t* metadata_length,
-                        int64_t* body_length, MemoryPool* pool,
-                        int max_recursion_depth = kMaxNestingDepth,
-                        bool allow_64bit = false);
+                        int64_t* body_length, const IpcOptions& options,
+                        MemoryPool* pool);
 
 /// \brief Serialize record batch as encapsulated IPC message in a new buffer
 ///
@@ -247,11 +246,12 @@ Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo,
 
 /// \brief Write multiple record batches to OutputStream, including schema
 /// \param[in] batches a vector of batches. Must all have same schema
+/// \param[in] options options for serialization
 /// \param[out] dst an OutputStream
 /// \return Status
 ARROW_EXPORT
 Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& batches,
-                              io::OutputStream* dst);
+                              const IpcOptions& options, io::OutputStream* dst);
 
 /// \brief Compute the number of bytes needed to write a record batch including metadata
 ///
@@ -354,36 +354,43 @@ Status OpenRecordBatchWriter(std::unique_ptr<IpcPayloadWriter> sink,
 ///
 /// \param[in] sink the IpcPayloadWriter to write to
 /// \param[in] schema the schema of the record batches to be written
+/// \param[in] options options for serialization
 /// \return Result<std::unique_ptr<RecordBatchWriter>>
 ARROW_EXPORT
 Result<std::unique_ptr<RecordBatchWriter>> OpenRecordBatchWriter(
-    std::unique_ptr<IpcPayloadWriter> sink, const std::shared_ptr<Schema>& schema);
+    std::unique_ptr<IpcPayloadWriter> sink, const std::shared_ptr<Schema>& schema,
+    const IpcOptions& options);
 
 /// \brief Compute IpcPayload for the given schema
 /// \param[in] schema the Schema that is being serialized
+/// \param[in] options options for serialization
 /// \param[in,out] dictionary_memo class to populate with assigned dictionary ids
 /// \param[out] out the returned vector of IpcPayloads
 /// \return Status
 ARROW_EXPORT
-Status GetSchemaPayload(const Schema& schema, DictionaryMemo* dictionary_memo,
-                        IpcPayload* out);
+Status GetSchemaPayload(const Schema& schema, const IpcOptions& options,
+                        DictionaryMemo* dictionary_memo, IpcPayload* out);
 
 /// \brief Compute IpcPayload for a dictionary
 /// \param[in] id the dictionary id
 /// \param[in] dictionary the dictionary values
+/// \param[in] options options for serialization
 /// \param[out] payload the output IpcPayload
 /// \return Status
 ARROW_EXPORT
 Status GetDictionaryPayload(int64_t id, const std::shared_ptr<Array>& dictionary,
-                            MemoryPool* pool, IpcPayload* payload);
+                            const IpcOptions& options, MemoryPool* pool,
+                            IpcPayload* payload);
 
 /// \brief Compute IpcPayload for the given record batch
 /// \param[in] batch the RecordBatch that is being serialized
+/// \param[in] options options for serialization
 /// \param[in,out] pool for any required temporary memory allocations
 /// \param[out] out the returned IpcPayload
 /// \return Status
 ARROW_EXPORT
-Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool, IpcPayload* out);
+Status GetRecordBatchPayload(const RecordBatch& batch, const IpcOptions& options,
+                             MemoryPool* pool, IpcPayload* out);
 
 }  // namespace internal
 
diff --git a/cpp/src/arrow/python/flight.cc b/cpp/src/arrow/python/flight.cc
index c5b2fe2..7fb6b1a 100644
--- a/cpp/src/arrow/python/flight.cc
+++ b/cpp/src/arrow/python/flight.cc
@@ -209,7 +209,7 @@ Status PyFlightDataStream::Next(FlightPayload* payload) { return stream_->Next(p
 PyGeneratorFlightDataStream::PyGeneratorFlightDataStream(
     PyObject* generator, std::shared_ptr<arrow::Schema> schema,
     PyGeneratorFlightDataStreamCallback callback)
-    : schema_(schema), callback_(callback) {
+    : schema_(schema), options_(ipc::IpcOptions::Defaults()), callback_(callback) {
   Py_INCREF(generator);
   generator_.reset(generator);
 }
@@ -217,7 +217,7 @@ PyGeneratorFlightDataStream::PyGeneratorFlightDataStream(
 std::shared_ptr<Schema> PyGeneratorFlightDataStream::schema() { return schema_; }
 
 Status PyGeneratorFlightDataStream::GetSchemaPayload(FlightPayload* payload) {
-  return ipc::internal::GetSchemaPayload(*schema_, &dictionary_memo_,
+  return ipc::internal::GetSchemaPayload(*schema_, options_, &dictionary_memo_,
                                          &payload->ipc_message);
 }
 
diff --git a/cpp/src/arrow/python/flight.h b/cpp/src/arrow/python/flight.h
index fe224f0..3c5dc5f 100644
--- a/cpp/src/arrow/python/flight.h
+++ b/cpp/src/arrow/python/flight.h
@@ -194,6 +194,7 @@ class ARROW_PYTHON_EXPORT PyGeneratorFlightDataStream
   OwnedRefNoGIL generator_;
   std::shared_ptr<arrow::Schema> schema_;
   ipc::DictionaryMemo dictionary_memo_;
+  ipc::IpcOptions options_;
   PyGeneratorFlightDataStreamCallback callback_;
 };
 
diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc
index bc64fb7..aaed073 100644
--- a/cpp/src/arrow/python/serialize.cc
+++ b/cpp/src/arrow/python/serialize.cc
@@ -562,6 +562,8 @@ Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
   return serialized_tensor.WriteTo(dst);
 }
 
+SerializedPyObject::SerializedPyObject() : ipc_options(ipc::IpcOptions::Defaults()) {}
+
 Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
   int32_t num_tensors = static_cast<int32_t>(this->tensors.size());
   int32_t num_ndarrays = static_cast<int32_t>(this->ndarrays.size());
@@ -575,7 +577,7 @@ Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
 
   // Align stream to 8-byte offset
   RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kArrowIpcAlignment));
-  RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, dst));
+  RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, this->ipc_options, dst));
 
   // Align stream to 64-byte offset so tensor bodies are 64-byte aligned
   RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
@@ -641,7 +643,8 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out
 
   py_gil.release();
   RETURN_NOT_OK(io::BufferOutputStream::Create(kInitialCapacity, memory_pool, &stream));
-  RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, stream.get()));
+  RETURN_NOT_OK(
+      ipc::WriteRecordBatchStream({this->batch}, this->ipc_options, stream.get()));
   RETURN_NOT_OK(stream->Finish(&buffer));
   py_gil.acquire();
 
diff --git a/cpp/src/arrow/python/serialize.h b/cpp/src/arrow/python/serialize.h
index 6cdbbe5..80f6416 100644
--- a/cpp/src/arrow/python/serialize.h
+++ b/cpp/src/arrow/python/serialize.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <vector>
 
+#include "arrow/ipc/options.h"
 #include "arrow/python/visibility.h"
 #include "arrow/status.h"
 
@@ -52,6 +53,9 @@ struct ARROW_PYTHON_EXPORT SerializedPyObject {
   std::vector<std::shared_ptr<Tensor>> tensors;
   std::vector<std::shared_ptr<Tensor>> ndarrays;
   std::vector<std::shared_ptr<Buffer>> buffers;
+  ipc::IpcOptions ipc_options;
+
+  SerializedPyObject();
 
   /// \brief Write serialized Python object to OutputStream
   /// \param[in,out] dst an OutputStream
diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx
index 8502471..af7e1ed 100644
--- a/python/pyarrow/_flight.pyx
+++ b/python/pyarrow/_flight.pyx
@@ -721,8 +721,7 @@ cdef class FlightStreamWriter(_CRecordBatchWriter):
             check_flight_status(
                 (<CFlightStreamWriter*> self.writer.get())
                 .WriteWithMetadata(deref(batch.batch),
-                                   c_buf,
-                                   1))
+                                   c_buf))
 
 
 cdef class FlightMetadataReader:
@@ -1170,6 +1169,8 @@ cdef CStatus _data_stream_next(void* self, CFlightPayload* payload) except *:
     """Callback for implementing FlightDataStream in Python."""
     cdef:
         unique_ptr[CFlightDataStream] data_stream
+        # TODO make it possible to pass IPC options around?
+        cdef CIpcOptions c_ipc_options = CIpcOptions.Defaults()
 
     py_stream = <object> self
     if not isinstance(py_stream, GeneratorStream):
@@ -1228,8 +1229,9 @@ cdef CStatus _data_stream_next(void* self, CFlightPayload* payload) except *:
                              "GeneratorStream. "
                              "Got: {}\nExpected: {}".format(batch.schema,
                                                             stream_schema))
-        check_flight_status(_GetRecordBatchPayload(
+        check_flight_status(GetRecordBatchPayload(
             deref(batch.batch),
+            c_ipc_options,
             c_default_memory_pool(),
             &payload.ipc_message))
         if metadata:
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index d3eca97..9bc104f 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -952,9 +952,19 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
         MessageType_V3" arrow::ipc::MetadataVersion::V3"
         MessageType_V4" arrow::ipc::MetadataVersion::V4"
 
+    cdef cppclass CIpcOptions" arrow::ipc::IpcOptions":
+        @staticmethod
+        CIpcOptions Defaults()
+
     cdef cppclass CDictionaryMemo" arrow::ipc::DictionaryMemo":
         pass
 
+    cdef cppclass CIpcPayload" arrow::ipc::internal::IpcPayload":
+        MessageType type
+        shared_ptr[CBuffer] metadata
+        vector[shared_ptr[CBuffer]] body_buffers
+        int64_t body_length
+
     cdef cppclass CMessage" arrow::ipc::Message":
         CStatus Open(const shared_ptr[CBuffer]& metadata,
                      const shared_ptr[CBuffer]& body,
@@ -981,8 +991,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
 
     cdef cppclass CRecordBatchWriter" arrow::ipc::RecordBatchWriter":
         CStatus Close()
-        CStatus WriteRecordBatch(const CRecordBatch& batch,
-                                 c_bool allow_64bit)
+        CStatus WriteRecordBatch(const CRecordBatch& batch)
         CStatus WriteTable(const CTable& table, int64_t max_chunksize)
 
     cdef cppclass CRecordBatchStreamReader \
@@ -1059,6 +1068,13 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
     CStatus AlignStream(InputStream* stream, int64_t alignment)
     CStatus AlignStream(OutputStream* stream, int64_t alignment)
 
+    cdef CStatus GetRecordBatchPayload\
+        " arrow::ipc::internal::GetRecordBatchPayload"(
+            const CRecordBatch& batch,
+            const CIpcOptions& options,
+            CMemoryPool* pool,
+            CIpcPayload* out)
+
     cdef cppclass CFeatherWriter" arrow::ipc::feather::TableWriter":
         @staticmethod
         CStatus Open(const shared_ptr[OutputStream]& stream,
diff --git a/python/pyarrow/includes/libarrow_flight.pxd b/python/pyarrow/includes/libarrow_flight.pxd
index 7373e0b..69b4653 100644
--- a/python/pyarrow/includes/libarrow_flight.pxd
+++ b/python/pyarrow/includes/libarrow_flight.pxd
@@ -23,20 +23,6 @@ from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport *
 
 
-cdef extern from "arrow/ipc/api.h" namespace "arrow" nogil:
-    cdef cppclass CIpcPayload" arrow::ipc::internal::IpcPayload":
-        MessageType type
-        shared_ptr[CBuffer] metadata
-        vector[shared_ptr[CBuffer]] body_buffers
-        int64_t body_length
-
-    cdef CStatus _GetRecordBatchPayload\
-        " arrow::ipc::internal::GetRecordBatchPayload"(
-            const CRecordBatch& batch,
-            CMemoryPool* pool,
-            CIpcPayload* out)
-
-
 cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
     cdef cppclass CActionType" arrow::flight::ActionType":
         c_string type
@@ -158,8 +144,7 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
     cdef cppclass CFlightStreamWriter \
             " arrow::flight::FlightStreamWriter"(CRecordBatchWriter):
         CStatus WriteWithMetadata(const CRecordBatch& batch,
-                                  shared_ptr[CBuffer] app_metadata,
-                                  c_bool allow_64bit)
+                                  shared_ptr[CBuffer] app_metadata)
 
     cdef cppclass CRecordBatchStream \
             " arrow::flight::RecordBatchStream"(CFlightDataStream):
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 14a8b53..834b6bd 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -182,7 +182,7 @@ cdef class _CRecordBatchWriter:
         """
         with nogil:
             check_status(self.writer.get()
-                         .WriteRecordBatch(deref(batch.batch), 1))
+                         .WriteRecordBatch(deref(batch.batch)))
 
     def write_table(self, Table table, chunksize=None):
         """
diff --git a/r/src/recordbatchwriter.cpp b/r/src/recordbatchwriter.cpp
index be5103f..b22f858 100644
--- a/r/src/recordbatchwriter.cpp
+++ b/r/src/recordbatchwriter.cpp
@@ -23,7 +23,7 @@
 void ipc___RecordBatchWriter__WriteRecordBatch(
     const std::shared_ptr<arrow::ipc::RecordBatchWriter>& batch_writer,
     const std::shared_ptr<arrow::RecordBatch>& batch) {
-  STOP_IF_NOT_OK(batch_writer->WriteRecordBatch(*batch, true));
+  STOP_IF_NOT_OK(batch_writer->WriteRecordBatch(*batch));
 }
 
 // [[arrow::export]]