You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/05/14 12:55:32 UTC

arrow git commit: ARROW-1008: [C++] Add abstract stream writer and reader C++ APIs. Give clearer names to IPC reader/writer classes

Repository: arrow
Updated Branches:
  refs/heads/master 99ff24089 -> 5739e04b3


ARROW-1008: [C++] Add abstract stream writer and reader C++ APIs. Give clearer names to IPC reader/writer classes

The main motivation for this patch was to make `StreamReader` and `StreamWriter` abstract, so that other implementations can be created. I would also like to add the option for asynchronous reading and writing.

I also added a CMake option `ARROW_NO_DEPRECATED_API` for more graceful name deprecations.

@kou do you think these names for the IPC classes are more clear?

Author: Wes McKinney <we...@twosigma.com>

Closes #679 from wesm/ARROW-1008 and squashes the following commits:

d7b7c9ce [Wes McKinney] Add missing dtors for pimpl pattern
a797ee3e [Wes McKinney] Fix glib
04fa2854 [Wes McKinney] Feedback on ipc reader/writer names. Add open_stream/open_file Python APIs
22346d47 [Wes McKinney] Fix unit tests
10837a65 [Wes McKinney] Add abstract stream writer and reader C++ APIs. Rename record batch stream reader and writer classes for better clarity


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/5739e04b
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/5739e04b
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/5739e04b

Branch: refs/heads/master
Commit: 5739e04b35aeb5be9df7e9aace866ba48ecbac8a
Parents: 99ff240
Author: Wes McKinney <we...@twosigma.com>
Authored: Sun May 14 08:55:26 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sun May 14 08:55:26 2017 -0400

----------------------------------------------------------------------
 c_glib/arrow-glib/stream-reader.cpp        |  20 ++--
 c_glib/arrow-glib/stream-reader.h          |   2 +-
 c_glib/arrow-glib/stream-reader.hpp        |   4 +-
 c_glib/arrow-glib/writer.cpp               |  16 +--
 c_glib/arrow-glib/writer.h                 |   2 +-
 c_glib/arrow-glib/writer.hpp               |   8 +-
 ci/travis_before_script_cpp.sh             |   2 +
 cpp/CMakeLists.txt                         |   8 ++
 cpp/src/arrow/ipc/file-to-stream.cc        |  12 ++-
 cpp/src/arrow/ipc/ipc-read-write-test.cc   |  25 ++---
 cpp/src/arrow/ipc/json-integration-test.cc |  13 +--
 cpp/src/arrow/ipc/reader.cc                |  54 +++++-----
 cpp/src/arrow/ipc/reader.h                 | 107 ++++++++++++++-----
 cpp/src/arrow/ipc/stream-to-file.cc        |  12 ++-
 cpp/src/arrow/ipc/writer.cc                |  70 +++++++------
 cpp/src/arrow/ipc/writer.h                 | 132 ++++++++++++++++--------
 python/doc/source/api.rst                  |  10 +-
 python/doc/source/ipc.rst                  |  23 +++--
 python/pyarrow/__init__.py                 |   5 +-
 python/pyarrow/includes/libarrow.pxd       |  38 ++++---
 python/pyarrow/io.pxi                      |  48 +++++----
 python/pyarrow/ipc.py                      |  46 ++++++++-
 python/pyarrow/tests/test_ipc.py           |  20 ++--
 23 files changed, 433 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/stream-reader.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/stream-reader.cpp b/c_glib/arrow-glib/stream-reader.cpp
index cc18cd8..19c36c2 100644
--- a/c_glib/arrow-glib/stream-reader.cpp
+++ b/c_glib/arrow-glib/stream-reader.cpp
@@ -43,7 +43,7 @@ G_BEGIN_DECLS
  */
 
 typedef struct GArrowStreamReaderPrivate_ {
-  std::shared_ptr<arrow::ipc::StreamReader> stream_reader;
+  std::shared_ptr<arrow::ipc::RecordBatchStreamReader> stream_reader;
 } GArrowStreamReaderPrivate;
 
 enum {
@@ -85,7 +85,7 @@ garrow_stream_reader_set_property(GObject *object,
   switch (prop_id) {
   case PROP_STREAM_READER:
     priv->stream_reader =
-      *static_cast<std::shared_ptr<arrow::ipc::StreamReader> *>(g_value_get_pointer(value));
+      *static_cast<std::shared_ptr<arrow::ipc::RecordBatchStreamReader> *>(g_value_get_pointer(value));
     break;
   default:
     G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
@@ -124,8 +124,8 @@ garrow_stream_reader_class_init(GArrowStreamReaderClass *klass)
   gobject_class->get_property = garrow_stream_reader_get_property;
 
   spec = g_param_spec_pointer("stream-reader",
-                              "ipc::StreamReader",
-                              "The raw std::shared<arrow::ipc::StreamReader> *",
+                              "ipc::RecordBatchStreamReader",
+                              "The raw std::shared<arrow::ipc::RecordBatchStreamReader> *",
                               static_cast<GParamFlags>(G_PARAM_WRITABLE |
                                                        G_PARAM_CONSTRUCT_ONLY));
   g_object_class_install_property(gobject_class, PROP_STREAM_READER, spec);
@@ -143,10 +143,10 @@ GArrowStreamReader *
 garrow_stream_reader_new(GArrowInputStream *stream,
                          GError **error)
 {
-  std::shared_ptr<arrow::ipc::StreamReader> arrow_stream_reader;
+  std::shared_ptr<arrow::ipc::RecordBatchStreamReader> arrow_stream_reader;
   auto status =
-    arrow::ipc::StreamReader::Open(garrow_input_stream_get_raw(stream),
-                                   &arrow_stream_reader);
+    arrow::ipc::RecordBatchStreamReader::Open(garrow_input_stream_get_raw(stream),
+                                              &arrow_stream_reader);
   if (garrow_error_check(error, status, "[ipc][stream-reader][open]")) {
     return garrow_stream_reader_new_raw(&arrow_stream_reader);
   } else {
@@ -179,7 +179,7 @@ garrow_stream_reader_get_schema(GArrowStreamReader *stream_reader)
  */
 GArrowRecordBatch *
 garrow_stream_reader_get_next_record_batch(GArrowStreamReader *stream_reader,
-                                               GError **error)
+                                           GError **error)
 {
   auto arrow_stream_reader =
     garrow_stream_reader_get_raw(stream_reader);
@@ -202,7 +202,7 @@ garrow_stream_reader_get_next_record_batch(GArrowStreamReader *stream_reader,
 G_END_DECLS
 
 GArrowStreamReader *
-garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::StreamReader> *arrow_stream_reader)
+garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamReader> *arrow_stream_reader)
 {
   auto stream_reader =
     GARROW_STREAM_READER(g_object_new(GARROW_TYPE_STREAM_READER,
@@ -211,7 +211,7 @@ garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::StreamReader> *arrow_st
   return stream_reader;
 }
 
-std::shared_ptr<arrow::ipc::StreamReader>
+std::shared_ptr<arrow::ipc::RecordBatchStreamReader>
 garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader)
 {
   GArrowStreamReaderPrivate *priv;

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/stream-reader.h
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/stream-reader.h b/c_glib/arrow-glib/stream-reader.h
index 2ea2c26..f6cdaea 100644
--- a/c_glib/arrow-glib/stream-reader.h
+++ b/c_glib/arrow-glib/stream-reader.h
@@ -55,7 +55,7 @@ typedef struct _GArrowStreamReaderClass    GArrowStreamReaderClass;
 /**
  * GArrowStreamReader:
  *
- * It wraps `arrow::ipc::StreamReader`.
+ * It wraps `arrow::ipc::InputStreamReader`.
  */
 struct _GArrowStreamReader
 {

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/stream-reader.hpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/stream-reader.hpp b/c_glib/arrow-glib/stream-reader.hpp
index ca8e689..5191b4e 100644
--- a/c_glib/arrow-glib/stream-reader.hpp
+++ b/c_glib/arrow-glib/stream-reader.hpp
@@ -24,5 +24,5 @@
 
 #include <arrow-glib/stream-reader.h>
 
-GArrowStreamReader *garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::StreamReader> *arrow_stream_reader);
-std::shared_ptr<arrow::ipc::StreamReader> garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader);
+GArrowStreamReader *garrow_stream_reader_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamReader> *arrow_stream_reader);
+std::shared_ptr<arrow::ipc::RecordBatchStreamReader> garrow_stream_reader_get_raw(GArrowStreamReader *stream_reader);

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/writer.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/writer.cpp b/c_glib/arrow-glib/writer.cpp
index 625a19e..092993b 100644
--- a/c_glib/arrow-glib/writer.cpp
+++ b/c_glib/arrow-glib/writer.cpp
@@ -47,7 +47,7 @@ G_BEGIN_DECLS
  */
 
 typedef struct GArrowStreamWriterPrivate_ {
-  std::shared_ptr<arrow::ipc::StreamWriter> stream_writer;
+  std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> stream_writer;
 } GArrowStreamWriterPrivate;
 
 enum {
@@ -89,7 +89,7 @@ garrow_stream_writer_set_property(GObject *object,
   switch (prop_id) {
   case PROP_STREAM_WRITER:
     priv->stream_writer =
-      *static_cast<std::shared_ptr<arrow::ipc::StreamWriter> *>(g_value_get_pointer(value));
+      *static_cast<std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> *>(g_value_get_pointer(value));
     break;
   default:
     G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
@@ -128,8 +128,8 @@ garrow_stream_writer_class_init(GArrowStreamWriterClass *klass)
   gobject_class->get_property = garrow_stream_writer_get_property;
 
   spec = g_param_spec_pointer("stream-writer",
-                              "ipc::StreamWriter",
-                              "The raw std::shared<arrow::ipc::StreamWriter> *",
+                              "ipc::RecordBatchStreamWriter",
+                              "The raw std::shared<arrow::ipc::RecordBatchStreamWriter> *",
                               static_cast<GParamFlags>(G_PARAM_WRITABLE |
                                                        G_PARAM_CONSTRUCT_ONLY));
   g_object_class_install_property(gobject_class, PROP_STREAM_WRITER, spec);
@@ -149,11 +149,11 @@ garrow_stream_writer_new(GArrowOutputStream *sink,
                          GArrowSchema *schema,
                          GError **error)
 {
-  std::shared_ptr<arrow::ipc::StreamWriter> arrow_stream_writer;
+  std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> arrow_stream_writer;
   auto status =
-    arrow::ipc::StreamWriter::Open(garrow_output_stream_get_raw(sink).get(),
-                                 garrow_schema_get_raw(schema),
-                                 &arrow_stream_writer);
+    arrow::ipc::RecordBatchStreamWriter::Open(garrow_output_stream_get_raw(sink).get(),
+                                              garrow_schema_get_raw(schema),
+                                              &arrow_stream_writer);
   if (garrow_error_check(error, status, "[ipc][stream-writer][open]")) {
     return garrow_stream_writer_new_raw(&arrow_stream_writer);
   } else {

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/writer.h
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/writer.h b/c_glib/arrow-glib/writer.h
index 2aaa776..2f8e90c 100644
--- a/c_glib/arrow-glib/writer.h
+++ b/c_glib/arrow-glib/writer.h
@@ -56,7 +56,7 @@ typedef struct _GArrowStreamWriterClass    GArrowStreamWriterClass;
 /**
  * GArrowStreamWriter:
  *
- * It wraps `arrow::ipc::StreamWriter`.
+ * It wraps `arrow::ipc::RecordBatchStreamWriter`.
  */
 struct _GArrowStreamWriter
 {

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/c_glib/arrow-glib/writer.hpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/writer.hpp b/c_glib/arrow-glib/writer.hpp
index 199f205..47f5e68 100644
--- a/c_glib/arrow-glib/writer.hpp
+++ b/c_glib/arrow-glib/writer.hpp
@@ -24,8 +24,8 @@
 
 #include <arrow-glib/writer.h>
 
-GArrowStreamWriter *garrow_stream_writer_new_raw(std::shared_ptr<arrow::ipc::StreamWriter> *arrow_stream_writer);
-std::shared_ptr<arrow::ipc::StreamWriter> garrow_stream_writer_get_raw(GArrowStreamWriter *stream_writer);
+GArrowStreamWriter *garrow_stream_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> *arrow_stream_writer);
+std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> garrow_stream_writer_get_raw(GArrowStreamWriter *stream_writer);
 
-GArrowFileWriter *garrow_file_writer_new_raw(std::shared_ptr<arrow::ipc::FileWriter> *arrow_file_writer);
-arrow::ipc::FileWriter *garrow_file_writer_get_raw(GArrowFileWriter *file_writer);
+GArrowFileWriter *garrow_file_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchFileWriter> *arrow_file_writer);
+arrow::ipc::RecordBatchFileWriter *garrow_file_writer_get_raw(GArrowFileWriter *file_writer);

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/ci/travis_before_script_cpp.sh
----------------------------------------------------------------------
diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh
index 3f9f67c..7d4ecb7 100755
--- a/ci/travis_before_script_cpp.sh
+++ b/ci/travis_before_script_cpp.sh
@@ -38,10 +38,12 @@ if [ $TRAVIS_OS_NAME == "linux" ]; then
     cmake -DARROW_TEST_MEMCHECK=on \
           $CMAKE_COMMON_FLAGS \
           -DARROW_CXXFLAGS="-Wconversion -Werror" \
+          -DARROW_NO_DEPRECATED_API=on \
           $ARROW_CPP_DIR
 else
     cmake $CMAKE_COMMON_FLAGS \
           -DARROW_CXXFLAGS=-Werror \
+          -DARROW_NO_DEPRECATED_API=on \
           $ARROW_CPP_DIR
 fi
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 6b2ceec..0ad7ef5 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -89,6 +89,10 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
     "Build the Arrow micro benchmarks"
     OFF)
 
+  option(ARROW_NO_DEPRECATED_API
+    "Exclude deprecated APIs from build"
+    OFF)
+
   option(ARROW_IPC
     "Build the Arrow IPC extensions"
     ON)
@@ -154,6 +158,10 @@ include(BuildUtils)
 
 include(SetupCxxFlags)
 
+if (ARROW_NO_DEPRECATED_API)
+  add_definitions(-DARROW_NO_DEPRECATED_API)
+endif()
+
 # Add common flags
 set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_COMMON_FLAGS}")
 set(EP_CXX_FLAGS "${CMAKE_CXX_FLAGS}")

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/file-to-stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file-to-stream.cc b/cpp/src/arrow/ipc/file-to-stream.cc
index 8161b19..39c720c 100644
--- a/cpp/src/arrow/ipc/file-to-stream.cc
+++ b/cpp/src/arrow/ipc/file-to-stream.cc
@@ -24,18 +24,19 @@
 #include "arrow/util/io-util.h"
 
 namespace arrow {
+namespace ipc {
 
 // Reads a file on the file system and prints to stdout the stream version of it.
 Status ConvertToStream(const char* path) {
   std::shared_ptr<io::ReadableFile> in_file;
-  std::shared_ptr<ipc::FileReader> reader;
+  std::shared_ptr<RecordBatchFileReader> reader;
 
   RETURN_NOT_OK(io::ReadableFile::Open(path, &in_file));
-  RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader));
+  RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader));
 
   io::StdoutStream sink;
-  std::shared_ptr<ipc::StreamWriter> writer;
-  RETURN_NOT_OK(ipc::StreamWriter::Open(&sink, reader->schema(), &writer));
+  std::shared_ptr<RecordBatchStreamWriter> writer;
+  RETURN_NOT_OK(RecordBatchStreamWriter::Open(&sink, reader->schema(), &writer));
   for (int i = 0; i < reader->num_record_batches(); ++i) {
     std::shared_ptr<RecordBatch> chunk;
     RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
@@ -44,6 +45,7 @@ Status ConvertToStream(const char* path) {
   return writer->Close();
 }
 
+}  // namespace ipc
 }  // namespace arrow
 
 int main(int argc, char** argv) {
@@ -51,7 +53,7 @@ int main(int argc, char** argv) {
     std::cerr << "Usage: file-to-stream <input arrow file>" << std::endl;
     return 1;
   }
-  arrow::Status status = arrow::ConvertToStream(argv[1]);
+  arrow::Status status = arrow::ipc::ConvertToStream(argv[1]);
   if (!status.ok()) {
     std::cerr << "Could not convert to stream: " << status.ToString() << std::endl;
     return 1;

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/ipc-read-write-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index b4a88b5..c99816c 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -140,16 +140,16 @@ class IpcTestFixture : public io::MemoryMapFixture {
     if (zero_data) { RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); }
     RETURN_NOT_OK(mmap_->Seek(0));
 
-    std::shared_ptr<FileWriter> file_writer;
-    RETURN_NOT_OK(FileWriter::Open(mmap_.get(), batch.schema(), &file_writer));
+    std::shared_ptr<RecordBatchFileWriter> file_writer;
+    RETURN_NOT_OK(RecordBatchFileWriter::Open(mmap_.get(), batch.schema(), &file_writer));
     RETURN_NOT_OK(file_writer->WriteRecordBatch(batch, true));
     RETURN_NOT_OK(file_writer->Close());
 
     int64_t offset;
     RETURN_NOT_OK(mmap_->Tell(&offset));
 
-    std::shared_ptr<FileReader> file_reader;
-    RETURN_NOT_OK(FileReader::Open(mmap_, offset, &file_reader));
+    std::shared_ptr<RecordBatchFileReader> file_reader;
+    RETURN_NOT_OK(RecordBatchFileReader::Open(mmap_, offset, &file_reader));
 
     return file_reader->GetRecordBatch(0, result);
   }
@@ -487,8 +487,9 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
 
   Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) {
     // Write the file
-    std::shared_ptr<FileWriter> writer;
-    RETURN_NOT_OK(FileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer));
+    std::shared_ptr<RecordBatchFileWriter> writer;
+    RETURN_NOT_OK(
+        RecordBatchFileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer));
 
     const int num_batches = static_cast<int>(in_batches.size());
 
@@ -504,8 +505,8 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
 
     // Open the file
     auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
-    std::shared_ptr<FileReader> reader;
-    RETURN_NOT_OK(FileReader::Open(buf_reader, footer_offset, &reader));
+    std::shared_ptr<RecordBatchFileReader> reader;
+    RETURN_NOT_OK(RecordBatchFileReader::Open(buf_reader, footer_offset, &reader));
 
     EXPECT_EQ(num_batches, reader->num_record_batches());
     for (int i = 0; i < num_batches; ++i) {
@@ -553,8 +554,8 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
   Status RoundTripHelper(
       const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>* out_batches) {
     // Write the file
-    std::shared_ptr<StreamWriter> writer;
-    RETURN_NOT_OK(StreamWriter::Open(sink_.get(), batch.schema(), &writer));
+    std::shared_ptr<RecordBatchStreamWriter> writer;
+    RETURN_NOT_OK(RecordBatchStreamWriter::Open(sink_.get(), batch.schema(), &writer));
     int num_batches = 5;
     for (int i = 0; i < num_batches; ++i) {
       RETURN_NOT_OK(writer->WriteRecordBatch(batch));
@@ -565,8 +566,8 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
     // Open the file
     auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
 
-    std::shared_ptr<StreamReader> reader;
-    RETURN_NOT_OK(StreamReader::Open(buf_reader, &reader));
+    std::shared_ptr<RecordBatchStreamReader> reader;
+    RETURN_NOT_OK(RecordBatchStreamReader::Open(buf_reader, &reader));
 
     std::shared_ptr<RecordBatch> chunk;
     while (true) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/json-integration-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc
index aa95500..424755a 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -76,8 +76,9 @@ static Status ConvertJsonToArrow(
     std::cout << "Found schema: " << reader->schema()->ToString() << std::endl;
   }
 
-  std::shared_ptr<ipc::FileWriter> writer;
-  RETURN_NOT_OK(ipc::FileWriter::Open(out_file.get(), reader->schema(), &writer));
+  std::shared_ptr<ipc::RecordBatchFileWriter> writer;
+  RETURN_NOT_OK(
+      ipc::RecordBatchFileWriter::Open(out_file.get(), reader->schema(), &writer));
 
   for (int i = 0; i < reader->num_record_batches(); ++i) {
     std::shared_ptr<RecordBatch> batch;
@@ -96,8 +97,8 @@ static Status ConvertArrowToJson(
   RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &in_file));
   RETURN_NOT_OK(io::FileOutputStream::Open(json_path, &out_file));
 
-  std::shared_ptr<ipc::FileReader> reader;
-  RETURN_NOT_OK(ipc::FileReader::Open(in_file, &reader));
+  std::shared_ptr<ipc::RecordBatchFileReader> reader;
+  RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(in_file, &reader));
 
   if (FLAGS_verbose) {
     std::cout << "Found schema: " << reader->schema()->ToString() << std::endl;
@@ -137,8 +138,8 @@ static Status ValidateArrowVsJson(
   std::shared_ptr<io::ReadableFile> arrow_file;
   RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &arrow_file));
 
-  std::shared_ptr<ipc::FileReader> arrow_reader;
-  RETURN_NOT_OK(ipc::FileReader::Open(arrow_file, &arrow_reader));
+  std::shared_ptr<ipc::RecordBatchFileReader> arrow_reader;
+  RETURN_NOT_OK(ipc::RecordBatchFileReader::Open(arrow_file, &arrow_reader));
 
   auto json_schema = json_reader->schema();
   auto arrow_schema = arrow_reader->schema();

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index aea4c9c..2b7b90f 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -156,7 +156,7 @@ Status ReadDictionary(const Message& metadata, const DictionaryTypeMap& dictiona
 }
 
 // ----------------------------------------------------------------------
-// StreamReader implementation
+// RecordBatchStreamReader implementation
 
 static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
   return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength());
@@ -176,10 +176,12 @@ static inline std::string message_type_name(Message::Type type) {
   return "unknown";
 }
 
-class StreamReader::StreamReaderImpl {
+RecordBatchReader::~RecordBatchReader() {}
+
+class RecordBatchStreamReader::RecordBatchStreamReaderImpl {
  public:
-  StreamReaderImpl() {}
-  ~StreamReaderImpl() {}
+  RecordBatchStreamReaderImpl() {}
+  ~RecordBatchStreamReaderImpl() {}
 
   Status Open(const std::shared_ptr<io::InputStream>& stream) {
     stream_ = stream;
@@ -267,33 +269,33 @@ class StreamReader::StreamReaderImpl {
   std::shared_ptr<Schema> schema_;
 };
 
-StreamReader::StreamReader() {
-  impl_.reset(new StreamReaderImpl());
+RecordBatchStreamReader::RecordBatchStreamReader() {
+  impl_.reset(new RecordBatchStreamReaderImpl());
 }
 
-StreamReader::~StreamReader() {}
+RecordBatchStreamReader::~RecordBatchStreamReader() {}
 
-Status StreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
-    std::shared_ptr<StreamReader>* reader) {
+Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
+    std::shared_ptr<RecordBatchStreamReader>* reader) {
   // Private ctor
-  *reader = std::shared_ptr<StreamReader>(new StreamReader());
+  *reader = std::shared_ptr<RecordBatchStreamReader>(new RecordBatchStreamReader());
   return (*reader)->impl_->Open(stream);
 }
 
-std::shared_ptr<Schema> StreamReader::schema() const {
+std::shared_ptr<Schema> RecordBatchStreamReader::schema() const {
   return impl_->schema();
 }
 
-Status StreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
+Status RecordBatchStreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
   return impl_->GetNextRecordBatch(batch);
 }
 
 // ----------------------------------------------------------------------
 // Reader implementation
 
-class FileReader::FileReaderImpl {
+class RecordBatchFileReader::RecordBatchFileReaderImpl {
  public:
-  FileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); }
+  RecordBatchFileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); }
 
   Status ReadFooter() {
     int magic_size = static_cast<int>(strlen(kArrowMagicBytes));
@@ -432,38 +434,38 @@ class FileReader::FileReaderImpl {
   std::shared_ptr<Schema> schema_;
 };
 
-FileReader::FileReader() {
-  impl_.reset(new FileReaderImpl());
+RecordBatchFileReader::RecordBatchFileReader() {
+  impl_.reset(new RecordBatchFileReaderImpl());
 }
 
-FileReader::~FileReader() {}
+RecordBatchFileReader::~RecordBatchFileReader() {}
 
-Status FileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
-    std::shared_ptr<FileReader>* reader) {
+Status RecordBatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
+    std::shared_ptr<RecordBatchFileReader>* reader) {
   int64_t footer_offset;
   RETURN_NOT_OK(file->GetSize(&footer_offset));
   return Open(file, footer_offset, reader);
 }
 
-Status FileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
-    int64_t footer_offset, std::shared_ptr<FileReader>* reader) {
-  *reader = std::shared_ptr<FileReader>(new FileReader());
+Status RecordBatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
+    int64_t footer_offset, std::shared_ptr<RecordBatchFileReader>* reader) {
+  *reader = std::shared_ptr<RecordBatchFileReader>(new RecordBatchFileReader());
   return (*reader)->impl_->Open(file, footer_offset);
 }
 
-std::shared_ptr<Schema> FileReader::schema() const {
+std::shared_ptr<Schema> RecordBatchFileReader::schema() const {
   return impl_->schema();
 }
 
-int FileReader::num_record_batches() const {
+int RecordBatchFileReader::num_record_batches() const {
   return impl_->num_record_batches();
 }
 
-MetadataVersion FileReader::version() const {
+MetadataVersion RecordBatchFileReader::version() const {
   return impl_->version();
 }
 
-Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
+Status RecordBatchFileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
   return impl_->GetRecordBatch(i, batch);
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/reader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index 1972446..dd29a36 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -44,29 +44,50 @@ class RandomAccessFile;
 
 namespace ipc {
 
-class ARROW_EXPORT StreamReader {
+/// \brief Abstract interface for reading stream of record batches
+class ARROW_EXPORT RecordBatchReader {
  public:
-  ~StreamReader();
+  virtual ~RecordBatchReader();
 
-  // Open an stream.
-  static Status Open(const std::shared_ptr<io::InputStream>& stream,
-      std::shared_ptr<StreamReader>* reader);
+  /// \return the shared schema of the record batches in the stream
+  virtual std::shared_ptr<Schema> schema() const = 0;
 
-  std::shared_ptr<Schema> schema() const;
+  /// Read the next record batch in the stream. Return nullptr for batch when
+  /// reaching end of stream
+  ///
+  /// \param(out) batch the next loaded batch, nullptr at end of stream
+  /// \return Status
+  virtual Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) = 0;
+};
+
+/// \class RecordBatchStreamReader
+/// \brief Synchronous batch stream reader that reads from io::InputStream
+class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader {
+ public:
+  virtual ~RecordBatchStreamReader();
 
-  // Returned batch is nullptr when end of stream reached
-  Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch);
+  /// Create batch reader from InputStream
+  ///
+  /// \param(in) stream an input stream instance
+  /// \param(out) reader the created reader object
+  /// \return Status
+  static Status Open(const std::shared_ptr<io::InputStream>& stream,
+      std::shared_ptr<RecordBatchStreamReader>* reader);
+
+  std::shared_ptr<Schema> schema() const override;
+  Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) override;
 
  private:
-  StreamReader();
+  RecordBatchStreamReader();
 
-  class ARROW_NO_EXPORT StreamReaderImpl;
-  std::unique_ptr<StreamReaderImpl> impl_;
+  class ARROW_NO_EXPORT RecordBatchStreamReaderImpl;
+  std::unique_ptr<RecordBatchStreamReaderImpl> impl_;
 };
 
-class ARROW_EXPORT FileReader {
+/// \brief Reads the record batch file format
+class ARROW_EXPORT RecordBatchFileReader {
  public:
-  ~FileReader();
+  ~RecordBatchFileReader();
 
   // Open a file-like object that is assumed to be self-contained; i.e., the
   // end of the file interface is the end of the Arrow file. Note that there
@@ -74,7 +95,7 @@ class ARROW_EXPORT FileReader {
   // need only locate the end of the Arrow file stream to discover the metadata
   // and then proceed to read the data into memory.
   static Status Open(const std::shared_ptr<io::RandomAccessFile>& file,
-      std::shared_ptr<FileReader>* reader);
+      std::shared_ptr<RecordBatchFileReader>* reader);
 
   // If the file is embedded within some larger file or memory region, you can
   // pass the absolute memory offset to the end of the file (which contains the
@@ -84,46 +105,80 @@ class ARROW_EXPORT FileReader {
   // @param file: the data source
   // @param footer_offset: the position of the end of the Arrow "file"
   static Status Open(const std::shared_ptr<io::RandomAccessFile>& file,
-      int64_t footer_offset, std::shared_ptr<FileReader>* reader);
+      int64_t footer_offset, std::shared_ptr<RecordBatchFileReader>* reader);
 
   /// The schema includes any dictionaries
   std::shared_ptr<Schema> schema() const;
 
+  /// Returns number of record batches in the file
   int num_record_batches() const;
 
+  /// Returns MetadataVersion in the file metadata
   MetadataVersion version() const;
 
-  // Read a record batch from the file. Does not copy memory if the input
-  // source supports zero-copy.
-  //
-  // TODO(wesm): Make the copy/zero-copy behavior configurable (e.g. provide an
-  // "always copy" option)
+  /// Read a record batch from the file. Does not copy memory if the input
+  /// source supports zero-copy.
+  ///
+  /// \param(in) i the index of the record batch to return
+  /// \param(out) batch the read batch
+  /// \return Status
   Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);
 
  private:
-  FileReader();
+  RecordBatchFileReader();
 
-  class ARROW_NO_EXPORT FileReaderImpl;
-  std::unique_ptr<FileReaderImpl> impl_;
+  class ARROW_NO_EXPORT RecordBatchFileReaderImpl;
+  std::unique_ptr<RecordBatchFileReaderImpl> impl_;
 };
 
-// Generic read functionsh; does not copy data if the input supports zero copy reads
+// Generic read functions; does not copy data if the input supports zero copy reads
+
+/// Read record batch from file given metadata and schema
+///
+/// \param(in) metadata a Message containing the record batch metadata
+/// \param(in) schema the record batch schema
+/// \param(in) file a random access file
+/// \param(out) out the read record batch
 Status ARROW_EXPORT ReadRecordBatch(const Message& metadata,
     const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file,
     std::shared_ptr<RecordBatch>* out);
 
+/// Read record batch from file given metadata and schema
+///
+/// \param(in) metadata a Message containing the record batch metadata
+/// \param(in) schema the record batch schema
+/// \param(in) file a random access file
+/// \param(in) max_recursion_depth the maximum permitted nesting depth
+/// \param(out) out the read record batch
 Status ARROW_EXPORT ReadRecordBatch(const Message& metadata,
     const std::shared_ptr<Schema>& schema, int max_recursion_depth,
     io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
 
-/// Read encapsulated message and RecordBatch
+/// Read record batch as encapsulated IPC message with metadata size prefix and
+/// header
+///
+/// \param(in) schema the record batch schema
+/// \param(in) offset the file location of the start of the message
+/// \param(in) file the file where the batch is located
+/// \param(out) out the read record batch
 Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset,
     io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
 
-/// EXPERIMENTAL: Read arrow::Tensor from a contiguous message
+/// EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file
+///
+/// \param(in) offset the file location of the start of the message
+/// \param(in) file the file where the batch is located
+/// \param(out) out the read tensor
 Status ARROW_EXPORT ReadTensor(
     int64_t offset, io::RandomAccessFile* file, std::shared_ptr<Tensor>* out);
 
+/// Backwards-compatibility for Arrow < 0.4.0
+///
+#ifndef ARROW_NO_DEPRECATED_API
+using StreamReader = RecordBatchReader;
+using FileReader = RecordBatchFileReader;
+#endif
+
 }  // namespace ipc
 }  // namespace arrow
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/stream-to-file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream-to-file.cc b/cpp/src/arrow/ipc/stream-to-file.cc
index ec0ac43..b942054 100644
--- a/cpp/src/arrow/ipc/stream-to-file.cc
+++ b/cpp/src/arrow/ipc/stream-to-file.cc
@@ -24,18 +24,19 @@
 #include "arrow/util/io-util.h"
 
 namespace arrow {
+namespace ipc {
 
 // Converts a stream from stdin to a file written to standard out.
 // A typical usage would be:
 // $ <program that produces streaming output> | stream-to-file > file.arrow
 Status ConvertToFile() {
   std::shared_ptr<io::InputStream> input(new io::StdinStream);
-  std::shared_ptr<ipc::StreamReader> reader;
-  RETURN_NOT_OK(ipc::StreamReader::Open(input, &reader));
+  std::shared_ptr<RecordBatchStreamReader> reader;
+  RETURN_NOT_OK(RecordBatchStreamReader::Open(input, &reader));
 
   io::StdoutStream sink;
-  std::shared_ptr<ipc::FileWriter> writer;
-  RETURN_NOT_OK(ipc::FileWriter::Open(&sink, reader->schema(), &writer));
+  std::shared_ptr<RecordBatchFileWriter> writer;
+  RETURN_NOT_OK(RecordBatchFileWriter::Open(&sink, reader->schema(), &writer));
 
   std::shared_ptr<RecordBatch> batch;
   while (true) {
@@ -46,10 +47,11 @@ Status ConvertToFile() {
   return writer->Close();
 }
 
+}  // namespace ipc
 }  // namespace arrow
 
 int main(int argc, char** argv) {
-  arrow::Status status = arrow::ConvertToFile();
+  arrow::Status status = arrow::ipc::ConvertToFile();
   if (!status.ok()) {
     std::cerr << "Could not convert to file: " << status.ToString() << std::endl;
     return 1;

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 78d6b9e..ced0710 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -88,9 +88,9 @@ static inline bool NeedTruncate(
   return offset != 0 || min_length < buffer->size();
 }
 
-class RecordBatchWriter : public ArrayVisitor {
+class RecordBatchSerializer : public ArrayVisitor {
  public:
-  RecordBatchWriter(MemoryPool* pool, int64_t buffer_start_offset,
+  RecordBatchSerializer(MemoryPool* pool, int64_t buffer_start_offset,
       int max_recursion_depth, bool allow_64bit)
       : pool_(pool),
         max_recursion_depth_(max_recursion_depth),
@@ -99,7 +99,7 @@ class RecordBatchWriter : public ArrayVisitor {
     DCHECK_GT(max_recursion_depth, 0);
   }
 
-  virtual ~RecordBatchWriter() = default;
+  virtual ~RecordBatchSerializer() = default;
 
   Status VisitArray(const Array& arr) {
     if (max_recursion_depth_ <= 0) {
@@ -480,9 +480,9 @@ class RecordBatchWriter : public ArrayVisitor {
   bool allow_64bit_;
 };
 
-class DictionaryWriter : public RecordBatchWriter {
+class DictionaryWriter : public RecordBatchSerializer {
  public:
-  using RecordBatchWriter::RecordBatchWriter;
+  using RecordBatchSerializer::RecordBatchSerializer;
 
   Status WriteMetadataMessage(
       int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override {
@@ -500,7 +500,7 @@ class DictionaryWriter : public RecordBatchWriter {
     auto schema = std::make_shared<Schema>(fields);
     RecordBatch batch(schema, dictionary->length(), {dictionary});
 
-    return RecordBatchWriter::Write(batch, dst, metadata_length, body_length);
+    return RecordBatchSerializer::Write(batch, dst, metadata_length, body_length);
   }
 
  private:
@@ -521,7 +521,8 @@ Status AlignStreamPosition(io::OutputStream* stream) {
 Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
     io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
     MemoryPool* pool, int max_recursion_depth, bool allow_64bit) {
-  RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth, allow_64bit);
+  RecordBatchSerializer writer(
+      pool, buffer_start_offset, max_recursion_depth, allow_64bit);
   return writer.Write(batch, dst, metadata_length, body_length);
 }
 
@@ -581,17 +582,21 @@ Status GetTensorSize(const Tensor& tensor, int64_t* size) {
 }
 
 // ----------------------------------------------------------------------
+
+RecordBatchWriter::~RecordBatchWriter() {}
+
+// ----------------------------------------------------------------------
 // Stream writer implementation
 
-class StreamWriter::StreamWriterImpl {
+class RecordBatchStreamWriter::RecordBatchStreamWriterImpl {
  public:
-  StreamWriterImpl()
+  RecordBatchStreamWriterImpl()
       : dictionary_memo_(std::make_shared<DictionaryMemo>()),
         pool_(default_memory_pool()),
         position_(-1),
         started_(false) {}
 
-  virtual ~StreamWriterImpl() = default;
+  virtual ~RecordBatchStreamWriterImpl() = default;
 
   Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema) {
     sink_ = sink;
@@ -721,37 +726,40 @@ class StreamWriter::StreamWriterImpl {
   std::vector<FileBlock> record_batches_;
 };
 
-StreamWriter::StreamWriter() {
-  impl_.reset(new StreamWriterImpl());
+RecordBatchStreamWriter::RecordBatchStreamWriter() {
+  impl_.reset(new RecordBatchStreamWriterImpl());
 }
 
-Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) {
+RecordBatchStreamWriter::~RecordBatchStreamWriter() {}
+
+Status RecordBatchStreamWriter::WriteRecordBatch(
+    const RecordBatch& batch, bool allow_64bit) {
   return impl_->WriteRecordBatch(batch, allow_64bit);
 }
 
-StreamWriter::~StreamWriter() {}
-
-void StreamWriter::set_memory_pool(MemoryPool* pool) {
+void RecordBatchStreamWriter::set_memory_pool(MemoryPool* pool) {
   impl_->set_memory_pool(pool);
 }
 
-Status StreamWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
-    std::shared_ptr<StreamWriter>* out) {
+Status RecordBatchStreamWriter::Open(io::OutputStream* sink,
+    const std::shared_ptr<Schema>& schema,
+    std::shared_ptr<RecordBatchStreamWriter>* out) {
   // ctor is private
-  *out = std::shared_ptr<StreamWriter>(new StreamWriter());
+  *out = std::shared_ptr<RecordBatchStreamWriter>(new RecordBatchStreamWriter());
   return (*out)->impl_->Open(sink, schema);
 }
 
-Status StreamWriter::Close() {
+Status RecordBatchStreamWriter::Close() {
   return impl_->Close();
 }
 
 // ----------------------------------------------------------------------
 // File writer implementation
 
-class FileWriter::FileWriterImpl : public StreamWriter::StreamWriterImpl {
+class RecordBatchFileWriter::RecordBatchFileWriterImpl
+    : public RecordBatchStreamWriter::RecordBatchStreamWriterImpl {
  public:
-  using BASE = StreamWriter::StreamWriterImpl;
+  using BASE = RecordBatchStreamWriter::RecordBatchStreamWriterImpl;
 
   Status Start() override {
     RETURN_NOT_OK(WriteAligned(
@@ -783,23 +791,25 @@ class FileWriter::FileWriterImpl : public StreamWriter::StreamWriterImpl {
   }
 };
 
-FileWriter::FileWriter() {
-  impl_.reset(new FileWriterImpl());
+RecordBatchFileWriter::RecordBatchFileWriter() {
+  impl_.reset(new RecordBatchFileWriterImpl());
 }
 
-FileWriter::~FileWriter() {}
+RecordBatchFileWriter::~RecordBatchFileWriter() {}
 
-Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
-    std::shared_ptr<FileWriter>* out) {
-  *out = std::shared_ptr<FileWriter>(new FileWriter());  // ctor is private
+Status RecordBatchFileWriter::Open(io::OutputStream* sink,
+    const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatchFileWriter>* out) {
+  *out = std::shared_ptr<RecordBatchFileWriter>(
+      new RecordBatchFileWriter());  // ctor is private
   return (*out)->impl_->Open(sink, schema);
 }
 
-Status FileWriter::WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) {
+Status RecordBatchFileWriter::WriteRecordBatch(
+    const RecordBatch& batch, bool allow_64bit) {
   return impl_->WriteRecordBatch(batch, allow_64bit);
 }
 
-Status FileWriter::Close() {
+Status RecordBatchFileWriter::Close() {
   return impl_->Close();
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/cpp/src/arrow/ipc/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index b71becb..899a1b2 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -46,6 +46,84 @@ class OutputStream;
 
 namespace ipc {
 
+/// \class RecordBatchWriter
+/// \brief Abstract interface for writing a stream of record batches
+class ARROW_EXPORT RecordBatchWriter {
+ public:
+  virtual ~RecordBatchWriter();
+
+  /// Write a record batch to the stream
+  ///
+  /// \param allow_64bit boolean permitting field lengths exceeding INT32_MAX
+  /// \return Status indicate success or failure
+  virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) = 0;
+
+  /// Perform any logic necessary to finish the stream
+  ///
+  /// \return Status indicate success or failure
+  virtual Status Close() = 0;
+
+  /// In some cases, writing may require memory allocation. We use the default
+  /// memory pool, but provide the option to override
+  ///
+  /// \param pool the memory pool to use for required allocations
+  virtual void set_memory_pool(MemoryPool* pool) = 0;
+};
+
+/// \class RecordBatchStreamWriter
+/// \brief Synchronous batch stream writer that writes the Arrow streaming
+/// format
+class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter {
+ public:
+  virtual ~RecordBatchStreamWriter();
+
+  /// Create a new writer from stream sink and schema. User is responsible for
+  /// closing the actual OutputStream.
+  ///
+  /// \param(in) sink output stream to write to
+  /// \param(in) schema the schema of the record batches to be written
+  /// \param(out) out the created stream writer
+  /// \return Status indicating success or failure
+  static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+      std::shared_ptr<RecordBatchStreamWriter>* out);
+
+  Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
+  Status Close() override;
+  void set_memory_pool(MemoryPool* pool) override;
+
+ protected:
+  RecordBatchStreamWriter();
+  class ARROW_NO_EXPORT RecordBatchStreamWriterImpl;
+  std::unique_ptr<RecordBatchStreamWriterImpl> impl_;
+};
+
+/// \brief Creates the Arrow record batch file format
+///
+/// Implements the random access file format, which structurally is a record
+/// batch stream followed by a metadata footer at the end of the file. Magic
+/// numbers are written at the start and end of the file
+class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
+ public:
+  virtual ~RecordBatchFileWriter();
+
+  /// Create a new writer from stream sink and schema
+  ///
+  /// \param(in) sink output stream to write to
+  /// \param(in) schema the schema of the record batches to be written
+  /// \param(out) out the created stream writer
+  /// \return Status indicating success or failure
+  static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
+      std::shared_ptr<RecordBatchFileWriter>* out);
+
+  Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
+  Status Close() override;
+
+ private:
+  RecordBatchFileWriter();
+  class ARROW_NO_EXPORT RecordBatchFileWriterImpl;
+  std::unique_ptr<RecordBatchFileWriterImpl> impl_;
+};
+
 /// Write the RecordBatch (collection of equal-length Arrow arrays) to the
 /// output stream in a contiguous block. The record batch metadata is written as
 /// a flatbuffer (see format/Message.fbs -- the RecordBatch message type)
@@ -58,13 +136,13 @@ namespace ipc {
 /// to the end of the body and end of the metadata / data header (suffixed by
 /// the header size) is returned in out-variables
 ///
-/// @param(in) buffer_start_offset the start offset to use in the buffer metadata,
+/// \param(in) buffer_start_offset the start offset to use in the buffer metadata,
 /// default should be 0
-/// @param(in) allow_64bit permit field lengths exceeding INT32_MAX. May not be
+/// \param(in) allow_64bit permit field lengths exceeding INT32_MAX. May not be
 /// readable by other Arrow implementations
-/// @param(out) metadata_length: the size of the length-prefixed flatbuffer
+/// \param(out) metadata_length: the size of the length-prefixed flatbuffer
 /// including padding to a 64-byte boundary
-/// @param(out) body_length: the size of the contiguous buffer block plus
+/// \param(out) body_length: the size of the contiguous buffer block plus
 /// padding bytes
 Status ARROW_EXPORT WriteRecordBatch(const RecordBatch& batch,
     int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
@@ -85,45 +163,6 @@ Status ARROW_EXPORT GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
 // write the tensor including metadata, padding, and data
 Status ARROW_EXPORT GetTensorSize(const Tensor& tensor, int64_t* size);
 
-class ARROW_EXPORT StreamWriter {
- public:
-  virtual ~StreamWriter();
-
-  static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
-      std::shared_ptr<StreamWriter>* out);
-
-  virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false);
-
-  /// Perform any logic necessary to finish the stream. User is responsible for
-  /// closing the actual OutputStream
-  virtual Status Close();
-
-  // In some cases, writing may require memory allocation. We use the default
-  // memory pool, but provide the option to override
-  void set_memory_pool(MemoryPool* pool);
-
- protected:
-  StreamWriter();
-  class ARROW_NO_EXPORT StreamWriterImpl;
-  std::unique_ptr<StreamWriterImpl> impl_;
-};
-
-class ARROW_EXPORT FileWriter : public StreamWriter {
- public:
-  virtual ~FileWriter();
-
-  static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
-      std::shared_ptr<FileWriter>* out);
-
-  Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
-  Status Close() override;
-
- private:
-  FileWriter();
-  class ARROW_NO_EXPORT FileWriterImpl;
-  std::unique_ptr<FileWriterImpl> impl_;
-};
-
 /// EXPERIMENTAL: Write RecordBatch allowing lengths over INT32_MAX. This data
 /// may not be readable by all Arrow implementations
 Status ARROW_EXPORT WriteLargeRecordBatch(const RecordBatch& batch,
@@ -135,6 +174,13 @@ Status ARROW_EXPORT WriteLargeRecordBatch(const RecordBatch& batch,
 Status ARROW_EXPORT WriteTensor(const Tensor& tensor, io::OutputStream* dst,
     int32_t* metadata_length, int64_t* body_length);
 
+/// Backwards-compatibility for Arrow < 0.4.0
+///
+#ifndef ARROW_NO_DEPRECATED_API
+using FileWriter = RecordBatchFileWriter;
+using StreamWriter = RecordBatchStreamWriter;
+#endif
+
 }  // namespace ipc
 }  // namespace arrow
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/doc/source/api.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index a8dd8c5..e7bea70 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -177,10 +177,12 @@ Interprocess Communication and Messaging
 .. autosummary::
    :toctree: generated/
 
-   FileReader
-   FileWriter
-   StreamReader
-   StreamWriter
+   RecordBatchFileReader
+   RecordBatchFileWriter
+   RecordBatchStreamReader
+   RecordBatchStreamWriter
+   open_file
+   open_stream
 
 .. _api.memory_pool:
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/doc/source/ipc.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/ipc.rst b/python/doc/source/ipc.rst
index e63e745..cce2ae8 100644
--- a/python/doc/source/ipc.rst
+++ b/python/doc/source/ipc.rst
@@ -55,13 +55,13 @@ First, let's create a small record batch:
    batch.num_columns
 
 Now, we can begin writing a stream containing some number of these batches. For
-this we use :class:`~pyarrow.StreamWriter`, which can write to a writeable
+this we use :class:`~pyarrow.BatchStreamWriter`, which can write to a writeable
 ``NativeFile`` object or a writeable Python object:
 
 .. ipython:: python
 
    sink = pa.InMemoryOutputStream()
-   writer = pa.StreamWriter(sink, batch.schema)
+   writer = pa.BatchStreamWriter(sink, batch.schema)
 
 Here we used an in-memory Arrow buffer stream, but this could have been a
 socket or some other IO sink.
@@ -80,11 +80,11 @@ particular stream. Now we can do:
    buf.size
 
 Now ``buf`` contains the complete stream as an in-memory byte buffer. We can
-read such a stream with :class:`~pyarrow.StreamReader`:
+read such a stream with :class:`~pyarrow.BatchStreamReader`:
 
 .. ipython:: python
 
-   reader = pa.StreamReader(buf)
+   reader = pa.BatchStreamReader(buf)
    reader.schema
 
    batches = [b for b in reader]
@@ -103,13 +103,13 @@ batches are also zero-copy and do not allocate any new memory on read.
 Writing and Reading Random Access Files
 ---------------------------------------
 
-The :class:`~pyarrow.FileWriter` has the same API as
-:class:`~pyarrow.StreamWriter`:
+The :class:`~pyarrow.BatchFileWriter` has the same API as
+:class:`~pyarrow.BatchStreamWriter`:
 
 .. ipython:: python
 
    sink = pa.InMemoryOutputStream()
-   writer = pa.FileWriter(sink, batch.schema)
+   writer = pa.BatchFileWriter(sink, batch.schema)
 
    for i in range(10):
       writer.write_batch(batch)
@@ -118,13 +118,14 @@ The :class:`~pyarrow.FileWriter` has the same API as
    buf = sink.get_result()
    buf.size
 
-The difference between :class:`~pyarrow.FileReader` and
-:class:`~pyarrow.StreamReader` is that the input source must have a ``seek``
-method for random access. The stream reader only requires read operations:
+The difference between :class:`~pyarrow.BatchFileReader` and
+:class:`~pyarrow.BatchStreamReader` is that the input source must have a
+``seek`` method for random access. The stream reader only requires read
+operations:
 
 .. ipython:: python
 
-   reader = pa.FileReader(buf)
+   reader = pa.BatchFileReader(buf)
 
 Because we have access to the entire payload, we know the number of record
 batches in the file, and can read any at random:

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 7d79811..d6d2aa4 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -101,7 +101,10 @@ def jemalloc_memory_pool():
 
 from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
 
-from pyarrow.ipc import FileReader, FileWriter, StreamReader, StreamWriter
+from pyarrow.ipc import (RecordBatchFileReader, RecordBatchFileWriter,
+                         RecordBatchStreamReader, RecordBatchStreamWriter,
+                         open_stream,
+                         open_file)
 
 
 localfs = LocalFilesystem.get_instance()

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 3d56c14..b03dd59 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -547,38 +547,44 @@ cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil:
 
 cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
 
-    cdef cppclass CStreamWriter " arrow::ipc::StreamWriter":
-        @staticmethod
-        CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
-                     shared_ptr[CStreamWriter]* out)
-
+    cdef cppclass CRecordBatchWriter \
+        " arrow::ipc::RecordBatchWriter":
         CStatus Close()
         CStatus WriteRecordBatch(const CRecordBatch& batch)
 
-    cdef cppclass CStreamReader " arrow::ipc::StreamReader":
+    cdef cppclass CRecordBatchReader \
+        " arrow::ipc::RecordBatchReader":
+        shared_ptr[CSchema] schema()
+        CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch)
 
+    cdef cppclass CRecordBatchStreamReader \
+        " arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader):
         @staticmethod
         CStatus Open(const shared_ptr[InputStream]& stream,
-                     shared_ptr[CStreamReader]* out)
+                     shared_ptr[CRecordBatchStreamReader]* out)
 
-        shared_ptr[CSchema] schema()
-
-        CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch)
-
-    cdef cppclass CFileWriter " arrow::ipc::FileWriter"(CStreamWriter):
+    cdef cppclass CRecordBatchStreamWriter \
+        " arrow::ipc::RecordBatchStreamWriter"(CRecordBatchWriter):
         @staticmethod
         CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
-                     shared_ptr[CFileWriter]* out)
+                     shared_ptr[CRecordBatchStreamWriter]* out)
 
-    cdef cppclass CFileReader " arrow::ipc::FileReader":
+    cdef cppclass CRecordBatchFileWriter \
+        " arrow::ipc::RecordBatchFileWriter"(CRecordBatchWriter):
+        @staticmethod
+        CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
+                     shared_ptr[CRecordBatchFileWriter]* out)
 
+    cdef cppclass CRecordBatchFileReader \
+        " arrow::ipc::RecordBatchFileReader":
         @staticmethod
         CStatus Open(const shared_ptr[RandomAccessFile]& file,
-                     shared_ptr[CFileReader]* out)
+                     shared_ptr[CRecordBatchFileReader]* out)
 
         @staticmethod
         CStatus Open2" Open"(const shared_ptr[RandomAccessFile]& file,
-                     int64_t footer_offset, shared_ptr[CFileReader]* out)
+                             int64_t footer_offset,
+                             shared_ptr[CRecordBatchFileReader]* out)
 
         shared_ptr[CSchema] schema()
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/io.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index a0a96e7..4cbf603 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -916,9 +916,9 @@ cdef class HdfsFile(NativeFile):
 # ----------------------------------------------------------------------
 # File and stream readers and writers
 
-cdef class _StreamWriter:
+cdef class _RecordBatchWriter:
     cdef:
-        shared_ptr[CStreamWriter] writer
+        shared_ptr[CRecordBatchWriter] writer
         shared_ptr[OutputStream] sink
         bint closed
 
@@ -930,12 +930,18 @@ cdef class _StreamWriter:
             self.close()
 
     def _open(self, sink, Schema schema):
+        cdef:
+            shared_ptr[CRecordBatchStreamWriter] writer
+
         get_writer(sink, &self.sink)
 
         with nogil:
-            check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema,
-                                            &self.writer))
+            check_status(
+                CRecordBatchStreamWriter.Open(self.sink.get(),
+                                              schema.sp_schema,
+                                              &writer))
 
+        self.writer = <shared_ptr[CRecordBatchWriter]> writer
         self.closed = False
 
     def write_batch(self, RecordBatch batch):
@@ -949,9 +955,9 @@ cdef class _StreamWriter:
         self.closed = True
 
 
-cdef class _StreamReader:
+cdef class _RecordBatchReader:
     cdef:
-        shared_ptr[CStreamReader] reader
+        shared_ptr[CRecordBatchReader] reader
 
     cdef readonly:
         Schema schema
@@ -961,15 +967,17 @@ cdef class _StreamReader:
 
     def _open(self, source):
         cdef:
-            shared_ptr[RandomAccessFile] reader
+            shared_ptr[RandomAccessFile] file_handle
             shared_ptr[InputStream] in_stream
+            shared_ptr[CRecordBatchStreamReader] reader
 
-        get_reader(source, &reader)
-        in_stream = <shared_ptr[InputStream]> reader
+        get_reader(source, &file_handle)
+        in_stream = <shared_ptr[InputStream]> file_handle
 
         with nogil:
-            check_status(CStreamReader.Open(in_stream, &self.reader))
+            check_status(CRecordBatchStreamReader.Open(in_stream, &reader))
 
+        self.reader = <shared_ptr[CRecordBatchReader]> reader
         self.schema = Schema()
         self.schema.init_schema(self.reader.get().schema())
 
@@ -1009,24 +1017,25 @@ cdef class _StreamReader:
         return pyarrow_wrap_table(table)
 
 
-cdef class _FileWriter(_StreamWriter):
+cdef class _RecordBatchFileWriter(_RecordBatchWriter):
 
     def _open(self, sink, Schema schema):
-        cdef shared_ptr[CFileWriter] writer
+        cdef shared_ptr[CRecordBatchFileWriter] writer
         get_writer(sink, &self.sink)
 
         with nogil:
-            check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema,
-                                          &writer))
+            check_status(
+                CRecordBatchFileWriter.Open(self.sink.get(), schema.sp_schema,
+                                      &writer))
 
         # Cast to base class, because has same interface
-        self.writer = <shared_ptr[CStreamWriter]> writer
+        self.writer = <shared_ptr[CRecordBatchWriter]> writer
         self.closed = False
 
 
-cdef class _FileReader:
+cdef class _RecordBatchFileReader:
     cdef:
-        shared_ptr[CFileReader] reader
+        shared_ptr[CRecordBatchFileReader] reader
 
     def __cinit__(self):
         pass
@@ -1041,9 +1050,10 @@ cdef class _FileReader:
 
         with nogil:
             if offset != 0:
-                check_status(CFileReader.Open2(reader, offset, &self.reader))
+                check_status(CRecordBatchFileReader.Open2(
+                    reader, offset, &self.reader))
             else:
-                check_status(CFileReader.Open(reader, &self.reader))
+                check_status(CRecordBatchFileReader.Open(reader, &self.reader))
 
     property num_record_batches:
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index c37a1ce..8338de3 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -20,7 +20,7 @@
 import pyarrow.lib as lib
 
 
-class StreamReader(lib._StreamReader):
+class RecordBatchStreamReader(lib._RecordBatchReader):
     """
     Reader for the Arrow streaming binary format
 
@@ -37,7 +37,7 @@ class StreamReader(lib._StreamReader):
             yield self.get_next_batch()
 
 
-class StreamWriter(lib._StreamWriter):
+class RecordBatchStreamWriter(lib._RecordBatchWriter):
     """
     Writer for the Arrow streaming binary format
 
@@ -52,7 +52,7 @@ class StreamWriter(lib._StreamWriter):
         self._open(sink, schema)
 
 
-class FileReader(lib._FileReader):
+class RecordBatchFileReader(lib._RecordBatchFileReader):
     """
     Class for reading Arrow record batch data from the Arrow binary file format
 
@@ -68,7 +68,7 @@ class FileReader(lib._FileReader):
         self._open(source, footer_offset=footer_offset)
 
 
-class FileWriter(lib._FileWriter):
+class RecordBatchFileWriter(lib._RecordBatchFileWriter):
     """
     Writer to create the Arrow binary file format
 
@@ -81,3 +81,41 @@ class FileWriter(lib._FileWriter):
     """
     def __init__(self, sink, schema):
         self._open(sink, schema)
+
+
+def open_stream(source):
+    """
+    Create reader for Arrow streaming format
+
+    Parameters
+    ----------
+    source : str, pyarrow.NativeFile, or file-like Python object
+        Either a file path, or a readable file object
+    footer_offset : int, default None
+        If the file is embedded in some larger file, this is the byte offset to
+        the very end of the file data
+
+    Returns
+    -------
+    reader : RecordBatchStreamReader
+    """
+    return RecordBatchStreamReader(source)
+
+
+def open_file(source, footer_offset=None):
+    """
+    Create reader for Arrow file format
+
+    Parameters
+    ----------
+    source : str, pyarrow.NativeFile, or file-like Python object
+        Either a file path, or a readable file object
+    footer_offset : int, default None
+        If the file is embedded in some larger file, this is the byte offset to
+        the very end of the file data
+
+    Returns
+    -------
+    reader : RecordBatchFileReader
+    """
+    return RecordBatchFileReader(source, footer_offset=footer_offset)

http://git-wip-us.apache.org/repos/asf/arrow/blob/5739e04b/python/pyarrow/tests/test_ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 0204067..4d19804 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -70,13 +70,13 @@ class TestFile(MessagingTest, unittest.TestCase):
     # Also tests writing zero-copy NumPy array with additional padding
 
     def _get_writer(self, sink, schema):
-        return pa.FileWriter(sink, schema)
+        return pa.RecordBatchFileWriter(sink, schema)
 
     def test_simple_roundtrip(self):
         batches = self.write_batches()
         file_contents = self._get_source()
 
-        reader = pa.FileReader(file_contents)
+        reader = pa.open_file(file_contents)
 
         assert reader.num_record_batches == len(batches)
 
@@ -89,7 +89,7 @@ class TestFile(MessagingTest, unittest.TestCase):
         batches = self.write_batches()
         file_contents = self._get_source()
 
-        reader = pa.FileReader(file_contents)
+        reader = pa.open_file(file_contents)
 
         result = reader.read_all()
         expected = pa.Table.from_batches(batches)
@@ -99,12 +99,12 @@ class TestFile(MessagingTest, unittest.TestCase):
 class TestStream(MessagingTest, unittest.TestCase):
 
     def _get_writer(self, sink, schema):
-        return pa.StreamWriter(sink, schema)
+        return pa.RecordBatchStreamWriter(sink, schema)
 
     def test_simple_roundtrip(self):
         batches = self.write_batches()
         file_contents = self._get_source()
-        reader = pa.StreamReader(file_contents)
+        reader = pa.open_stream(file_contents)
 
         assert reader.schema.equals(batches[0].schema)
 
@@ -121,7 +121,7 @@ class TestStream(MessagingTest, unittest.TestCase):
     def test_read_all(self):
         batches = self.write_batches()
         file_contents = self._get_source()
-        reader = pa.StreamReader(file_contents)
+        reader = pa.open_stream(file_contents)
 
         result = reader.read_all()
         expected = pa.Table.from_batches(batches)
@@ -147,7 +147,7 @@ class TestSocket(MessagingTest, unittest.TestCase):
             connection, client_address = self._sock.accept()
             try:
                 source = connection.makefile(mode='rb')
-                reader = pa.StreamReader(source)
+                reader = pa.open_stream(source)
                 self._schema = reader.schema
                 if self._do_read_all:
                     self._table = reader.read_all()
@@ -185,7 +185,7 @@ class TestSocket(MessagingTest, unittest.TestCase):
         return self._sock.makefile(mode='wb')
 
     def _get_writer(self, sink, schema):
-        return pa.StreamWriter(sink, schema)
+        return pa.RecordBatchStreamWriter(sink, schema)
 
     def test_simple_roundtrip(self):
         self.start_server(do_read_all=False)
@@ -241,12 +241,12 @@ def test_get_record_batch_size():
 
 
 def write_file(batch, sink):
-    writer = pa.FileWriter(sink, batch.schema)
+    writer = pa.RecordBatchFileWriter(sink, batch.schema)
     writer.write_batch(batch)
     writer.close()
 
 
 def read_file(source):
-    reader = pa.FileReader(source)
+    reader = pa.open_file(source)
     return [reader.get_batch(i)
             for i in range(reader.num_record_batches)]