You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/15 20:51:58 UTC
[2/2] arrow git commit: ARROW-1214: [Python/C++] Add C++
functionality to more easily handle encapsulated IPC messages, Python bindings
ARROW-1214: [Python/C++] Add C++ functionality to more easily handle encapsulated IPC messages, Python bindings
This patch does a bunch of things:
* Decouples the RecordBatchStreamReader from the actual message iteration (which is handled by a new `arrow::ipc::MessageReader` interface
* Enables `arrow::ipc::Message` to hold all of the memory for a complete unit of data: metadata plus body
* Renames some IPC methods for better consistency (GetNextRecordBatch -> ReadNextRecordBatch)
* Adds function to serialize a complete encapsulated message to an `arrow::io::OutputStream*
* Add Python bindings for all of the above, introduce `pyarrow.Message`, `pyarrow.MessageReader`. Add `read_message` and `Message.serialize` functions for efficient memory round trips
* Add `pyarrow.read_record_batch` for reading a single record batch given a message and a known schema
Later we will want to add `pyarrow.read_schema`, but it seemed like a bit of work to make it work for dictionaries.
This implements the C++ analogue to ARROW-1047, which was for Java. Not sure why I didn't create a JIRA about this. cc @icexelloss
Author: Wes McKinney <we...@twosigma.com>
Closes #839 from wesm/ARROW-1214 and squashes the following commits:
07f1820a [Wes McKinney] Refactor to introduce MessageReader abstract type, use unique_ptr for messages instead of shared_ptr. First cut at Message, MessageReader Python API. Add read_message, C++/Python machinery for message roundtrips to Buffer, comparison. Add function to read RecordBatch from encapsulated message given schema.
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/bb0a7588
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/bb0a7588
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/bb0a7588
Branch: refs/heads/master
Commit: bb0a75885f2655ac54be47bd238811b74782532e
Parents: 099f61c
Author: Wes McKinney <we...@twosigma.com>
Authored: Sat Jul 15 16:51:51 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sat Jul 15 16:51:51 2017 -0400
----------------------------------------------------------------------
c_glib/arrow-glib/reader.cpp | 4 +-
cpp/src/arrow/buffer.cc | 2 -
cpp/src/arrow/buffer.h | 15 +-
cpp/src/arrow/builder.cc | 12 +-
cpp/src/arrow/ipc/file-to-stream.cc | 2 +-
cpp/src/arrow/ipc/ipc-json-test.cc | 6 +-
cpp/src/arrow/ipc/ipc-read-write-test.cc | 70 ++--
cpp/src/arrow/ipc/json-integration-test.cc | 8 +-
cpp/src/arrow/ipc/json.cc | 6 +-
cpp/src/arrow/ipc/json.h | 2 +-
cpp/src/arrow/ipc/metadata.cc | 150 +++++--
cpp/src/arrow/ipc/metadata.h | 100 +++--
cpp/src/arrow/ipc/reader.cc | 206 ++++------
cpp/src/arrow/ipc/reader.h | 35 +-
cpp/src/arrow/ipc/stream-to-file.cc | 2 +-
cpp/src/arrow/ipc/writer.cc | 4 +-
cpp/src/arrow/python/builtin_convert.cc | 5 +-
python/doc/source/api.rst | 12 +-
python/pyarrow/__init__.py | 9 +-
python/pyarrow/feather.pxi | 109 +++++
python/pyarrow/includes/libarrow.pxd | 66 +--
python/pyarrow/io.pxi | 353 ----------------
python/pyarrow/ipc.pxi | 480 ++++++++++++++++++++++
python/pyarrow/ipc.py | 13 +-
python/pyarrow/lib.pyx | 8 +-
python/pyarrow/pandas_compat.py | 2 +-
python/pyarrow/table.pxi | 16 +-
python/pyarrow/tests/conftest.py | 2 +-
python/pyarrow/tests/test_array.py | 2 +-
python/pyarrow/tests/test_convert_builtin.py | 3 +
python/pyarrow/tests/test_feather.py | 3 +-
python/pyarrow/tests/test_ipc.py | 59 ++-
python/pyarrow/tests/test_parquet.py | 7 +-
python/pyarrow/tests/test_table.py | 4 +
python/pyarrow/tests/test_tensor.py | 1 +
35 files changed, 1135 insertions(+), 643 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/c_glib/arrow-glib/reader.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/reader.cpp b/c_glib/arrow-glib/reader.cpp
index 3ff6ba1..523bdee 100644
--- a/c_glib/arrow-glib/reader.cpp
+++ b/c_glib/arrow-glib/reader.cpp
@@ -173,7 +173,7 @@ garrow_record_batch_reader_get_next_record_batch(GArrowRecordBatchReader *reader
{
auto arrow_reader = garrow_record_batch_reader_get_raw(reader);
std::shared_ptr<arrow::RecordBatch> arrow_record_batch;
- auto status = arrow_reader->GetNextRecordBatch(&arrow_record_batch);
+ auto status = arrow_reader->ReadNextRecordBatch(&arrow_record_batch);
if (garrow_error_check(error,
status,
@@ -410,7 +410,7 @@ garrow_record_batch_file_reader_get_record_batch(GArrowRecordBatchFileReader *re
{
auto arrow_reader = garrow_record_batch_file_reader_get_raw(reader);
std::shared_ptr<arrow::RecordBatch> arrow_record_batch;
- auto status = arrow_reader->GetRecordBatch(i, &arrow_record_batch);
+ auto status = arrow_reader->ReadRecordBatch(i, &arrow_record_batch);
if (garrow_error_check(error,
status,
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/buffer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc
index fb63798..a1d119e 100644
--- a/cpp/src/arrow/buffer.cc
+++ b/cpp/src/arrow/buffer.cc
@@ -27,8 +27,6 @@
namespace arrow {
-Buffer::~Buffer() {}
-
Status Buffer::Copy(
int64_t start, int64_t nbytes, MemoryPool* pool, std::shared_ptr<Buffer>* out) const {
// Sanity checks
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/buffer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index bfbea77..b117b24 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -22,6 +22,7 @@
#include <cstdint>
#include <cstring>
#include <memory>
+#include <string>
#include "arrow/status.h"
#include "arrow/util/macros.h"
@@ -47,7 +48,8 @@ class ARROW_EXPORT Buffer {
public:
Buffer(const uint8_t* data, int64_t size)
: is_mutable_(false), data_(data), size_(size), capacity_(size) {}
- virtual ~Buffer();
+
+ virtual ~Buffer() = default;
/// An offset into data that is owned by another buffer, but we want to be
/// able to retain a valid pointer to it even after other shared_ptr's to the
@@ -97,6 +99,17 @@ class ARROW_EXPORT Buffer {
DISALLOW_COPY_AND_ASSIGN(Buffer);
};
+/// \brief Create Buffer referencing std::string memory
+///
+/// Warning: string instance must stay alive
+///
+/// \param str std::string instance
+/// \return std::shared_ptr<Buffer>
+static inline std::shared_ptr<Buffer> GetBufferFromString(const std::string& str) {
+ return std::make_shared<Buffer>(
+ reinterpret_cast<const uint8_t*>(str.c_str()), static_cast<int64_t>(str.size()));
+}
+
/// Construct a view on passed buffer at the indicated offset and length. This
/// function cannot fail and does not error checking (except in debug builds)
static inline std::shared_ptr<Buffer> SliceBuffer(
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 155d81a..e466838 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -342,8 +342,8 @@ Status AdaptiveIntBuilder::Append(
sizeof(int64_t) * length);
} else {
#ifdef _MSC_VER
-# pragma warning(push)
-# pragma warning(disable:4996)
+#pragma warning(push)
+#pragma warning(disable : 4996)
#endif
// int_size_ may have changed, so we need to recheck
switch (int_size_) {
@@ -366,7 +366,7 @@ Status AdaptiveIntBuilder::Append(
DCHECK(false);
}
#ifdef _MSC_VER
-# pragma warning(pop)
+#pragma warning(pop)
#endif
}
@@ -497,8 +497,8 @@ Status AdaptiveUIntBuilder::Append(
sizeof(uint64_t) * length);
} else {
#ifdef _MSC_VER
-# pragma warning(push)
-# pragma warning(disable:4996)
+#pragma warning(push)
+#pragma warning(disable : 4996)
#endif
// int_size_ may have changed, so we need to recheck
switch (int_size_) {
@@ -521,7 +521,7 @@ Status AdaptiveUIntBuilder::Append(
DCHECK(false);
}
#ifdef _MSC_VER
-# pragma warning(pop)
+#pragma warning(pop)
#endif
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/file-to-stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file-to-stream.cc b/cpp/src/arrow/ipc/file-to-stream.cc
index 39c720c..a1feedc 100644
--- a/cpp/src/arrow/ipc/file-to-stream.cc
+++ b/cpp/src/arrow/ipc/file-to-stream.cc
@@ -39,7 +39,7 @@ Status ConvertToStream(const char* path) {
RETURN_NOT_OK(RecordBatchStreamWriter::Open(&sink, reader->schema(), &writer));
for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> chunk;
- RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
+ RETURN_NOT_OK(reader->ReadRecordBatch(i, &chunk));
RETURN_NOT_OK(writer->WriteRecordBatch(*chunk));
}
return writer->Close();
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/ipc-json-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc
index 9297146..318e318 100644
--- a/cpp/src/arrow/ipc/ipc-json-test.cc
+++ b/cpp/src/arrow/ipc/ipc-json-test.cc
@@ -276,7 +276,7 @@ TEST(TestJsonFileReadWrite, BasicRoundTrip) {
for (int i = 0; i < nbatches; ++i) {
std::shared_ptr<RecordBatch> batch;
- ASSERT_OK(reader->GetRecordBatch(i, &batch));
+ ASSERT_OK(reader->ReadRecordBatch(i, &batch));
ASSERT_TRUE(batch->Equals(*batches[i]));
}
}
@@ -344,7 +344,7 @@ TEST(TestJsonFileReadWrite, MinimalFormatExample) {
ASSERT_EQ(1, reader->num_record_batches());
std::shared_ptr<RecordBatch> batch;
- ASSERT_OK(reader->GetRecordBatch(0, &batch));
+ ASSERT_OK(reader->ReadRecordBatch(0, &batch));
std::vector<bool> foo_valid = {true, false, true, true, true};
std::vector<int32_t> foo_values = {1, 2, 3, 4, 5};
@@ -388,7 +388,7 @@ void CheckRoundtrip(const RecordBatch& batch) {
ASSERT_OK(JsonReader::Open(buffer, &reader));
std::shared_ptr<RecordBatch> result_batch;
- ASSERT_OK(reader->GetRecordBatch(0, &result_batch));
+ ASSERT_OK(reader->ReadRecordBatch(0, &result_batch));
CompareBatch(batch, *result_batch);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/ipc-read-write-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index c71d046..42f14b0 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -51,8 +51,8 @@ class TestSchemaMetadata : public ::testing::Test {
std::shared_ptr<Buffer> buffer;
ASSERT_OK(WriteSchemaMessage(schema, memo, &buffer));
- std::shared_ptr<Message> message;
- ASSERT_OK(Message::Open(buffer, 0, &message));
+ std::unique_ptr<Message> message;
+ ASSERT_OK(Message::Open(buffer, nullptr, &message));
ASSERT_EQ(Message::SCHEMA, message->type());
@@ -65,6 +65,32 @@ class TestSchemaMetadata : public ::testing::Test {
}
};
+TEST(TestMessage, Equals) {
+ std::string metadata = "foo";
+ std::string body = "bar";
+
+ auto b1 = GetBufferFromString(metadata);
+ auto b2 = GetBufferFromString(metadata);
+ auto b3 = GetBufferFromString(body);
+ auto b4 = GetBufferFromString(body);
+
+ Message msg1(b1, b3);
+ Message msg2(b2, b4);
+ Message msg3(b1, nullptr);
+ Message msg4(b2, nullptr);
+
+ ASSERT_TRUE(msg1.Equals(msg2));
+ ASSERT_TRUE(msg3.Equals(msg4));
+
+ ASSERT_FALSE(msg1.Equals(msg3));
+ ASSERT_FALSE(msg3.Equals(msg1));
+
+ // same metadata as msg1, different body
+ Message msg5(b2, b1);
+ ASSERT_FALSE(msg1.Equals(msg5));
+ ASSERT_FALSE(msg5.Equals(msg1));
+}
+
const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
TEST_F(TestSchemaMetadata, PrimitiveFields) {
@@ -123,16 +149,12 @@ class IpcTestFixture : public io::MemoryMapFixture {
RETURN_NOT_OK(WriteRecordBatch(
batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_));
- std::shared_ptr<Message> message;
+ std::unique_ptr<Message> message;
RETURN_NOT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
- // The buffer offsets start at 0, so we must construct a
- // RandomAccessFile according to that frame of reference
- std::shared_ptr<Buffer> buffer_payload;
- RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length, &buffer_payload));
- io::BufferReader buffer_reader(buffer_payload);
-
- return ReadRecordBatch(*message, batch.schema(), &buffer_reader, batch_result);
+ io::BufferReader buffer_reader(message->body());
+ return ReadRecordBatch(
+ *message->metadata(), batch.schema(), &buffer_reader, batch_result);
}
Status DoLargeRoundTrip(
@@ -151,7 +173,7 @@ class IpcTestFixture : public io::MemoryMapFixture {
std::shared_ptr<RecordBatchFileReader> file_reader;
RETURN_NOT_OK(RecordBatchFileReader::Open(mmap_, offset, &file_reader));
- return file_reader->GetRecordBatch(0, result);
+ return file_reader->ReadRecordBatch(0, result);
}
void CheckReadResult(const RecordBatch& result, const RecordBatch& expected) {
@@ -225,7 +247,7 @@ TEST_F(TestIpcRoundTrip, MetadataVersion) {
ASSERT_OK(WriteRecordBatch(
*batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_));
- std::shared_ptr<Message> message;
+ std::unique_ptr<Message> message;
ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
ASSERT_EQ(MetadataVersion::V3, message->metadata_version());
@@ -434,16 +456,13 @@ TEST_F(RecursionLimits, ReadLimit) {
ASSERT_OK(WriteToMmap(
recursion_depth, true, &metadata_length, &body_length, &batch, &schema));
- std::shared_ptr<Message> message;
+ std::unique_ptr<Message> message;
ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
- std::shared_ptr<Buffer> payload;
- ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
-
- io::BufferReader reader(payload);
+ io::BufferReader reader(message->body());
std::shared_ptr<RecordBatch> result;
- ASSERT_RAISES(Invalid, ReadRecordBatch(*message, schema, &reader, &result));
+ ASSERT_RAISES(Invalid, ReadRecordBatch(*message->metadata(), schema, &reader, &result));
}
TEST_F(RecursionLimits, StressLimit) {
@@ -455,16 +474,13 @@ TEST_F(RecursionLimits, StressLimit) {
ASSERT_OK(WriteToMmap(
recursion_depth, true, &metadata_length, &body_length, &batch, &schema));
- std::shared_ptr<Message> message;
+ std::unique_ptr<Message> message;
ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
- std::shared_ptr<Buffer> payload;
- ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
-
- io::BufferReader reader(payload);
-
+ io::BufferReader reader(message->body());
std::shared_ptr<RecordBatch> result;
- ASSERT_OK(ReadRecordBatch(*message, schema, recursion_depth + 1, &reader, &result));
+ ASSERT_OK(ReadRecordBatch(
+ *message->metadata(), schema, recursion_depth + 1, &reader, &result));
*it_works = result->Equals(*batch);
};
@@ -511,7 +527,7 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
EXPECT_EQ(num_batches, reader->num_record_batches());
for (int i = 0; i < num_batches; ++i) {
std::shared_ptr<RecordBatch> chunk;
- RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
+ RETURN_NOT_OK(reader->ReadRecordBatch(i, &chunk));
out_batches->emplace_back(chunk);
}
@@ -571,7 +587,7 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
std::shared_ptr<RecordBatch> chunk;
while (true) {
- RETURN_NOT_OK(reader->GetNextRecordBatch(&chunk));
+ RETURN_NOT_OK(reader->ReadNextRecordBatch(&chunk));
if (chunk == nullptr) { break; }
out_batches->emplace_back(chunk);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/json-integration-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc
index 424755a..18f5dfa 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -82,7 +82,7 @@ static Status ConvertJsonToArrow(
for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> batch;
- RETURN_NOT_OK(reader->GetRecordBatch(i, &batch));
+ RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch));
RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
}
return writer->Close();
@@ -109,7 +109,7 @@ static Status ConvertArrowToJson(
for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> batch;
- RETURN_NOT_OK(reader->GetRecordBatch(i, &batch));
+ RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch));
RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
}
@@ -168,8 +168,8 @@ static Status ValidateArrowVsJson(
std::shared_ptr<RecordBatch> arrow_batch;
std::shared_ptr<RecordBatch> json_batch;
for (int i = 0; i < json_nbatches; ++i) {
- RETURN_NOT_OK(json_reader->GetRecordBatch(i, &json_batch));
- RETURN_NOT_OK(arrow_reader->GetRecordBatch(i, &arrow_batch));
+ RETURN_NOT_OK(json_reader->ReadRecordBatch(i, &json_batch));
+ RETURN_NOT_OK(arrow_reader->ReadRecordBatch(i, &arrow_batch));
if (!json_batch->ApproxEquals(*arrow_batch)) {
std::stringstream ss;
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/json.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc
index f8c0b62..36e343e 100644
--- a/cpp/src/arrow/ipc/json.cc
+++ b/cpp/src/arrow/ipc/json.cc
@@ -115,7 +115,7 @@ class JsonReader::JsonReaderImpl {
return Status::OK();
}
- Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const {
+ Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const {
DCHECK_GE(i, 0) << "i out of bounds";
DCHECK_LT(i, static_cast<int>(record_batches_->GetArray().Size()))
<< "i out of bounds";
@@ -164,8 +164,8 @@ int JsonReader::num_record_batches() const {
return impl_->num_record_batches();
}
-Status JsonReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const {
- return impl_->GetRecordBatch(i, batch);
+Status JsonReader::ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const {
+ return impl_->ReadRecordBatch(i, batch);
}
} // namespace ipc
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/json.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json.h b/cpp/src/arrow/ipc/json.h
index ad94def..2ba27c7 100644
--- a/cpp/src/arrow/ipc/json.h
+++ b/cpp/src/arrow/ipc/json.h
@@ -72,7 +72,7 @@ class ARROW_EXPORT JsonReader {
int num_record_batches() const;
// Read a record batch from the file
- Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const;
+ Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const;
private:
JsonReader(MemoryPool* pool, const std::shared_ptr<Buffer>& data);
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index 54f0547..5b2ca3b 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -17,6 +17,7 @@
#include "arrow/ipc/metadata.h"
+#include <algorithm>
#include <cstdint>
#include <memory>
#include <sstream>
@@ -834,11 +835,12 @@ Status DictionaryMemo::AddDictionary(
class Message::MessageImpl {
public:
- explicit MessageImpl(const std::shared_ptr<Buffer>& buffer, int64_t offset)
- : buffer_(buffer), offset_(offset), message_(nullptr) {}
+ explicit MessageImpl(
+ const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body)
+ : metadata_(metadata), message_(nullptr), body_(body) {}
Status Open() {
- message_ = flatbuf::GetMessage(buffer_->data() + offset_);
+ message_ = flatbuf::GetMessage(metadata_->data());
// Check that the metadata version is supported
if (message_->version() < kMinMetadataVersion) {
@@ -872,7 +874,7 @@ class Message::MessageImpl {
// Arrow 0.2
return MetadataVersion::V2;
case flatbuf::MetadataVersion_V3:
- // Arrow 0.3
+ // Arrow >= 0.3
return MetadataVersion::V3;
// Add cases as other versions become available
default:
@@ -882,28 +884,38 @@ class Message::MessageImpl {
const void* header() const { return message_->header(); }
- int64_t body_length() const { return message_->bodyLength(); }
+ std::shared_ptr<Buffer> body() const { return body_; }
- private:
- // Retain reference to memory
- std::shared_ptr<Buffer> buffer_;
- int64_t offset_;
+ std::shared_ptr<Buffer> metadata() const { return metadata_; }
+ private:
+ // The Flatbuffer metadata
+ std::shared_ptr<Buffer> metadata_;
const flatbuf::Message* message_;
+
+ // The message body, if any
+ std::shared_ptr<Buffer> body_;
};
-Message::Message(const std::shared_ptr<Buffer>& buffer, int64_t offset) {
- impl_.reset(new MessageImpl(buffer, offset));
+Message::Message(
+ const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body) {
+ impl_.reset(new MessageImpl(metadata, body));
+}
+
+Status Message::Open(const std::shared_ptr<Buffer>& metadata,
+ const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out) {
+ out->reset(new Message(metadata, body));
+ return (*out)->impl_->Open();
}
Message::~Message() {}
-Status Message::Open(const std::shared_ptr<Buffer>& buffer, int64_t offset,
- std::shared_ptr<Message>* out) {
- // ctor is private
+std::shared_ptr<Buffer> Message::body() const {
+ return impl_->body();
+}
- *out = std::shared_ptr<Message>(new Message(buffer, offset));
- return (*out)->impl_->Open();
+std::shared_ptr<Buffer> Message::metadata() const {
+ return impl_->metadata();
}
Message::Type Message::type() const {
@@ -914,14 +926,64 @@ MetadataVersion Message::metadata_version() const {
return impl_->version();
}
-int64_t Message::body_length() const {
- return impl_->body_length();
-}
-
const void* Message::header() const {
return impl_->header();
}
+bool Message::Equals(const Message& other) const {
+ int64_t metadata_bytes = std::min(metadata()->size(), other.metadata()->size());
+
+ if (!metadata()->Equals(*other.metadata(), metadata_bytes)) {
+ return false;
+ }
+
+ // Compare bodies, if they have them
+ auto this_body = body();
+ auto other_body = other.body();
+
+ const bool this_has_body = (this_body != nullptr) && (this_body->size() > 0);
+ const bool other_has_body = (other_body != nullptr) && (other_body->size() > 0);
+
+ if (this_has_body && other_has_body) {
+ return this_body->Equals(*other_body);
+ } else if (this_has_body ^ other_has_body) {
+ // One has a body but not the other
+ return false;
+ } else {
+ // Neither has a body
+ return true;
+ }
+}
+
+Status Message::SerializeTo(io::OutputStream* file, int64_t* output_length) const {
+ int32_t metadata_length = 0;
+ RETURN_NOT_OK(WriteMessage(*metadata(), file, &metadata_length));
+
+ *output_length = metadata_length;
+
+ auto body_buffer = body();
+ if (body_buffer) {
+ RETURN_NOT_OK(file->Write(body_buffer->data(), body_buffer->size()));
+ *output_length += body_buffer->size();
+ }
+
+ return Status::OK();
+}
+
+std::string FormatMessageType(Message::Type type) {
+ switch (type) {
+ case Message::SCHEMA:
+ return "schema";
+ case Message::RECORD_BATCH:
+ return "record batch";
+ case Message::DICTIONARY_BATCH:
+ return "dictionary";
+ default:
+ break;
+ }
+ return "unknown";
+}
+
// ----------------------------------------------------------------------
static Status VisitField(const flatbuf::Field* field, DictionaryTypeMap* id_to_field) {
@@ -975,10 +1037,11 @@ Status GetSchema(const void* opaque_schema, const DictionaryMemo& dictionary_mem
return Status::OK();
}
-Status GetTensorMetadata(const void* opaque_tensor, std::shared_ptr<DataType>* type,
+Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type,
std::vector<int64_t>* shape, std::vector<int64_t>* strides,
std::vector<std::string>* dim_names) {
- auto tensor = static_cast<const flatbuf::Tensor*>(opaque_tensor);
+ auto message = flatbuf::GetMessage(metadata.data());
+ auto tensor = reinterpret_cast<const flatbuf::Tensor*>(message->header());
int ndim = static_cast<int>(tensor->shape()->size());
@@ -1006,8 +1069,27 @@ Status GetTensorMetadata(const void* opaque_tensor, std::shared_ptr<DataType>* t
// ----------------------------------------------------------------------
// Read and write messages
+static Status ReadFullMessage(const std::shared_ptr<Buffer>& metadata,
+ io::InputStream* stream, std::unique_ptr<Message>* message) {
+ auto fb_message = flatbuf::GetMessage(metadata->data());
+
+ int64_t body_length = fb_message->bodyLength();
+
+ std::shared_ptr<Buffer> body;
+ RETURN_NOT_OK(stream->Read(body_length, &body));
+
+ if (body->size() < body_length) {
+ std::stringstream ss;
+ ss << "Expected to be able to read " << body_length << " bytes for message body, got "
+ << body->size();
+ return Status::IOError(ss.str());
+ }
+
+ return Message::Open(metadata, body, message);
+}
+
Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file,
- std::shared_ptr<Message>* message) {
+ std::unique_ptr<Message>* message) {
std::shared_ptr<Buffer> buffer;
RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer));
@@ -1019,13 +1101,15 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile
<< ", metadata length: " << metadata_length;
return Status::Invalid(ss.str());
}
- return Message::Open(buffer, 4, message);
+
+ auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4);
+ return ReadFullMessage(metadata, file, message);
}
-Status ReadMessage(io::InputStream* file, std::shared_ptr<Message>* message) {
+Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) {
std::shared_ptr<Buffer> buffer;
- RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer));
+ RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer));
if (buffer->size() != sizeof(int32_t)) {
*message = nullptr;
return Status::OK();
@@ -1044,9 +1128,21 @@ Status ReadMessage(io::InputStream* file, std::shared_ptr<Message>* message) {
return Status::IOError("Unexpected end of stream trying to read message");
}
- return Message::Open(buffer, 0, message);
+ return ReadFullMessage(buffer, file, message);
+}
+
+// ----------------------------------------------------------------------
+// Implement InputStream message reader
+
+Status InputStreamMessageReader::ReadNextMessage(std::unique_ptr<Message>* message) {
+ return ReadMessage(stream_.get(), message);
}
+InputStreamMessageReader::~InputStreamMessageReader() {}
+
+// ----------------------------------------------------------------------
+// Implement message writing
+
Status WriteMessage(
const Buffer& message, io::OutputStream* file, int32_t* message_length) {
// Need to write 4 bytes (message size), the message, plus padding to
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 257bbd8..64b2571 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -59,26 +59,12 @@ static constexpr const char* kArrowMagicBytes = "ARROW1";
constexpr int kMaxNestingDepth = 64;
struct ARROW_EXPORT FieldMetadata {
- FieldMetadata() {}
- FieldMetadata(int64_t length, int64_t null_count, int64_t offset)
- : length(length), null_count(null_count), offset(offset) {}
-
- FieldMetadata(const FieldMetadata& other) {
- this->length = other.length;
- this->null_count = other.null_count;
- this->offset = other.offset;
- }
-
int64_t length;
int64_t null_count;
int64_t offset;
};
struct ARROW_EXPORT BufferMetadata {
- BufferMetadata() {}
- BufferMetadata(int32_t page, int64_t offset, int64_t length)
- : page(page), offset(offset), length(length) {}
-
/// The shared memory page id where to find this. Set to -1 if unused
int32_t page;
@@ -90,10 +76,6 @@ struct ARROW_EXPORT BufferMetadata {
};
struct FileBlock {
- FileBlock() {}
- FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length)
- : offset(offset), metadata_length(metadata_length), body_length(body_length) {}
-
int64_t offset;
int32_t metadata_length;
int64_t body_length;
@@ -153,20 +135,46 @@ Status GetDictionaryTypes(const void* opaque_schema, DictionaryTypeMap* id_to_fi
Status ARROW_EXPORT GetSchema(const void* opaque_schema,
const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out);
-Status ARROW_EXPORT GetTensorMetadata(const void* opaque_tensor,
+Status ARROW_EXPORT GetTensorMetadata(const Buffer& metadata,
std::shared_ptr<DataType>* type, std::vector<int64_t>* shape,
std::vector<int64_t>* strides, std::vector<std::string>* dim_names);
+/// \brief An IPC message including metadata and body
class ARROW_EXPORT Message {
public:
enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH, TENSOR };
- ~Message();
+ /// \brief Construct message, but do not validate
+ ///
+ /// Use at your own risk; Message::Open has more metadata validation
+ Message(const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body);
- static Status Open(const std::shared_ptr<Buffer>& buffer, int64_t offset,
- std::shared_ptr<Message>* out);
+ ~Message();
- int64_t body_length() const;
+ /// \brief Create and validate a Message instance from two buffers
+ ///
+ /// \param[in] metadata a buffer containing the Flatbuffer metadata
+ /// \param[in] body a buffer containing the message body, which may be nullptr
+ /// \param[out] out the created message
+ static Status Open(const std::shared_ptr<Buffer>& metadata,
+ const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out);
+
+ /// \brief Write length-prefixed metadata and body to output stream
+ ///
+ /// \param[in] file output stream to write to
+ /// \param[out] output_length the number of bytes written
+ /// \return Status
+ bool Equals(const Message& other) const;
+
+ /// \brief the Message metadata
+ ///
+ /// \return buffer
+ std::shared_ptr<Buffer> metadata() const;
+
+ /// \brief the Message body, if any
+ ///
+ /// \return buffer is nullptr if no body
+ std::shared_ptr<Buffer> body() const;
Type type() const;
@@ -174,9 +182,14 @@ class ARROW_EXPORT Message {
const void* header() const;
- private:
- Message(const std::shared_ptr<Buffer>& buffer, int64_t offset);
+ /// \brief Write length-prefixed metadata and body to output stream
+ ///
+ /// \param[in] file output stream to write to
+ /// \param[out] output_length the number of bytes written
+ /// \return Status
+ Status SerializeTo(io::OutputStream* file, int64_t* output_length) const;
+ private:
// Hide serialization details from user API
class MessageImpl;
std::unique_ptr<MessageImpl> impl_;
@@ -184,8 +197,34 @@ class ARROW_EXPORT Message {
DISALLOW_COPY_AND_ASSIGN(Message);
};
+ARROW_EXPORT std::string FormatMessageType(Message::Type type);
+
+/// \brief Abstract interface for a sequence of messages
+class ARROW_EXPORT MessageReader {
+ public:
+ virtual ~MessageReader() = default;
+
+ virtual Status ReadNextMessage(std::unique_ptr<Message>* message) = 0;
+};
+
+class ARROW_EXPORT InputStreamMessageReader : public MessageReader {
+ public:
+ explicit InputStreamMessageReader(const std::shared_ptr<io::InputStream>& stream)
+ : stream_(stream) {}
+
+ ~InputStreamMessageReader();
+
+ Status ReadNextMessage(std::unique_ptr<Message>* message) override;
+
+ private:
+ std::shared_ptr<io::InputStream> stream_;
+};
+
+/// \brief Read encapulated RPC message from position in file
+///
/// Read a length-prefixed message flatbuffer starting at the indicated file
-/// offset
+/// offset. If the message has a body with non-zero length, it will also be
+/// read
///
/// The metadata_length includes at least the length prefix and the flatbuffer
///
@@ -196,15 +235,18 @@ class ARROW_EXPORT Message {
/// \param[out] message the message read
/// \return Status success or failure
Status ARROW_EXPORT ReadMessage(int64_t offset, int32_t metadata_length,
- io::RandomAccessFile* file, std::shared_ptr<Message>* message);
+ io::RandomAccessFile* file, std::unique_ptr<Message>* message);
+/// \brief Read encapulated RPC message (metadata and body) from InputStream
+///
/// Read length-prefixed message with as-yet unknown length. Returns nullptr if
/// there are not enough bytes available or the message length is 0 (e.g. EOS
/// in a stream)
Status ARROW_EXPORT ReadMessage(
- io::InputStream* stream, std::shared_ptr<Message>* message);
+ io::InputStream* stream, std::unique_ptr<Message>* message);
-/// Write a serialized message with a length-prefix and padding to an 8-byte offset
+/// Write a serialized message metadata with a length-prefix and padding to an
+/// 8-byte offset
///
/// <message_size: int32><message: const void*><padding>
Status ARROW_EXPORT WriteMessage(
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 8ca4d82..88ab330 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -257,11 +257,18 @@ static Status LoadArray(const std::shared_ptr<DataType>& type,
return loader.Load();
}
-Status ReadRecordBatch(const Message& metadata, const std::shared_ptr<Schema>& schema,
+Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema,
io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out);
}
+Status ReadRecordBatch(const Message& message, const std::shared_ptr<Schema>& schema,
+ std::shared_ptr<RecordBatch>* out) {
+ io::BufferReader reader(message.body());
+ DCHECK_EQ(message.type(), Message::RECORD_BATCH);
+ return ReadRecordBatch(*message.metadata(), schema, kMaxNestingDepth, &reader, out);
+}
+
// ----------------------------------------------------------------------
// Array loading
@@ -294,18 +301,22 @@ static inline Status ReadRecordBatch(const flatbuf::RecordBatch* metadata,
schema, metadata->length(), max_recursion_depth, &source, out);
}
-Status ReadRecordBatch(const Message& metadata, const std::shared_ptr<Schema>& schema,
+Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema,
int max_recursion_depth, io::RandomAccessFile* file,
std::shared_ptr<RecordBatch>* out) {
- DCHECK_EQ(metadata.type(), Message::RECORD_BATCH);
- auto batch = reinterpret_cast<const flatbuf::RecordBatch*>(metadata.header());
+ auto message = flatbuf::GetMessage(metadata.data());
+ if (message->header_type() != flatbuf::MessageHeader_RecordBatch) {
+ DCHECK_EQ(message->header_type(), flatbuf::MessageHeader_RecordBatch);
+ }
+ auto batch = reinterpret_cast<const flatbuf::RecordBatch*>(message->header());
return ReadRecordBatch(batch, schema, max_recursion_depth, file, out);
}
-Status ReadDictionary(const Message& metadata, const DictionaryTypeMap& dictionary_types,
+Status ReadDictionary(const Buffer& metadata, const DictionaryTypeMap& dictionary_types,
io::RandomAccessFile* file, int64_t* dictionary_id, std::shared_ptr<Array>* out) {
+ auto message = flatbuf::GetMessage(metadata.data());
auto dictionary_batch =
- reinterpret_cast<const flatbuf::DictionaryBatch*>(metadata.header());
+ reinterpret_cast<const flatbuf::DictionaryBatch*>(message->header());
int64_t id = *dictionary_id = dictionary_batch->id();
auto it = dictionary_types.find(id);
@@ -335,25 +346,33 @@ Status ReadDictionary(const Message& metadata, const DictionaryTypeMap& dictiona
return Status::OK();
}
+static Status ReadMessageAndValidate(MessageReader* reader, Message::Type expected_type,
+ bool allow_null, std::unique_ptr<Message>* message) {
+ RETURN_NOT_OK(reader->ReadNextMessage(message));
+
+ if (!(*message) && !allow_null) {
+ std::stringstream ss;
+ ss << "Expected " << FormatMessageType(expected_type)
+ << " message in stream, was null or length 0";
+ return Status::Invalid(ss.str());
+ }
+
+ if ((*message) == nullptr) { return Status::OK(); }
+
+ if ((*message)->type() != expected_type) {
+ std::stringstream ss;
+ ss << "Message not expected type: " << FormatMessageType(expected_type)
+ << ", was: " << (*message)->type();
+ return Status::IOError(ss.str());
+ }
+ return Status::OK();
+}
+
// ----------------------------------------------------------------------
// RecordBatchStreamReader implementation
static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
- return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength());
-}
-
-static inline std::string FormatMessageType(Message::Type type) {
- switch (type) {
- case Message::SCHEMA:
- return "schema";
- case Message::RECORD_BATCH:
- return "record batch";
- case Message::DICTIONARY_BATCH:
- return "dictionary";
- default:
- break;
- }
- return "unknown";
+ return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()};
}
RecordBatchReader::~RecordBatchReader() {}
@@ -363,59 +382,29 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl {
RecordBatchStreamReaderImpl() {}
~RecordBatchStreamReaderImpl() {}
- Status Open(const std::shared_ptr<io::InputStream>& stream) {
- stream_ = stream;
+ Status Open(std::unique_ptr<MessageReader> message_reader) {
+ message_reader_ = std::move(message_reader);
return ReadSchema();
}
- Status ReadNextMessage(
- Message::Type expected_type, bool allow_null, std::shared_ptr<Message>* message) {
- RETURN_NOT_OK(ReadMessage(stream_.get(), message));
-
- if (!(*message) && !allow_null) {
- std::stringstream ss;
- ss << "Expected " << FormatMessageType(expected_type)
- << " message in stream, was null or length 0";
- return Status::Invalid(ss.str());
- }
-
- if ((*message) == nullptr) { return Status::OK(); }
-
- if ((*message)->type() != expected_type) {
- std::stringstream ss;
- ss << "Message not expected type: " << FormatMessageType(expected_type)
- << ", was: " << (*message)->type();
- return Status::IOError(ss.str());
- }
- return Status::OK();
- }
-
- Status ReadExact(int64_t size, std::shared_ptr<Buffer>* buffer) {
- RETURN_NOT_OK(stream_->Read(size, buffer));
-
- if ((*buffer)->size() < size) {
- return Status::IOError("Unexpected EOS when reading buffer");
- }
- return Status::OK();
- }
-
Status ReadNextDictionary() {
- std::shared_ptr<Message> message;
- RETURN_NOT_OK(ReadNextMessage(Message::DICTIONARY_BATCH, false, &message));
+ std::unique_ptr<Message> message;
+ RETURN_NOT_OK(ReadMessageAndValidate(
+ message_reader_.get(), Message::DICTIONARY_BATCH, false, &message));
- std::shared_ptr<Buffer> batch_body;
- RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body));
- io::BufferReader reader(batch_body);
+ io::BufferReader reader(message->body());
std::shared_ptr<Array> dictionary;
int64_t id;
- RETURN_NOT_OK(ReadDictionary(*message, dictionary_types_, &reader, &id, &dictionary));
+ RETURN_NOT_OK(ReadDictionary(
+ *message->metadata(), dictionary_types_, &reader, &id, &dictionary));
return dictionary_memo_.AddDictionary(id, dictionary);
}
Status ReadSchema() {
- std::shared_ptr<Message> message;
- RETURN_NOT_OK(ReadNextMessage(Message::SCHEMA, false, &message));
+ std::unique_ptr<Message> message;
+ RETURN_NOT_OK(
+ ReadMessageAndValidate(message_reader_.get(), Message::SCHEMA, false, &message));
RETURN_NOT_OK(GetDictionaryTypes(message->header(), &dictionary_types_));
@@ -429,9 +418,10 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl {
return GetSchema(message->header(), dictionary_memo_, &schema_);
}
- Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
- std::shared_ptr<Message> message;
- RETURN_NOT_OK(ReadNextMessage(Message::RECORD_BATCH, true, &message));
+ Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
+ std::unique_ptr<Message> message;
+ RETURN_NOT_OK(ReadMessageAndValidate(
+ message_reader_.get(), Message::RECORD_BATCH, true, &message));
if (message == nullptr) {
// End of stream
@@ -439,21 +429,18 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl {
return Status::OK();
}
- std::shared_ptr<Buffer> batch_body;
- RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body));
- io::BufferReader reader(batch_body);
- return ReadRecordBatch(*message, schema_, &reader, batch);
+ io::BufferReader reader(message->body());
+ return ReadRecordBatch(*message->metadata(), schema_, &reader, batch);
}
std::shared_ptr<Schema> schema() const { return schema_; }
private:
+ std::unique_ptr<MessageReader> message_reader_;
+
// dictionary_id -> type
DictionaryTypeMap dictionary_types_;
-
DictionaryMemo dictionary_memo_;
-
- std::shared_ptr<io::InputStream> stream_;
std::shared_ptr<Schema> schema_;
};
@@ -463,19 +450,25 @@ RecordBatchStreamReader::RecordBatchStreamReader() {
RecordBatchStreamReader::~RecordBatchStreamReader() {}
-Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
+Status RecordBatchStreamReader::Open(std::unique_ptr<MessageReader> message_reader,
std::shared_ptr<RecordBatchStreamReader>* reader) {
// Private ctor
*reader = std::shared_ptr<RecordBatchStreamReader>(new RecordBatchStreamReader());
- return (*reader)->impl_->Open(stream);
+ return (*reader)->impl_->Open(std::move(message_reader));
+}
+
+Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
+ std::shared_ptr<RecordBatchStreamReader>* out) {
+ std::unique_ptr<MessageReader> message_reader(new InputStreamMessageReader(stream));
+ return Open(std::move(message_reader), out);
}
std::shared_ptr<Schema> RecordBatchStreamReader::schema() const {
return impl_->schema();
}
-Status RecordBatchStreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
- return impl_->GetNextRecordBatch(batch);
+Status RecordBatchStreamReader::ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
+ return impl_->ReadNextRecordBatch(batch);
}
// ----------------------------------------------------------------------
@@ -547,22 +540,17 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
}
- Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
+ Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
DCHECK_GE(i, 0);
DCHECK_LT(i, num_record_batches());
FileBlock block = record_batch(i);
- std::shared_ptr<Message> message;
+ std::unique_ptr<Message> message;
RETURN_NOT_OK(
ReadMessage(block.offset, block.metadata_length, file_.get(), &message));
- // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
- // ARROW-384).
- std::shared_ptr<Buffer> buffer_block;
- RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
- io::BufferReader reader(buffer_block);
-
- return ReadRecordBatch(*message, schema_, &reader, batch);
+ io::BufferReader reader(message->body());
+ return ::arrow::ipc::ReadRecordBatch(*message->metadata(), schema_, &reader, batch);
}
Status ReadSchema() {
@@ -571,23 +559,16 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
// Read all the dictionaries
for (int i = 0; i < num_dictionaries(); ++i) {
FileBlock block = dictionary(i);
- std::shared_ptr<Message> message;
+ std::unique_ptr<Message> message;
RETURN_NOT_OK(
ReadMessage(block.offset, block.metadata_length, file_.get(), &message));
- // TODO(wesm): ARROW-577: This code is a bit duplicated, can be fixed
- // with a more invasive refactor
-
- // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
- // ARROW-384).
- std::shared_ptr<Buffer> buffer_block;
- RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
- io::BufferReader reader(buffer_block);
+ io::BufferReader reader(message->body());
std::shared_ptr<Array> dictionary;
int64_t dictionary_id;
- RETURN_NOT_OK(ReadDictionary(
- *message, dictionary_fields_, &reader, &dictionary_id, &dictionary));
+ RETURN_NOT_OK(ReadDictionary(*message->metadata(), dictionary_fields_, &reader,
+ &dictionary_id, &dictionary));
RETURN_NOT_OK(dictionary_memo_->AddDictionary(dictionary_id, dictionary));
}
@@ -653,12 +634,13 @@ MetadataVersion RecordBatchFileReader::version() const {
return impl_->version();
}
-Status RecordBatchFileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
- return impl_->GetRecordBatch(i, batch);
+Status RecordBatchFileReader::ReadRecordBatch(
+ int i, std::shared_ptr<RecordBatch>* batch) {
+ return impl_->ReadRecordBatch(i, batch);
}
-static Status ReadContiguousPayload(int64_t offset, io::RandomAccessFile* file,
- std::shared_ptr<Message>* message, std::shared_ptr<Buffer>* payload) {
+static Status ReadContiguousPayload(
+ int64_t offset, io::RandomAccessFile* file, std::unique_ptr<Message>* message) {
std::shared_ptr<Buffer> buffer;
RETURN_NOT_OK(file->Seek(offset));
RETURN_NOT_OK(ReadMessage(file, message));
@@ -666,38 +648,32 @@ static Status ReadContiguousPayload(int64_t offset, io::RandomAccessFile* file,
if (*message == nullptr) {
return Status::Invalid("Unable to read metadata at offset");
}
-
- // TODO(ARROW-388): The buffer offsets start at 0, so we must construct a
- // RandomAccessFile according to that frame of reference
- RETURN_NOT_OK(file->Read((*message)->body_length(), payload));
return Status::OK();
}
Status ReadRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset,
io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
- std::shared_ptr<Buffer> payload;
- std::shared_ptr<Message> message;
-
- RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message, &payload));
- io::BufferReader buffer_reader(payload);
- return ReadRecordBatch(*message, schema, kMaxNestingDepth, &buffer_reader, out);
+ std::unique_ptr<Message> message;
+ RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message));
+ io::BufferReader buffer_reader(message->body());
+ return ReadRecordBatch(
+ *message->metadata(), schema, kMaxNestingDepth, &buffer_reader, out);
}
Status ReadTensor(
int64_t offset, io::RandomAccessFile* file, std::shared_ptr<Tensor>* out) {
// Respect alignment of Tensor messages (see WriteTensor)
offset = PaddedLength(offset);
- std::shared_ptr<Message> message;
- std::shared_ptr<Buffer> data;
- RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message, &data));
+ std::unique_ptr<Message> message;
+ RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message));
std::shared_ptr<DataType> type;
std::vector<int64_t> shape;
std::vector<int64_t> strides;
std::vector<std::string> dim_names;
RETURN_NOT_OK(
- GetTensorMetadata(message->header(), &type, &shape, &strides, &dim_names));
- *out = std::make_shared<Tensor>(type, data, shape, strides, dim_names);
+ GetTensorMetadata(*message->metadata(), &type, &shape, &strides, &dim_names));
+ *out = std::make_shared<Tensor>(type, message->body(), shape, strides, dim_names);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/reader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index dd29a36..d6c2614 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -57,7 +57,7 @@ class ARROW_EXPORT RecordBatchReader {
///
/// \param(out) batch the next loaded batch, nullptr at end of stream
/// \return Status
- virtual Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) = 0;
+ virtual Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) = 0;
};
/// \class RecordBatchStreamReader
@@ -66,16 +66,24 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader {
public:
virtual ~RecordBatchStreamReader();
- /// Create batch reader from InputStream
+ /// Create batch reader from generic MessageReader
+ ///
+ /// \param(in) message_reader a MessageReader implementation
+ /// \param(out) out the created RecordBatchStreamReader object
+ /// \return Status
+ static Status Open(std::unique_ptr<MessageReader> message_reader,
+ std::shared_ptr<RecordBatchStreamReader>* out);
+
+ /// \Create Record batch stream reader from InputStream
///
/// \param(in) stream an input stream instance
- /// \param(out) reader the created reader object
+ /// \param(out) out the created RecordBatchStreamReader object
/// \return Status
static Status Open(const std::shared_ptr<io::InputStream>& stream,
- std::shared_ptr<RecordBatchStreamReader>* reader);
+ std::shared_ptr<RecordBatchStreamReader>* out);
std::shared_ptr<Schema> schema() const override;
- Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) override;
+ Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) override;
private:
RecordBatchStreamReader();
@@ -122,7 +130,7 @@ class ARROW_EXPORT RecordBatchFileReader {
/// \param(in) i the index of the record batch to return
/// \param(out) batch the read batch
/// \return Status
- Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);
+ Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);
private:
RecordBatchFileReader();
@@ -133,16 +141,25 @@ class ARROW_EXPORT RecordBatchFileReader {
// Generic read functions; does not copy data if the input supports zero copy reads
-/// Read record batch from file given metadata and schema
+/// \brief Read record batch from file given metadata and schema
///
/// \param(in) metadata a Message containing the record batch metadata
/// \param(in) schema the record batch schema
/// \param(in) file a random access file
/// \param(out) out the read record batch
-Status ARROW_EXPORT ReadRecordBatch(const Message& metadata,
+Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata,
const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file,
std::shared_ptr<RecordBatch>* out);
+/// \brief Read record batch from fully encapulated Message
+///
+/// \param[in] message a message instance containing metadata and body
+/// \param[in] schema
+/// \param[out] out the resulting RecordBatch
+/// \return Status
+Status ARROW_EXPORT ReadRecordBatch(const Message& message,
+ const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out);
+
/// Read record batch from file given metadata and schema
///
/// \param(in) metadata a Message containing the record batch metadata
@@ -150,7 +167,7 @@ Status ARROW_EXPORT ReadRecordBatch(const Message& metadata,
/// \param(in) file a random access file
/// \param(in) max_recursion_depth the maximum permitted nesting depth
/// \param(out) out the read record batch
-Status ARROW_EXPORT ReadRecordBatch(const Message& metadata,
+Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata,
const std::shared_ptr<Schema>& schema, int max_recursion_depth,
io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/stream-to-file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream-to-file.cc b/cpp/src/arrow/ipc/stream-to-file.cc
index b942054..de65883 100644
--- a/cpp/src/arrow/ipc/stream-to-file.cc
+++ b/cpp/src/arrow/ipc/stream-to-file.cc
@@ -40,7 +40,7 @@ Status ConvertToFile() {
std::shared_ptr<RecordBatch> batch;
while (true) {
- RETURN_NOT_OK(reader->GetNextRecordBatch(&batch));
+ RETURN_NOT_OK(reader->ReadNextRecordBatch(&batch));
if (batch == nullptr) break;
RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 7563343..14708a1 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -110,7 +110,7 @@ class RecordBatchSerializer : public ArrayVisitor {
}
// push back all common elements
- field_nodes_.emplace_back(arr.length(), arr.null_count(), 0);
+ field_nodes_.push_back({arr.length(), arr.null_count(), 0});
if (arr.null_count() > 0) {
std::shared_ptr<Buffer> bitmap;
@@ -680,7 +680,7 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl {
Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) {
// Push an empty FileBlock. Can be written in the footer later
- record_batches_.emplace_back(0, 0, 0);
+ record_batches_.push_back({0, 0, 0});
return WriteRecordBatch(
batch, allow_64bit, &record_batches_[record_batches_.size() - 1]);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/python/builtin_convert.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/builtin_convert.cc b/cpp/src/arrow/python/builtin_convert.cc
index f10dac7..816f95a 100644
--- a/cpp/src/arrow/python/builtin_convert.cc
+++ b/cpp/src/arrow/python/builtin_convert.cc
@@ -477,9 +477,8 @@ class FixedWidthBytesConverter
inline Status AppendItem(const OwnedRef& item) {
PyObject* bytes_obj;
OwnedRef tmp;
- Py_ssize_t expected_length =
- std::dynamic_pointer_cast<FixedSizeBinaryType>(typed_builder_->type())
- ->byte_width();
+ Py_ssize_t expected_length = std::dynamic_pointer_cast<FixedSizeBinaryType>(
+ typed_builder_->type())->byte_width();
if (item.obj() == Py_None) {
RETURN_NOT_OK(typed_builder_->AppendNull());
return Status::OK();
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/doc/source/api.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index 4810a31..400614d 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -137,7 +137,6 @@ Tables and Record Batches
Column
RecordBatch
Table
- get_record_batch_size
.. _api.tensor:
@@ -148,9 +147,6 @@ Tensor type and Functions
:toctree: generated/
Tensor
- write_tensor
- get_tensor_size
- read_tensor
.. _api.io:
@@ -177,12 +173,20 @@ Interprocess Communication and Messaging
.. autosummary::
:toctree: generated/
+ Message
+ MessageReader
RecordBatchFileReader
RecordBatchFileWriter
RecordBatchStreamReader
RecordBatchStreamWriter
open_file
open_stream
+ read_message
+ read_record_batch
+ get_record_batch_size
+ read_tensor
+ write_tensor
+ get_tensor_size
.. _api.memory_pool:
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 434722c..f7cddd0 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -69,9 +69,8 @@ from pyarrow.lib import (null, bool_,
from pyarrow.lib import (HdfsFile, NativeFile, PythonFile,
Buffer, BufferReader, BufferOutputStream,
OSFile, MemoryMappedFile, memory_map,
- frombuffer, read_tensor, write_tensor,
+ frombuffer,
memory_map, create_memory_map,
- get_record_batch_size, get_tensor_size,
have_libhdfs, have_libhdfs3, MockOutputStream)
from pyarrow.lib import (MemoryPool, total_allocated_bytes,
@@ -89,8 +88,12 @@ from pyarrow.lib import (ArrowException,
from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
-from pyarrow.ipc import (RecordBatchFileReader, RecordBatchFileWriter,
+from pyarrow.ipc import (Message, MessageReader,
+ RecordBatchFileReader, RecordBatchFileWriter,
RecordBatchStreamReader, RecordBatchStreamWriter,
+ read_message, read_record_batch, read_tensor,
+ write_tensor,
+ get_record_batch_size, get_tensor_size,
open_stream,
open_file,
serialize_pandas, deserialize_pandas)
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/pyarrow/feather.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/feather.pxi b/python/pyarrow/feather.pxi
new file mode 100644
index 0000000..2e7cf6c
--- /dev/null
+++ b/python/pyarrow/feather.pxi
@@ -0,0 +1,109 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#----------------------------------------------------------------------
+# Implement legacy Feather file format
+
+
+class FeatherError(Exception):
+ pass
+
+
+cdef class FeatherWriter:
+ cdef:
+ unique_ptr[CFeatherWriter] writer
+
+ cdef public:
+ int64_t num_rows
+
+ def __cinit__(self):
+ self.num_rows = -1
+
+ def open(self, object dest):
+ cdef shared_ptr[OutputStream] sink
+ get_writer(dest, &sink)
+
+ with nogil:
+ check_status(CFeatherWriter.Open(sink, &self.writer))
+
+ def close(self):
+ if self.num_rows < 0:
+ self.num_rows = 0
+ self.writer.get().SetNumRows(self.num_rows)
+ check_status(self.writer.get().Finalize())
+
+ def write_array(self, object name, object col, object mask=None):
+ cdef Array arr
+
+ if self.num_rows >= 0:
+ if len(col) != self.num_rows:
+ raise ValueError('prior column had a different number of rows')
+ else:
+ self.num_rows = len(col)
+
+ if isinstance(col, Array):
+ arr = col
+ else:
+ arr = Array.from_pandas(col, mask=mask)
+
+ cdef c_string c_name = tobytes(name)
+
+ with nogil:
+ check_status(
+ self.writer.get().Append(c_name, deref(arr.sp_array)))
+
+
+cdef class FeatherReader:
+ cdef:
+ unique_ptr[CFeatherReader] reader
+
+ def __cinit__(self):
+ pass
+
+ def open(self, source):
+ cdef shared_ptr[RandomAccessFile] reader
+ get_reader(source, &reader)
+
+ with nogil:
+ check_status(CFeatherReader.Open(reader, &self.reader))
+
+ property num_rows:
+
+ def __get__(self):
+ return self.reader.get().num_rows()
+
+ property num_columns:
+
+ def __get__(self):
+ return self.reader.get().num_columns()
+
+ def get_column_name(self, int i):
+ cdef c_string name = self.reader.get().GetColumnName(i)
+ return frombytes(name)
+
+ def get_column(self, int i):
+ if i < 0 or i >= self.num_columns:
+ raise IndexError(i)
+
+ cdef shared_ptr[CColumn] sp_column
+ with nogil:
+ check_status(self.reader.get()
+ .GetColumn(i, &sp_column))
+
+ cdef Column col = Column()
+ col.init(sp_column)
+ return col
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 9fad824..dd791cd 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -546,41 +546,41 @@ cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil:
int64_t GetExtentBytesWritten()
-cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil:
- cdef cppclass SchemaMessage:
- int num_fields()
- CStatus GetField(int i, shared_ptr[CField]* out)
- CStatus GetSchema(shared_ptr[CSchema]* out)
-
- cdef cppclass FieldMetadata:
- pass
-
- cdef cppclass BufferMetadata:
- pass
-
- cdef cppclass RecordBatchMessage:
- pass
-
- cdef cppclass DictionaryBatchMessage:
- pass
-
+cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
enum MessageType" arrow::ipc::Message::Type":
MessageType_SCHEMA" arrow::ipc::Message::SCHEMA"
MessageType_RECORD_BATCH" arrow::ipc::Message::RECORD_BATCH"
MessageType_DICTIONARY_BATCH" arrow::ipc::Message::DICTIONARY_BATCH"
- cdef cppclass Message:
- CStatus Open(const shared_ptr[CBuffer]& buf,
- shared_ptr[Message]* out)
- int64_t body_length()
+ enum MetadataVersion" arrow::ipc::MetadataVersion":
+ MessageType_V1" arrow::ipc::MetadataVersion::V1"
+ MessageType_V2" arrow::ipc::MetadataVersion::V2"
+ MessageType_V3" arrow::ipc::MetadataVersion::V3"
+
+ cdef cppclass CMessage" arrow::ipc::Message":
+ CStatus Open(const shared_ptr[CBuffer]& metadata,
+ const shared_ptr[CBuffer]& body,
+ unique_ptr[CMessage]* out)
+
+ shared_ptr[CBuffer] body()
+
+ c_bool Equals(const CMessage& other)
+
+ shared_ptr[CBuffer] metadata()
+ MetadataVersion metadata_version()
MessageType type()
- shared_ptr[SchemaMessage] GetSchema()
- shared_ptr[RecordBatchMessage] GetRecordBatch()
- shared_ptr[DictionaryBatchMessage] GetDictionaryBatch()
+ CStatus SerializeTo(OutputStream* stream, int64_t* output_length)
+ c_string FormatMessageType(MessageType type)
-cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
+ cdef cppclass CMessageReader \
+ " arrow::ipc::MessageReader":
+ CStatus ReadNextMessage(unique_ptr[CMessage]* out)
+
+ cdef cppclass CInputStreamMessageReader \
+ " arrow::ipc::InputStreamMessageReader":
+ CInputStreamMessageReader(const shared_ptr[InputStream]& stream)
cdef cppclass CRecordBatchWriter \
" arrow::ipc::RecordBatchWriter":
@@ -590,7 +590,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
cdef cppclass CRecordBatchReader \
" arrow::ipc::RecordBatchReader":
shared_ptr[CSchema] schema()
- CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch)
+ CStatus ReadNextRecordBatch(shared_ptr[CRecordBatch]* batch)
cdef cppclass CRecordBatchStreamReader \
" arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader):
@@ -598,6 +598,10 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
CStatus Open(const shared_ptr[InputStream]& stream,
shared_ptr[CRecordBatchStreamReader]* out)
+ @staticmethod
+ CStatus Open2" Open"(unique_ptr[CMessageReader] message_reader,
+ shared_ptr[CRecordBatchStreamReader]* out)
+
cdef cppclass CRecordBatchStreamWriter \
" arrow::ipc::RecordBatchStreamWriter"(CRecordBatchWriter):
@staticmethod
@@ -625,7 +629,9 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
int num_record_batches()
- CStatus GetRecordBatch(int i, shared_ptr[CRecordBatch]* batch)
+ CStatus ReadRecordBatch(int i, shared_ptr[CRecordBatch]* batch)
+
+ CStatus ReadMessage(InputStream* stream, unique_ptr[CMessage]* message)
CStatus GetRecordBatchSize(const CRecordBatch& batch, int64_t* size)
CStatus GetTensorSize(const CTensor& tensor, int64_t* size)
@@ -637,6 +643,10 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
CStatus ReadTensor(int64_t offset, RandomAccessFile* file,
shared_ptr[CTensor]* out)
+ CStatus ReadRecordBatch(const CMessage& message,
+ const shared_ptr[CSchema]& schema,
+ shared_ptr[CRecordBatch]* out)
+
cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil:
http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/pyarrow/io.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 3221185..8b213a3 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -956,356 +956,3 @@ cdef class HdfsFile(NativeFile):
def __dealloc__(self):
self.parent = None
-
-# ----------------------------------------------------------------------
-# File and stream readers and writers
-
-cdef class _RecordBatchWriter:
- cdef:
- shared_ptr[CRecordBatchWriter] writer
- shared_ptr[OutputStream] sink
- bint closed
-
- def __cinit__(self):
- self.closed = True
-
- def __dealloc__(self):
- if not self.closed:
- self.close()
-
- def _open(self, sink, Schema schema):
- cdef:
- shared_ptr[CRecordBatchStreamWriter] writer
-
- get_writer(sink, &self.sink)
-
- with nogil:
- check_status(
- CRecordBatchStreamWriter.Open(self.sink.get(),
- schema.sp_schema,
- &writer))
-
- self.writer = <shared_ptr[CRecordBatchWriter]> writer
- self.closed = False
-
- def write_batch(self, RecordBatch batch):
- with nogil:
- check_status(self.writer.get()
- .WriteRecordBatch(deref(batch.batch)))
-
- def close(self):
- with nogil:
- check_status(self.writer.get().Close())
- self.closed = True
-
-
-cdef class _RecordBatchReader:
- cdef:
- shared_ptr[CRecordBatchReader] reader
-
- cdef readonly:
- Schema schema
-
- def __cinit__(self):
- pass
-
- def _open(self, source):
- cdef:
- shared_ptr[RandomAccessFile] file_handle
- shared_ptr[InputStream] in_stream
- shared_ptr[CRecordBatchStreamReader] reader
-
- get_reader(source, &file_handle)
- in_stream = <shared_ptr[InputStream]> file_handle
-
- with nogil:
- check_status(CRecordBatchStreamReader.Open(in_stream, &reader))
-
- self.reader = <shared_ptr[CRecordBatchReader]> reader
- self.schema = pyarrow_wrap_schema(self.reader.get().schema())
-
- def get_next_batch(self):
- """
- Read next RecordBatch from the stream. Raises StopIteration at end of
- stream
- """
- cdef shared_ptr[CRecordBatch] batch
-
- with nogil:
- check_status(self.reader.get().GetNextRecordBatch(&batch))
-
- if batch.get() == NULL:
- raise StopIteration
-
- return pyarrow_wrap_batch(batch)
-
- def read_all(self):
- """
- Read all record batches as a pyarrow.Table
- """
- cdef:
- vector[shared_ptr[CRecordBatch]] batches
- shared_ptr[CRecordBatch] batch
- shared_ptr[CTable] table
-
- with nogil:
- while True:
- check_status(self.reader.get().GetNextRecordBatch(&batch))
- if batch.get() == NULL:
- break
- batches.push_back(batch)
-
- check_status(CTable.FromRecordBatches(batches, &table))
-
- return pyarrow_wrap_table(table)
-
-
-cdef class _RecordBatchFileWriter(_RecordBatchWriter):
-
- def _open(self, sink, Schema schema):
- cdef shared_ptr[CRecordBatchFileWriter] writer
- get_writer(sink, &self.sink)
-
- with nogil:
- check_status(
- CRecordBatchFileWriter.Open(self.sink.get(), schema.sp_schema,
- &writer))
-
- # Cast to base class, because has same interface
- self.writer = <shared_ptr[CRecordBatchWriter]> writer
- self.closed = False
-
-
-cdef class _RecordBatchFileReader:
- cdef:
- shared_ptr[CRecordBatchFileReader] reader
-
- cdef readonly:
- Schema schema
-
- def __cinit__(self):
- pass
-
- def _open(self, source, footer_offset=None):
- cdef shared_ptr[RandomAccessFile] reader
- get_reader(source, &reader)
-
- cdef int64_t offset = 0
- if footer_offset is not None:
- offset = footer_offset
-
- with nogil:
- if offset != 0:
- check_status(CRecordBatchFileReader.Open2(
- reader, offset, &self.reader))
- else:
- check_status(CRecordBatchFileReader.Open(reader, &self.reader))
-
- self.schema = pyarrow_wrap_schema(self.reader.get().schema())
-
- property num_record_batches:
-
- def __get__(self):
- return self.reader.get().num_record_batches()
-
- def get_batch(self, int i):
- cdef shared_ptr[CRecordBatch] batch
-
- if i < 0 or i >= self.num_record_batches:
- raise ValueError('Batch number {0} out of range'.format(i))
-
- with nogil:
- check_status(self.reader.get().GetRecordBatch(i, &batch))
-
- return pyarrow_wrap_batch(batch)
-
- # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
- # time has passed
- get_record_batch = get_batch
-
- def read_all(self):
- """
- Read all record batches as a pyarrow.Table
- """
- cdef:
- vector[shared_ptr[CRecordBatch]] batches
- shared_ptr[CTable] table
- int i, nbatches
-
- nbatches = self.num_record_batches
-
- batches.resize(nbatches)
- with nogil:
- for i in range(nbatches):
- check_status(self.reader.get().GetRecordBatch(i, &batches[i]))
- check_status(CTable.FromRecordBatches(batches, &table))
-
- return pyarrow_wrap_table(table)
-
-
-#----------------------------------------------------------------------
-# Implement legacy Feather file format
-
-
-class FeatherError(Exception):
- pass
-
-
-cdef class FeatherWriter:
- cdef:
- unique_ptr[CFeatherWriter] writer
-
- cdef public:
- int64_t num_rows
-
- def __cinit__(self):
- self.num_rows = -1
-
- def open(self, object dest):
- cdef shared_ptr[OutputStream] sink
- get_writer(dest, &sink)
-
- with nogil:
- check_status(CFeatherWriter.Open(sink, &self.writer))
-
- def close(self):
- if self.num_rows < 0:
- self.num_rows = 0
- self.writer.get().SetNumRows(self.num_rows)
- check_status(self.writer.get().Finalize())
-
- def write_array(self, object name, object col, object mask=None):
- cdef Array arr
-
- if self.num_rows >= 0:
- if len(col) != self.num_rows:
- raise ValueError('prior column had a different number of rows')
- else:
- self.num_rows = len(col)
-
- if isinstance(col, Array):
- arr = col
- else:
- arr = Array.from_pandas(col, mask=mask)
-
- cdef c_string c_name = tobytes(name)
-
- with nogil:
- check_status(
- self.writer.get().Append(c_name, deref(arr.sp_array)))
-
-
-cdef class FeatherReader:
- cdef:
- unique_ptr[CFeatherReader] reader
-
- def __cinit__(self):
- pass
-
- def open(self, source):
- cdef shared_ptr[RandomAccessFile] reader
- get_reader(source, &reader)
-
- with nogil:
- check_status(CFeatherReader.Open(reader, &self.reader))
-
- property num_rows:
-
- def __get__(self):
- return self.reader.get().num_rows()
-
- property num_columns:
-
- def __get__(self):
- return self.reader.get().num_columns()
-
- def get_column_name(self, int i):
- cdef c_string name = self.reader.get().GetColumnName(i)
- return frombytes(name)
-
- def get_column(self, int i):
- if i < 0 or i >= self.num_columns:
- raise IndexError(i)
-
- cdef shared_ptr[CColumn] sp_column
- with nogil:
- check_status(self.reader.get()
- .GetColumn(i, &sp_column))
-
- cdef Column col = Column()
- col.init(sp_column)
- return col
-
-
-def get_tensor_size(Tensor tensor):
- """
- Return total size of serialized Tensor including metadata and padding
- """
- cdef int64_t size
- with nogil:
- check_status(GetTensorSize(deref(tensor.tp), &size))
- return size
-
-
-def get_record_batch_size(RecordBatch batch):
- """
- Return total size of serialized RecordBatch including metadata and padding
- """
- cdef int64_t size
- with nogil:
- check_status(GetRecordBatchSize(deref(batch.batch), &size))
- return size
-
-
-def write_tensor(Tensor tensor, NativeFile dest):
- """
- Write pyarrow.Tensor to pyarrow.NativeFile object its current position
-
- Parameters
- ----------
- tensor : pyarrow.Tensor
- dest : pyarrow.NativeFile
-
- Returns
- -------
- bytes_written : int
- Total number of bytes written to the file
- """
- cdef:
- int32_t metadata_length
- int64_t body_length
-
- dest._assert_writeable()
-
- with nogil:
- check_status(
- WriteTensor(deref(tensor.tp), dest.wr_file.get(),
- &metadata_length, &body_length))
-
- return metadata_length + body_length
-
-
-def read_tensor(NativeFile source):
- """
- Read pyarrow.Tensor from pyarrow.NativeFile object from current
- position. If the file source supports zero copy (e.g. a memory map), then
- this operation does not allocate any memory
-
- Parameters
- ----------
- source : pyarrow.NativeFile
-
- Returns
- -------
- tensor : Tensor
- """
- cdef:
- shared_ptr[CTensor] sp_tensor
-
- source._assert_readable()
-
- cdef int64_t offset = source.tell()
- with nogil:
- check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor))
-
- return pyarrow_wrap_tensor(sp_tensor)