You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/05/14 12:55:32 UTC
arrow git commit: ARROW-1008: [C++] Add abstract stream writer and
reader C++ APIs. Give clearer names to IPC reader/writer classes
Repository: arrow
Updated Branches:
refs/heads/master 99ff24089 -> 5739e04b3
ARROW-1008: [C++] Add abstract stream writer and reader C++ APIs. Give clearer names to IPC reader/writer classes
The main motivation for this patch was to make `StreamReader` and `StreamWriter` abstract, so that other implementations can be created. I would also like to add the option for asynchronous reading and writing.
I also added a CMake option `ARROW_NO_DEPRECATED_API` for more graceful name deprecations.
@kou do you think these names for the IPC classes are more clear?
Author: Wes McKinney <we...@twosigma.com>
Closes #679 from wesm/ARROW-1008 and squashes the following commits:
d7b7c9ce [Wes McKinney] Add missing dtors for pimpl pattern
a797ee3e [Wes McKinney] Fix glib
04fa2854 [Wes McKinney] Feedback on ipc reader/writer names. Add open_stream/open_file Python APIs
22346d47 [Wes McKinney] Fix unit tests
10837a65 [Wes McKinney] Add abstract stream writer and reader C++ APIs. Rename record batch stream reader and writer classes for better clarity
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/5739e04b
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/5739e04b
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/5739e04b
Branch: refs/heads/master
Commit: 5739e04b35aeb5be9df7e9aace866ba48ecbac8a
Parents: 99ff240
Author: Wes McKinney <we...@twosigma.com>
Authored: Sun May 14 08:55:26 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sun May 14 08:55:26 2017 -0400
----------------------------------------------------------------------
c_glib/arrow-glib/stream-reader.cpp | 20 ++--
c_glib/arrow-glib/stream-reader.h | 2 +-
c_glib/arrow-glib/stream-reader.hpp | 4 +-
c_glib/arrow-glib/writer.cpp | 16 +--
c_glib/arrow-glib/writer.h | 2 +-
c_glib/arrow-glib/writer.hpp | 8 +-
ci/travis_before_script_cpp.sh | 2 +
cpp/CMakeLists.txt | 8 ++
cpp/src/arrow/ipc/file-to-stream.cc | 12 ++-
cpp/src/arrow/ipc/ipc-read-write-test.cc | 25 ++---
cpp/src/arrow/ipc/json-integration-test.cc | 13 +--
cpp/src/arrow/ipc/reader.cc | 54 +++++-----
cpp/src/arrow/ipc/reader.h | 107 ++++++++++++++-----
cpp/src/arrow/ipc/stream-to-file.cc | 12 ++-
cpp/src/arrow/ipc/writer.cc | 70 +++++++------
cpp/src/arrow/ipc/writer.h | 132 ++++++++++++++++--------
python/doc/source/api.rst | 10 +-
python/doc/source/ipc.rst | 23 +++--
python/pyarrow/__init__.py | 5 +-
python/pyarrow/includes/libarrow.pxd | 38 ++++---
python/pyarrow/io.pxi | 48 +++++----
python/pyarrow/ipc.py | 46 ++++++++-
python/pyarrow/tests/test_ipc.py | 20 ++--
23 files changed, 433 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/stream-reader.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/stream-reader.cpp b/c_glib/arrow-glib/stream-reader.cpp
index cc18cd8..19c36c2 100644
--- a/c_glib/arrow-glib/stream-reader.cpp
+++ b/c_glib/arrow-glib/stream-reader.cpp
@@ -43,7 +43,7 @@ G_BEGIN_DECLS
*/
typedef struct GArrowStreamReaderPrivate_ {
- std::shared_ptr<arrow::ipc::StreamReader> stream_reader;
+ std::shared_ptr<arrow::ipc::RecordBatchStreamReader> stream_reader;
} GArrowStreamReaderPrivate;
enum {
@@ -85,7 +85,7 @@ garrow_stream_reader_set_property(GObject *object,
switch (prop_id) {
case PROP_STREAM_READER:
priv->stream_reader =
- *static_cast<std::shared_ptr<arrow::ipc::StreamReader> *>(g_value_get_pointer(value));
+ *static_cast<std::shared_ptr<arrow::ipc::RecordBatchStreamReader> *>(g_value_get_pointer(value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
@@ -124,8 +124,8 @@ garrow_stream_reader_class_init(GArrowStreamReaderClass *klass)
gobject_class->get_property = garrow_stream_reader_get_property;
spec = g_param_spec_pointer("stream-reader",
- "ipc::StreamReader",
- "The raw std::shared<arrow::ipc::StreamReader> *",
+ "ipc::RecordBatchStreamReader",
+ "The raw std::shared<arrow::ipc::RecordBatchStreamReader> *",
static_cast<GParamFlags>(G_PARAM_WRITABLE |
G_PARAM_CONSTRUCT_ONLY));
g_object_class_install_property(gobject_class, PROP_STREAM_READER, spec);
@@ -143,10 +143,10 @@ GArrowStreamReader *
garrow_stream_reader_new(GArrowInputStream *stream,
GError **error)
{
- std::shared_ptr<arrow::ipc::StreamReader> arrow_stream_reader;
+ std::shared_ptr<arrow::ipc::RecordBatchStreamReader> arrow_stream_reader;
auto status =
- arrow::ipc::StreamReader::Open(garrow_input_stream_get_raw(stream),
- &arrow_stream_reader);
+ arrow::ipc::RecordBatchStreamReader::Open(garrow_input_stream_get_raw(stream),
+ &arrow_stream_reader);
if (garrow_error_check(error, status, "[ipc][stream-reader][open]")) {
return garrow_stream_reader_new_raw(&arrow_stream_reader);
} else {
@@ -179,7 +179,7 @@ garrow_stream_reader_get_schema(GArrowStreamReader *stream_reader)
*/
GArrowRecordBatch *
garrow_stream_reader_get_next_record_batch(GArrowStreamReader *stream_reader,
- GError **error)
+ GError **error)
{
auto arrow_stream_reader =
garrow_stream_reader_get_raw(stream_reader);
@@ -202,7 +202,7 @@ garrow_stream_reader_get_next_record_batch(GArrowStreamReader *stream_reader,
G_END_DECLS
GArrowStreamReader *
-garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::StreamReader> *arrow_stream_reader)
+garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamReader> *arrow_stream_reader)
{
auto stream_reader =
GARROW_STREAM_READER(g_object_new(GARROW_TYPE_STREAM_READER,
@@ -211,7 +211,7 @@ garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::StreamReader> *arrow_st
return stream_reader;
}
-std::shared_ptr<arrow::ipc::StreamReader>
+std::shared_ptr<arrow::ipc::RecordBatchStreamReader>
garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader)
{
GArrowStreamReaderPrivate *priv;
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/stream-reader.h
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/stream-reader.h b/c_glib/arrow-glib/stream-reader.h
index 2ea2c26..f6cdaea 100644
--- a/c_glib/arrow-glib/stream-reader.h
+++ b/c_glib/arrow-glib/stream-reader.h
@@ -55,7 +55,7 @@ typedef struct _GArrowStreamReaderClass GArrowStreamReaderClass;
/**
* GArrowStreamReader:
*
- * It wraps `arrow::ipc::StreamReader`.
+ * It wraps `arrow::ipc::InputStreamReader`.
*/
struct _GArrowStreamReader
{
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/stream-reader.hpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/stream-reader.hpp b/c_glib/arrow-glib/stream-reader.hpp
index ca8e689..5191b4e 100644
--- a/c_glib/arrow-glib/stream-reader.hpp
+++ b/c_glib/arrow-glib/stream-reader.hpp
@@ -24,5 +24,5 @@
#include <arrow-glib/stream-reader.h>
-GArrowStreamReader *garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::StreamReader> *arrow_stream_reader);
-std::shared_ptr<arrow::ipc::StreamReader> garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader);
+GArrowStreamReader *garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamReader> *arrow_stream_reader);
+std::shared_ptr<arrow::ipc::RecordBatchStreamReader> garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader);
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/writer.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/writer.cpp b/c_glib/arrow-glib/writer.cpp
index 625a19e..092993b 100644
--- a/c_glib/arrow-glib/writer.cpp
+++ b/c_glib/arrow-glib/writer.cpp
@@ -47,7 +47,7 @@ G_BEGIN_DECLS
*/
typedef struct GArrowStreamWriterPrivate_ {
- std::shared_ptr<arrow::ipc::StreamWriter> stream_writer;
+ std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> stream_writer;
} GArrowStreamWriterPrivate;
enum {
@@ -89,7 +89,7 @@ garrow_stream_writer_set_property(GObject *object,
switch (prop_id) {
case PROP_STREAM_WRITER:
priv->stream_writer =
- *static_cast<std::shared_ptr<arrow::ipc::StreamWriter> *>(g_value_get_pointer(value));
+ *static_cast<std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> *>(g_value_get_pointer(value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
@@ -128,8 +128,8 @@ garrow_stream_writer_class_init(GArrowStreamWriterClass *klass)
gobject_class->get_property = garrow_stream_writer_get_property;
spec = g_param_spec_pointer("stream-writer",
- "ipc::StreamWriter",
- "The raw std::shared<arrow::ipc::StreamWriter> *",
+ "ipc::RecordBatchStreamWriter",
+ "The raw std::shared<arrow::ipc::RecordBatchStreamWriter> *",
static_cast<GParamFlags>(G_PARAM_WRITABLE |
G_PARAM_CONSTRUCT_ONLY));
g_object_class_install_property(gobject_class, PROP_STREAM_WRITER, spec);
@@ -149,11 +149,11 @@ garrow_stream_writer_new(GArrowOutputStream *sink,
GArrowSchema *schema,
GError **error)
{
- std::shared_ptr<arrow::ipc::StreamWriter> arrow_stream_writer;
+ std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> arrow_stream_writer;
auto status =
- arrow::ipc::StreamWriter::Open(garrow_output_stream_get_raw(sink).get(),
- garrow_schema_get_raw(schema),
- &arrow_stream_writer);
+ arrow::ipc::RecordBatchStreamWriter::Open(garrow_output_stream_get_raw(sink).get(),
+ garrow_schema_get_raw(schema),
+ &arrow_stream_writer);
if (garrow_error_check(error, status, "[ipc][stream-writer][open]")) {
return garrow_stream_writer_new_raw(&arrow_stream_writer);
} else {
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/writer.h
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/writer.h b/c_glib/arrow-glib/writer.h
index 2aaa776..2f8e90c 100644
--- a/c_glib/arrow-glib/writer.h
+++ b/c_glib/arrow-glib/writer.h
@@ -56,7 +56,7 @@ typedef struct _GArrowStreamWriterClass GArrowStreamWriterClass;
/**
* GArrowStreamWriter:
*
- * It wraps `arrow::ipc::StreamWriter`.
+ * It wraps `arrow::ipc::RecordBatchStreamWriter`.
*/
struct _GArrowStreamWriter
{
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/writer.hpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/writer.hpp b/c_glib/arrow-glib/writer.hpp
index 199f205..47f5e68 100644
--- a/c_glib/arrow-glib/writer.hpp
+++ b/c_glib/arrow-glib/writer.hpp
@@ -24,8 +24,8 @@
#include <arrow-glib/writer.h>
-GArrowStreamWriter *garrow_stream_writer_new_raw(std::shared_ptr<arrow::ipc::StreamWriter> *arrow_stream_writer);
-std::shared_ptr<arrow::ipc::StreamWriter> garrow_stream_writer_get_raw(GArrowStreamWriter *stream_writer);
+GArrowStreamWriter *garrow_stream_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> *arrow_stream_writer);
+std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> garrow_stream_writer_get_raw(GArrowStreamWriter *stream_writer);
-GArrowFileWriter *garrow_file_writer_new_raw(std::shared_ptr<arrow::ipc::FileWriter> *arrow_file_writer);
-arrow::ipc::FileWriter *garrow_file_writer_get_raw(GArrowFileWriter *file_writer);
+GArrowFileWriter *garrow_file_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchFileWriter> *arrow_file_writer);
+arrow::ipc::RecordBatchFileWriter *garrow_file_writer_get_raw(GArrowFileWriter *file_writer);
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/ci/travis_before_script_cpp.sh
----------------------------------------------------------------------
diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh
index 3f9f67c..7d4ecb7 100755
--- a/ci/travis_before_script_cpp.sh
+++ b/ci/travis_before_script_cpp.sh
@@ -38,10 +38,12 @@ if [ $TRAVIS_OS_NAME == "linux" ]; then
cmake -DARROW_TEST_MEMCHECK=on \
$CMAKE_COMMON_FLAGS \
-DARROW_CXXFLAGS="-Wconversion -Werror" \
+ -DARROW_NO_DEPRECATED_API=on \
$ARROW_CPP_DIR
else
cmake $CMAKE_COMMON_FLAGS \
-DARROW_CXXFLAGS=-Werror \
+ -DARROW_NO_DEPRECATED_API=on \
$ARROW_CPP_DIR
fi
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 6b2ceec..0ad7ef5 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -89,6 +89,10 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
"Build the Arrow micro benchmarks"
OFF)
+ option(ARROW_NO_DEPRECATED_API
+ "Exclude deprecated APIs from build"
+ OFF)
+
option(ARROW_IPC
"Build the Arrow IPC extensions"
ON)
@@ -154,6 +158,10 @@ include(BuildUtils)
include(SetupCxxFlags)
+if (ARROW_NO_DEPRECATED_API)
+ add_definitions(-DARROW_NO_DEPRECATED_API)
+endif()
+
# Add common flags
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_COMMON_FLAGS}")
set(EP_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/file-to-stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file-to-stream.cc b/cpp/src/arrow/ipc/file-to-stream.cc
index 8161b19..39c720c 100644
--- a/cpp/src/arrow/ipc/file-to-stream.cc
+++ b/cpp/src/arrow/ipc/file-to-stream.cc
@@ -24,18 +24,19 @@
#include "arrow/util/io-util.h"
namespace arrow {
+namespace ipc {
// Reads a file on the file system and prints to stdout the stream version of it.
Status ConvertToStream(const char* path) {
std::shared_ptr<io::ReadableFile> in_file;
- std::shared_ptr<ipc::FileReader> reader;
+ std::shared_ptr<RecordBatchFileReader> reader;
RETURN_NOT_OK(io::ReadableFile::Open(path, &in_file));
- RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader));
+ RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader));
io::StdoutStream sink;
- std::shared_ptr<ipc::StreamWriter> writer;
- RETURN_NOT_OK(ipc::StreamWriter::Open(&sink, reader->schema(), &writer));
+ std::shared_ptr<RecordBatchStreamWriter> writer;
+ RETURN_NOT_OK(RecordBatchStreamWriter::Open(&sink, reader->schema(), &writer));
for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> chunk;
RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
@@ -44,6 +45,7 @@ Status ConvertToStream(const char* path) {
return writer->Close();
}
+} // namespace ipc
} // namespace arrow
int main(int argc, char** argv) {
@@ -51,7 +53,7 @@ int main(int argc, char** argv) {
std::cerr << "Usage: file-to-stream <input arrow file>" << std::endl;
return 1;
}
- arrow::Status status = arrow::ConvertToStream(argv[1]);
+ arrow::Status status = arrow::ipc::ConvertToStream(argv[1]);
if (!status.ok()) {
std::cerr << "Could not convert to stream: " << status.ToString() << std::endl;
return 1;
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/ipc-read-write-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index b4a88b5..c99816c 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -140,16 +140,16 @@ class IpcTestFixture : public io::MemoryMapFixture {
if (zero_data) { RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); }
RETURN_NOT_OK(mmap_->Seek(0));
- std::shared_ptr<FileWriter> file_writer;
- RETURN_NOT_OK(FileWriter::Open(mmap_.get(), batch.schema(), &file_writer));
+ std::shared_ptr<RecordBatchFileWriter> file_writer;
+ RETURN_NOT_OK(RecordBatchFileWriter::Open(mmap_.get(), batch.schema(), &file_writer));
RETURN_NOT_OK(file_writer->WriteRecordBatch(batch, true));
RETURN_NOT_OK(file_writer->Close());
int64_t offset;
RETURN_NOT_OK(mmap_->Tell(&offset));
- std::shared_ptr<FileReader> file_reader;
- RETURN_NOT_OK(FileReader::Open(mmap_, offset, &file_reader));
+ std::shared_ptr<RecordBatchFileReader> file_reader;
+ RETURN_NOT_OK(RecordBatchFileReader::Open(mmap_, offset, &file_reader));
return file_reader->GetRecordBatch(0, result);
}
@@ -487,8 +487,9 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) {
// Write the file
- std::shared_ptr<FileWriter> writer;
- RETURN_NOT_OK(FileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer));
+ std::shared_ptr<RecordBatchFileWriter> writer;
+ RETURN_NOT_OK(
+ RecordBatchFileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer));
const int num_batches = static_cast<int>(in_batches.size());
@@ -504,8 +505,8 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
// Open the file
auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
- std::shared_ptr<FileReader> reader;
- RETURN_NOT_OK(FileReader::Open(buf_reader, footer_offset, &reader));
+ std::shared_ptr<RecordBatchFileReader> reader;
+ RETURN_NOT_OK(RecordBatchFileReader::Open(buf_reader, footer_offset, &reader));
EXPECT_EQ(num_batches, reader->num_record_batches());
for (int i = 0; i < num_batches; ++i) {
@@ -553,8 +554,8 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
Status RoundTripHelper(
const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>* out_batches) {
// Write the file
- std::shared_ptr<StreamWriter> writer;
- RETURN_NOT_OK(StreamWriter::Open(sink_.get(), batch.schema(), &writer));
+ std::shared_ptr<RecordBatchStreamWriter> writer;
+ RETURN_NOT_OK(RecordBatchStreamWriter::Open(sink_.get(), batch.schema(), &writer));
int num_batches = 5;
for (int i = 0; i < num_batches; ++i) {
RETURN_NOT_OK(writer->WriteRecordBatch(batch));
@@ -565,8 +566,8 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
// Open the file
auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
- std::shared_ptr<StreamReader> reader;
- RETURN_NOT_OK(StreamReader::Open(buf_reader, &reader));
+ std::shared_ptr<RecordBatchStreamReader> reader;
+ RETURN_NOT_OK(RecordBatchStreamReader::Open(buf_reader, &reader));
std::shared_ptr<RecordBatch> chunk;
while (true) {
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/json-integration-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc
index aa95500..424755a 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -76,8 +76,9 @@ static Status ConvertJsonToArrow(
std::cout << "Found schema: " << reader->schema()->ToString() << std::endl;
}
- std::shared_ptr<ipc::FileWriter> writer;
- RETURN_NOT_OK(ipc::FileWriter::Open(out_file.get(), reader->schema(), &writer));
+ std::shared_ptr<ipc::RecordBatchFileWriter> writer;
+ RETURN_NOT_OK(
+ ipc::RecordBatchFileWriter::Open(out_file.get(), reader->schema(), &writer));
for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> batch;
@@ -96,8 +97,8 @@ static Status ConvertArrowToJson(
RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &in_file));
RETURN_NOT_OK(io::FileOutputStream::Open(json_path, &out_file));
- std::shared_ptr<ipc::FileReader> reader;
- RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader));
+ std::shared_ptr<ipc::RecordBatchFileReader> reader;
+ RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader));
if (FLAGS_verbose) {
std::cout << "Found schema: " << reader->schema()->ToString() << std::endl;
@@ -137,8 +138,8 @@ static Status ValidateArrowVsJson(
std::shared_ptr<io::ReadableFile> arrow_file;
RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &arrow_file));
- std::shared_ptr<ipc::FileReader> arrow_reader;
- RETURN_NOT_OK(ipc::FileReader::Open(arrow_file, &arrow_reader));
+ std::shared_ptr<ipc::RecordBatchFileReader> arrow_reader;
+ RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(arrow_file, &arrow_reader));
auto json_schema = json_reader->schema();
auto arrow_schema = arrow_reader->schema();
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index aea4c9c..2b7b90f 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -156,7 +156,7 @@ Status ReadDictionary(const Message& metadata, const DictionaryTypeMap& dictiona
}
// ----------------------------------------------------------------------
-// StreamReader implementation
+// RecordBatchStreamReader implementation
static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength());
@@ -176,10 +176,12 @@ static inline std::string message_type_name(Message::Type type) {
return "unknown";
}
-class StreamReader::StreamReaderImpl {
+RecordBatchReader::~RecordBatchReader() {}
+
+class RecordBatchStreamReader::RecordBatchStreamReaderImpl {
public:
- StreamReaderImpl() {}
- ~StreamReaderImpl() {}
+ RecordBatchStreamReaderImpl() {}
+ ~RecordBatchStreamReaderImpl() {}
Status Open(const std::shared_ptr<io::InputStream>& stream) {
stream_ = stream;
@@ -267,33 +269,33 @@ class StreamReader::StreamReaderImpl {
std::shared_ptr<Schema> schema_;
};
-StreamReader::StreamReader() {
- impl_.reset(new StreamReaderImpl());
+RecordBatchStreamReader::RecordBatchStreamReader() {
+ impl_.reset(new RecordBatchStreamReaderImpl());
}
-StreamReader::~StreamReader() {}
+RecordBatchStreamReader::~RecordBatchStreamReader() {}
-Status StreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
- std::shared_ptr<StreamReader>* reader) {
+Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
+ std::shared_ptr<RecordBatchStreamReader>* reader) {
// Private ctor
- *reader = std::shared_ptr<StreamReader>(new StreamReader());
+ *reader = std::shared_ptr<RecordBatchStreamReader>(new RecordBatchStreamReader());
return (*reader)->impl_->Open(stream);
}
-std::shared_ptr<Schema> StreamReader::schema() const {
+std::shared_ptr<Schema> RecordBatchStreamReader::schema() const {
return impl_->schema();
}
-Status StreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
+Status RecordBatchStreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
return impl_->GetNextRecordBatch(batch);
}
// ----------------------------------------------------------------------
// Reader implementation
-class FileReader::FileReaderImpl {
+class RecordBatchFileReader::RecordBatchFileReaderImpl {
public:
- FileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); }
+ RecordBatchFileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); }
Status ReadFooter() {
int magic_size = static_cast<int>(strlen(kArrowMagicBytes));
@@ -432,38 +434,38 @@ class FileReader::FileReaderImpl {
std::shared_ptr<Schema> schema_;
};
-FileReader::FileReader() {
- impl_.reset(new FileReaderImpl());
+RecordBatchFileReader::RecordBatchFileReader() {
+ impl_.reset(new RecordBatchFileReaderImpl());
}
-FileReader::~FileReader() {}
+RecordBatchFileReader::~RecordBatchFileReader() {}
-Status FileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
- std::shared_ptr<FileReader>* reader) {
+Status RecordBatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
+ std::shared_ptr<RecordBatchFileReader>* reader) {
int64_t footer_offset;
RETURN_NOT_OK(file->GetSize(&footer_offset));
return Open(file, footer_offset, reader);
}
-Status FileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
- int64_t footer_offset, std::shared_ptr<FileReader>* reader) {
- *reader = std::shared_ptr<FileReader>(new FileReader());
+Status RecordBatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
+ int64_t footer_offset, std::shared_ptr<RecordBatchFileReader>* reader) {
+ *reader = std::shared_ptr<RecordBatchFileReader>(new RecordBatchFileReader());
return (*reader)->impl_->Open(file, footer_offset);
}
-std::shared_ptr<Schema> FileReader::schema() const {
+std::shared_ptr<Schema> RecordBatchFileReader::schema() const {
return impl_->schema();
}
-int FileReader::num_record_batches() const {
+int RecordBatchFileReader::num_record_batches() const {
return impl_->num_record_batches();
}
-MetadataVersion FileReader::version() const {
+MetadataVersion RecordBatchFileReader::version() const {
return impl_->version();
}
-Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
+Status RecordBatchFileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
return impl_->GetRecordBatch(i, batch);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/reader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index 1972446..dd29a36 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -44,29 +44,50 @@ class RandomAccessFile;
namespace ipc {
-class ARROW_EXPORT StreamReader {
+/// \brief Abstract interface for reading stream of record batches
+class ARROW_EXPORT RecordBatchReader {
public:
- ~StreamReader();
+ virtual ~RecordBatchReader();
- // Open an stream.
- static Status Open(const std::shared_ptr<io::InputStream>& stream,
- std::shared_ptr<StreamReader>* reader);
+ /// \return the shared schema of the record batches in the stream
+ virtual std::shared_ptr<Schema> schema() const = 0;
- std::shared_ptr<Schema> schema() const;
+ /// Read the next record batch in the stream. Return nullptr for batch when
+ /// reaching end of stream
+ ///
+ /// \param(out) batch the next loaded batch, nullptr at end of stream
+ /// \return Status
+ virtual Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) = 0;
+};
+
+/// \class RecordBatchStreamReader
+/// \brief Synchronous batch stream reader that reads from io::InputStream
+class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader {
+ public:
+ virtual ~RecordBatchStreamReader();
- // Returned batch is nullptr when end of stream reached
- Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch);
+ /// Create batch reader from InputStream
+ ///
+ /// \param(in) stream an input stream instance
+ /// \param(out) reader the created reader object
+ /// \return Status
+ static Status Open(const std::shared_ptr<io::InputStream>& stream,
+ std::shared_ptr<RecordBatchStreamReader>* reader);
+
+ std::shared_ptr<Schema> schema() const override;
+ Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) override;
private:
- StreamReader();
+ RecordBatchStreamReader();
- class ARROW_NO_EXPORT StreamReaderImpl;
- std::unique_ptr<StreamReaderImpl> impl_;
+ class ARROW_NO_EXPORT RecordBatchStreamReaderImpl;
+ std::unique_ptr<RecordBatchStreamReaderImpl> impl_;
};
-class ARROW_EXPORT FileReader {
+/// \brief Reads the record batch file format
+class ARROW_EXPORT RecordBatchFileReader {
public:
- ~FileReader();
+ ~RecordBatchFileReader();
// Open a file-like object that is assumed to be self-contained; i.e., the
// end of the file interface is the end of the Arrow file. Note that there
@@ -74,7 +95,7 @@ class ARROW_EXPORT FileReader {
// need only locate the end of the Arrow file stream to discover the metadata
// and then proceed to read the data into memory.
static Status Open(const std::shared_ptr<io::RandomAccessFile>& file,
- std::shared_ptr<FileReader>* reader);
+ std::shared_ptr<RecordBatchFileReader>* reader);
// If the file is embedded within some larger file or memory region, you can
// pass the absolute memory offset to the end of the file (which contains the
@@ -84,46 +105,80 @@ class ARROW_EXPORT FileReader {
// @param file: the data source
// @param footer_offset: the position of the end of the Arrow "file"
static Status Open(const std::shared_ptr<io::RandomAccessFile>& file,
- int64_t footer_offset, std::shared_ptr<FileReader>* reader);
+ int64_t footer_offset, std::shared_ptr<RecordBatchFileReader>* reader);
/// The schema includes any dictionaries
std::shared_ptr<Schema> schema() const;
+ /// Returns number of record batches in the file
int num_record_batches() const;
+ /// Returns MetadataVersion in the file metadata
MetadataVersion version() const;
- // Read a record batch from the file. Does not copy memory if the input
- // source supports zero-copy.
- //
- // TODO(wesm): Make the copy/zero-copy behavior configurable (e.g. provide an
- // "always copy" option)
+ /// Read a record batch from the file. Does not copy memory if the input
+ /// source supports zero-copy.
+ ///
+ /// \param(in) i the index of the record batch to return
+ /// \param(out) batch the read batch
+ /// \return Status
Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);
private:
- FileReader();
+ RecordBatchFileReader();
- class ARROW_NO_EXPORT FileReaderImpl;
- std::unique_ptr<FileReaderImpl> impl_;
+ class ARROW_NO_EXPORT RecordBatchFileReaderImpl;
+ std::unique_ptr<RecordBatchFileReaderImpl> impl_;
};
-// Generic read functionsh; does not copy data if the input supports zero copy reads
+// Generic read functions; does not copy data if the input supports zero copy reads
+
+/// Read record batch from file given metadata and schema
+///
+/// \param(in) metadata a Message containing the record batch metadata
+/// \param(in) schema the record batch schema
+/// \param(in) file a random access file
+/// \param(out) out the read record batch
Status ARROW_EXPORT ReadRecordBatch(const Message& metadata,
const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file,
std::shared_ptr<RecordBatch>* out);
+/// Read record batch from file given metadata and schema
+///
+/// \param(in) metadata a Message containing the record batch metadata
+/// \param(in) schema the record batch schema
+/// \param(in) file a random access file
+/// \param(in) max_recursion_depth the maximum permitted nesting depth
+/// \param(out) out the read record batch
Status ARROW_EXPORT ReadRecordBatch(const Message& metadata,
const std::shared_ptr<Schema>& schema, int max_recursion_depth,
io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
-/// Read encapsulated message and RecordBatch
+/// Read record batch as encapsulated IPC message with metadata size prefix and
+/// header
+///
+/// \param(in) schema the record batch schema
+/// \param(in) offset the file location of the start of the message
+/// \param(in) file the file where the batch is located
+/// \param(out) out the read record batch
Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset,
io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
-/// EXPERIMENTAL: Read arrow::Tensor from a contiguous message
+/// EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file
+///
+/// \param(in) offset the file location of the start of the message
+/// \param(in) file the file where the batch is located
+/// \param(out) out the read tensor
Status ARROW_EXPORT ReadTensor(
int64_t offset, io::RandomAccessFile* file, std::shared_ptr<Tensor>* out);
+/// Backwards-compatibility for Arrow < 0.4.0
+///
+#ifndef ARROW_NO_DEPRECATED_API
+using StreamReader = RecordBatchReader;
+using FileReader = RecordBatchFileReader;
+#endif
+
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/stream-to-file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream-to-file.cc b/cpp/src/arrow/ipc/stream-to-file.cc
index ec0ac43..b942054 100644
--- a/cpp/src/arrow/ipc/stream-to-file.cc
+++ b/cpp/src/arrow/ipc/stream-to-file.cc
@@ -24,18 +24,19 @@
#include "arrow/util/io-util.h"
namespace arrow {
+namespace ipc {
// Converts a stream from stdin to a file written to standard out.
// A typical usage would be:
// $ <program that produces streaming output> | stream-to-file > file.arrow
Status ConvertToFile() {
std::shared_ptr<io::InputStream> input(new io::StdinStream);
- std::shared_ptr<ipc::StreamReader> reader;
- RETURN_NOT_OK(ipc::StreamReader::Open(input, &reader));
+ std::shared_ptr<RecordBatchStreamReader> reader;
+ RETURN_NOT_OK(RecordBatchStreamReader::Open(input, &reader));
io::StdoutStream sink;
- std::shared_ptr<ipc::FileWriter> writer;
- RETURN_NOT_OK(ipc::FileWriter::Open(&sink, reader->schema(), &writer));
+ std::shared_ptr<RecordBatchFileWriter> writer;
+ RETURN_NOT_OK(RecordBatchFileWriter::Open(&sink, reader->schema(), &writer));
std::shared_ptr<RecordBatch> batch;
while (true) {
@@ -46,10 +47,11 @@ Status ConvertToFile() {
return writer->Close();
}
+} // namespace ipc
} // namespace arrow
int main(int argc, char** argv) {
- arrow::Status status = arrow::ConvertToFile();
+ arrow::Status status = arrow::ipc::ConvertToFile();
if (!status.ok()) {
std::cerr << "Could not convert to file: " << status.ToString() << std::endl;
return 1;
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 78d6b9e..ced0710 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -88,9 +88,9 @@ static inline bool NeedTruncate(
return offset != 0 || min_length < buffer->size();
}
-class RecordBatchWriter : public ArrayVisitor {
+class RecordBatchSerializer : public ArrayVisitor {
public:
- RecordBatchWriter(MemoryPool* pool, int64_t buffer_start_offset,
+ RecordBatchSerializer(MemoryPool* pool, int64_t buffer_start_offset,
int max_recursion_depth, bool allow_64bit)
: pool_(pool),
max_recursion_depth_(max_recursion_depth),
@@ -99,7 +99,7 @@ class RecordBatchWriter : public ArrayVisitor {
DCHECK_GT(max_recursion_depth, 0);
}
- virtual ~RecordBatchWriter() = default;
+ virtual ~RecordBatchSerializer() = default;
Status VisitArray(const Array& arr) {
if (max_recursion_depth_ <= 0) {
@@ -480,9 +480,9 @@ class RecordBatchWriter : public ArrayVisitor {
bool allow_64bit_;
};
-class DictionaryWriter : public RecordBatchWriter {
+class DictionaryWriter : public RecordBatchSerializer {
public:
- using RecordBatchWriter::RecordBatchWriter;
+ using RecordBatchSerializer::RecordBatchSerializer;
Status WriteMetadataMessage(
int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override {
@@ -500,7 +500,7 @@ class DictionaryWriter : public RecordBatchWriter {
auto schema = std::make_shared<Schema>(fields);
RecordBatch batch(schema, dictionary->length(), {dictionary});
- return RecordBatchWriter::Write(batch, dst, metadata_length, body_length);
+ return RecordBatchSerializer::Write(batch, dst, metadata_length, body_length);
}
private:
@@ -521,7 +521,8 @@ Status AlignStreamPosition(io::OutputStream* stream) {
Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
MemoryPool* pool, int max_recursion_depth, bool allow_64bit) {
- RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth, allow_64bit);
+ RecordBatchSerializer writer(
+ pool, buffer_start_offset, max_recursion_depth, allow_64bit);
return writer.Write(batch, dst, metadata_length, body_length);
}
@@ -581,17 +582,21 @@ Status GetTensorSize(const Tensor& tensor, int64_t* size) {
}
// ----------------------------------------------------------------------
+
+RecordBatchWriter::~RecordBatchWriter() {}
+
+// ----------------------------------------------------------------------
// Stream writer implementation
-class StreamWriter::StreamWriterImpl {
+class RecordBatchStreamWriter::RecordBatchStreamWriterImpl {
public:
- StreamWriterImpl()
+ RecordBatchStreamWriterImpl()
: dictionary_memo_(std::make_shared<DictionaryMemo>()),
pool_(default_memory_pool()),
position_(-1),
started_(false) {}
- virtual ~StreamWriterImpl() = default;
+ virtual ~RecordBatchStreamWriterImpl() = default;
Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema) {
sink_ = sink;
@@ -721,37 +726,40 @@ class StreamWriter::StreamWriterImpl {
std::vector<FileBlock> record_batches_;
};
-StreamWriter::StreamWriter() {
- impl_.reset(new StreamWriterImpl());
+RecordBatchStreamWriter::RecordBatchStreamWriter() {
+ impl_.reset(new RecordBatchStreamWriterImpl());
}
-Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) {
+RecordBatchStreamWriter::~RecordBatchStreamWriter() {}
+
+Status RecordBatchStreamWriter::WriteRecordBatch(
+ const RecordBatch& batch, bool allow_64bit) {
return impl_->WriteRecordBatch(batch, allow_64bit);
}
-StreamWriter::~StreamWriter() {}
-
-void StreamWriter::set_memory_pool(MemoryPool* pool) {
+void RecordBatchStreamWriter::set_memory_pool(MemoryPool* pool) {
impl_->set_memory_pool(pool);
}
-Status StreamWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
- std::shared_ptr<StreamWriter>* out) {
+Status RecordBatchStreamWriter::Open(io::OutputStream* sink,
+ const std::shared_ptr<Schema>& schema,
+ std::shared_ptr<RecordBatchStreamWriter>* out) {
// ctor is private
- *out = std::shared_ptr<StreamWriter>(new StreamWriter());
+ *out = std::shared_ptr<RecordBatchStreamWriter>(new RecordBatchStreamWriter());
return (*out)->impl_->Open(sink, schema);
}
-Status StreamWriter::Close() {
+Status RecordBatchStreamWriter::Close() {
return impl_->Close();
}
// ----------------------------------------------------------------------
// File writer implementation
-class FileWriter::FileWriterImpl : public StreamWriter::StreamWriterImpl {
+class RecordBatchFileWriter::RecordBatchFileWriterImpl
+ : public RecordBatchStreamWriter::RecordBatchStreamWriterImpl {
public:
- using BASE = StreamWriter::StreamWriterImpl;
+ using BASE = RecordBatchStreamWriter::RecordBatchStreamWriterImpl;
Status Start() override {
RETURN_NOT_OK(WriteAligned(
@@ -783,23 +791,25 @@ class FileWriter::FileWriterImpl : public StreamWriter::StreamWriterImpl {
}
};
-FileWriter::FileWriter() {
- impl_.reset(new FileWriterImpl());
+RecordBatchFileWriter::RecordBatchFileWriter() {
+ impl_.reset(new RecordBatchFileWriterImpl());
}
-FileWriter::~FileWriter() {}
+RecordBatchFileWriter::~RecordBatchFileWriter() {}
-Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
- std::shared_ptr<FileWriter>* out) {
- *out = std::shared_ptr<FileWriter>(new FileWriter()); // ctor is private
+Status RecordBatchFileWriter::Open(io::OutputStream* sink,
+ const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatchFileWriter>* out) {
+ *out = std::shared_ptr<RecordBatchFileWriter>(
+ new RecordBatchFileWriter()); // ctor is private
return (*out)->impl_->Open(sink, schema);
}
-Status FileWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) {
+Status RecordBatchFileWriter::WriteRecordBatch(
+ const RecordBatch& batch, bool allow_64bit) {
return impl_->WriteRecordBatch(batch, allow_64bit);
}
-Status FileWriter::Close() {
+Status RecordBatchFileWriter::Close() {
return impl_->Close();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index b71becb..899a1b2 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -46,6 +46,84 @@ class OutputStream;
namespace ipc {
+/// \class RecordBatchWriter
+/// \brief Abstract interface for writing a stream of record batches
+class ARROW_EXPORT RecordBatchWriter {
+ public:
+ virtual ~RecordBatchWriter();
+
+ /// Write a record batch to the stream
+ ///
+ /// \param allow_64bit boolean permitting field lengths exceeding INT32_MAX
+ /// \return Status indicate success or failure
+ virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) = 0;
+
+ /// Perform any logic necessary to finish the stream
+ ///
+ /// \return Status indicate success or failure
+ virtual Status Close() = 0;
+
+ /// In some cases, writing may require memory allocation. We use the default
+ /// memory pool, but provide the option to override
+ ///
+ /// \param pool the memory pool to use for required allocations
+ virtual void set_memory_pool(MemoryPool* pool) = 0;
+};
+
+/// \class RecordBatchStreamWriter
+/// \brief Synchronous batch stream writer that writes the Arrow streaming
+/// format
+class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter {
+ public:
+ virtual ~RecordBatchStreamWriter();
+
+ /// Create a new writer from stream sink and schema. User is responsible for
+ /// closing the actual OutputStream.
+ ///
+ /// \param(in) sink output stream to write to
+ /// \param(in) schema the schema of the record batches to be written
+ /// \param(out) out the created stream writer
+ /// \return Status indicating success or failure
+ static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+ std::shared_ptr<RecordBatchStreamWriter>* out);
+
+ Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
+ Status Close() override;
+ void set_memory_pool(MemoryPool* pool) override;
+
+ protected:
+ RecordBatchStreamWriter();
+ class ARROW_NO_EXPORT RecordBatchStreamWriterImpl;
+ std::unique_ptr<RecordBatchStreamWriterImpl> impl_;
+};
+
+/// \brief Creates the Arrow record batch file format
+///
+/// Implements the random access file format, which structurally is a record
+/// batch stream followed by a metadata footer at the end of the file. Magic
+/// numbers are written at the start and end of the file
+class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
+ public:
+ virtual ~RecordBatchFileWriter();
+
+ /// Create a new writer from stream sink and schema
+ ///
+ /// \param(in) sink output stream to write to
+ /// \param(in) schema the schema of the record batches to be written
+ /// \param(out) out the created stream writer
+ /// \return Status indicating success or failure
+ static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+ std::shared_ptr<RecordBatchFileWriter>* out);
+
+ Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
+ Status Close() override;
+
+ private:
+ RecordBatchFileWriter();
+ class ARROW_NO_EXPORT RecordBatchFileWriterImpl;
+ std::unique_ptr<RecordBatchFileWriterImpl> impl_;
+};
+
/// Write the RecordBatch (collection of equal-length Arrow arrays) to the
/// output stream in a contiguous block. The record batch metadata is written as
/// a flatbuffer (see format/Message.fbs -- the RecordBatch message type)
@@ -58,13 +136,13 @@ namespace ipc {
/// to the end of the body and end of the metadata / data header (suffixed by
/// the header size) is returned in out-variables
///
-/// @param(in) buffer_start_offset the start offset to use in the buffer metadata,
+/// \param(in) buffer_start_offset the start offset to use in the buffer metadata,
/// default should be 0
-/// @param(in) allow_64bit permit field lengths exceeding INT32_MAX. May not be
+/// \param(in) allow_64bit permit field lengths exceeding INT32_MAX. May not be
/// readable by other Arrow implementations
-/// @param(out) metadata_length: the size of the length-prefixed flatbuffer
+/// \param(out) metadata_length: the size of the length-prefixed flatbuffer
/// including padding to a 64-byte boundary
-/// @param(out) body_length: the size of the contiguous buffer block plus
+/// \param(out) body_length: the size of the contiguous buffer block plus
/// padding bytes
Status ARROW_EXPORT WriteRecordBatch(const RecordBatch& batch,
int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
@@ -85,45 +163,6 @@ Status ARROW_EXPORT GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
// write the tensor including metadata, padding, and data
Status ARROW_EXPORT GetTensorSize(const Tensor& tensor, int64_t* size);
-class ARROW_EXPORT StreamWriter {
- public:
- virtual ~StreamWriter();
-
- static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
- std::shared_ptr<StreamWriter>* out);
-
- virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false);
-
- /// Perform any logic necessary to finish the stream. User is responsible for
- /// closing the actual OutputStream
- virtual Status Close();
-
- // In some cases, writing may require memory allocation. We use the default
- // memory pool, but provide the option to override
- void set_memory_pool(MemoryPool* pool);
-
- protected:
- StreamWriter();
- class ARROW_NO_EXPORT StreamWriterImpl;
- std::unique_ptr<StreamWriterImpl> impl_;
-};
-
-class ARROW_EXPORT FileWriter : public StreamWriter {
- public:
- virtual ~FileWriter();
-
- static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
- std::shared_ptr<FileWriter>* out);
-
- Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
- Status Close() override;
-
- private:
- FileWriter();
- class ARROW_NO_EXPORT FileWriterImpl;
- std::unique_ptr<FileWriterImpl> impl_;
-};
-
/// EXPERIMENTAL: Write RecordBatch allowing lengths over INT32_MAX. This data
/// may not be readable by all Arrow implementations
Status ARROW_EXPORT WriteLargeRecordBatch(const RecordBatch& batch,
@@ -135,6 +174,13 @@ Status ARROW_EXPORT WriteLargeRecordBatch(const RecordBatch& batch,
Status ARROW_EXPORT WriteTensor(const Tensor& tensor, io::OutputStream* dst,
int32_t* metadata_length, int64_t* body_length);
+/// Backwards-compatibility for Arrow < 0.4.0
+///
+#ifndef ARROW_NO_DEPRECATED_API
+using FileWriter = RecordBatchFileWriter;
+using StreamWriter = RecordBatchStreamWriter;
+#endif
+
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/doc/source/api.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index a8dd8c5..e7bea70 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -177,10 +177,12 @@ Interprocess Communication and Messaging
.. autosummary::
:toctree: generated/
- FileReader
- FileWriter
- StreamReader
- StreamWriter
+ RecordBatchFileReader
+ RecordBatchFileWriter
+ RecordBatchStreamReader
+ RecordBatchStreamWriter
+ open_file
+ open_stream
.. _api.memory_pool:
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/doc/source/ipc.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/ipc.rst b/python/doc/source/ipc.rst
index e63e745..cce2ae8 100644
--- a/python/doc/source/ipc.rst
+++ b/python/doc/source/ipc.rst
@@ -55,13 +55,13 @@ First, let's create a small record batch:
batch.num_columns
Now, we can begin writing a stream containing some number of these batches. For
-this we use :class:`~pyarrow.StreamWriter`, which can write to a writeable
+this we use :class:`~pyarrow.BatchStreamWriter`, which can write to a writeable
``NativeFile`` object or a writeable Python object:
.. ipython:: python
sink = pa.InMemoryOutputStream()
- writer = pa.StreamWriter(sink, batch.schema)
+ writer = pa.BatchStreamWriter(sink, batch.schema)
Here we used an in-memory Arrow buffer stream, but this could have been a
socket or some other IO sink.
@@ -80,11 +80,11 @@ particular stream. Now we can do:
buf.size
Now ``buf`` contains the complete stream as an in-memory byte buffer. We can
-read such a stream with :class:`~pyarrow.StreamReader`:
+read such a stream with :class:`~pyarrow.BatchStreamReader`:
.. ipython:: python
- reader = pa.StreamReader(buf)
+ reader = pa.BatchStreamReader(buf)
reader.schema
batches = [b for b in reader]
@@ -103,13 +103,13 @@ batches are also zero-copy and do not allocate any new memory on read.
Writing and Reading Random Access Files
---------------------------------------
-The :class:`~pyarrow.FileWriter` has the same API as
-:class:`~pyarrow.StreamWriter`:
+The :class:`~pyarrow.BatchFileWriter` has the same API as
+:class:`~pyarrow.BatchStreamWriter`:
.. ipython:: python
sink = pa.InMemoryOutputStream()
- writer = pa.FileWriter(sink, batch.schema)
+ writer = pa.BatchFileWriter(sink, batch.schema)
for i in range(10):
writer.write_batch(batch)
@@ -118,13 +118,14 @@ The :class:`~pyarrow.FileWriter` has the same API as
buf = sink.get_result()
buf.size
-The difference between :class:`~pyarrow.FileReader` and
-:class:`~pyarrow.StreamReader` is that the input source must have a ``seek``
-method for random access. The stream reader only requires read operations:
+The difference between :class:`~pyarrow.BatchFileReader` and
+:class:`~pyarrow.BatchStreamReader` is that the input source must have a
+``seek`` method for random access. The stream reader only requires read
+operations:
.. ipython:: python
- reader = pa.FileReader(buf)
+ reader = pa.BatchFileReader(buf)
Because we have access to the entire payload, we know the number of record
batches in the file, and can read any at random:
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 7d79811..d6d2aa4 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -101,7 +101,10 @@ def jemalloc_memory_pool():
from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
-from pyarrow.ipc import FileReader, FileWriter, StreamReader, StreamWriter
+from pyarrow.ipc import (RecordBatchFileReader, RecordBatchFileWriter,
+ RecordBatchStreamReader, RecordBatchStreamWriter,
+ open_stream,
+ open_file)
localfs = LocalFilesystem.get_instance()
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 3d56c14..b03dd59 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -547,38 +547,44 @@ cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil:
cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
- cdef cppclass CStreamWriter " arrow::ipc::StreamWriter":
- @staticmethod
- CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
- shared_ptr[CStreamWriter]* out)
-
+ cdef cppclass CRecordBatchWriter \
+ " arrow::ipc::RecordBatchWriter":
CStatus Close()
CStatus WriteRecordBatch(const CRecordBatch& batch)
- cdef cppclass CStreamReader " arrow::ipc::StreamReader":
+ cdef cppclass CRecordBatchReader \
+ " arrow::ipc::RecordBatchReader":
+ shared_ptr[CSchema] schema()
+ CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch)
+ cdef cppclass CRecordBatchStreamReader \
+ " arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader):
@staticmethod
CStatus Open(const shared_ptr[InputStream]& stream,
- shared_ptr[CStreamReader]* out)
+ shared_ptr[CRecordBatchStreamReader]* out)
- shared_ptr[CSchema] schema()
-
- CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch)
-
- cdef cppclass CFileWriter " arrow::ipc::FileWriter"(CStreamWriter):
+ cdef cppclass CRecordBatchStreamWriter \
+ " arrow::ipc::RecordBatchStreamWriter"(CRecordBatchWriter):
@staticmethod
CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
- shared_ptr[CFileWriter]* out)
+ shared_ptr[CRecordBatchStreamWriter]* out)
- cdef cppclass CFileReader " arrow::ipc::FileReader":
+ cdef cppclass CRecordBatchFileWriter \
+ " arrow::ipc::RecordBatchFileWriter"(CRecordBatchWriter):
+ @staticmethod
+ CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
+ shared_ptr[CRecordBatchFileWriter]* out)
+ cdef cppclass CRecordBatchFileReader \
+ " arrow::ipc::RecordBatchFileReader":
@staticmethod
CStatus Open(const shared_ptr[RandomAccessFile]& file,
- shared_ptr[CFileReader]* out)
+ shared_ptr[CRecordBatchFileReader]* out)
@staticmethod
CStatus Open2" Open"(const shared_ptr[RandomAccessFile]& file,
- int64_t footer_offset, shared_ptr[CFileReader]* out)
+ int64_t footer_offset,
+ shared_ptr[CRecordBatchFileReader]* out)
shared_ptr[CSchema] schema()
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/io.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index a0a96e7..4cbf603 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -916,9 +916,9 @@ cdef class HdfsFile(NativeFile):
# ----------------------------------------------------------------------
# File and stream readers and writers
-cdef class _StreamWriter:
+cdef class _RecordBatchWriter:
cdef:
- shared_ptr[CStreamWriter] writer
+ shared_ptr[CRecordBatchWriter] writer
shared_ptr[OutputStream] sink
bint closed
@@ -930,12 +930,18 @@ cdef class _StreamWriter:
self.close()
def _open(self, sink, Schema schema):
+ cdef:
+ shared_ptr[CRecordBatchStreamWriter] writer
+
get_writer(sink, &self.sink)
with nogil:
- check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema,
- &self.writer))
+ check_status(
+ CRecordBatchStreamWriter.Open(self.sink.get(),
+ schema.sp_schema,
+ &writer))
+ self.writer = <shared_ptr[CRecordBatchWriter]> writer
self.closed = False
def write_batch(self, RecordBatch batch):
@@ -949,9 +955,9 @@ cdef class _StreamWriter:
self.closed = True
-cdef class _StreamReader:
+cdef class _RecordBatchReader:
cdef:
- shared_ptr[CStreamReader] reader
+ shared_ptr[CRecordBatchReader] reader
cdef readonly:
Schema schema
@@ -961,15 +967,17 @@ cdef class _StreamReader:
def _open(self, source):
cdef:
- shared_ptr[RandomAccessFile] reader
+ shared_ptr[RandomAccessFile] file_handle
shared_ptr[InputStream] in_stream
+ shared_ptr[CRecordBatchStreamReader] reader
- get_reader(source, &reader)
- in_stream = <shared_ptr[InputStream]> reader
+ get_reader(source, &file_handle)
+ in_stream = <shared_ptr[InputStream]> file_handle
with nogil:
- check_status(CStreamReader.Open(in_stream, &self.reader))
+ check_status(CRecordBatchStreamReader.Open(in_stream, &reader))
+ self.reader = <shared_ptr[CRecordBatchReader]> reader
self.schema = Schema()
self.schema.init_schema(self.reader.get().schema())
@@ -1009,24 +1017,25 @@ cdef class _StreamReader:
return pyarrow_wrap_table(table)
-cdef class _FileWriter(_StreamWriter):
+cdef class _RecordBatchFileWriter(_RecordBatchWriter):
def _open(self, sink, Schema schema):
- cdef shared_ptr[CFileWriter] writer
+ cdef shared_ptr[CRecordBatchFileWriter] writer
get_writer(sink, &self.sink)
with nogil:
- check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema,
- &writer))
+ check_status(
+ CRecordBatchFileWriter.Open(self.sink.get(), schema.sp_schema,
+ &writer))
# Cast to base class, because has same interface
- self.writer = <shared_ptr[CStreamWriter]> writer
+ self.writer = <shared_ptr[CRecordBatchWriter]> writer
self.closed = False
-cdef class _FileReader:
+cdef class _RecordBatchFileReader:
cdef:
- shared_ptr[CFileReader] reader
+ shared_ptr[CRecordBatchFileReader] reader
def __cinit__(self):
pass
@@ -1041,9 +1050,10 @@ cdef class _FileReader:
with nogil:
if offset != 0:
- check_status(CFileReader.Open2(reader, offset, &self.reader))
+ check_status(CRecordBatchFileReader.Open2(
+ reader, offset, &self.reader))
else:
- check_status(CFileReader.Open(reader, &self.reader))
+ check_status(CRecordBatchFileReader.Open(reader, &self.reader))
property num_record_batches:
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index c37a1ce..8338de3 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -20,7 +20,7 @@
import pyarrow.lib as lib
-class StreamReader(lib._StreamReader):
+class RecordBatchStreamReader(lib._RecordBatchReader):
"""
Reader for the Arrow streaming binary format
@@ -37,7 +37,7 @@ class StreamReader(lib._StreamReader):
yield self.get_next_batch()
-class StreamWriter(lib._StreamWriter):
+class RecordBatchStreamWriter(lib._RecordBatchWriter):
"""
Writer for the Arrow streaming binary format
@@ -52,7 +52,7 @@ class StreamWriter(lib._StreamWriter):
self._open(sink, schema)
-class FileReader(lib._FileReader):
+class RecordBatchFileReader(lib._RecordBatchFileReader):
"""
Class for reading Arrow record batch data from the Arrow binary file format
@@ -68,7 +68,7 @@ class FileReader(lib._FileReader):
self._open(source, footer_offset=footer_offset)
-class FileWriter(lib._FileWriter):
+class RecordBatchFileWriter(lib._RecordBatchFileWriter):
"""
Writer to create the Arrow binary file format
@@ -81,3 +81,41 @@ class FileWriter(lib._FileWriter):
"""
def __init__(self, sink, schema):
self._open(sink, schema)
+
+
+def open_stream(source):
+ """
+ Create reader for Arrow streaming format
+
+ Parameters
+ ----------
+ source : str, pyarrow.NativeFile, or file-like Python object
+ Either a file path, or a readable file object
+ footer_offset : int, default None
+ If the file is embedded in some larger file, this is the byte offset to
+ the very end of the file data
+
+ Returns
+ -------
+ reader : RecordBatchStreamReader
+ """
+ return RecordBatchStreamReader(source)
+
+
+def open_file(source, footer_offset=None):
+ """
+ Create reader for Arrow file format
+
+ Parameters
+ ----------
+ source : str, pyarrow.NativeFile, or file-like Python object
+ Either a file path, or a readable file object
+ footer_offset : int, default None
+ If the file is embedded in some larger file, this is the byte offset to
+ the very end of the file data
+
+ Returns
+ -------
+ reader : RecordBatchFileReader
+ """
+ return RecordBatchFileReader(source, footer_offset=footer_offset)
http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/tests/test_ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 0204067..4d19804 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -70,13 +70,13 @@ class TestFile(MessagingTest, unittest.TestCase):
# Also tests writing zero-copy NumPy array with additional padding
def _get_writer(self, sink, schema):
- return pa.FileWriter(sink, schema)
+ return pa.RecordBatchFileWriter(sink, schema)
def test_simple_roundtrip(self):
batches = self.write_batches()
file_contents = self._get_source()
- reader = pa.FileReader(file_contents)
+ reader = pa.open_file(file_contents)
assert reader.num_record_batches == len(batches)
@@ -89,7 +89,7 @@ class TestFile(MessagingTest, unittest.TestCase):
batches = self.write_batches()
file_contents = self._get_source()
- reader = pa.FileReader(file_contents)
+ reader = pa.open_file(file_contents)
result = reader.read_all()
expected = pa.Table.from_batches(batches)
@@ -99,12 +99,12 @@ class TestFile(MessagingTest, unittest.TestCase):
class TestStream(MessagingTest, unittest.TestCase):
def _get_writer(self, sink, schema):
- return pa.StreamWriter(sink, schema)
+ return pa.RecordBatchStreamWriter(sink, schema)
def test_simple_roundtrip(self):
batches = self.write_batches()
file_contents = self._get_source()
- reader = pa.StreamReader(file_contents)
+ reader = pa.open_stream(file_contents)
assert reader.schema.equals(batches[0].schema)
@@ -121,7 +121,7 @@ class TestStream(MessagingTest, unittest.TestCase):
def test_read_all(self):
batches = self.write_batches()
file_contents = self._get_source()
- reader = pa.StreamReader(file_contents)
+ reader = pa.open_stream(file_contents)
result = reader.read_all()
expected = pa.Table.from_batches(batches)
@@ -147,7 +147,7 @@ class TestSocket(MessagingTest, unittest.TestCase):
connection, client_address = self._sock.accept()
try:
source = connection.makefile(mode='rb')
- reader = pa.StreamReader(source)
+ reader = pa.open_stream(source)
self._schema = reader.schema
if self._do_read_all:
self._table = reader.read_all()
@@ -185,7 +185,7 @@ class TestSocket(MessagingTest, unittest.TestCase):
return self._sock.makefile(mode='wb')
def _get_writer(self, sink, schema):
- return pa.StreamWriter(sink, schema)
+ return pa.RecordBatchStreamWriter(sink, schema)
def test_simple_roundtrip(self):
self.start_server(do_read_all=False)
@@ -241,12 +241,12 @@ def test_get_record_batch_size():
def write_file(batch, sink):
- writer = pa.FileWriter(sink, batch.schema)
+ writer = pa.RecordBatchFileWriter(sink, batch.schema)
writer.write_batch(batch)
writer.close()
def read_file(source):
- reader = pa.FileReader(source)
+ reader = pa.open_file(source)
return [reader.get_batch(i)
for i in range(reader.num_record_batches)]