You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/21 16:11:13 UTC
arrow git commit: ARROW-495: [C++] Implement streaming binary format,
refactoring
Repository: arrow
Updated Branches:
refs/heads/master 8ca7033fc -> 5888e10cf
ARROW-495: [C++] Implement streaming binary format, refactoring
cc @nongli
Author: Wes McKinney <we...@twosigma.com>
Closes #293 from wesm/ARROW-495 and squashes the following commits:
279583b [Wes McKinney] FileBlock is a struct
c88e61a [Wes McKinney] Fix Python bindings after API changes
645a329 [Wes McKinney] Install stream.h
21378b4 [Wes McKinney] Collapse BaseStreamWriter and StreamWriter
b6c4578 [Wes McKinney] clang-format
12eb2cb [Wes McKinney] Add unit tests for streaming format, fix EOS, metadata length padding issues
3200b17 [Wes McKinney] Implement StreamReader
69fe82e [Wes McKinney] Implement rough draft of StreamWriter, share code with FileWriter
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/5888e10c
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/5888e10c
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/5888e10c
Branch: refs/heads/master
Commit: 5888e10cffac222e359d1b440b4684d16c061085
Parents: 8ca7033
Author: Wes McKinney <we...@twosigma.com>
Authored: Sat Jan 21 11:11:06 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sat Jan 21 11:11:06 2017 -0500
----------------------------------------------------------------------
cpp/CMakeLists.txt | 1 -
cpp/src/arrow/io/memory.cc | 4 +-
cpp/src/arrow/ipc/CMakeLists.txt | 2 +
cpp/src/arrow/ipc/adapter.cc | 44 ++---
cpp/src/arrow/ipc/adapter.h | 11 +-
cpp/src/arrow/ipc/file.cc | 167 +++++++++++++------
cpp/src/arrow/ipc/file.h | 54 ++++---
cpp/src/arrow/ipc/ipc-adapter-test.cc | 16 +-
cpp/src/arrow/ipc/ipc-file-test.cc | 188 +++++++++++++++++----
cpp/src/arrow/ipc/ipc-json-test.cc | 5 +-
cpp/src/arrow/ipc/ipc-metadata-test.cc | 83 +---------
cpp/src/arrow/ipc/json-integration-test.cc | 4 +-
cpp/src/arrow/ipc/json.cc | 19 +--
cpp/src/arrow/ipc/json.h | 3 +-
cpp/src/arrow/ipc/metadata-internal.cc | 8 +-
cpp/src/arrow/ipc/metadata-internal.h | 4 +-
cpp/src/arrow/ipc/metadata.cc | 121 +-------------
cpp/src/arrow/ipc/metadata.h | 32 +---
cpp/src/arrow/ipc/stream.cc | 206 ++++++++++++++++++++++++
cpp/src/arrow/ipc/stream.h | 112 +++++++++++++
cpp/src/arrow/ipc/test-common.h | 9 ++
python/pyarrow/includes/libarrow_ipc.pxd | 3 +-
python/pyarrow/ipc.pyx | 5 +-
python/src/pyarrow/adapters/pandas.cc | 34 ++--
24 files changed, 718 insertions(+), 417 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 885ab19..9039ffb 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -90,7 +90,6 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
option(ARROW_ALTIVEC
"Build Arrow with Altivec"
ON)
-
endif()
if(NOT ARROW_BUILD_TESTS)
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 0f5a0dc..1339a99 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -116,13 +116,13 @@ Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer)
Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
int64_t size = std::min(nbytes, size_ - position_);
- if (buffer_ != nullptr) {
+ if (size > 0 && buffer_ != nullptr) {
*out = SliceBuffer(buffer_, position_, size);
} else {
*out = std::make_shared<Buffer>(data_ + position_, size);
}
- position_ += nbytes;
+ position_ += size;
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index b7ac5f0..c047f53 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -46,6 +46,7 @@ set(ARROW_IPC_SRCS
json-internal.cc
metadata.cc
metadata-internal.cc
+ stream.cc
)
if(NOT APPLE)
@@ -151,6 +152,7 @@ install(FILES
file.h
json.h
metadata.h
+ stream.h
DESTINATION include/arrow/ipc)
# pkg-config support
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index 7b4d18c..9da7b39 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -49,10 +49,9 @@ namespace ipc {
class RecordBatchWriter : public ArrayVisitor {
public:
- RecordBatchWriter(const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows,
- int64_t buffer_start_offset, int max_recursion_depth)
- : columns_(columns),
- num_rows_(num_rows),
+ RecordBatchWriter(
+ const RecordBatch& batch, int64_t buffer_start_offset, int max_recursion_depth)
+ : batch_(batch),
max_recursion_depth_(max_recursion_depth),
buffer_start_offset_(buffer_start_offset) {}
@@ -79,8 +78,8 @@ class RecordBatchWriter : public ArrayVisitor {
}
// Perform depth-first traversal of the row-batch
- for (size_t i = 0; i < columns_.size(); ++i) {
- RETURN_NOT_OK(VisitArray(*columns_[i].get()));
+ for (int i = 0; i < batch_.num_columns(); ++i) {
+ RETURN_NOT_OK(VisitArray(*batch_.column(i)));
}
// The position for the start of a buffer relative to the passed frame of
@@ -126,18 +125,23 @@ class RecordBatchWriter : public ArrayVisitor {
// itself as an int32_t.
std::shared_ptr<Buffer> metadata_fb;
RETURN_NOT_OK(WriteRecordBatchMetadata(
- num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb));
+ batch_.num_rows(), body_length, field_nodes_, buffer_meta_, &metadata_fb));
// Need to write 4 bytes (metadata size), the metadata, plus padding to
- // fall on an 8-byte offset
- int64_t padded_metadata_length = BitUtil::CeilByte(metadata_fb->size() + 4);
+ // end on an 8-byte offset
+ int64_t start_offset;
+ RETURN_NOT_OK(dst->Tell(&start_offset));
+
+ int64_t padded_metadata_length = metadata_fb->size() + 4;
+ const int remainder = (padded_metadata_length + start_offset) % 8;
+ if (remainder != 0) { padded_metadata_length += 8 - remainder; }
// The returned metadata size includes the length prefix, the flatbuffer,
// plus padding
*metadata_length = static_cast<int32_t>(padded_metadata_length);
- // Write the flatbuffer size prefix
- int32_t flatbuffer_size = metadata_fb->size();
+ // Write the flatbuffer size prefix including padding
+ int32_t flatbuffer_size = padded_metadata_length - 4;
RETURN_NOT_OK(
dst->Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t)));
@@ -294,9 +298,7 @@ class RecordBatchWriter : public ArrayVisitor {
return Status::OK();
}
- // Do not copy this vector. Ownership must be retained elsewhere
- const std::vector<std::shared_ptr<Array>>& columns_;
- int32_t num_rows_;
+ const RecordBatch& batch_;
std::vector<flatbuf::FieldNode> field_nodes_;
std::vector<flatbuf::Buffer> buffer_meta_;
@@ -306,18 +308,16 @@ class RecordBatchWriter : public ArrayVisitor {
int64_t buffer_start_offset_;
};
-Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
- int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst,
- int32_t* metadata_length, int64_t* body_length, int max_recursion_depth) {
+Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
+ io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
+ int max_recursion_depth) {
DCHECK_GT(max_recursion_depth, 0);
- RecordBatchWriter serializer(
- columns, num_rows, buffer_start_offset, max_recursion_depth);
+ RecordBatchWriter serializer(batch, buffer_start_offset, max_recursion_depth);
return serializer.Write(dst, metadata_length, body_length);
}
-Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) {
- RecordBatchWriter serializer(
- batch->columns(), batch->num_rows(), 0, kMaxIpcRecursionDepth);
+Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
+ RecordBatchWriter serializer(batch, 0, kMaxIpcRecursionDepth);
RETURN_NOT_OK(serializer.GetTotalSize(size));
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index 963b9ee..f9ef7d9 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -71,17 +71,14 @@ constexpr int kMaxIpcRecursionDepth = 64;
//
// @param(out) body_length: the size of the contiguous buffer block plus
// padding bytes
-ARROW_EXPORT Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
- int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst,
- int32_t* metadata_length, int64_t* body_length,
- int max_recursion_depth = kMaxIpcRecursionDepth);
-
-// int64_t GetRecordBatchMetadata(const RecordBatch* batch);
+ARROW_EXPORT Status WriteRecordBatch(const RecordBatch& batch,
+ int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
+ int64_t* body_length, int max_recursion_depth = kMaxIpcRecursionDepth);
// Compute the precise number of bytes needed in a contiguous memory segment to
// write the record batch. This involves generating the complete serialized
// Flatbuffers metadata.
-ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size);
+ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
// ----------------------------------------------------------------------
// "Read" path; does not copy data if the input supports zero copy reads
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc
index d7d2e61..bc086e3 100644
--- a/cpp/src/arrow/ipc/file.cc
+++ b/cpp/src/arrow/ipc/file.cc
@@ -26,6 +26,7 @@
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/adapter.h"
+#include "arrow/ipc/metadata-internal.h"
#include "arrow/ipc/metadata.h"
#include "arrow/ipc/util.h"
#include "arrow/status.h"
@@ -35,82 +36,154 @@ namespace arrow {
namespace ipc {
static constexpr const char* kArrowMagicBytes = "ARROW1";
-
// ----------------------------------------------------------------------
-// Writer implementation
+// File footer
+
+static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
+FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
+ std::vector<flatbuf::Block> fb_blocks;
-FileWriter::FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema)
- : sink_(sink), schema_(schema), position_(-1), started_(false) {}
+ for (const FileBlock& block : blocks) {
+ fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length);
+ }
-Status FileWriter::UpdatePosition() {
- return sink_->Tell(&position_);
+ return fbb.CreateVectorOfStructs(fb_blocks);
}
-Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
- std::shared_ptr<FileWriter>* out) {
- *out = std::shared_ptr<FileWriter>(new FileWriter(sink, schema)); // ctor is private
- RETURN_NOT_OK((*out)->UpdatePosition());
- return Status::OK();
+Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
+ const std::vector<FileBlock>& record_batches, io::OutputStream* out) {
+ FBB fbb;
+
+ flatbuffers::Offset<flatbuf::Schema> fb_schema;
+ RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema));
+
+ auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
+ auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
+
+ auto footer = flatbuf::CreateFooter(
+ fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches);
+
+ fbb.Finish(footer);
+
+ int32_t size = fbb.GetSize();
+
+ return out->Write(fbb.GetBufferPointer(), size);
}
-Status FileWriter::Write(const uint8_t* data, int64_t nbytes) {
- RETURN_NOT_OK(sink_->Write(data, nbytes));
- position_ += nbytes;
- return Status::OK();
+static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
+ return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength());
}
-Status FileWriter::Align() {
- int64_t remainder = PaddedLength(position_) - position_;
- if (remainder > 0) { return Write(kPaddingBytes, remainder); }
+class FileFooter::FileFooterImpl {
+ public:
+ FileFooterImpl(const std::shared_ptr<Buffer>& buffer, const flatbuf::Footer* footer)
+ : buffer_(buffer), footer_(footer) {}
+
+ int num_dictionaries() const { return footer_->dictionaries()->size(); }
+
+ int num_record_batches() const { return footer_->recordBatches()->size(); }
+
+ MetadataVersion::type version() const {
+ switch (footer_->version()) {
+ case flatbuf::MetadataVersion_V1:
+ return MetadataVersion::V1;
+ case flatbuf::MetadataVersion_V2:
+ return MetadataVersion::V2;
+ // Add cases as other versions become available
+ default:
+ return MetadataVersion::V2;
+ }
+ }
+
+ FileBlock record_batch(int i) const {
+ return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
+ }
+
+ FileBlock dictionary(int i) const {
+ return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
+ }
+
+ Status GetSchema(std::shared_ptr<Schema>* out) const {
+ auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, footer_->schema());
+ return schema_msg->GetSchema(out);
+ }
+
+ private:
+ // Retain reference to memory
+ std::shared_ptr<Buffer> buffer_;
+
+ const flatbuf::Footer* footer_;
+};
+
+FileFooter::FileFooter() {}
+
+FileFooter::~FileFooter() {}
+
+Status FileFooter::Open(
+ const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out) {
+ const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data());
+
+ *out = std::unique_ptr<FileFooter>(new FileFooter());
+
+ // TODO(wesm): Verify the footer
+ (*out)->impl_.reset(new FileFooterImpl(buffer, footer));
+
return Status::OK();
}
-Status FileWriter::WriteAligned(const uint8_t* data, int64_t nbytes) {
- RETURN_NOT_OK(Write(data, nbytes));
- return Align();
+int FileFooter::num_dictionaries() const {
+ return impl_->num_dictionaries();
}
-Status FileWriter::Start() {
- RETURN_NOT_OK(WriteAligned(
- reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes)));
- started_ = true;
- return Status::OK();
+int FileFooter::num_record_batches() const {
+ return impl_->num_record_batches();
}
-Status FileWriter::CheckStarted() {
- if (!started_) { return Start(); }
- return Status::OK();
+MetadataVersion::type FileFooter::version() const {
+ return impl_->version();
}
-Status FileWriter::WriteRecordBatch(
- const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) {
- RETURN_NOT_OK(CheckStarted());
-
- int64_t offset = position_;
+FileBlock FileFooter::record_batch(int i) const {
+ return impl_->record_batch(i);
+}
- // There may be padding ever the end of the metadata, so we cannot rely on
- // position_
- int32_t metadata_length;
- int64_t body_length;
+FileBlock FileFooter::dictionary(int i) const {
+ return impl_->dictionary(i);
+}
- // Frame of reference in file format is 0, see ARROW-384
- const int64_t buffer_start_offset = 0;
- RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(
- columns, num_rows, buffer_start_offset, sink_, &metadata_length, &body_length));
- RETURN_NOT_OK(UpdatePosition());
+Status FileFooter::GetSchema(std::shared_ptr<Schema>* out) const {
+ return impl_->GetSchema(out);
+}
- DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned writes";
+// ----------------------------------------------------------------------
+// File writer implementation
- // Append metadata, to be written in the footer later
- record_batches_.emplace_back(offset, metadata_length, body_length);
+Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+ std::shared_ptr<FileWriter>* out) {
+ *out = std::shared_ptr<FileWriter>(new FileWriter(sink, schema)); // ctor is private
+ RETURN_NOT_OK((*out)->UpdatePosition());
+ return Status::OK();
+}
+Status FileWriter::Start() {
+ RETURN_NOT_OK(WriteAligned(
+ reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes)));
+ started_ = true;
return Status::OK();
}
+Status FileWriter::WriteRecordBatch(const RecordBatch& batch) {
+ // Push an empty FileBlock
+ // Append metadata, to be written in the footer later
+ record_batches_.emplace_back(0, 0, 0);
+ return StreamWriter::WriteRecordBatch(
+ batch, &record_batches_[record_batches_.size() - 1]);
+}
+
Status FileWriter::Close() {
// Write metadata
int64_t initial_position = position_;
- RETURN_NOT_OK(WriteFileFooter(schema_.get(), dictionaries_, record_batches_, sink_));
+ RETURN_NOT_OK(WriteFileFooter(*schema_, dictionaries_, record_batches_, sink_));
RETURN_NOT_OK(UpdatePosition());
// Write footer length
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.h b/cpp/src/arrow/ipc/file.h
index 4f35c37..7696954 100644
--- a/cpp/src/arrow/ipc/file.h
+++ b/cpp/src/arrow/ipc/file.h
@@ -25,13 +25,12 @@
#include <vector>
#include "arrow/ipc/metadata.h"
+#include "arrow/ipc/stream.h"
#include "arrow/util/visibility.h"
namespace arrow {
-class Array;
class Buffer;
-struct Field;
class RecordBatch;
class Schema;
class Status;
@@ -45,40 +44,43 @@ class ReadableFileInterface;
namespace ipc {
-class ARROW_EXPORT FileWriter {
- public:
- static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
- std::shared_ptr<FileWriter>* out);
+Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
+ const std::vector<FileBlock>& record_batches, io::OutputStream* out);
- // TODO(wesm): Write dictionaries
+class ARROW_EXPORT FileFooter {
+ public:
+ ~FileFooter();
- Status WriteRecordBatch(
- const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows);
+ static Status Open(
+ const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out);
- Status Close();
+ int num_dictionaries() const;
+ int num_record_batches() const;
+ MetadataVersion::type version() const;
- private:
- FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
+ FileBlock record_batch(int i) const;
+ FileBlock dictionary(int i) const;
- Status CheckStarted();
- Status Start();
+ Status GetSchema(std::shared_ptr<Schema>* out) const;
- Status UpdatePosition();
+ private:
+ FileFooter();
+ class FileFooterImpl;
+ std::unique_ptr<FileFooterImpl> impl_;
+};
- // Adds padding bytes if necessary to ensure all memory blocks are written on
- // 8-byte boundaries.
- Status Align();
+class ARROW_EXPORT FileWriter : public StreamWriter {
+ public:
+ static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+ std::shared_ptr<FileWriter>* out);
- // Write data and update position
- Status Write(const uint8_t* data, int64_t nbytes);
+ Status WriteRecordBatch(const RecordBatch& batch) override;
+ Status Close() override;
- // Write and align
- Status WriteAligned(const uint8_t* data, int64_t nbytes);
+ private:
+ using StreamWriter::StreamWriter;
- io::OutputStream* sink_;
- std::shared_ptr<Schema> schema_;
- int64_t position_;
- bool started_;
+ Status Start() override;
std::vector<FileBlock> dictionaries_;
std::vector<FileBlock> record_batches_;
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index 6ba0a6e..17868f8 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -55,8 +55,8 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
const int64_t buffer_offset = 0;
- RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), buffer_offset,
- mmap_.get(), &metadata_length, &body_length));
+ RETURN_NOT_OK(WriteRecordBatch(
+ batch, buffer_offset, mmap_.get(), &metadata_length, &body_length));
std::shared_ptr<RecordBatchMetadata> metadata;
RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata));
@@ -102,9 +102,8 @@ void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
int32_t mock_metadata_length = -1;
int64_t mock_body_length = -1;
int64_t size = -1;
- ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), 0, &mock,
- &mock_metadata_length, &mock_body_length));
- ASSERT_OK(GetRecordBatchSize(batch.get(), &size));
+ ASSERT_OK(WriteRecordBatch(*batch, 0, &mock, &mock_metadata_length, &mock_body_length));
+ ASSERT_OK(GetRecordBatchSize(*batch, &size));
ASSERT_EQ(mock.GetExtentBytesWritten(), size);
}
@@ -157,11 +156,10 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
if (override_level) {
- return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(),
- metadata_length, body_length, recursion_level + 1);
+ return WriteRecordBatch(
+ *batch, 0, mmap_.get(), metadata_length, body_length, recursion_level + 1);
} else {
- return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(),
- metadata_length, body_length);
+ return WriteRecordBatch(*batch, 0, mmap_.get(), metadata_length, body_length);
}
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/ipc-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc
index 0a9f677..15ceb80 100644
--- a/cpp/src/arrow/ipc/ipc-file-test.cc
+++ b/cpp/src/arrow/ipc/ipc-file-test.cc
@@ -29,6 +29,7 @@
#include "arrow/io/test-common.h"
#include "arrow/ipc/adapter.h"
#include "arrow/ipc/file.h"
+#include "arrow/ipc/stream.h"
#include "arrow/ipc/test-common.h"
#include "arrow/ipc/util.h"
@@ -41,6 +42,19 @@
namespace arrow {
namespace ipc {
+void CompareBatch(const RecordBatch& left, const RecordBatch& right) {
+ ASSERT_TRUE(left.schema()->Equals(right.schema()));
+ ASSERT_EQ(left.num_columns(), right.num_columns())
+ << left.schema()->ToString() << " result: " << right.schema()->ToString();
+ EXPECT_EQ(left.num_rows(), right.num_rows());
+ for (int i = 0; i < left.num_columns(); ++i) {
+ EXPECT_TRUE(left.column(i)->Equals(right.column(i)))
+ << "Idx: " << i << " Name: " << left.column_name(i);
+ }
+}
+
+using BatchVector = std::vector<std::shared_ptr<RecordBatch>>;
+
class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
public:
void SetUp() {
@@ -50,43 +64,94 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
}
void TearDown() {}
- Status RoundTripHelper(
- const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>* out_batches) {
+ Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) {
// Write the file
- RETURN_NOT_OK(FileWriter::Open(sink_.get(), batch.schema(), &file_writer_));
- int num_batches = 3;
- for (int i = 0; i < num_batches; ++i) {
- RETURN_NOT_OK(file_writer_->WriteRecordBatch(batch.columns(), batch.num_rows()));
+ std::shared_ptr<FileWriter> writer;
+ RETURN_NOT_OK(FileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer));
+
+ const int num_batches = static_cast<int>(in_batches.size());
+
+ for (const auto& batch : in_batches) {
+ RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
}
- RETURN_NOT_OK(file_writer_->Close());
+ RETURN_NOT_OK(writer->Close());
// Current offset into stream is the end of the file
int64_t footer_offset;
RETURN_NOT_OK(sink_->Tell(&footer_offset));
// Open the file
- auto reader = std::make_shared<io::BufferReader>(buffer_);
- RETURN_NOT_OK(FileReader::Open(reader, footer_offset, &file_reader_));
+ auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+ std::shared_ptr<FileReader> reader;
+ RETURN_NOT_OK(FileReader::Open(buf_reader, footer_offset, &reader));
- EXPECT_EQ(num_batches, file_reader_->num_record_batches());
-
- out_batches->resize(num_batches);
+ EXPECT_EQ(num_batches, reader->num_record_batches());
for (int i = 0; i < num_batches; ++i) {
- RETURN_NOT_OK(file_reader_->GetRecordBatch(i, &(*out_batches)[i]));
+ std::shared_ptr<RecordBatch> chunk;
+ RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
+ out_batches->emplace_back(chunk);
}
return Status::OK();
}
- void CompareBatch(const RecordBatch* left, const RecordBatch* right) {
- ASSERT_TRUE(left->schema()->Equals(right->schema()));
- ASSERT_EQ(left->num_columns(), right->num_columns())
- << left->schema()->ToString() << " result: " << right->schema()->ToString();
- EXPECT_EQ(left->num_rows(), right->num_rows());
- for (int i = 0; i < left->num_columns(); ++i) {
- EXPECT_TRUE(left->column(i)->Equals(right->column(i)))
- << "Idx: " << i << " Name: " << left->column_name(i);
+ protected:
+ MemoryPool* pool_;
+
+ std::unique_ptr<io::BufferOutputStream> sink_;
+ std::shared_ptr<PoolBuffer> buffer_;
+};
+
+TEST_P(TestFileFormat, RoundTrip) {
+ std::shared_ptr<RecordBatch> batch1;
+ std::shared_ptr<RecordBatch> batch2;
+ ASSERT_OK((*GetParam())(&batch1)); // NOLINT clang-tidy gtest issue
+ ASSERT_OK((*GetParam())(&batch2)); // NOLINT clang-tidy gtest issue
+
+ std::vector<std::shared_ptr<RecordBatch>> in_batches = {batch1, batch2};
+ std::vector<std::shared_ptr<RecordBatch>> out_batches;
+
+ ASSERT_OK(RoundTripHelper(in_batches, &out_batches));
+
+ // Compare batches
+ for (size_t i = 0; i < in_batches.size(); ++i) {
+ CompareBatch(*in_batches[i], *out_batches[i]);
+ }
+}
+
+class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
+ public:
+ void SetUp() {
+ pool_ = default_memory_pool();
+ buffer_ = std::make_shared<PoolBuffer>(pool_);
+ sink_.reset(new io::BufferOutputStream(buffer_));
+ }
+ void TearDown() {}
+
+ Status RoundTripHelper(
+ const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>* out_batches) {
+ // Write the file
+ std::shared_ptr<StreamWriter> writer;
+ RETURN_NOT_OK(StreamWriter::Open(sink_.get(), batch.schema(), &writer));
+ int num_batches = 5;
+ for (int i = 0; i < num_batches; ++i) {
+ RETURN_NOT_OK(writer->WriteRecordBatch(batch));
+ }
+ RETURN_NOT_OK(writer->Close());
+
+ // Open the file
+ auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+
+ std::shared_ptr<StreamReader> reader;
+ RETURN_NOT_OK(StreamReader::Open(buf_reader, &reader));
+
+ std::shared_ptr<RecordBatch> chunk;
+ while (true) {
+ RETURN_NOT_OK(reader->GetNextRecordBatch(&chunk));
+ if (chunk == nullptr) { break; }
+ out_batches->emplace_back(chunk);
}
+ return Status::OK();
}
protected:
@@ -94,12 +159,9 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
std::unique_ptr<io::BufferOutputStream> sink_;
std::shared_ptr<PoolBuffer> buffer_;
-
- std::shared_ptr<FileWriter> file_writer_;
- std::shared_ptr<FileReader> file_reader_;
};
-TEST_P(TestFileFormat, RoundTrip) {
+TEST_P(TestStreamFormat, RoundTrip) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
@@ -109,14 +171,80 @@ TEST_P(TestFileFormat, RoundTrip) {
// Compare batches. Same
for (size_t i = 0; i < out_batches.size(); ++i) {
- CompareBatch(batch.get(), out_batches[i].get());
+ CompareBatch(*batch, *out_batches[i]);
}
}
-INSTANTIATE_TEST_CASE_P(RoundTripTests, TestFileFormat,
- ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch,
- &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList,
- &MakeStringTypesRecordBatch, &MakeStruct));
+#define BATCH_CASES() \
+ ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, \
+ &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, &MakeStringTypesRecordBatch, \
+ &MakeStruct);
+
+INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES());
+INSTANTIATE_TEST_CASE_P(StreamRoundTripTests, TestStreamFormat, BATCH_CASES());
+
+class TestFileFooter : public ::testing::Test {
+ public:
+ void SetUp() {}
+
+ void CheckRoundtrip(const Schema& schema, const std::vector<FileBlock>& dictionaries,
+ const std::vector<FileBlock>& record_batches) {
+ auto buffer = std::make_shared<PoolBuffer>();
+ io::BufferOutputStream stream(buffer);
+
+ ASSERT_OK(WriteFileFooter(schema, dictionaries, record_batches, &stream));
+
+ std::unique_ptr<FileFooter> footer;
+ ASSERT_OK(FileFooter::Open(buffer, &footer));
+
+ ASSERT_EQ(MetadataVersion::V2, footer->version());
+
+ // Check schema
+ std::shared_ptr<Schema> schema2;
+ ASSERT_OK(footer->GetSchema(&schema2));
+ AssertSchemaEqual(schema, *schema2);
+
+ // Check blocks
+ ASSERT_EQ(dictionaries.size(), footer->num_dictionaries());
+ ASSERT_EQ(record_batches.size(), footer->num_record_batches());
+
+ for (int i = 0; i < footer->num_dictionaries(); ++i) {
+ CheckBlocks(dictionaries[i], footer->dictionary(i));
+ }
+
+ for (int i = 0; i < footer->num_record_batches(); ++i) {
+ CheckBlocks(record_batches[i], footer->record_batch(i));
+ }
+ }
+
+ void CheckBlocks(const FileBlock& left, const FileBlock& right) {
+ ASSERT_EQ(left.offset, right.offset);
+ ASSERT_EQ(left.metadata_length, right.metadata_length);
+ ASSERT_EQ(left.body_length, right.body_length);
+ }
+
+ private:
+ std::shared_ptr<Schema> example_schema_;
+};
+
+TEST_F(TestFileFooter, Basics) {
+ auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
+ auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>());
+ Schema schema({f0, f1});
+
+ std::vector<FileBlock> dictionaries;
+ dictionaries.emplace_back(8, 92, 900);
+ dictionaries.emplace_back(1000, 100, 1900);
+ dictionaries.emplace_back(3000, 100, 2900);
+
+ std::vector<FileBlock> record_batches;
+ record_batches.emplace_back(6000, 100, 900);
+ record_batches.emplace_back(7000, 100, 1900);
+ record_batches.emplace_back(9000, 100, 2900);
+ record_batches.emplace_back(12000, 100, 3900);
+
+ CheckRoundtrip(schema, dictionaries, record_batches);
+}
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/ipc-json-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc
index 0750989..30f968c 100644
--- a/cpp/src/arrow/ipc/ipc-json-test.cc
+++ b/cpp/src/arrow/ipc/ipc-json-test.cc
@@ -245,8 +245,9 @@ TEST(TestJsonFileReadWrite, BasicRoundTrip) {
std::vector<std::shared_ptr<Array>> arrays;
MakeBatchArrays(schema, num_rows, &arrays);
- batches.emplace_back(std::make_shared<RecordBatch>(schema, num_rows, arrays));
- ASSERT_OK(writer->WriteRecordBatch(arrays, num_rows));
+ auto batch = std::make_shared<RecordBatch>(schema, num_rows, arrays);
+ batches.push_back(batch);
+ ASSERT_OK(writer->WriteRecordBatch(*batch));
}
std::string result;
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/ipc-metadata-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc
index 7c5744a..098f996 100644
--- a/cpp/src/arrow/ipc/ipc-metadata-test.cc
+++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc
@@ -23,6 +23,7 @@
#include "arrow/io/memory.h"
#include "arrow/ipc/metadata.h"
+#include "arrow/ipc/test-common.h"
#include "arrow/schema.h"
#include "arrow/status.h"
#include "arrow/test-util.h"
@@ -34,20 +35,11 @@ class Buffer;
namespace ipc {
-static inline void assert_schema_equal(const Schema* lhs, const Schema* rhs) {
- if (!lhs->Equals(*rhs)) {
- std::stringstream ss;
- ss << "left schema: " << lhs->ToString() << std::endl
- << "right schema: " << rhs->ToString() << std::endl;
- FAIL() << ss.str();
- }
-}
-
class TestSchemaMetadata : public ::testing::Test {
public:
void SetUp() {}
- void CheckRoundtrip(const Schema* schema) {
+ void CheckRoundtrip(const Schema& schema) {
std::shared_ptr<Buffer> buffer;
ASSERT_OK(WriteSchema(schema, &buffer));
@@ -57,12 +49,12 @@ class TestSchemaMetadata : public ::testing::Test {
ASSERT_EQ(Message::SCHEMA, message->type());
auto schema_msg = std::make_shared<SchemaMetadata>(message);
- ASSERT_EQ(schema->num_fields(), schema_msg->num_fields());
+ ASSERT_EQ(schema.num_fields(), schema_msg->num_fields());
std::shared_ptr<Schema> schema2;
ASSERT_OK(schema_msg->GetSchema(&schema2));
- assert_schema_equal(schema, schema2.get());
+ AssertSchemaEqual(schema, *schema2);
}
};
@@ -82,7 +74,7 @@ TEST_F(TestSchemaMetadata, PrimitiveFields) {
auto f10 = std::make_shared<Field>("f10", std::make_shared<BooleanType>());
Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10});
- CheckRoundtrip(&schema);
+ CheckRoundtrip(schema);
}
TEST_F(TestSchemaMetadata, NestedFields) {
@@ -94,70 +86,7 @@ TEST_F(TestSchemaMetadata, NestedFields) {
auto f1 = std::make_shared<Field>("f1", type2);
Schema schema({f0, f1});
- CheckRoundtrip(&schema);
-}
-
-class TestFileFooter : public ::testing::Test {
- public:
- void SetUp() {}
-
- void CheckRoundtrip(const Schema* schema, const std::vector<FileBlock>& dictionaries,
- const std::vector<FileBlock>& record_batches) {
- auto buffer = std::make_shared<PoolBuffer>();
- io::BufferOutputStream stream(buffer);
-
- ASSERT_OK(WriteFileFooter(schema, dictionaries, record_batches, &stream));
-
- std::unique_ptr<FileFooter> footer;
- ASSERT_OK(FileFooter::Open(buffer, &footer));
-
- ASSERT_EQ(MetadataVersion::V2, footer->version());
-
- // Check schema
- std::shared_ptr<Schema> schema2;
- ASSERT_OK(footer->GetSchema(&schema2));
- assert_schema_equal(schema, schema2.get());
-
- // Check blocks
- ASSERT_EQ(dictionaries.size(), footer->num_dictionaries());
- ASSERT_EQ(record_batches.size(), footer->num_record_batches());
-
- for (int i = 0; i < footer->num_dictionaries(); ++i) {
- CheckBlocks(dictionaries[i], footer->dictionary(i));
- }
-
- for (int i = 0; i < footer->num_record_batches(); ++i) {
- CheckBlocks(record_batches[i], footer->record_batch(i));
- }
- }
-
- void CheckBlocks(const FileBlock& left, const FileBlock& right) {
- ASSERT_EQ(left.offset, right.offset);
- ASSERT_EQ(left.metadata_length, right.metadata_length);
- ASSERT_EQ(left.body_length, right.body_length);
- }
-
- private:
- std::shared_ptr<Schema> example_schema_;
-};
-
-TEST_F(TestFileFooter, Basics) {
- auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
- auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>());
- Schema schema({f0, f1});
-
- std::vector<FileBlock> dictionaries;
- dictionaries.emplace_back(8, 92, 900);
- dictionaries.emplace_back(1000, 100, 1900);
- dictionaries.emplace_back(3000, 100, 2900);
-
- std::vector<FileBlock> record_batches;
- record_batches.emplace_back(6000, 100, 900);
- record_batches.emplace_back(7000, 100, 1900);
- record_batches.emplace_back(9000, 100, 2900);
- record_batches.emplace_back(12000, 100, 3900);
-
- CheckRoundtrip(&schema, dictionaries, record_batches);
+ CheckRoundtrip(schema);
}
} // namespace ipc
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/json-integration-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc
index 757e6c0..95bc742 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -81,7 +81,7 @@ static Status ConvertJsonToArrow(
for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(reader->GetRecordBatch(i, &batch));
- RETURN_NOT_OK(writer->WriteRecordBatch(batch->columns(), batch->num_rows()));
+ RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
}
return writer->Close();
}
@@ -108,7 +108,7 @@ static Status ConvertArrowToJson(
for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(reader->GetRecordBatch(i, &batch));
- RETURN_NOT_OK(writer->WriteRecordBatch(batch->columns(), batch->num_rows()));
+ RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
}
std::string result;
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/json.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc
index 6e3a993..773fb74 100644
--- a/cpp/src/arrow/ipc/json.cc
+++ b/cpp/src/arrow/ipc/json.cc
@@ -64,25 +64,23 @@ class JsonWriter::JsonWriterImpl {
return Status::OK();
}
- Status WriteRecordBatch(
- const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) {
- DCHECK_EQ(static_cast<int>(columns.size()), schema_->num_fields());
+ Status WriteRecordBatch(const RecordBatch& batch) {
+ DCHECK_EQ(batch.num_columns(), schema_->num_fields());
writer_->StartObject();
writer_->Key("count");
- writer_->Int(num_rows);
+ writer_->Int(batch.num_rows());
writer_->Key("columns");
writer_->StartArray();
for (int i = 0; i < schema_->num_fields(); ++i) {
- const std::shared_ptr<Array>& column = columns[i];
+ const std::shared_ptr<Array>& column = batch.column(i);
- DCHECK_EQ(num_rows, column->length())
+ DCHECK_EQ(batch.num_rows(), column->length())
<< "Array length did not match record batch length";
- RETURN_NOT_OK(
- WriteJsonArray(schema_->field(i)->name, *column.get(), writer_.get()));
+ RETURN_NOT_OK(WriteJsonArray(schema_->field(i)->name, *column, writer_.get()));
}
writer_->EndArray();
@@ -113,9 +111,8 @@ Status JsonWriter::Finish(std::string* result) {
return impl_->Finish(result);
}
-Status JsonWriter::WriteRecordBatch(
- const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows) {
- return impl_->WriteRecordBatch(columns, num_rows);
+Status JsonWriter::WriteRecordBatch(const RecordBatch& batch) {
+ return impl_->WriteRecordBatch(batch);
}
// ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/json.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json.h b/cpp/src/arrow/ipc/json.h
index 7395be4..88afdfa 100644
--- a/cpp/src/arrow/ipc/json.h
+++ b/cpp/src/arrow/ipc/json.h
@@ -46,8 +46,7 @@ class ARROW_EXPORT JsonWriter {
// TODO(wesm): Write dictionaries
- Status WriteRecordBatch(
- const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows);
+ Status WriteRecordBatch(const RecordBatch& batch);
Status Finish(std::string* result);
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index cc160c4..cd77220 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -282,10 +282,10 @@ flatbuf::Endianness endianness() {
}
Status SchemaToFlatbuffer(
- FBB& fbb, const Schema* schema, flatbuffers::Offset<flatbuf::Schema>* out) {
+ FBB& fbb, const Schema& schema, flatbuffers::Offset<flatbuf::Schema>* out) {
std::vector<FieldOffset> field_offsets;
- for (int i = 0; i < schema->num_fields(); ++i) {
- std::shared_ptr<Field> field = schema->field(i);
+ for (int i = 0; i < schema.num_fields(); ++i) {
+ std::shared_ptr<Field> field = schema.field(i);
FieldOffset offset;
RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, &offset));
field_offsets.push_back(offset);
@@ -295,7 +295,7 @@ Status SchemaToFlatbuffer(
return Status::OK();
}
-Status MessageBuilder::SetSchema(const Schema* schema) {
+Status MessageBuilder::SetSchema(const Schema& schema) {
flatbuffers::Offset<flatbuf::Schema> fb_schema;
RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, &fb_schema));
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
index 4826ebe..d94a8ab 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -49,11 +49,11 @@ static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVe
Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr<Field>* out);
Status SchemaToFlatbuffer(
- FBB& fbb, const Schema* schema, flatbuffers::Offset<flatbuf::Schema>* out);
+ FBB& fbb, const Schema& schema, flatbuffers::Offset<flatbuf::Schema>* out);
class MessageBuilder {
public:
- Status SetSchema(const Schema* schema);
+ Status SetSchema(const Schema& schema);
Status SetRecordBatch(int32_t length, int64_t body_length,
const std::vector<flatbuf::FieldNode>& nodes,
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index f0674ff..a97965c 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -38,7 +38,7 @@ namespace flatbuf = org::apache::arrow::flatbuf;
namespace ipc {
-Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out) {
+Status WriteSchema(const Schema& schema, std::shared_ptr<Buffer>* out) {
MessageBuilder message;
RETURN_NOT_OK(message.SetSchema(schema));
RETURN_NOT_OK(message.Finish());
@@ -232,124 +232,5 @@ int RecordBatchMetadata::num_fields() const {
return impl_->num_fields();
}
-// ----------------------------------------------------------------------
-// File footer
-
-static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
-FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
- std::vector<flatbuf::Block> fb_blocks;
-
- for (const FileBlock& block : blocks) {
- fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length);
- }
-
- return fbb.CreateVectorOfStructs(fb_blocks);
-}
-
-Status WriteFileFooter(const Schema* schema, const std::vector<FileBlock>& dictionaries,
- const std::vector<FileBlock>& record_batches, io::OutputStream* out) {
- FBB fbb;
-
- flatbuffers::Offset<flatbuf::Schema> fb_schema;
- RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema));
-
- auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
- auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
-
- auto footer = flatbuf::CreateFooter(
- fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches);
-
- fbb.Finish(footer);
-
- int32_t size = fbb.GetSize();
-
- return out->Write(fbb.GetBufferPointer(), size);
-}
-
-static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
- return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength());
-}
-
-class FileFooter::FileFooterImpl {
- public:
- FileFooterImpl(const std::shared_ptr<Buffer>& buffer, const flatbuf::Footer* footer)
- : buffer_(buffer), footer_(footer) {}
-
- int num_dictionaries() const { return footer_->dictionaries()->size(); }
-
- int num_record_batches() const { return footer_->recordBatches()->size(); }
-
- MetadataVersion::type version() const {
- switch (footer_->version()) {
- case flatbuf::MetadataVersion_V1:
- return MetadataVersion::V1;
- case flatbuf::MetadataVersion_V2:
- return MetadataVersion::V2;
- // Add cases as other versions become available
- default:
- return MetadataVersion::V2;
- }
- }
-
- FileBlock record_batch(int i) const {
- return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
- }
-
- FileBlock dictionary(int i) const {
- return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
- }
-
- Status GetSchema(std::shared_ptr<Schema>* out) const {
- auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, footer_->schema());
- return schema_msg->GetSchema(out);
- }
-
- private:
- // Retain reference to memory
- std::shared_ptr<Buffer> buffer_;
-
- const flatbuf::Footer* footer_;
-};
-
-FileFooter::FileFooter() {}
-
-FileFooter::~FileFooter() {}
-
-Status FileFooter::Open(
- const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out) {
- const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data());
-
- *out = std::unique_ptr<FileFooter>(new FileFooter());
-
- // TODO(wesm): Verify the footer
- (*out)->impl_.reset(new FileFooterImpl(buffer, footer));
-
- return Status::OK();
-}
-
-int FileFooter::num_dictionaries() const {
- return impl_->num_dictionaries();
-}
-
-int FileFooter::num_record_batches() const {
- return impl_->num_record_batches();
-}
-
-MetadataVersion::type FileFooter::version() const {
- return impl_->version();
-}
-
-FileBlock FileFooter::record_batch(int i) const {
- return impl_->record_batch(i);
-}
-
-FileBlock FileFooter::dictionary(int i) const {
- return impl_->dictionary(i);
-}
-
-Status FileFooter::GetSchema(std::shared_ptr<Schema>* out) const {
- return impl_->GetSchema(out);
-}
-
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 1c4ef64..6e15ef3 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -49,7 +49,7 @@ struct MetadataVersion {
// Serialize arrow::Schema as a Flatbuffer
ARROW_EXPORT
-Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out);
+Status WriteSchema(const Schema& schema, std::shared_ptr<Buffer>* out);
// Read interface classes. We do not fully deserialize the flatbuffers so that
// individual fields metadata can be retrieved from very large schema without
@@ -149,10 +149,8 @@ class ARROW_EXPORT Message {
std::unique_ptr<MessageImpl> impl_;
};
-// ----------------------------------------------------------------------
-// File footer for file-like representation
-
struct FileBlock {
+ FileBlock() {}
FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length)
: offset(offset), metadata_length(metadata_length), body_length(body_length) {}
@@ -161,32 +159,6 @@ struct FileBlock {
int64_t body_length;
};
-ARROW_EXPORT
-Status WriteFileFooter(const Schema* schema, const std::vector<FileBlock>& dictionaries,
- const std::vector<FileBlock>& record_batches, io::OutputStream* out);
-
-class ARROW_EXPORT FileFooter {
- public:
- ~FileFooter();
-
- static Status Open(
- const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out);
-
- int num_dictionaries() const;
- int num_record_batches() const;
- MetadataVersion::type version() const;
-
- FileBlock record_batch(int i) const;
- FileBlock dictionary(int i) const;
-
- Status GetSchema(std::shared_ptr<Schema>* out) const;
-
- private:
- FileFooter();
- class FileFooterImpl;
- std::unique_ptr<FileFooterImpl> impl_;
-};
-
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.cc b/cpp/src/arrow/ipc/stream.cc
new file mode 100644
index 0000000..a2ca672
--- /dev/null
+++ b/cpp/src/arrow/ipc/stream.cc
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/ipc/stream.h"
+
+#include <cstdint>
+#include <cstring>
+#include <sstream>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/adapter.h"
+#include "arrow/ipc/metadata.h"
+#include "arrow/ipc/util.h"
+#include "arrow/schema.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace ipc {
+
+// ----------------------------------------------------------------------
+// Stream writer implementation
+
+StreamWriter::~StreamWriter() {}
+
+StreamWriter::StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema)
+ : sink_(sink), schema_(schema), position_(-1), started_(false) {}
+
+Status StreamWriter::UpdatePosition() {
+ return sink_->Tell(&position_);
+}
+
+Status StreamWriter::Write(const uint8_t* data, int64_t nbytes) {
+ RETURN_NOT_OK(sink_->Write(data, nbytes));
+ position_ += nbytes;
+ return Status::OK();
+}
+
+Status StreamWriter::Align() {
+ int64_t remainder = PaddedLength(position_) - position_;
+ if (remainder > 0) { return Write(kPaddingBytes, remainder); }
+ return Status::OK();
+}
+
+Status StreamWriter::WriteAligned(const uint8_t* data, int64_t nbytes) {
+ RETURN_NOT_OK(Write(data, nbytes));
+ return Align();
+}
+
+Status StreamWriter::CheckStarted() {
+ if (!started_) { return Start(); }
+ return Status::OK();
+}
+
+Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, FileBlock* block) {
+ RETURN_NOT_OK(CheckStarted());
+
+ block->offset = position_;
+
+ // Frame of reference in file format is 0, see ARROW-384
+ const int64_t buffer_start_offset = 0;
+ RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(
+ batch, buffer_start_offset, sink_, &block->metadata_length, &block->body_length));
+ RETURN_NOT_OK(UpdatePosition());
+
+ DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned writes";
+
+ return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// StreamWriter implementation
+
+Status StreamWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+ std::shared_ptr<StreamWriter>* out) {
+ // ctor is private
+ *out = std::shared_ptr<StreamWriter>(new StreamWriter(sink, schema));
+ RETURN_NOT_OK((*out)->UpdatePosition());
+ return Status::OK();
+}
+
+Status StreamWriter::Start() {
+ std::shared_ptr<Buffer> schema_fb;
+ RETURN_NOT_OK(WriteSchema(*schema_, &schema_fb));
+
+ int32_t flatbuffer_size = schema_fb->size();
+ RETURN_NOT_OK(
+ Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t)));
+
+ // Write the flatbuffer
+ RETURN_NOT_OK(Write(schema_fb->data(), flatbuffer_size));
+ started_ = true;
+ return Status::OK();
+}
+
+Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) {
+ // Pass FileBlock, but results not used
+ FileBlock dummy_block;
+ return WriteRecordBatch(batch, &dummy_block);
+}
+
+Status StreamWriter::Close() {
+ // Close the stream
+ RETURN_NOT_OK(CheckStarted());
+ return sink_->Close();
+}
+
+// ----------------------------------------------------------------------
+// StreamReader implementation
+
+StreamReader::StreamReader(const std::shared_ptr<io::InputStream>& stream)
+ : stream_(stream), schema_(nullptr) {}
+
+StreamReader::~StreamReader() {}
+
+Status StreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
+ std::shared_ptr<StreamReader>* reader) {
+ // Private ctor
+ *reader = std::shared_ptr<StreamReader>(new StreamReader(stream));
+ return (*reader)->ReadSchema();
+}
+
+Status StreamReader::ReadSchema() {
+ std::shared_ptr<Message> message;
+ RETURN_NOT_OK(ReadNextMessage(&message));
+
+ if (message->type() != Message::SCHEMA) {
+ return Status::IOError("First message was not schema type");
+ }
+
+ SchemaMetadata schema_meta(message);
+
+ // TODO(wesm): If the schema contains dictionaries, we must read all the
+ // dictionaries from the stream before constructing the final Schema
+ return schema_meta.GetSchema(&schema_);
+}
+
+Status StreamReader::ReadNextMessage(std::shared_ptr<Message>* message) {
+ std::shared_ptr<Buffer> buffer;
+ RETURN_NOT_OK(stream_->Read(sizeof(int32_t), &buffer));
+
+ if (buffer->size() != sizeof(int32_t)) {
+ *message = nullptr;
+ return Status::OK();
+ }
+
+ int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data());
+
+ RETURN_NOT_OK(stream_->Read(message_length, &buffer));
+ if (buffer->size() != message_length) {
+ return Status::IOError("Unexpected end of stream trying to read message");
+ }
+ return Message::Open(buffer, 0, message);
+}
+
+std::shared_ptr<Schema> StreamReader::schema() const {
+ return schema_;
+}
+
+Status StreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
+ std::shared_ptr<Message> message;
+ RETURN_NOT_OK(ReadNextMessage(&message));
+
+ if (message == nullptr) {
+ // End of stream
+ *batch = nullptr;
+ return Status::OK();
+ }
+
+ if (message->type() != Message::RECORD_BATCH) {
+ return Status::IOError("Metadata not record batch");
+ }
+
+ auto batch_metadata = std::make_shared<RecordBatchMetadata>(message);
+
+ std::shared_ptr<Buffer> batch_body;
+ RETURN_NOT_OK(stream_->Read(message->body_length(), &batch_body));
+
+ if (batch_body->size() < message->body_length()) {
+ return Status::IOError("Unexpected EOS when reading message body");
+ }
+
+ io::BufferReader reader(batch_body);
+
+ return ReadRecordBatch(batch_metadata, schema_, &reader, batch);
+}
+
+} // namespace ipc
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/stream.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.h b/cpp/src/arrow/ipc/stream.h
new file mode 100644
index 0000000..0b0e62f
--- /dev/null
+++ b/cpp/src/arrow/ipc/stream.h
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Implement Arrow streaming binary format
+
+#ifndef ARROW_IPC_STREAM_H
+#define ARROW_IPC_STREAM_H
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Array;
+class Buffer;
+struct Field;
+class RecordBatch;
+class Schema;
+class Status;
+
+namespace io {
+
+class InputStream;
+class OutputStream;
+
+} // namespace io
+
+namespace ipc {
+
+struct FileBlock;
+class Message;
+
+class ARROW_EXPORT StreamWriter {
+ public:
+ virtual ~StreamWriter();
+
+ static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+ std::shared_ptr<StreamWriter>* out);
+
+ virtual Status WriteRecordBatch(const RecordBatch& batch);
+ virtual Status Close();
+
+ protected:
+ StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
+
+ virtual Status Start();
+
+ Status CheckStarted();
+ Status UpdatePosition();
+
+ Status WriteRecordBatch(const RecordBatch& batch, FileBlock* block);
+
+ // Adds padding bytes if necessary to ensure all memory blocks are written on
+ // 8-byte boundaries.
+ Status Align();
+
+ // Write data and update position
+ Status Write(const uint8_t* data, int64_t nbytes);
+
+ // Write and align
+ Status WriteAligned(const uint8_t* data, int64_t nbytes);
+
+ io::OutputStream* sink_;
+ std::shared_ptr<Schema> schema_;
+ int64_t position_;
+ bool started_;
+};
+
+class ARROW_EXPORT StreamReader {
+ public:
+ ~StreamReader();
+
+ // Open an stream.
+ static Status Open(const std::shared_ptr<io::InputStream>& stream,
+ std::shared_ptr<StreamReader>* reader);
+
+ std::shared_ptr<Schema> schema() const;
+
+ // Returned batch is nullptr when end of stream reached
+ Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch);
+
+ private:
+ explicit StreamReader(const std::shared_ptr<io::InputStream>& stream);
+
+ Status ReadSchema();
+
+ Status ReadNextMessage(std::shared_ptr<Message>* message);
+
+ std::shared_ptr<io::InputStream> stream_;
+ std::shared_ptr<Schema> schema_;
+};
+
+} // namespace ipc
+} // namespace arrow
+
+#endif // ARROW_IPC_STREAM_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 3faeebf..ca790de 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -36,6 +36,15 @@
namespace arrow {
namespace ipc {
+static inline void AssertSchemaEqual(const Schema& lhs, const Schema& rhs) {
+ if (!lhs.Equals(rhs)) {
+ std::stringstream ss;
+ ss << "left schema: " << lhs.ToString() << std::endl
+ << "right schema: " << rhs.ToString() << std::endl;
+ FAIL() << ss.str();
+ }
+}
+
const auto kListInt32 = list(int32());
const auto kListListInt32 = list(kListInt32);
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/python/pyarrow/includes/libarrow_ipc.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd
index b3185b1..8295760 100644
--- a/python/pyarrow/includes/libarrow_ipc.pxd
+++ b/python/pyarrow/includes/libarrow_ipc.pxd
@@ -29,8 +29,7 @@ cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil:
CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
shared_ptr[CFileWriter]* out)
- CStatus WriteRecordBatch(const vector[shared_ptr[CArray]]& columns,
- int32_t num_rows)
+ CStatus WriteRecordBatch(const CRecordBatch& batch)
CStatus Close()
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/python/pyarrow/ipc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.pyx b/python/pyarrow/ipc.pyx
index abc5e1b..22069a7 100644
--- a/python/pyarrow/ipc.pyx
+++ b/python/pyarrow/ipc.pyx
@@ -21,6 +21,8 @@
# distutils: language = c++
# cython: embedsignature = True
+from cython.operator cimport dereference as deref
+
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_io cimport *
from pyarrow.includes.libarrow_ipc cimport *
@@ -58,10 +60,9 @@ cdef class ArrowFileWriter:
self.close()
def write_record_batch(self, RecordBatch batch):
- cdef CRecordBatch* bptr = batch.batch
with nogil:
check_status(self.writer.get()
- .WriteRecordBatch(bptr.columns(), bptr.num_rows()))
+ .WriteRecordBatch(deref(batch.batch)))
def close(self):
with nogil:
http://git-wip-us.apache.org/repos/asf/arrow/blob/5888e10c/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index 6623e23..feafa3d 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -254,16 +254,16 @@ struct arrow_traits<Type::BOOL> {
static constexpr bool is_numeric_nullable = false;
};
-#define INT_DECL(TYPE) \
- template <> \
- struct arrow_traits<Type::TYPE> { \
- static constexpr int npy_type = NPY_##TYPE; \
- static constexpr bool supports_nulls = false; \
- static constexpr double na_value = NAN; \
- static constexpr bool is_boolean = false; \
- static constexpr bool is_numeric_not_nullable = true; \
- static constexpr bool is_numeric_nullable = false; \
- typedef typename npy_traits<NPY_##TYPE>::value_type T; \
+#define INT_DECL(TYPE) \
+ template <> \
+ struct arrow_traits<Type::TYPE> { \
+ static constexpr int npy_type = NPY_##TYPE; \
+ static constexpr bool supports_nulls = false; \
+ static constexpr double na_value = NAN; \
+ static constexpr bool is_boolean = false; \
+ static constexpr bool is_numeric_not_nullable = true; \
+ static constexpr bool is_numeric_nullable = false; \
+ typedef typename npy_traits<NPY_##TYPE>::value_type T; \
};
INT_DECL(INT8);
@@ -1803,7 +1803,7 @@ class ArrowDeserializer {
// types
Status Convert(PyObject** out) {
-#define CONVERT_CASE(TYPE) \
+#define CONVERT_CASE(TYPE) \
case Type::TYPE: { \
RETURN_NOT_OK(ConvertValues<Type::TYPE>()); \
} break;
@@ -1857,8 +1857,7 @@ class ArrowDeserializer {
}
template <int TYPE>
- inline typename std::enable_if<TYPE == Type::DATE, Status>::type
- ConvertValues() {
+ inline typename std::enable_if<TYPE == Type::DATE, Status>::type ConvertValues() {
typedef typename arrow_traits<TYPE>::T T;
RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type));
@@ -1910,24 +1909,21 @@ class ArrowDeserializer {
// UTF8 strings
template <int TYPE>
- inline typename std::enable_if<TYPE == Type::STRING, Status>::type
- ConvertValues() {
+ inline typename std::enable_if<TYPE == Type::STRING, Status>::type ConvertValues() {
RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
return ConvertBinaryLike<arrow::StringArray>(data_, out_values);
}
template <int T2>
- inline typename std::enable_if<T2 == Type::BINARY, Status>::type
- ConvertValues() {
+ inline typename std::enable_if<T2 == Type::BINARY, Status>::type ConvertValues() {
RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
return ConvertBinaryLike<arrow::BinaryArray>(data_, out_values);
}
template <int TYPE>
- inline typename std::enable_if<TYPE == Type::DICTIONARY, Status>::type
- ConvertValues() {
+ inline typename std::enable_if<TYPE == Type::DICTIONARY, Status>::type ConvertValues() {
std::shared_ptr<PandasBlock> block;
RETURN_NOT_OK(MakeCategoricalBlock(col_->type(), col_->length(), &block));
RETURN_NOT_OK(block->Write(col_, 0, 0));