You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2019/08/14 10:45:39 UTC
[arrow] branch master updated: ARROW-5559: [C++] Add an IpcOptions
structure
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new cef7e80 ARROW-5559: [C++] Add an IpcOptions structure
cef7e80 is described below
commit cef7e80b527523b01a13bffbe9b36a547c3d74a9
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Wed Aug 14 12:45:21 2019 +0200
ARROW-5559: [C++] Add an IpcOptions structure
Switch existing APIs to take IpcOptions rather than individual option arguments.
Closes #5032 from pitrou/ARROW-5559-ipc-options and squashes the following commits:
e39ddc640 <Antoine Pitrou> ARROW-5559: Add an IpcOptions structure
Authored-by: Antoine Pitrou <an...@python.org>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/src/arrow/CMakeLists.txt | 1 +
cpp/src/arrow/extension_type-test.cc | 6 +-
cpp/src/arrow/flight/client.cc | 10 +-
cpp/src/arrow/flight/client.h | 3 +-
cpp/src/arrow/flight/perf-server.cc | 7 +-
cpp/src/arrow/flight/server.cc | 14 +--
cpp/src/arrow/ipc/message.h | 6 +-
cpp/src/arrow/ipc/options.cc | 26 +++++
cpp/src/arrow/ipc/options.h | 43 +++++++++
cpp/src/arrow/ipc/read-write-benchmark.cc | 6 +-
cpp/src/arrow/ipc/read-write-test.cc | 64 ++++++++-----
cpp/src/arrow/ipc/reader.cc | 27 ++++--
cpp/src/arrow/ipc/reader.h | 5 +-
cpp/src/arrow/ipc/writer.cc | 144 +++++++++++++++-------------
cpp/src/arrow/ipc/writer.h | 47 +++++----
cpp/src/arrow/python/flight.cc | 4 +-
cpp/src/arrow/python/flight.h | 1 +
cpp/src/arrow/python/serialize.cc | 7 +-
cpp/src/arrow/python/serialize.h | 4 +
python/pyarrow/_flight.pyx | 8 +-
python/pyarrow/includes/libarrow.pxd | 20 +++-
python/pyarrow/includes/libarrow_flight.pxd | 17 +---
python/pyarrow/ipc.pxi | 2 +-
r/src/recordbatchwriter.cpp | 2 +-
24 files changed, 299 insertions(+), 175 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index dc1cec2..0085238 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -255,6 +255,7 @@ if(ARROW_IPC)
ipc/json-simple.cc
ipc/message.cc
ipc/metadata-internal.cc
+ ipc/options.cc
ipc/reader.cc
ipc/writer.cc)
set(ARROW_SRCS ${ARROW_SRCS} ${ARROW_IPC_SRCS})
diff --git a/cpp/src/arrow/extension_type-test.cc b/cpp/src/arrow/extension_type-test.cc
index 29c5356..2f680af 100644
--- a/cpp/src/arrow/extension_type-test.cc
+++ b/cpp/src/arrow/extension_type-test.cc
@@ -221,7 +221,8 @@ auto RoundtripBatch = [](const std::shared_ptr<RecordBatch>& batch,
std::shared_ptr<RecordBatch>* out) {
std::shared_ptr<io::BufferOutputStream> out_stream;
ASSERT_OK(io::BufferOutputStream::Create(1024, default_memory_pool(), &out_stream));
- ASSERT_OK(ipc::WriteRecordBatchStream({batch}, out_stream.get()));
+ ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcOptions::Defaults(),
+ out_stream.get()));
std::shared_ptr<Buffer> complete_ipc_stream;
ASSERT_OK(out_stream->Finish(&complete_ipc_stream));
@@ -273,7 +274,8 @@ TEST_F(TestExtensionType, UnrecognizedExtension) {
// and ensure that a plain instance of the storage type is created
std::shared_ptr<io::BufferOutputStream> out_stream;
ASSERT_OK(io::BufferOutputStream::Create(1024, default_memory_pool(), &out_stream));
- ASSERT_OK(ipc::WriteRecordBatchStream({batch}, out_stream.get()));
+ ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcOptions::Defaults(),
+ out_stream.get()));
std::shared_ptr<Buffer> complete_ipc_stream;
ASSERT_OK(out_stream->Finish(&complete_ipc_stream));
diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
index eb4df58..6dc5944 100644
--- a/cpp/src/arrow/flight/client.cc
+++ b/cpp/src/arrow/flight/client.cc
@@ -251,13 +251,13 @@ class GrpcStreamWriter : public FlightStreamWriter {
std::shared_ptr<grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>> writer,
std::unique_ptr<FlightStreamWriter>* out);
- Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override {
- return WriteWithMetadata(batch, nullptr, allow_64bit);
+ Status WriteRecordBatch(const RecordBatch& batch) override {
+ return WriteWithMetadata(batch, nullptr);
}
- Status WriteWithMetadata(const RecordBatch& batch, std::shared_ptr<Buffer> app_metadata,
- bool allow_64bit = false) override {
+ Status WriteWithMetadata(const RecordBatch& batch,
+ std::shared_ptr<Buffer> app_metadata) override {
app_metadata_ = app_metadata;
- return batch_writer_->WriteRecordBatch(batch, allow_64bit);
+ return batch_writer_->WriteRecordBatch(batch);
}
void set_memory_pool(MemoryPool* pool) override {
batch_writer_->set_memory_pool(pool);
diff --git a/cpp/src/arrow/flight/client.h b/cpp/src/arrow/flight/client.h
index 0fa571d..774b0f6 100644
--- a/cpp/src/arrow/flight/client.h
+++ b/cpp/src/arrow/flight/client.h
@@ -86,8 +86,7 @@ class ARROW_FLIGHT_EXPORT FlightStreamReader : public MetadataRecordBatchReader
class ARROW_FLIGHT_EXPORT FlightStreamWriter : public ipc::RecordBatchWriter {
public:
virtual Status WriteWithMetadata(const RecordBatch& batch,
- std::shared_ptr<Buffer> app_metadata,
- bool allow_64bit = false) = 0;
+ std::shared_ptr<Buffer> app_metadata) = 0;
};
#ifdef _MSC_VER
diff --git a/cpp/src/arrow/flight/perf-server.cc b/cpp/src/arrow/flight/perf-server.cc
index 60b2d4e..129fe53 100644
--- a/cpp/src/arrow/flight/perf-server.cc
+++ b/cpp/src/arrow/flight/perf-server.cc
@@ -74,7 +74,7 @@ class PerfDataStream : public FlightDataStream {
std::shared_ptr<Schema> schema() override { return schema_; }
Status GetSchemaPayload(FlightPayload* payload) override {
- return ipc::internal::GetSchemaPayload(*schema_, &dictionary_memo_,
+ return ipc::internal::GetSchemaPayload(*schema_, ipc_options_, &dictionary_memo_,
&payload->ipc_message);
}
@@ -103,8 +103,8 @@ class PerfDataStream : public FlightDataStream {
} else {
records_sent_ += batch_length_;
}
- return ipc::internal::GetRecordBatchPayload(*batch, default_memory_pool(),
- &payload->ipc_message);
+ return ipc::internal::GetRecordBatchPayload(
+ *batch, ipc_options_, default_memory_pool(), &payload->ipc_message);
}
private:
@@ -115,6 +115,7 @@ class PerfDataStream : public FlightDataStream {
int64_t records_sent_;
std::shared_ptr<Schema> schema_;
ipc::DictionaryMemo dictionary_memo_;
+ ipc::IpcOptions ipc_options_;
std::shared_ptr<RecordBatch> batch_;
ArrayVector arrays_;
};
diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
index 849fbf0..d3178ed 100644
--- a/cpp/src/arrow/flight/server.cc
+++ b/cpp/src/arrow/flight/server.cc
@@ -36,6 +36,7 @@
#include "arrow/buffer.h"
#include "arrow/ipc/dictionary.h"
+#include "arrow/ipc/options.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/writer.h"
#include "arrow/memory_pool.h"
@@ -640,13 +641,13 @@ class RecordBatchStream::RecordBatchStreamImpl {
RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
MemoryPool* pool)
- : pool_(pool), reader_(reader) {}
+ : pool_(pool), reader_(reader), ipc_options_(ipc::IpcOptions::Defaults()) {}
std::shared_ptr<Schema> schema() { return reader_->schema(); }
Status GetSchemaPayload(FlightPayload* payload) {
- return ipc::internal::GetSchemaPayload(*reader_->schema(), &dictionary_memo_,
- &payload->ipc_message);
+ return ipc::internal::GetSchemaPayload(*reader_->schema(), ipc_options_,
+ &dictionary_memo_, &payload->ipc_message);
}
Status Next(FlightPayload* payload) {
@@ -664,7 +665,7 @@ class RecordBatchStream::RecordBatchStreamImpl {
if (stage_ == Stage::DICTIONARY) {
if (dictionary_index_ == static_cast<int>(dictionaries_.size())) {
stage_ = Stage::RECORD_BATCH;
- return ipc::internal::GetRecordBatchPayload(*current_batch_, pool_,
+ return ipc::internal::GetRecordBatchPayload(*current_batch_, ipc_options_, pool_,
&payload->ipc_message);
} else {
return GetNextDictionary(payload);
@@ -679,7 +680,7 @@ class RecordBatchStream::RecordBatchStreamImpl {
payload->ipc_message.metadata = nullptr;
return Status::OK();
} else {
- return ipc::internal::GetRecordBatchPayload(*current_batch_, pool_,
+ return ipc::internal::GetRecordBatchPayload(*current_batch_, ipc_options_, pool_,
&payload->ipc_message);
}
}
@@ -687,7 +688,7 @@ class RecordBatchStream::RecordBatchStreamImpl {
private:
Status GetNextDictionary(FlightPayload* payload) {
const auto& it = dictionaries_[dictionary_index_++];
- return ipc::internal::GetDictionaryPayload(it.first, it.second, pool_,
+ return ipc::internal::GetDictionaryPayload(it.first, it.second, ipc_options_, pool_,
&payload->ipc_message);
}
@@ -703,6 +704,7 @@ class RecordBatchStream::RecordBatchStreamImpl {
MemoryPool* pool_;
std::shared_ptr<RecordBatchReader> reader_;
ipc::DictionaryMemo dictionary_memo_;
+ ipc::IpcOptions ipc_options_;
std::shared_ptr<RecordBatch> current_batch_;
std::vector<std::pair<int64_t, std::shared_ptr<Array>>> dictionaries_;
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index fcc7e77..9c152d7 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -24,6 +24,7 @@
#include <memory>
#include <string>
+#include "arrow/ipc/options.h"
#include "arrow/status.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
@@ -57,11 +58,6 @@ enum class MetadataVersion : char {
V4
};
-// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
-// deeply nested schemas, it is expected the user will indicate explicitly the
-// maximum allowed recursion depth
-constexpr int kMaxNestingDepth = 64;
-
// Read interface classes. We do not fully deserialize the flatbuffers so that
// individual fields metadata can be retrieved from very large schema without
//
diff --git a/cpp/src/arrow/ipc/options.cc b/cpp/src/arrow/ipc/options.cc
new file mode 100644
index 0000000..a5714f3
--- /dev/null
+++ b/cpp/src/arrow/ipc/options.cc
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/ipc/options.h"
+
+namespace arrow {
+namespace ipc {
+
+IpcOptions IpcOptions::Defaults() { return IpcOptions(); }
+
+} // namespace ipc
+} // namespace arrow
diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h
new file mode 100644
index 0000000..d380402
--- /dev/null
+++ b/cpp/src/arrow/ipc/options.h
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace ipc {
+
+// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
+// deeply nested schemas, it is expected the user will indicate explicitly the
+// maximum allowed recursion depth
+constexpr int kMaxNestingDepth = 64;
+
+struct ARROW_EXPORT IpcOptions {
+ // If true, allow field lengths that don't fit in a signed 32-bit int.
+ // Some implementations may not be able to parse such streams.
+ bool allow_64bit = false;
+ // The maximum permitted schema nesting depth.
+ int max_recursion_depth = kMaxNestingDepth;
+
+ static IpcOptions Defaults();
+};
+
+} // namespace ipc
+} // namespace arrow
diff --git a/cpp/src/arrow/ipc/read-write-benchmark.cc b/cpp/src/arrow/ipc/read-write-benchmark.cc
index 6f66f9c..d2b1d99 100644
--- a/cpp/src/arrow/ipc/read-write-benchmark.cc
+++ b/cpp/src/arrow/ipc/read-write-benchmark.cc
@@ -50,6 +50,7 @@ std::shared_ptr<RecordBatch> MakeRecordBatch(int64_t total_size, int64_t num_fie
static void WriteRecordBatch(benchmark::State& state) { // NOLINT non-const reference
// 1MB
constexpr int64_t kTotalSize = 1 << 20;
+ auto options = ipc::IpcOptions::Defaults();
std::shared_ptr<ResizableBuffer> buffer;
ABORT_NOT_OK(AllocateResizableBuffer(kTotalSize & 2, &buffer));
@@ -60,7 +61,7 @@ static void WriteRecordBatch(benchmark::State& state) { // NOLINT non-const ref
int32_t metadata_length;
int64_t body_length;
if (!ipc::WriteRecordBatch(*record_batch, 0, &stream, &metadata_length, &body_length,
- default_memory_pool())
+ options, default_memory_pool())
.ok()) {
state.SkipWithError("Failed to write!");
}
@@ -71,6 +72,7 @@ static void WriteRecordBatch(benchmark::State& state) { // NOLINT non-const ref
static void ReadRecordBatch(benchmark::State& state) { // NOLINT non-const reference
// 1MB
constexpr int64_t kTotalSize = 1 << 20;
+ auto options = ipc::IpcOptions::Defaults();
std::shared_ptr<ResizableBuffer> buffer;
ABORT_NOT_OK(AllocateResizableBuffer(kTotalSize & 2, &buffer));
@@ -81,7 +83,7 @@ static void ReadRecordBatch(benchmark::State& state) { // NOLINT non-const refe
int32_t metadata_length;
int64_t body_length;
if (!ipc::WriteRecordBatch(*record_batch, 0, &stream, &metadata_length, &body_length,
- default_memory_pool())
+ options, default_memory_pool())
.ok()) {
state.SkipWithError("Failed to write!");
}
diff --git a/cpp/src/arrow/ipc/read-write-test.cc b/cpp/src/arrow/ipc/read-write-test.cc
index d5a0034..d79962b 100644
--- a/cpp/src/arrow/ipc/read-write-test.cc
+++ b/cpp/src/arrow/ipc/read-write-test.cc
@@ -224,6 +224,11 @@ static int g_file_number = 0;
class IpcTestFixture : public io::MemoryMapFixture {
public:
+ void SetUp() {
+ pool_ = default_memory_pool();
+ options_ = IpcOptions::Defaults();
+ }
+
void DoSchemaRoundTrip(const Schema& schema, DictionaryMemo* out_memo,
std::shared_ptr<Schema>* result) {
std::shared_ptr<Buffer> serialized_schema;
@@ -251,9 +256,13 @@ class IpcTestFixture : public io::MemoryMapFixture {
}
RETURN_NOT_OK(mmap_->Seek(0));
- std::shared_ptr<RecordBatchWriter> file_writer;
- RETURN_NOT_OK(RecordBatchFileWriter::Open(mmap_.get(), batch.schema(), &file_writer));
- RETURN_NOT_OK(file_writer->WriteRecordBatch(batch, true));
+ auto options = options_;
+ options.allow_64bit = true;
+
+ auto res = RecordBatchFileWriter::Open(mmap_.get(), batch.schema(), options);
+ RETURN_NOT_OK(res.status());
+ std::shared_ptr<RecordBatchWriter> file_writer = *res;
+ RETURN_NOT_OK(file_writer->WriteRecordBatch(batch));
RETURN_NOT_OK(file_writer->Close());
int64_t offset;
@@ -308,19 +317,20 @@ class IpcTestFixture : public io::MemoryMapFixture {
protected:
std::shared_ptr<io::MemoryMappedFile> mmap_;
MemoryPool* pool_;
+ IpcOptions options_;
};
class TestWriteRecordBatch : public ::testing::Test, public IpcTestFixture {
public:
- void SetUp() { pool_ = default_memory_pool(); }
- void TearDown() { io::MemoryMapFixture::TearDown(); }
+ void SetUp() { IpcTestFixture::SetUp(); }
+ void TearDown() { IpcTestFixture::TearDown(); }
};
class TestIpcRoundTrip : public ::testing::TestWithParam<MakeRecordBatch*>,
public IpcTestFixture {
public:
- void SetUp() { pool_ = default_memory_pool(); }
- void TearDown() { io::MemoryMapFixture::TearDown(); }
+ void SetUp() { IpcTestFixture::SetUp(); }
+ void TearDown() { IpcTestFixture::TearDown(); }
};
TEST_P(TestIpcRoundTrip, RoundTrip) {
@@ -342,7 +352,7 @@ TEST_F(TestIpcRoundTrip, MetadataVersion) {
const int64_t buffer_offset = 0;
ASSERT_OK(WriteRecordBatch(*batch, buffer_offset, mmap_.get(), &metadata_length,
- &body_length, pool_));
+ &body_length, options_, pool_));
std::unique_ptr<Message> message;
ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
@@ -461,13 +471,14 @@ TEST_F(TestWriteRecordBatch, SliceTruncatesBuffers) {
CheckArray(a1);
}
-void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
+void TestGetRecordBatchSize(const IpcOptions& options,
+ std::shared_ptr<RecordBatch> batch) {
io::MockOutputStream mock;
int32_t mock_metadata_length = -1;
int64_t mock_body_length = -1;
int64_t size = -1;
ASSERT_OK(WriteRecordBatch(*batch, 0, &mock, &mock_metadata_length, &mock_body_length,
- default_memory_pool()));
+ options, default_memory_pool()));
ASSERT_OK(GetRecordBatchSize(*batch, &size));
ASSERT_EQ(mock.GetExtentBytesWritten(), size);
}
@@ -476,19 +487,19 @@ TEST_F(TestWriteRecordBatch, IntegerGetRecordBatchSize) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(MakeIntRecordBatch(&batch));
- TestGetRecordBatchSize(batch);
+ TestGetRecordBatchSize(options_, batch);
ASSERT_OK(MakeListRecordBatch(&batch));
- TestGetRecordBatchSize(batch);
+ TestGetRecordBatchSize(options_, batch);
ASSERT_OK(MakeZeroLengthRecordBatch(&batch));
- TestGetRecordBatchSize(batch);
+ TestGetRecordBatchSize(options_, batch);
ASSERT_OK(MakeNonNullRecordBatch(&batch));
- TestGetRecordBatchSize(batch);
+ TestGetRecordBatchSize(options_, batch);
ASSERT_OK(MakeDeeplyNestedList(&batch));
- TestGetRecordBatchSize(batch);
+ TestGetRecordBatchSize(options_, batch);
}
class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
@@ -521,13 +532,12 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
const int memory_map_size = 1 << 20;
RETURN_NOT_OK(io::MemoryMapFixture::InitMemoryMap(memory_map_size, ss.str(), &mmap_));
+ auto options = IpcOptions::Defaults();
if (override_level) {
- return WriteRecordBatch(**batch, 0, mmap_.get(), metadata_length, body_length,
- pool_, recursion_level + 1);
- } else {
- return WriteRecordBatch(**batch, 0, mmap_.get(), metadata_length, body_length,
- pool_);
+ options.max_recursion_depth = recursion_level + 1;
}
+ return WriteRecordBatch(**batch, 0, mmap_.get(), metadata_length, body_length,
+ options, pool_);
}
protected:
@@ -582,10 +592,12 @@ TEST_F(RecursionLimits, StressLimit) {
DictionaryMemo empty_memo;
+ auto options = IpcOptions::Defaults();
+ options.max_recursion_depth = recursion_depth + 1;
io::BufferReader reader(message->body());
std::shared_ptr<RecordBatch> result;
- ASSERT_OK(ReadRecordBatch(*message->metadata(), schema, &empty_memo,
- recursion_depth + 1, &reader, &result));
+ ASSERT_OK(ReadRecordBatch(*message->metadata(), schema, &empty_memo, options, &reader,
+ &result));
*it_works = result->Equals(*batch);
};
@@ -854,8 +866,8 @@ TEST(TestRecordBatchStreamReader, EmptyStreamWithDictionaries) {
class TestTensorRoundTrip : public ::testing::Test, public IpcTestFixture {
public:
- void SetUp() { pool_ = default_memory_pool(); }
- void TearDown() { io::MemoryMapFixture::TearDown(); }
+ void SetUp() { IpcTestFixture::SetUp(); }
+ void TearDown() { IpcTestFixture::TearDown(); }
void CheckTensorRoundTrip(const Tensor& tensor) {
int32_t metadata_length;
@@ -931,8 +943,8 @@ TEST_F(TestTensorRoundTrip, NonContiguous) {
class TestSparseTensorRoundTrip : public ::testing::Test, public IpcTestFixture {
public:
- void SetUp() { pool_ = default_memory_pool(); }
- void TearDown() { io::MemoryMapFixture::TearDown(); }
+ void SetUp() { IpcTestFixture::SetUp(); }
+ void TearDown() { IpcTestFixture::TearDown(); }
template <typename SparseIndexType>
void CheckSparseTensorRoundTrip(const SparseTensorImpl<SparseIndexType>& tensor) {
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 4a554aa..648bb89 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -348,7 +348,8 @@ static Status LoadArray(const Field& field, ArrayLoaderContext* context, ArrayDa
Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema,
const DictionaryMemo* dictionary_memo, io::RandomAccessFile* file,
std::shared_ptr<RecordBatch>* out) {
- return ReadRecordBatch(metadata, schema, dictionary_memo, kMaxNestingDepth, file, out);
+ auto options = IpcOptions::Defaults();
+ return ReadRecordBatch(metadata, schema, dictionary_memo, options, file, out);
}
Status ReadRecordBatch(const Message& message, const std::shared_ptr<Schema>& schema,
@@ -356,9 +357,10 @@ Status ReadRecordBatch(const Message& message, const std::shared_ptr<Schema>& sc
std::shared_ptr<RecordBatch>* out) {
CHECK_MESSAGE_TYPE(message.type(), Message::RECORD_BATCH);
CHECK_HAS_BODY(message);
+ auto options = IpcOptions::Defaults();
io::BufferReader reader(message.body());
- return ReadRecordBatch(*message.metadata(), schema, dictionary_memo, kMaxNestingDepth,
- &reader, out);
+ return ReadRecordBatch(*message.metadata(), schema, dictionary_memo, options, &reader,
+ out);
}
// ----------------------------------------------------------------------
@@ -389,15 +391,17 @@ static Status LoadRecordBatchFromSource(const std::shared_ptr<Schema>& schema,
static inline Status ReadRecordBatch(const flatbuf::RecordBatch* metadata,
const std::shared_ptr<Schema>& schema,
const DictionaryMemo* dictionary_memo,
- int max_recursion_depth, io::RandomAccessFile* file,
+ const IpcOptions& options,
+ io::RandomAccessFile* file,
std::shared_ptr<RecordBatch>* out) {
IpcComponentSource source(metadata, file);
- return LoadRecordBatchFromSource(schema, metadata->length(), max_recursion_depth,
- &source, dictionary_memo, out);
+ return LoadRecordBatchFromSource(schema, metadata->length(),
+ options.max_recursion_depth, &source, dictionary_memo,
+ out);
}
Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema,
- const DictionaryMemo* dictionary_memo, int max_recursion_depth,
+ const DictionaryMemo* dictionary_memo, const IpcOptions& options,
io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
const flatbuf::Message* message;
RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &message));
@@ -406,11 +410,13 @@ Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& sc
return Status::IOError(
"Header-type of flatbuffer-encoded Message is not RecordBatch.");
}
- return ReadRecordBatch(batch, schema, dictionary_memo, max_recursion_depth, file, out);
+ return ReadRecordBatch(batch, schema, dictionary_memo, options, file, out);
}
Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,
io::RandomAccessFile* file) {
+ auto options = IpcOptions::Defaults();
+
const flatbuf::Message* message;
RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &message));
auto dictionary_batch = message->header_as_DictionaryBatch();
@@ -432,7 +438,7 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,
std::shared_ptr<RecordBatch> batch;
auto batch_meta = dictionary_batch->data();
RETURN_NOT_OK(ReadRecordBatch(batch_meta, ::arrow::schema({value_field}),
- dictionary_memo, kMaxNestingDepth, file, &batch));
+ dictionary_memo, options, file, &batch));
if (batch->num_columns() != 1) {
return Status::Invalid("Dictionary record batch must only contain one field");
}
@@ -817,10 +823,11 @@ Status ReadSchema(const Message& message, DictionaryMemo* dictionary_memo,
Status ReadRecordBatch(const std::shared_ptr<Schema>& schema,
const DictionaryMemo* dictionary_memo, io::InputStream* file,
std::shared_ptr<RecordBatch>* out) {
+ auto options = IpcOptions::Defaults();
std::unique_ptr<Message> message;
RETURN_NOT_OK(ReadContiguousPayload(file, &message));
io::BufferReader buffer_reader(message->body());
- return ReadRecordBatch(*message->metadata(), schema, dictionary_memo, kMaxNestingDepth,
+ return ReadRecordBatch(*message->metadata(), schema, dictionary_memo, options,
&buffer_reader, out);
}
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index 34a0eef..1314a37 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -25,6 +25,7 @@
#include "arrow/ipc/dictionary.h"
#include "arrow/ipc/message.h"
+#include "arrow/ipc/options.h"
#include "arrow/record_batch.h"
#include "arrow/util/visibility.h"
@@ -245,12 +246,12 @@ Status ReadRecordBatch(const Message& message, const std::shared_ptr<Schema>& sc
/// dictionaries. Can be nullptr if you are sure there are no
/// dictionary-encoded fields
/// \param[in] file a random access file
-/// \param[in] max_recursion_depth the maximum permitted nesting depth
+/// \param[in] options options for deserialization
/// \param[out] out the read record batch
/// \return Status
ARROW_EXPORT
Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema,
- const DictionaryMemo* dictionary_memo, int max_recursion_depth,
+ const DictionaryMemo* dictionary_memo, const IpcOptions& options,
io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
/// \brief Read arrow::Tensor as encapsulated IPC message in file
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index b5c16cd..a42e4cc 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -113,13 +113,13 @@ namespace internal {
class RecordBatchSerializer : public ArrayVisitor {
public:
RecordBatchSerializer(MemoryPool* pool, int64_t buffer_start_offset,
- int max_recursion_depth, bool allow_64bit, IpcPayload* out)
+ const IpcOptions& options, IpcPayload* out)
: out_(out),
pool_(pool),
- max_recursion_depth_(max_recursion_depth),
- buffer_start_offset_(buffer_start_offset),
- allow_64bit_(allow_64bit) {
- DCHECK_GT(max_recursion_depth, 0);
+ options_(options),
+ max_recursion_depth_(options.max_recursion_depth),
+ buffer_start_offset_(buffer_start_offset) {
+ DCHECK_GT(max_recursion_depth_, 0);
}
~RecordBatchSerializer() override = default;
@@ -129,7 +129,7 @@ class RecordBatchSerializer : public ArrayVisitor {
return Status::Invalid("Max recursion depth reached");
}
- if (!allow_64bit_ && arr.length() > std::numeric_limits<int32_t>::max()) {
+ if (!options_.allow_64bit && arr.length() > std::numeric_limits<int32_t>::max()) {
return Status::CapacityError("Cannot write arrays larger than 2^31 - 1 in length");
}
@@ -496,17 +496,16 @@ class RecordBatchSerializer : public ArrayVisitor {
std::vector<internal::FieldMetadata> field_nodes_;
std::vector<internal::BufferMetadata> buffer_meta_;
+ const IpcOptions& options_;
int64_t max_recursion_depth_;
int64_t buffer_start_offset_;
- bool allow_64bit_;
};
class DictionaryWriter : public RecordBatchSerializer {
public:
DictionaryWriter(int64_t dictionary_id, MemoryPool* pool, int64_t buffer_start_offset,
- int max_recursion_depth, bool allow_64bit, IpcPayload* out)
- : RecordBatchSerializer(pool, buffer_start_offset, max_recursion_depth, allow_64bit,
- out),
+ const IpcOptions& options, IpcPayload* out)
+ : RecordBatchSerializer(pool, buffer_start_offset, options, out),
dictionary_id_(dictionary_id) {}
Status SerializeMetadata(int64_t num_rows) override {
@@ -562,25 +561,25 @@ Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst,
return Status::OK();
}
-Status GetSchemaPayload(const Schema& schema, DictionaryMemo* dictionary_memo,
- IpcPayload* out) {
+Status GetSchemaPayload(const Schema& schema, const IpcOptions& options,
+ DictionaryMemo* dictionary_memo, IpcPayload* out) {
out->type = Message::SCHEMA;
return WriteSchemaMessage(schema, dictionary_memo, &out->metadata);
}
Status GetDictionaryPayload(int64_t id, const std::shared_ptr<Array>& dictionary,
- MemoryPool* pool, IpcPayload* out) {
+ const IpcOptions& options, MemoryPool* pool,
+ IpcPayload* out) {
out->type = Message::DICTIONARY_BATCH;
// Frame of reference is 0, see ARROW-384
- DictionaryWriter writer(id, pool, /*buffer_start_offset=*/0, ipc::kMaxNestingDepth,
- true /* allow_64bit */, out);
+ DictionaryWriter writer(id, pool, /*buffer_start_offset=*/0, options, out);
return writer.Assemble(dictionary);
}
-Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool,
- IpcPayload* out) {
+Status GetRecordBatchPayload(const RecordBatch& batch, const IpcOptions& options,
+ MemoryPool* pool, IpcPayload* out) {
out->type = Message::RECORD_BATCH;
- RecordBatchSerializer writer(pool, 0, kMaxNestingDepth, true, out);
+ RecordBatchSerializer writer(pool, /*buffer_start_offset=*/0, options, out);
return writer.Assemble(batch);
}
@@ -588,11 +587,10 @@ Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool,
Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
io::OutputStream* dst, int32_t* metadata_length,
- int64_t* body_length, MemoryPool* pool, int max_recursion_depth,
- bool allow_64bit) {
+ int64_t* body_length, const IpcOptions& options,
+ MemoryPool* pool) {
internal::IpcPayload payload;
- internal::RecordBatchSerializer writer(pool, buffer_start_offset, max_recursion_depth,
- allow_64bit, &payload);
+ internal::RecordBatchSerializer writer(pool, buffer_start_offset, options, &payload);
RETURN_NOT_OK(writer.Assemble(batch));
// TODO(wesm): it's a rough edge that the metadata and body length here are
@@ -605,25 +603,17 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
}
Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& batches,
- io::OutputStream* dst) {
+ const IpcOptions& options, io::OutputStream* dst) {
ASSIGN_OR_RAISE(std::shared_ptr<RecordBatchWriter> writer,
- RecordBatchStreamWriter::Open(dst, batches[0]->schema()));
+ RecordBatchStreamWriter::Open(dst, batches[0]->schema(), options));
for (const auto& batch : batches) {
- // allow sizes > INT32_MAX
DCHECK(batch->schema()->Equals(*batches[0]->schema())) << "Schemas unequal";
- RETURN_NOT_OK(writer->WriteRecordBatch(*batch, true));
+ RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
}
RETURN_NOT_OK(writer->Close());
return Status::OK();
}
-Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
- io::OutputStream* dst, int32_t* metadata_length,
- int64_t* body_length, MemoryPool* pool) {
- return WriteRecordBatch(batch, buffer_start_offset, dst, metadata_length, body_length,
- pool, kMaxNestingDepth, true);
-}
-
namespace {
Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst,
@@ -829,8 +819,9 @@ Status WriteSparseTensor(const SparseTensor& sparse_tensor, io::OutputStream* ds
Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
int64_t buffer_start_offset, io::OutputStream* dst,
int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) {
+ auto options = IpcOptions::Defaults();
internal::IpcPayload payload;
- RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, pool, &payload));
+ RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, options, pool, &payload));
// The body size is computed in the payload
*body_length = payload.body_length;
@@ -839,11 +830,12 @@ Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dict
Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
// emulates the behavior of Write without actually writing
+ auto options = IpcOptions::Defaults();
int32_t metadata_length = 0;
int64_t body_length = 0;
io::MockOutputStream dst;
- RETURN_NOT_OK(WriteRecordBatch(batch, 0, &dst, &metadata_length, &body_length,
- default_memory_pool(), kMaxNestingDepth, true));
+ RETURN_NOT_OK(WriteRecordBatch(batch, 0, &dst, &metadata_length, &body_length, options,
+ default_memory_pool()));
*size = dst.GetExtentBytesWritten();
return Status::OK();
}
@@ -875,7 +867,7 @@ Status RecordBatchWriter::WriteTable(const Table& table, int64_t max_chunksize)
if (batch == nullptr) {
break;
}
- RETURN_NOT_OK(WriteRecordBatch(*batch, true));
+ RETURN_NOT_OK(WriteRecordBatch(*batch));
}
return Status::OK();
@@ -902,11 +894,13 @@ class RecordBatchPayloadWriter : public RecordBatchWriter {
~RecordBatchPayloadWriter() override = default;
RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter> payload_writer,
- const Schema& schema, DictionaryMemo* out_memo = nullptr)
+ const Schema& schema, const IpcOptions& options,
+ DictionaryMemo* out_memo = nullptr)
: payload_writer_(std::move(payload_writer)),
schema_(schema),
pool_(default_memory_pool()),
- dictionary_memo_(out_memo) {
+ dictionary_memo_(out_memo),
+ options_(options) {
if (out_memo == nullptr) {
dictionary_memo_ = &internal_dict_memo_;
}
@@ -915,12 +909,12 @@ class RecordBatchPayloadWriter : public RecordBatchWriter {
// A Schema-owning constructor variant
RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter> payload_writer,
const std::shared_ptr<Schema>& schema,
- DictionaryMemo* out_memo = nullptr)
- : RecordBatchPayloadWriter(std::move(payload_writer), *schema, out_memo) {
+ const IpcOptions& options, DictionaryMemo* out_memo = nullptr)
+ : RecordBatchPayloadWriter(std::move(payload_writer), *schema, options, out_memo) {
shared_schema_ = schema;
}
- Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override {
+ Status WriteRecordBatch(const RecordBatch& batch) override {
if (!batch.schema()->Equals(schema_, false /* check_metadata */)) {
return Status::Invalid("Tried to write record batch with different schema");
}
@@ -936,7 +930,7 @@ class RecordBatchPayloadWriter : public RecordBatchWriter {
// deltas while computing the RecordBatch payload to save time?
internal::IpcPayload payload;
- RETURN_NOT_OK(GetRecordBatchPayload(batch, pool_, &payload));
+ RETURN_NOT_OK(GetRecordBatchPayload(batch, options_, pool_, &payload));
return payload_writer_->WritePayload(payload);
}
@@ -952,7 +946,7 @@ class RecordBatchPayloadWriter : public RecordBatchWriter {
RETURN_NOT_OK(payload_writer_->Start());
internal::IpcPayload payload;
- RETURN_NOT_OK(GetSchemaPayload(schema_, dictionary_memo_, &payload));
+ RETURN_NOT_OK(GetSchemaPayload(schema_, options_, dictionary_memo_, &payload));
return payload_writer_->WritePayload(payload);
}
@@ -972,7 +966,8 @@ class RecordBatchPayloadWriter : public RecordBatchWriter {
int64_t dictionary_id = pair.first;
const auto& dictionary = pair.second;
- RETURN_NOT_OK(GetDictionaryPayload(dictionary_id, dictionary, pool_, &payload));
+ RETURN_NOT_OK(
+ GetDictionaryPayload(dictionary_id, dictionary, options_, pool_, &payload));
RETURN_NOT_OK(payload_writer_->WritePayload(payload));
}
return Status::OK();
@@ -987,6 +982,7 @@ class RecordBatchPayloadWriter : public RecordBatchWriter {
DictionaryMemo internal_dict_memo_;
bool started_ = false;
bool wrote_dictionaries_ = false;
+ IpcOptions options_;
};
// ----------------------------------------------------------------------
@@ -1138,20 +1134,22 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl
: public RecordBatchPayloadWriter {
public:
RecordBatchStreamWriterImpl(io::OutputStream* sink,
- const std::shared_ptr<Schema>& schema)
+ const std::shared_ptr<Schema>& schema,
+ const IpcOptions& options)
: RecordBatchPayloadWriter(
std::unique_ptr<internal::IpcPayloadWriter>(new PayloadStreamWriter(sink)),
- schema) {}
+ schema, options) {}
~RecordBatchStreamWriterImpl() = default;
};
class RecordBatchFileWriter::RecordBatchFileWriterImpl : public RecordBatchPayloadWriter {
public:
- RecordBatchFileWriterImpl(io::OutputStream* sink, const std::shared_ptr<Schema>& schema)
+ RecordBatchFileWriterImpl(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+ const IpcOptions& options)
: RecordBatchPayloadWriter(std::unique_ptr<internal::IpcPayloadWriter>(
new PayloadFileWriter(sink, schema)),
- schema) {}
+ schema, options) {}
~RecordBatchFileWriterImpl() = default;
};
@@ -1160,9 +1158,8 @@ RecordBatchStreamWriter::RecordBatchStreamWriter() {}
RecordBatchStreamWriter::~RecordBatchStreamWriter() {}
-Status RecordBatchStreamWriter::WriteRecordBatch(const RecordBatch& batch,
- bool allow_64bit) {
- return impl_->WriteRecordBatch(batch, allow_64bit);
+Status RecordBatchStreamWriter::WriteRecordBatch(const RecordBatch& batch) {
+ return impl_->WriteRecordBatch(batch);
}
void RecordBatchStreamWriter::set_memory_pool(MemoryPool* pool) {
@@ -1177,13 +1174,20 @@ Status RecordBatchStreamWriter::Open(io::OutputStream* sink,
}
Result<std::shared_ptr<RecordBatchWriter>> RecordBatchStreamWriter::Open(
- io::OutputStream* sink, const std::shared_ptr<Schema>& schema) {
+ io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+ const IpcOptions& options) {
// ctor is private
auto result = std::shared_ptr<RecordBatchStreamWriter>(new RecordBatchStreamWriter());
- result->impl_.reset(new RecordBatchStreamWriterImpl(sink, schema));
+ result->impl_.reset(new RecordBatchStreamWriterImpl(sink, schema, options));
return std::move(result);
}
+Result<std::shared_ptr<RecordBatchWriter>> RecordBatchStreamWriter::Open(
+ io::OutputStream* sink, const std::shared_ptr<Schema>& schema) {
+ auto options = IpcOptions::Defaults();
+ return Open(sink, schema, options);
+}
+
Status RecordBatchStreamWriter::Close() { return impl_->Close(); }
RecordBatchFileWriter::RecordBatchFileWriter() {}
@@ -1198,16 +1202,22 @@ Status RecordBatchFileWriter::Open(io::OutputStream* sink,
}
Result<std::shared_ptr<RecordBatchWriter>> RecordBatchFileWriter::Open(
- io::OutputStream* sink, const std::shared_ptr<Schema>& schema) {
+ io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+ const IpcOptions& options) {
// ctor is private
auto result = std::shared_ptr<RecordBatchFileWriter>(new RecordBatchFileWriter());
- result->file_impl_.reset(new RecordBatchFileWriterImpl(sink, schema));
+ result->file_impl_.reset(new RecordBatchFileWriterImpl(sink, schema, options));
return std::move(result);
}
-Status RecordBatchFileWriter::WriteRecordBatch(const RecordBatch& batch,
- bool allow_64bit) {
- return file_impl_->WriteRecordBatch(batch, allow_64bit);
+Result<std::shared_ptr<RecordBatchWriter>> RecordBatchFileWriter::Open(
+ io::OutputStream* sink, const std::shared_ptr<Schema>& schema) {
+ auto options = IpcOptions::Defaults();
+ return Open(sink, schema, options);
+}
+
+Status RecordBatchFileWriter::WriteRecordBatch(const RecordBatch& batch) {
+ return file_impl_->WriteRecordBatch(batch);
}
Status RecordBatchFileWriter::Close() { return file_impl_->Close(); }
@@ -1217,15 +1227,17 @@ namespace internal {
Status OpenRecordBatchWriter(std::unique_ptr<IpcPayloadWriter> sink,
const std::shared_ptr<Schema>& schema,
std::unique_ptr<RecordBatchWriter>* out) {
- ASSIGN_OR_RAISE(*out, OpenRecordBatchWriter(std::move(sink), schema));
+ auto options = IpcOptions::Defaults();
+ ASSIGN_OR_RAISE(*out, OpenRecordBatchWriter(std::move(sink), schema, options));
return Status::OK();
}
Result<std::unique_ptr<RecordBatchWriter>> OpenRecordBatchWriter(
- std::unique_ptr<IpcPayloadWriter> sink, const std::shared_ptr<Schema>& schema) {
+ std::unique_ptr<IpcPayloadWriter> sink, const std::shared_ptr<Schema>& schema,
+ const IpcOptions& options) {
// XXX should we call Start()?
return std::unique_ptr<RecordBatchWriter>(
- new RecordBatchPayloadWriter(std::move(sink), schema));
+ new RecordBatchPayloadWriter(std::move(sink), schema, options));
}
} // namespace internal
@@ -1248,10 +1260,10 @@ Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool,
Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool,
io::OutputStream* out) {
+ auto options = IpcOptions::Defaults();
int32_t metadata_length = 0;
int64_t body_length = 0;
- return WriteRecordBatch(batch, 0, out, &metadata_length, &body_length, pool,
- kMaxNestingDepth, true);
+ return WriteRecordBatch(batch, 0, out, &metadata_length, &body_length, options, pool);
}
Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo,
@@ -1259,8 +1271,10 @@ Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo,
std::shared_ptr<io::BufferOutputStream> stream;
RETURN_NOT_OK(io::BufferOutputStream::Create(1024, pool, &stream));
+ auto options = IpcOptions::Defaults();
auto payload_writer = make_unique<PayloadStreamWriter>(stream.get());
- RecordBatchPayloadWriter writer(std::move(payload_writer), schema, dictionary_memo);
+ RecordBatchPayloadWriter writer(std::move(payload_writer), schema, options,
+ dictionary_memo);
// Write schema and populate fields (but not dictionaries) in dictionary_memo
RETURN_NOT_OK(writer.Start());
return stream->Finish(out);
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index d2b9faa..e70827e 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -25,6 +25,7 @@
#include <vector>
#include "arrow/ipc/message.h"
+#include "arrow/ipc/options.h"
#include "arrow/result.h"
#include "arrow/util/visibility.h"
@@ -59,10 +60,8 @@ class ARROW_EXPORT RecordBatchWriter {
/// \brief Write a record batch to the stream
///
/// \param[in] batch the record batch to write to the stream
- /// \param[in] allow_64bit if true, allow field lengths that don't fit
- /// in a signed 32-bit int
/// \return Status
- virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) = 0;
+ virtual Status WriteRecordBatch(const RecordBatch& batch) = 0;
/// \brief Write possibly-chunked table by creating sequence of record batches
/// \param[in] table table to write
@@ -112,13 +111,15 @@ class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter {
/// \return Result<std::shared_ptr<RecordBatchWriter>>
static Result<std::shared_ptr<RecordBatchWriter>> Open(
io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
+ static Result<std::shared_ptr<RecordBatchWriter>> Open(
+ io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+ const IpcOptions& options);
/// \brief Write a record batch to the stream
///
/// \param[in] batch the record batch to write
- /// \param[in] allow_64bit allow array lengths over INT32_MAX - 1
/// \return Status
- Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
+ Status WriteRecordBatch(const RecordBatch& batch) override;
/// \brief Close the stream by writing a 4-byte int32 0 EOS market
/// \return Status
@@ -157,13 +158,15 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
/// \return Status
static Result<std::shared_ptr<RecordBatchWriter>> Open(
io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
+ static Result<std::shared_ptr<RecordBatchWriter>> Open(
+ io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+ const IpcOptions& options);
/// \brief Write a record batch to the file
///
/// \param[in] batch the record batch to write
- /// \param[in] allow_64bit allow array lengths over INT32_MAX - 1
/// \return Status
- Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
+ Status WriteRecordBatch(const RecordBatch& batch) override;
/// \brief Close the file stream by writing the file footer and magic number
/// \return Status
@@ -184,11 +187,8 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
/// \param[out] metadata_length the size of the length-prefixed flatbuffer
/// including padding to a 64-byte boundary
/// \param[out] body_length the size of the contiguous buffer block plus
+/// \param[in] options options for serialization
/// \param[in] pool the memory pool to allocate memory from
-/// \param[in] max_recursion_depth the maximum permitted nesting schema depth
-/// \param[in] allow_64bit permit field lengths exceeding INT32_MAX. May not be
-/// readable by other Arrow implementations
-/// padding bytes
/// \return Status
///
/// Write the RecordBatch (collection of equal-length Arrow arrays) to the
@@ -207,9 +207,8 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
ARROW_EXPORT
Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
io::OutputStream* dst, int32_t* metadata_length,
- int64_t* body_length, MemoryPool* pool,
- int max_recursion_depth = kMaxNestingDepth,
- bool allow_64bit = false);
+ int64_t* body_length, const IpcOptions& options,
+ MemoryPool* pool);
/// \brief Serialize record batch as encapsulated IPC message in a new buffer
///
@@ -247,11 +246,12 @@ Status SerializeSchema(const Schema& schema, DictionaryMemo* dictionary_memo,
/// \brief Write multiple record batches to OutputStream, including schema
/// \param[in] batches a vector of batches. Must all have same schema
+/// \param[in] options options for serialization
/// \param[out] dst an OutputStream
/// \return Status
ARROW_EXPORT
Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>& batches,
- io::OutputStream* dst);
+ const IpcOptions& options, io::OutputStream* dst);
/// \brief Compute the number of bytes needed to write a record batch including metadata
///
@@ -354,36 +354,43 @@ Status OpenRecordBatchWriter(std::unique_ptr<IpcPayloadWriter> sink,
///
/// \param[in] sink the IpcPayloadWriter to write to
/// \param[in] schema the schema of the record batches to be written
+/// \param[in] options options for serialization
/// \return Result<std::unique_ptr<RecordBatchWriter>>
ARROW_EXPORT
Result<std::unique_ptr<RecordBatchWriter>> OpenRecordBatchWriter(
- std::unique_ptr<IpcPayloadWriter> sink, const std::shared_ptr<Schema>& schema);
+ std::unique_ptr<IpcPayloadWriter> sink, const std::shared_ptr<Schema>& schema,
+ const IpcOptions& options);
/// \brief Compute IpcPayload for the given schema
/// \param[in] schema the Schema that is being serialized
+/// \param[in] options options for serialization
/// \param[in,out] dictionary_memo class to populate with assigned dictionary ids
/// \param[out] out the returned vector of IpcPayloads
/// \return Status
ARROW_EXPORT
-Status GetSchemaPayload(const Schema& schema, DictionaryMemo* dictionary_memo,
- IpcPayload* out);
+Status GetSchemaPayload(const Schema& schema, const IpcOptions& options,
+ DictionaryMemo* dictionary_memo, IpcPayload* out);
/// \brief Compute IpcPayload for a dictionary
/// \param[in] id the dictionary id
/// \param[in] dictionary the dictionary values
+/// \param[in] options options for serialization
/// \param[out] payload the output IpcPayload
/// \return Status
ARROW_EXPORT
Status GetDictionaryPayload(int64_t id, const std::shared_ptr<Array>& dictionary,
- MemoryPool* pool, IpcPayload* payload);
+ const IpcOptions& options, MemoryPool* pool,
+ IpcPayload* payload);
/// \brief Compute IpcPayload for the given record batch
/// \param[in] batch the RecordBatch that is being serialized
+/// \param[in] options options for serialization
/// \param[in,out] pool for any required temporary memory allocations
/// \param[out] out the returned IpcPayload
/// \return Status
ARROW_EXPORT
-Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool, IpcPayload* out);
+Status GetRecordBatchPayload(const RecordBatch& batch, const IpcOptions& options,
+ MemoryPool* pool, IpcPayload* out);
} // namespace internal
diff --git a/cpp/src/arrow/python/flight.cc b/cpp/src/arrow/python/flight.cc
index c5b2fe2..7fb6b1a 100644
--- a/cpp/src/arrow/python/flight.cc
+++ b/cpp/src/arrow/python/flight.cc
@@ -209,7 +209,7 @@ Status PyFlightDataStream::Next(FlightPayload* payload) { return stream_->Next(p
PyGeneratorFlightDataStream::PyGeneratorFlightDataStream(
PyObject* generator, std::shared_ptr<arrow::Schema> schema,
PyGeneratorFlightDataStreamCallback callback)
- : schema_(schema), callback_(callback) {
+ : schema_(schema), options_(ipc::IpcOptions::Defaults()), callback_(callback) {
Py_INCREF(generator);
generator_.reset(generator);
}
@@ -217,7 +217,7 @@ PyGeneratorFlightDataStream::PyGeneratorFlightDataStream(
std::shared_ptr<Schema> PyGeneratorFlightDataStream::schema() { return schema_; }
Status PyGeneratorFlightDataStream::GetSchemaPayload(FlightPayload* payload) {
- return ipc::internal::GetSchemaPayload(*schema_, &dictionary_memo_,
+ return ipc::internal::GetSchemaPayload(*schema_, options_, &dictionary_memo_,
&payload->ipc_message);
}
diff --git a/cpp/src/arrow/python/flight.h b/cpp/src/arrow/python/flight.h
index fe224f0..3c5dc5f 100644
--- a/cpp/src/arrow/python/flight.h
+++ b/cpp/src/arrow/python/flight.h
@@ -194,6 +194,7 @@ class ARROW_PYTHON_EXPORT PyGeneratorFlightDataStream
OwnedRefNoGIL generator_;
std::shared_ptr<arrow::Schema> schema_;
ipc::DictionaryMemo dictionary_memo_;
+ ipc::IpcOptions options_;
PyGeneratorFlightDataStreamCallback callback_;
};
diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc
index bc64fb7..aaed073 100644
--- a/cpp/src/arrow/python/serialize.cc
+++ b/cpp/src/arrow/python/serialize.cc
@@ -562,6 +562,8 @@ Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
return serialized_tensor.WriteTo(dst);
}
+SerializedPyObject::SerializedPyObject() : ipc_options(ipc::IpcOptions::Defaults()) {}
+
Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
int32_t num_tensors = static_cast<int32_t>(this->tensors.size());
int32_t num_ndarrays = static_cast<int32_t>(this->ndarrays.size());
@@ -575,7 +577,7 @@ Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
// Align stream to 8-byte offset
RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kArrowIpcAlignment));
- RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, dst));
+ RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, this->ipc_options, dst));
// Align stream to 64-byte offset so tensor bodies are 64-byte aligned
RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
@@ -641,7 +643,8 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out
py_gil.release();
RETURN_NOT_OK(io::BufferOutputStream::Create(kInitialCapacity, memory_pool, &stream));
- RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, stream.get()));
+ RETURN_NOT_OK(
+ ipc::WriteRecordBatchStream({this->batch}, this->ipc_options, stream.get()));
RETURN_NOT_OK(stream->Finish(&buffer));
py_gil.acquire();
diff --git a/cpp/src/arrow/python/serialize.h b/cpp/src/arrow/python/serialize.h
index 6cdbbe5..80f6416 100644
--- a/cpp/src/arrow/python/serialize.h
+++ b/cpp/src/arrow/python/serialize.h
@@ -21,6 +21,7 @@
#include <memory>
#include <vector>
+#include "arrow/ipc/options.h"
#include "arrow/python/visibility.h"
#include "arrow/status.h"
@@ -52,6 +53,9 @@ struct ARROW_PYTHON_EXPORT SerializedPyObject {
std::vector<std::shared_ptr<Tensor>> tensors;
std::vector<std::shared_ptr<Tensor>> ndarrays;
std::vector<std::shared_ptr<Buffer>> buffers;
+ ipc::IpcOptions ipc_options;
+
+ SerializedPyObject();
/// \brief Write serialized Python object to OutputStream
/// \param[in,out] dst an OutputStream
diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx
index 8502471..af7e1ed 100644
--- a/python/pyarrow/_flight.pyx
+++ b/python/pyarrow/_flight.pyx
@@ -721,8 +721,7 @@ cdef class FlightStreamWriter(_CRecordBatchWriter):
check_flight_status(
(<CFlightStreamWriter*> self.writer.get())
.WriteWithMetadata(deref(batch.batch),
- c_buf,
- 1))
+ c_buf))
cdef class FlightMetadataReader:
@@ -1170,6 +1169,8 @@ cdef CStatus _data_stream_next(void* self, CFlightPayload* payload) except *:
"""Callback for implementing FlightDataStream in Python."""
cdef:
unique_ptr[CFlightDataStream] data_stream
+ # TODO make it possible to pass IPC options around?
+ cdef CIpcOptions c_ipc_options = CIpcOptions.Defaults()
py_stream = <object> self
if not isinstance(py_stream, GeneratorStream):
@@ -1228,8 +1229,9 @@ cdef CStatus _data_stream_next(void* self, CFlightPayload* payload) except *:
"GeneratorStream. "
"Got: {}\nExpected: {}".format(batch.schema,
stream_schema))
- check_flight_status(_GetRecordBatchPayload(
+ check_flight_status(GetRecordBatchPayload(
deref(batch.batch),
+ c_ipc_options,
c_default_memory_pool(),
&payload.ipc_message))
if metadata:
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index d3eca97..9bc104f 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -952,9 +952,19 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
MessageType_V3" arrow::ipc::MetadataVersion::V3"
MessageType_V4" arrow::ipc::MetadataVersion::V4"
+ cdef cppclass CIpcOptions" arrow::ipc::IpcOptions":
+ @staticmethod
+ CIpcOptions Defaults()
+
cdef cppclass CDictionaryMemo" arrow::ipc::DictionaryMemo":
pass
+ cdef cppclass CIpcPayload" arrow::ipc::internal::IpcPayload":
+ MessageType type
+ shared_ptr[CBuffer] metadata
+ vector[shared_ptr[CBuffer]] body_buffers
+ int64_t body_length
+
cdef cppclass CMessage" arrow::ipc::Message":
CStatus Open(const shared_ptr[CBuffer]& metadata,
const shared_ptr[CBuffer]& body,
@@ -981,8 +991,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
cdef cppclass CRecordBatchWriter" arrow::ipc::RecordBatchWriter":
CStatus Close()
- CStatus WriteRecordBatch(const CRecordBatch& batch,
- c_bool allow_64bit)
+ CStatus WriteRecordBatch(const CRecordBatch& batch)
CStatus WriteTable(const CTable& table, int64_t max_chunksize)
cdef cppclass CRecordBatchStreamReader \
@@ -1059,6 +1068,13 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
CStatus AlignStream(InputStream* stream, int64_t alignment)
CStatus AlignStream(OutputStream* stream, int64_t alignment)
+ cdef CStatus GetRecordBatchPayload\
+ " arrow::ipc::internal::GetRecordBatchPayload"(
+ const CRecordBatch& batch,
+ const CIpcOptions& options,
+ CMemoryPool* pool,
+ CIpcPayload* out)
+
cdef cppclass CFeatherWriter" arrow::ipc::feather::TableWriter":
@staticmethod
CStatus Open(const shared_ptr[OutputStream]& stream,
diff --git a/python/pyarrow/includes/libarrow_flight.pxd b/python/pyarrow/includes/libarrow_flight.pxd
index 7373e0b..69b4653 100644
--- a/python/pyarrow/includes/libarrow_flight.pxd
+++ b/python/pyarrow/includes/libarrow_flight.pxd
@@ -23,20 +23,6 @@ from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
-cdef extern from "arrow/ipc/api.h" namespace "arrow" nogil:
- cdef cppclass CIpcPayload" arrow::ipc::internal::IpcPayload":
- MessageType type
- shared_ptr[CBuffer] metadata
- vector[shared_ptr[CBuffer]] body_buffers
- int64_t body_length
-
- cdef CStatus _GetRecordBatchPayload\
- " arrow::ipc::internal::GetRecordBatchPayload"(
- const CRecordBatch& batch,
- CMemoryPool* pool,
- CIpcPayload* out)
-
-
cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
cdef cppclass CActionType" arrow::flight::ActionType":
c_string type
@@ -158,8 +144,7 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
cdef cppclass CFlightStreamWriter \
" arrow::flight::FlightStreamWriter"(CRecordBatchWriter):
CStatus WriteWithMetadata(const CRecordBatch& batch,
- shared_ptr[CBuffer] app_metadata,
- c_bool allow_64bit)
+ shared_ptr[CBuffer] app_metadata)
cdef cppclass CRecordBatchStream \
" arrow::flight::RecordBatchStream"(CFlightDataStream):
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 14a8b53..834b6bd 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -182,7 +182,7 @@ cdef class _CRecordBatchWriter:
"""
with nogil:
check_status(self.writer.get()
- .WriteRecordBatch(deref(batch.batch), 1))
+ .WriteRecordBatch(deref(batch.batch)))
def write_table(self, Table table, chunksize=None):
"""
diff --git a/r/src/recordbatchwriter.cpp b/r/src/recordbatchwriter.cpp
index be5103f..b22f858 100644
--- a/r/src/recordbatchwriter.cpp
+++ b/r/src/recordbatchwriter.cpp
@@ -23,7 +23,7 @@
void ipc___RecordBatchWriter__WriteRecordBatch(
const std::shared_ptr<arrow::ipc::RecordBatchWriter>& batch_writer,
const std::shared_ptr<arrow::RecordBatch>& batch) {
- STOP_IF_NOT_OK(batch_writer->WriteRecordBatch(*batch, true));
+ STOP_IF_NOT_OK(batch_writer->WriteRecordBatch(*batch));
}
// [[arrow::export]]