You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/10/04 03:14:49 UTC

arrow git commit: ARROW-302: [C++/Python] Implement C++ IO interfaces for interacting with Python file and bytes objects

Repository: arrow
Updated Branches:
  refs/heads/master c3930a062 -> c7e6a0716


ARROW-302: [C++/Python] Implement C++ IO interfaces for interacting with Python file and bytes objects

This will enable code (such as arrow IPC or Parquet) that only knows about Arrow's IO subsystem to interact with Python objects in various ways. In other words, when we have in C++:

```
std::shared_ptr<io::ReadableFileInterface> handle = ...;
handle->Read(nbytes, &out);
```

then the C++ file handle could be invoking the `read` method of a Python object. Same goes for `arrow::io::OutputStream` and `write` methods. There's data copying in some places overhead because of the rigid memory ownership semantics of the `PyBytes` type, but this can't be avoided here.

Another nice thing is that if we have some data in a Python bytes object that we want to expose to some other C++ component, we can wrap it in the `PyBytesReader` which provides zero-copy read access to the underlying data.

Author: Wes McKinney <we...@twosigma.com>

Closes #152 from wesm/ARROW-302 and squashes the following commits:

2de9f97 [Wes McKinney] Fix compiler warning / bug from OS X
316b845 [Wes McKinney] Code review comments
e791893 [Wes McKinney] Python 2.7 fix
0fc4cf1 [Wes McKinney] cpplint
e9b8c60 [Wes McKinney] Test the size() method and fix bug with missing whence
6481e91 [Wes McKinney] Add a zero-copy reader for PyBytes
7e357eb [Wes McKinney] Get basic Python file read/write working
d470133 [Wes McKinney] Share default implementations of ReadAt, add Buffer-based Read API
737a8db [Wes McKinney] Refactoring, more code sharing with native file interfaces
8be433f [Wes McKinney] Draft PyReadableFile implementation, not yet tested
20a3f28 [Wes McKinney] Draft API for Arrow IO wrappers for Python files


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/c7e6a071
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/c7e6a071
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/c7e6a071

Branch: refs/heads/master
Commit: c7e6a0716308766766aaaf4faa2effc5445640c6
Parents: c3930a0
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Oct 3 23:14:41 2016 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Oct 3 23:14:41 2016 -0400

----------------------------------------------------------------------
 cpp/CMakeLists.txt                      |   2 +
 cpp/src/arrow/io/CMakeLists.txt         |   1 +
 cpp/src/arrow/io/file.cc                |  10 +-
 cpp/src/arrow/io/file.h                 |   6 +-
 cpp/src/arrow/io/hdfs.cc                |  46 +++++-
 cpp/src/arrow/io/hdfs.h                 |  13 +-
 cpp/src/arrow/io/interfaces.cc          |  48 ++++++
 cpp/src/arrow/io/interfaces.h           |  26 ++--
 cpp/src/arrow/io/memory.cc              |  40 ++---
 cpp/src/arrow/io/memory.h               |  21 ++-
 python/CMakeLists.txt                   |   1 +
 python/pyarrow/__init__.py              |   5 +-
 python/pyarrow/array.pyx                |  31 ----
 python/pyarrow/error.pxd                |   4 +-
 python/pyarrow/error.pyx                |   2 +-
 python/pyarrow/includes/libarrow_io.pxd |  29 ++++
 python/pyarrow/includes/pyarrow.pxd     |  34 +++--
 python/pyarrow/io.pxd                   |  13 +-
 python/pyarrow/io.pyx                   | 136 ++++++++++++-----
 python/pyarrow/parquet.pyx              |   8 +-
 python/pyarrow/table.pyx                |  37 ++++-
 python/pyarrow/tests/test_hdfs.py       | 128 ++++++++++++++++
 python/pyarrow/tests/test_io.py         | 121 ++++++---------
 python/src/pyarrow/adapters/pandas.cc   |   2 +-
 python/src/pyarrow/common.cc            |  15 ++
 python/src/pyarrow/common.h             |  30 +++-
 python/src/pyarrow/io.cc                | 215 +++++++++++++++++++++++++++
 python/src/pyarrow/io.h                 |  97 ++++++++++++
 28 files changed, 878 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index d65c715..f70c8ab 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -166,6 +166,8 @@ else()
   message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
 endif ()
 
+message(STATUS "Build Type: ${CMAKE_BUILD_TYPE}")
+
 # Add common flags
 set(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}")
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/cpp/src/arrow/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index d2e3491..47bb089 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -39,6 +39,7 @@ set(ARROW_IO_TEST_LINK_LIBS
 
 set(ARROW_IO_SRCS
   file.cc
+  interfaces.cc
   memory.cc
 )
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/cpp/src/arrow/io/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 87bae7f..93f0ad9 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -413,15 +413,7 @@ Status ReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
   return impl_->Read(nbytes, bytes_read, out);
 }
 
-Status ReadableFile::ReadAt(
-    int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
-  RETURN_NOT_OK(Seek(position));
-  return impl_->Read(nbytes, bytes_read, out);
-}
-
-Status ReadableFile::ReadAt(
-    int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
-  RETURN_NOT_OK(Seek(position));
+Status ReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
   return impl_->ReadBuffer(nbytes, out);
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/cpp/src/arrow/io/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
index 5e714ea..10fe16e 100644
--- a/cpp/src/arrow/io/file.h
+++ b/cpp/src/arrow/io/file.h
@@ -71,11 +71,9 @@ class ARROW_EXPORT ReadableFile : public ReadableFileInterface {
   Status Close() override;
   Status Tell(int64_t* position) override;
 
-  Status ReadAt(
-      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
-  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
-
   Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
+  Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
   Status GetSize(int64_t* size) override;
   Status Seek(int64_t position) override;
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
index a6b4b2f..b74f846 100644
--- a/cpp/src/arrow/io/hdfs.cc
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -22,6 +22,8 @@
 #include <string>
 
 #include "arrow/io/hdfs.h"
+#include "arrow/util/buffer.h"
+#include "arrow/util/memory-pool.h"
 #include "arrow/util/status.h"
 
 namespace arrow {
@@ -89,7 +91,7 @@ class HdfsAnyFileImpl {
 // Private implementation for read-only files
 class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
  public:
-  HdfsReadableFileImpl() {}
+  explicit HdfsReadableFileImpl(MemoryPool* pool) : pool_(pool) {}
 
   Status Close() {
     if (is_open_) {
@@ -108,6 +110,19 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
     return Status::OK();
   }
 
+  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
+    auto buffer = std::make_shared<PoolBuffer>(pool_);
+    RETURN_NOT_OK(buffer->Resize(nbytes));
+
+    int64_t bytes_read = 0;
+    RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data()));
+
+    if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); }
+
+    *out = buffer;
+    return Status::OK();
+  }
+
   Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
     tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer), nbytes);
     RETURN_NOT_OK(CheckReadResult(ret));
@@ -115,6 +130,19 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
     return Status::OK();
   }
 
+  Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+    auto buffer = std::make_shared<PoolBuffer>(pool_);
+    RETURN_NOT_OK(buffer->Resize(nbytes));
+
+    int64_t bytes_read = 0;
+    RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));
+
+    if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); }
+
+    *out = buffer;
+    return Status::OK();
+  }
+
   Status GetSize(int64_t* size) {
     hdfsFileInfo* entry = hdfsGetPathInfo(fs_, path_.c_str());
     if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo failed"); }
@@ -123,10 +151,16 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
     hdfsFreeFileInfo(entry, 1);
     return Status::OK();
   }
+
+  void set_memory_pool(MemoryPool* pool) { pool_ = pool; }
+
+ private:
+  MemoryPool* pool_;
 };
 
-HdfsReadableFile::HdfsReadableFile() {
-  impl_.reset(new HdfsReadableFileImpl());
+HdfsReadableFile::HdfsReadableFile(MemoryPool* pool) {
+  if (pool == nullptr) { pool = default_memory_pool(); }
+  impl_.reset(new HdfsReadableFileImpl(pool));
 }
 
 HdfsReadableFile::~HdfsReadableFile() {
@@ -144,7 +178,7 @@ Status HdfsReadableFile::ReadAt(
 
 Status HdfsReadableFile::ReadAt(
     int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
-  return Status::NotImplemented("Not yet implemented");
+  return impl_->ReadAt(position, nbytes, out);
 }
 
 bool HdfsReadableFile::supports_zero_copy() const {
@@ -155,6 +189,10 @@ Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buff
   return impl_->Read(nbytes, bytes_read, buffer);
 }
 
+Status HdfsReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* buffer) {
+  return impl_->Read(nbytes, buffer);
+}
+
 Status HdfsReadableFile::GetSize(int64_t* size) {
   return impl_->GetSize(size);
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/cpp/src/arrow/io/hdfs.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h
index 39720cc..4a4e3ec 100644
--- a/cpp/src/arrow/io/hdfs.h
+++ b/cpp/src/arrow/io/hdfs.h
@@ -164,6 +164,12 @@ class ARROW_EXPORT HdfsReadableFile : public ReadableFileInterface {
 
   Status GetSize(int64_t* size) override;
 
+  // NOTE: If you wish to read a particular range of a file in a multithreaded
+  // context, you may prefer to use ReadAt to avoid locking issues
+  Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
+
+  Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
   Status ReadAt(
       int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
 
@@ -174,17 +180,16 @@ class ARROW_EXPORT HdfsReadableFile : public ReadableFileInterface {
   Status Seek(int64_t position) override;
   Status Tell(int64_t* position) override;
 
-  // NOTE: If you wish to read a particular range of a file in a multithreaded
-  // context, you may prefer to use ReadAt to avoid locking issues
-  Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
+  void set_memory_pool(MemoryPool* pool);
 
  private:
+  explicit HdfsReadableFile(MemoryPool* pool = nullptr);
+
   class ARROW_NO_EXPORT HdfsReadableFileImpl;
   std::unique_ptr<HdfsReadableFileImpl> impl_;
 
   friend class HdfsClient::HdfsClientImpl;
 
-  HdfsReadableFile();
   DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile);
 };
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/cpp/src/arrow/io/interfaces.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
new file mode 100644
index 0000000..44986ce
--- /dev/null
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/io/interfaces.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/util/buffer.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace io {
+
+FileInterface::~FileInterface() {}
+
+ReadableFileInterface::ReadableFileInterface() {
+  set_mode(FileMode::READ);
+}
+
+Status ReadableFileInterface::ReadAt(
+    int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+  RETURN_NOT_OK(Seek(position));
+  return Read(nbytes, bytes_read, out);
+}
+
+Status ReadableFileInterface::ReadAt(
+    int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
+  RETURN_NOT_OK(Seek(position));
+  return Read(nbytes, out);
+}
+
+}  // namespace io
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/cpp/src/arrow/io/interfaces.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index fa34b43..db0c059 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -22,10 +22,12 @@
 #include <memory>
 
 #include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
 
 namespace arrow {
 
 class Buffer;
+class MemoryPool;
 class Status;
 
 namespace io {
@@ -43,9 +45,9 @@ class FileSystemClient {
   virtual ~FileSystemClient() {}
 };
 
-class FileInterface {
+class ARROW_EXPORT FileInterface {
  public:
-  virtual ~FileInterface() {}
+  virtual ~FileInterface() = 0;
   virtual Status Close() = 0;
   virtual Status Tell(int64_t* position) = 0;
 
@@ -54,7 +56,6 @@ class FileInterface {
  protected:
   FileInterface() {}
   FileMode::type mode_;
-
   void set_mode(FileMode::type mode) { mode_ = mode; }
 
  private:
@@ -74,6 +75,9 @@ class Writeable {
 class Readable {
  public:
   virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0;
+
+  // Does not copy if not necessary
+  virtual Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) = 0;
 };
 
 class OutputStream : public FileInterface, public Writeable {
@@ -86,21 +90,21 @@ class InputStream : public FileInterface, public Readable {
   InputStream() {}
 };
 
-class ReadableFileInterface : public InputStream, public Seekable {
+class ARROW_EXPORT ReadableFileInterface : public InputStream, public Seekable {
  public:
-  virtual Status ReadAt(
-      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0;
-
   virtual Status GetSize(int64_t* size) = 0;
 
-  // Does not copy if not necessary
+  virtual bool supports_zero_copy() const = 0;
+
+  // Read at position, provide default implementations using Read(...), but can
+  // be overridden
   virtual Status ReadAt(
-      int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) = 0;
+      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out);
 
-  virtual bool supports_zero_copy() const = 0;
+  virtual Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out);
 
  protected:
-  ReadableFileInterface() { set_mode(FileMode::READ); }
+  ReadableFileInterface();
 };
 
 class WriteableFileInterface : public OutputStream, public Seekable {

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index c168c91..7d6e02e 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -123,6 +123,8 @@ MemoryMappedFile::MemoryMappedFile(FileMode::type mode) {
   ReadableFileInterface::set_mode(mode);
 }
 
+MemoryMappedFile::~MemoryMappedFile() {}
+
 Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
     std::shared_ptr<MemoryMappedFile>* out) {
   std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile(mode));
@@ -161,16 +163,8 @@ Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out)
   return Status::OK();
 }
 
-Status MemoryMappedFile::ReadAt(
-    int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
-  RETURN_NOT_OK(impl_->Seek(position));
-  return Read(nbytes, bytes_read, out);
-}
-
-Status MemoryMappedFile::ReadAt(
-    int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
-  nbytes = std::min(nbytes, impl_->size() - position);
-  RETURN_NOT_OK(impl_->Seek(position));
+Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+  nbytes = std::min(nbytes, impl_->size() - impl_->position());
   *out = std::make_shared<Buffer>(impl_->head(), nbytes);
   impl_->advance(nbytes);
   return Status::OK();
@@ -246,6 +240,11 @@ Status BufferOutputStream::Reserve(int64_t nbytes) {
 // ----------------------------------------------------------------------
 // In-memory buffer reader
 
+BufferReader::BufferReader(const uint8_t* buffer, int buffer_size)
+    : buffer_(buffer), buffer_size_(buffer_size), position_(0) {}
+
+BufferReader::~BufferReader() {}
+
 Status BufferReader::Close() {
   // no-op
   return Status::OK();
@@ -256,20 +255,6 @@ Status BufferReader::Tell(int64_t* position) {
   return Status::OK();
 }
 
-Status BufferReader::ReadAt(
-    int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
-  RETURN_NOT_OK(Seek(position));
-  return Read(nbytes, bytes_read, buffer);
-}
-
-Status BufferReader::ReadAt(
-    int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
-  int64_t size = std::min(nbytes, buffer_size_ - position_);
-  *out = std::make_shared<Buffer>(buffer_ + position, size);
-  position_ += nbytes;
-  return Status::OK();
-}
-
 bool BufferReader::supports_zero_copy() const {
   return true;
 }
@@ -281,6 +266,13 @@ Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer)
   return Status::OK();
 }
 
+Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+  int64_t size = std::min(nbytes, buffer_size_ - position_);
+  *out = std::make_shared<Buffer>(buffer_ + position_, size);
+  position_ += nbytes;
+  return Status::OK();
+}
+
 Status BufferReader::GetSize(int64_t* size) {
   *size = buffer_size_;
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index 6989d73..df2fe8d 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -61,6 +61,8 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream {
 // A memory source that uses memory-mapped files for memory interactions
 class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
  public:
+  ~MemoryMappedFile();
+
   static Status Open(const std::string& path, FileMode::type mode,
       std::shared_ptr<MemoryMappedFile>* out);
 
@@ -73,11 +75,8 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
   // Required by ReadableFileInterface, copies memory into out
   Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override;
 
-  Status ReadAt(
-      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) override;
-
-  // Read into a buffer, zero copy if possible
-  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+  // Zero copy read
+  Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
 
   bool supports_zero_copy() const override;
 
@@ -100,17 +99,17 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
 
 class ARROW_EXPORT BufferReader : public ReadableFileInterface {
  public:
-  BufferReader(const uint8_t* buffer, int buffer_size)
-      : buffer_(buffer), buffer_size_(buffer_size), position_(0) {}
+  BufferReader(const uint8_t* buffer, int buffer_size);
+  ~BufferReader();
 
   Status Close() override;
   Status Tell(int64_t* position) override;
 
-  Status ReadAt(
-      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
-  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
-
   Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
+
+  // Zero copy read
+  Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
   Status GetSize(int64_t* size) override;
   Status Seek(int64_t position) override;
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 6357e3c..77a771a 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -432,6 +432,7 @@ set(PYARROW_SRCS
   src/pyarrow/common.cc
   src/pyarrow/config.cc
   src/pyarrow/helpers.cc
+  src/pyarrow/io.cc
   src/pyarrow/status.cc
 
   src/pyarrow/adapters/builtin.cc

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 40a09c2..7561f6d 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -41,6 +41,5 @@ from pyarrow.schema import (null, bool_,
                             list_, struct, field,
                             DataType, Field, Schema, schema)
 
-from pyarrow.array import RowBatch, from_pandas_dataframe
-
-from pyarrow.table import Column, Table
+from pyarrow.array import RowBatch
+from pyarrow.table import Column, Table, from_pandas_dataframe

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/pyarrow/array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx
index 5229b42..cdbe73a 100644
--- a/python/pyarrow/array.pyx
+++ b/python/pyarrow/array.pyx
@@ -35,7 +35,6 @@ from pyarrow.scalar import NA
 from pyarrow.schema cimport Schema
 import pyarrow.schema as schema
 
-from pyarrow.table cimport Table
 
 def total_allocated_bytes():
     cdef MemoryPool* pool = pyarrow.GetMemoryPool()
@@ -254,35 +253,6 @@ def from_pandas_series(object series, object mask=None, timestamps_to_ms=False):
     return box_arrow_array(out)
 
 
-def from_pandas_dataframe(object df, name=None, timestamps_to_ms=False):
-    """
-    Convert pandas.DataFrame to an Arrow Table
-
-    Parameters
-    ----------
-    df: pandas.DataFrame
-
-    name: str
-
-    timestamps_to_ms: bool
-        Convert datetime columns to ms resolution. This is needed for
-        compability with other functionality like Parquet I/O which
-        only supports milliseconds.
-    """
-    cdef:
-        list names = []
-        list arrays = []
-
-    for name in df.columns:
-        col = df[name]
-        arr = from_pandas_series(col, timestamps_to_ms=timestamps_to_ms)
-
-        names.append(name)
-        arrays.append(arr)
-
-    return Table.from_arrays(names, arrays, name=name)
-
-
 cdef object series_as_ndarray(object obj):
     import pandas as pd
 
@@ -324,4 +294,3 @@ cdef class RowBatch:
 
     def __getitem__(self, i):
         return self.arrays[i]
-

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/pyarrow/error.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/error.pxd b/python/pyarrow/error.pxd
index 1fb6fad..891d1ac 100644
--- a/python/pyarrow/error.pxd
+++ b/python/pyarrow/error.pxd
@@ -16,7 +16,7 @@
 # under the License.
 
 from pyarrow.includes.libarrow cimport CStatus
-from pyarrow.includes.pyarrow cimport *
+from pyarrow.includes.pyarrow cimport PyStatus
 
 cdef int check_cstatus(const CStatus& status) nogil except -1
-cdef int check_status(const Status& status) nogil except -1
+cdef int check_status(const PyStatus& status) nogil except -1

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/pyarrow/error.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/error.pyx b/python/pyarrow/error.pyx
index 2440193..a2c53fe 100644
--- a/python/pyarrow/error.pyx
+++ b/python/pyarrow/error.pyx
@@ -30,7 +30,7 @@ cdef int check_cstatus(const CStatus& status) nogil except -1:
     with gil:
         raise ArrowException(frombytes(c_message))
 
-cdef int check_status(const Status& status) nogil except -1:
+cdef int check_status(const PyStatus& status) nogil except -1:
     if status.ok():
         return 0
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/pyarrow/includes/libarrow_io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd
index f338a43..56d8d4c 100644
--- a/python/pyarrow/includes/libarrow_io.pxd
+++ b/python/pyarrow/includes/libarrow_io.pxd
@@ -18,6 +18,7 @@
 # distutils: language = c++
 
 from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport MemoryPool
 
 cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
     enum FileMode" arrow::io::FileMode::type":
@@ -35,6 +36,7 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
         FileMode mode()
 
     cdef cppclass Readable:
+        CStatus ReadB" Read"(int64_t nbytes, shared_ptr[Buffer]* out)
         CStatus Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out)
 
     cdef cppclass Seekable:
@@ -66,6 +68,24 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
         pass
 
 
+cdef extern from "arrow/io/file.h" namespace "arrow::io" nogil:
+    cdef cppclass FileOutputStream(OutputStream):
+        @staticmethod
+        CStatus Open(const c_string& path, shared_ptr[FileOutputStream]* file)
+
+        int file_descriptor()
+
+    cdef cppclass ReadableFile(ReadableFileInterface):
+        @staticmethod
+        CStatus Open(const c_string& path, shared_ptr[ReadableFile]* file)
+
+        @staticmethod
+        CStatus Open(const c_string& path, MemoryPool* memory_pool,
+                     shared_ptr[ReadableFile]* file)
+
+        int file_descriptor()
+
+
 cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
     CStatus ConnectLibHdfs()
 
@@ -120,3 +140,12 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
                               int32_t buffer_size, int16_t replication,
                               int64_t default_block_size,
                               shared_ptr[HdfsOutputStream]* handle)
+
+
+cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil:
+    cdef cppclass BufferReader(ReadableFileInterface):
+        BufferReader(const uint8_t* data, int64_t nbytes)
+
+    cdef cppclass BufferOutputStream(OutputStream):
+        # TODO(wesm)
+        pass

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/pyarrow/includes/pyarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd
index 92c8147..4c97166 100644
--- a/python/pyarrow/includes/pyarrow.pxd
+++ b/python/pyarrow/includes/pyarrow.pxd
@@ -18,15 +18,18 @@
 # distutils: language = c++
 
 from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport (CArray, CColumn, CDataType,
+from pyarrow.includes.libarrow cimport (CArray, CColumn, CDataType, CStatus,
                                         Type, MemoryPool)
 
+cimport pyarrow.includes.libarrow_io as arrow_io
+
+
 cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil:
     # We can later add more of the common status factory methods as needed
-    cdef Status Status_OK "Status::OK"()
+    cdef PyStatus PyStatus_OK "Status::OK"()
 
-    cdef cppclass Status:
-        Status()
+    cdef cppclass PyStatus "pyarrow::Status":
+        PyStatus()
 
         c_string ToString()
 
@@ -40,12 +43,25 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil:
         c_bool IsArrowError()
 
     shared_ptr[CDataType] GetPrimitiveType(Type type)
-    Status ConvertPySequence(object obj, shared_ptr[CArray]* out)
+    PyStatus ConvertPySequence(object obj, shared_ptr[CArray]* out)
 
-    Status PandasToArrow(MemoryPool* pool, object ao, shared_ptr[CArray]* out)
-    Status PandasMaskedToArrow(MemoryPool* pool, object ao, object mo,
-                               shared_ptr[CArray]* out)
+    PyStatus PandasToArrow(MemoryPool* pool, object ao,
+                           shared_ptr[CArray]* out)
+    PyStatus PandasMaskedToArrow(MemoryPool* pool, object ao, object mo,
+                                 shared_ptr[CArray]* out)
 
-    Status ArrowToPandas(const shared_ptr[CColumn]& arr, object py_ref, PyObject** out)
+    PyStatus ArrowToPandas(const shared_ptr[CColumn]& arr, object py_ref,
+                           PyObject** out)
 
     MemoryPool* GetMemoryPool()
+
+
+cdef extern from "pyarrow/io.h" namespace "pyarrow" nogil:
+    cdef cppclass PyReadableFile(arrow_io.ReadableFileInterface):
+        PyReadableFile(object fo)
+
+    cdef cppclass PyOutputStream(arrow_io.OutputStream):
+        PyOutputStream(object fo)
+
+    cdef cppclass PyBytesReader(arrow_io.BufferReader):
+        PyBytesReader(object fo)

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/pyarrow/io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd
index f55fc0a..1dbb3fd 100644
--- a/python/pyarrow/io.pxd
+++ b/python/pyarrow/io.pxd
@@ -23,11 +23,16 @@ from pyarrow.includes.libarrow_io cimport (ReadableFileInterface,
                                            OutputStream)
 
 
-cdef class NativeFileInterface:
+cdef class NativeFile:
+    cdef:
+        shared_ptr[ReadableFileInterface] rd_file
+        shared_ptr[OutputStream] wr_file
+        bint is_readonly
+        bint is_open
 
     # By implementing these "virtual" functions (all functions in Cython
-    # extension classes are technically virtual in the C++ sense)m we can
-    # expose the arrow::io abstract file interfaces to other components
-    # throughout the suite of Arrow C++ libraries
+    # extension classes are technically virtual in the C++ sense) we can expose
+    # the arrow::io abstract file interfaces to other components throughout the
+    # suite of Arrow C++ libraries
     cdef read_handle(self, shared_ptr[ReadableFileInterface]* file)
     cdef write_handle(self, shared_ptr[OutputStream]* file)

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index f2eee26..e6e2b62 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -242,6 +242,9 @@ cdef class HdfsClient:
         cdef int16_t c_replication = replication or 0
         cdef int64_t c_default_block_size = default_block_size or 0
 
+        cdef shared_ptr[HdfsOutputStream] wr_handle
+        cdef shared_ptr[HdfsReadableFile] rd_handle
+
         if mode in ('wb', 'ab'):
             if mode == 'ab':
                 append = True
@@ -251,13 +254,17 @@ cdef class HdfsClient:
                     self.client.get()
                     .OpenWriteable(c_path, append, c_buffer_size,
                                    c_replication, c_default_block_size,
-                                   &out.wr_file))
+                                   &wr_handle))
+
+            out.wr_file = <shared_ptr[OutputStream]> wr_handle
 
             out.is_readonly = False
         else:
             with nogil:
                 check_cstatus(self.client.get()
-                              .OpenReadable(c_path, &out.rd_file))
+                              .OpenReadable(c_path, &rd_handle))
+
+            out.rd_file = <shared_ptr[ReadableFileInterface]> rd_handle
             out.is_readonly = True
 
         if c_buffer_size == 0:
@@ -314,25 +321,8 @@ cdef class HdfsClient:
         f = self.open(path, 'rb', buffer_size=buffer_size)
         f.download(stream)
 
-cdef class NativeFileInterface:
-
-    cdef read_handle(self, shared_ptr[ReadableFileInterface]* file):
-        raise NotImplementedError
-
-    cdef write_handle(self, shared_ptr[OutputStream]* file):
-        raise NotImplementedError
-
-cdef class HdfsFile(NativeFileInterface):
-    cdef:
-        shared_ptr[HdfsReadableFile] rd_file
-        shared_ptr[HdfsOutputStream] wr_file
-        bint is_readonly
-        bint is_open
-        object parent
 
-    cdef readonly:
-        int32_t buffer_size
-        object mode
+cdef class NativeFile:
 
     def __cinit__(self):
         self.is_open = False
@@ -356,14 +346,6 @@ cdef class HdfsFile(NativeFileInterface):
                     check_cstatus(self.wr_file.get().Close())
         self.is_open = False
 
-    cdef _assert_readable(self):
-        if not self.is_readonly:
-            raise IOError("only valid on readonly files")
-
-    cdef _assert_writeable(self):
-        if self.is_readonly:
-            raise IOError("only valid on writeonly files")
-
     cdef read_handle(self, shared_ptr[ReadableFileInterface]* file):
         self._assert_readable()
         file[0] = <shared_ptr[ReadableFileInterface]> self.rd_file
@@ -372,6 +354,14 @@ cdef class HdfsFile(NativeFileInterface):
         self._assert_writeable()
         file[0] = <shared_ptr[OutputStream]> self.wr_file
 
+    def _assert_readable(self):
+        if not self.is_readonly:
+            raise IOError("only valid on readonly files")
+
+    def _assert_writeable(self):
+        if self.is_readonly:
+            raise IOError("only valid on writeonly files")
+
     def size(self):
         cdef int64_t size
         self._assert_readable()
@@ -393,6 +383,83 @@ cdef class HdfsFile(NativeFileInterface):
         with nogil:
             check_cstatus(self.rd_file.get().Seek(position))
 
+    def write(self, data):
+        """
+        Write bytes-like (unicode, encoded to UTF-8) to file
+        """
+        self._assert_writeable()
+
+        data = tobytes(data)
+
+        cdef const uint8_t* buf = <const uint8_t*> cp.PyBytes_AS_STRING(data)
+        cdef int64_t bufsize = len(data)
+        with nogil:
+            check_cstatus(self.wr_file.get().Write(buf, bufsize))
+
+    def read(self, int nbytes):
+        cdef:
+            int64_t bytes_read = 0
+            uint8_t* buf
+            shared_ptr[Buffer] out
+
+        self._assert_readable()
+
+        with nogil:
+            check_cstatus(self.rd_file.get()
+                          .ReadB(nbytes, &out))
+
+        result = cp.PyBytes_FromStringAndSize(
+            <const char*>out.get().data(), out.get().size())
+
+        return result
+
+
+# ----------------------------------------------------------------------
+# Python file-like objects
+
+cdef class PythonFileInterface(NativeFile):
+    cdef:
+        object handle
+
+    def __cinit__(self, handle, mode='w'):
+        self.handle = handle
+
+        if mode.startswith('w'):
+            self.wr_file.reset(new pyarrow.PyOutputStream(handle))
+            self.is_readonly = 0
+        elif mode.startswith('r'):
+            self.rd_file.reset(new pyarrow.PyReadableFile(handle))
+            self.is_readonly = 1
+        else:
+            raise ValueError('Invalid file mode: {0}'.format(mode))
+
+        self.is_open = True
+
+
+cdef class BytesReader(NativeFile):
+    cdef:
+        object obj
+
+    def __cinit__(self, obj):
+        if not isinstance(obj, bytes):
+            raise ValueError('Must pass bytes object')
+
+        self.obj = obj
+        self.is_readonly = 1
+        self.is_open = True
+
+        self.rd_file.reset(new pyarrow.PyBytesReader(obj))
+
+# ----------------------------------------------------------------------
+# Specialization for HDFS
+
+
+cdef class HdfsFile(NativeFile):
+    cdef readonly:
+        int32_t buffer_size
+        object mode
+        object parent
+
     def read(self, int nbytes):
         """
         Read indicated number of bytes from the file, up to EOF
@@ -504,16 +571,3 @@ cdef class HdfsFile(NativeFileInterface):
         writer_thread.join()
         if exc_info is not None:
             raise exc_info[0], exc_info[1], exc_info[2]
-
-    def write(self, data):
-        """
-        Write bytes-like (unicode, encoded to UTF-8) to file
-        """
-        self._assert_writeable()
-
-        data = tobytes(data)
-
-        cdef const uint8_t* buf = <const uint8_t*> cp.PyBytes_AS_STRING(data)
-        cdef int64_t bufsize = len(data)
-        with nogil:
-            check_cstatus(self.wr_file.get().Write(buf, bufsize))

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/pyarrow/parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx
index 099e148..ca0176a 100644
--- a/python/pyarrow/parquet.pyx
+++ b/python/pyarrow/parquet.pyx
@@ -27,10 +27,10 @@ cimport pyarrow.includes.pyarrow as pyarrow
 from pyarrow.compat import tobytes
 from pyarrow.error import ArrowException
 from pyarrow.error cimport check_cstatus
-from pyarrow.io import NativeFileInterface
+from pyarrow.io import NativeFile
 from pyarrow.table cimport Table
 
-from pyarrow.io cimport NativeFileInterface
+from pyarrow.io cimport NativeFile
 
 import six
 
@@ -54,7 +54,7 @@ cdef class ParquetReader:
             new FileReader(default_memory_pool(),
                            ParquetFileReader.OpenFile(path)))
 
-    cdef open_native_file(self, NativeFileInterface file):
+    cdef open_native_file(self, NativeFile file):
         cdef shared_ptr[ReadableFileInterface] cpp_handle
         file.read_handle(&cpp_handle)
 
@@ -84,7 +84,7 @@ def read_table(source, columns=None):
 
     if isinstance(source, six.string_types):
         reader.open_local_file(source)
-    elif isinstance(source, NativeFileInterface):
+    elif isinstance(source, NativeFile):
         reader.open_native_file(source)
 
     return reader.read_all()

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index f02d36f..ade82aa 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -25,10 +25,12 @@ cimport pyarrow.includes.pyarrow as pyarrow
 import pyarrow.config
 
 from pyarrow.array cimport Array, box_arrow_array
-from pyarrow.compat import frombytes, tobytes
 from pyarrow.error cimport check_status
 from pyarrow.schema cimport box_data_type, box_schema
 
+from pyarrow.compat import frombytes, tobytes
+
+
 cdef class ChunkedArray:
     '''
     Do not call this class's constructor directly.
@@ -161,7 +163,7 @@ cdef class Table:
 
     @staticmethod
     def from_pandas(df, name=None):
-        pass
+        return from_pandas_dataframe(df, name=name)
 
     @staticmethod
     def from_arrays(names, arrays, name=None):
@@ -264,3 +266,34 @@ cdef class Table:
         def __get__(self):
             return (self.num_rows, self.num_columns)
 
+
+
+def from_pandas_dataframe(object df, name=None, timestamps_to_ms=False):
+    """
+    Convert pandas.DataFrame to an Arrow Table
+
+    Parameters
+    ----------
+    df: pandas.DataFrame
+
+    name: str
+
+    timestamps_to_ms: bool
+        Convert datetime columns to ms resolution. This is needed for
+        compability with other functionality like Parquet I/O which
+        only supports milliseconds.
+    """
+    from pyarrow.array import from_pandas_series
+
+    cdef:
+        list names = []
+        list arrays = []
+
+    for name in df.columns:
+        col = df[name]
+        arr = from_pandas_series(col, timestamps_to_ms=timestamps_to_ms)
+
+        names.append(name)
+        arrays.append(arr)
+
+    return Table.from_arrays(names, arrays, name=name)

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/pyarrow/tests/test_hdfs.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py
new file mode 100644
index 0000000..ed8d419
--- /dev/null
+++ b/python/pyarrow/tests/test_hdfs.py
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from io import BytesIO
+from os.path import join as pjoin
+import os
+import random
+
+import pytest
+
+import pyarrow.io as io
+
+# ----------------------------------------------------------------------
+# HDFS tests
+
+
+def hdfs_test_client():
+    host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost')
+    user = os.environ['ARROW_HDFS_TEST_USER']
+    try:
+        port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 20500))
+    except ValueError:
+        raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
+                         'an integer')
+
+    return io.HdfsClient.connect(host, port, user)
+
+
+libhdfs = pytest.mark.skipif(not io.have_libhdfs(),
+                             reason='No libhdfs available on system')
+
+
+HDFS_TMP_PATH = '/tmp/pyarrow-test-{0}'.format(random.randint(0, 1000))
+
+
+@pytest.fixture(scope='session')
+def hdfs(request):
+    fixture = hdfs_test_client()
+
+    def teardown():
+        fixture.delete(HDFS_TMP_PATH, recursive=True)
+        fixture.close()
+    request.addfinalizer(teardown)
+    return fixture
+
+
+@libhdfs
+def test_hdfs_close():
+    client = hdfs_test_client()
+    assert client.is_open
+    client.close()
+    assert not client.is_open
+
+    with pytest.raises(Exception):
+        client.ls('/')
+
+
+@libhdfs
+def test_hdfs_mkdir(hdfs):
+    path = pjoin(HDFS_TMP_PATH, 'test-dir/test-dir')
+    parent_path = pjoin(HDFS_TMP_PATH, 'test-dir')
+
+    hdfs.mkdir(path)
+    assert hdfs.exists(path)
+
+    hdfs.delete(parent_path, recursive=True)
+    assert not hdfs.exists(path)
+
+
+@libhdfs
+def test_hdfs_ls(hdfs):
+    base_path = pjoin(HDFS_TMP_PATH, 'ls-test')
+    hdfs.mkdir(base_path)
+
+    dir_path = pjoin(base_path, 'a-dir')
+    f1_path = pjoin(base_path, 'a-file-1')
+
+    hdfs.mkdir(dir_path)
+
+    f = hdfs.open(f1_path, 'wb')
+    f.write('a' * 10)
+
+    contents = sorted(hdfs.ls(base_path, False))
+    assert contents == [dir_path, f1_path]
+
+
+@libhdfs
+def test_hdfs_download_upload(hdfs):
+    base_path = pjoin(HDFS_TMP_PATH, 'upload-test')
+
+    data = b'foobarbaz'
+    buf = BytesIO(data)
+    buf.seek(0)
+
+    hdfs.upload(base_path, buf)
+
+    out_buf = BytesIO()
+    hdfs.download(base_path, out_buf)
+    out_buf.seek(0)
+    assert out_buf.getvalue() == data
+
+
+@libhdfs
+def test_hdfs_file_context_manager(hdfs):
+    path = pjoin(HDFS_TMP_PATH, 'ctx-manager')
+
+    data = b'foo'
+    with hdfs.open(path, 'wb') as f:
+        f.write(data)
+
+    with hdfs.open(path, 'rb') as f:
+        assert f.size() == 3
+        result = f.read(10)
+        assert result == data

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/pyarrow/tests/test_io.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index eb92e8e..9a41ebe 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -16,112 +16,85 @@
 # under the License.
 
 from io import BytesIO
-from os.path import join as pjoin
-import os
-import random
-
 import pytest
 
+from pyarrow.compat import u
 import pyarrow.io as io
 
-#----------------------------------------------------------------------
-# HDFS tests
+# ----------------------------------------------------------------------
+# Python file-like objects
 
 
-def hdfs_test_client():
-    host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost')
-    user = os.environ['ARROW_HDFS_TEST_USER']
-    try:
-        port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 20500))
-    except ValueError:
-        raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
-                         'an integer')
+def test_python_file_write():
+    buf = BytesIO()
 
-    return io.HdfsClient.connect(host, port, user)
+    f = io.PythonFileInterface(buf)
 
+    assert f.tell() == 0
 
-libhdfs = pytest.mark.skipif(not io.have_libhdfs(),
-                             reason='No libhdfs available on system')
+    s1 = b'enga\xc3\xb1ado'
+    s2 = b'foobar'
 
+    f.write(s1.decode('utf8'))
+    assert f.tell() == len(s1)
 
-HDFS_TMP_PATH = '/tmp/pyarrow-test-{0}'.format(random.randint(0, 1000))
+    f.write(s2)
 
+    expected = s1 + s2
 
-@pytest.fixture(scope='session')
-def hdfs(request):
-    fixture = hdfs_test_client()
-    def teardown():
-        fixture.delete(HDFS_TMP_PATH, recursive=True)
-        fixture.close()
-    request.addfinalizer(teardown)
-    return fixture
+    result = buf.getvalue()
+    assert result == expected
 
+    f.close()
 
-@libhdfs
-def test_hdfs_close():
-    client = hdfs_test_client()
-    assert client.is_open
-    client.close()
-    assert not client.is_open
 
-    with pytest.raises(Exception):
-        client.ls('/')
+def test_python_file_read():
+    data = b'some sample data'
 
+    buf = BytesIO(data)
+    f = io.PythonFileInterface(buf, mode='r')
 
-@libhdfs
-def test_hdfs_mkdir(hdfs):
-    path = pjoin(HDFS_TMP_PATH, 'test-dir/test-dir')
-    parent_path = pjoin(HDFS_TMP_PATH, 'test-dir')
+    assert f.size() == len(data)
 
-    hdfs.mkdir(path)
-    assert hdfs.exists(path)
+    assert f.tell() == 0
 
-    hdfs.delete(parent_path, recursive=True)
-    assert not hdfs.exists(path)
+    assert f.read(4) == b'some'
+    assert f.tell() == 4
 
+    f.seek(0)
+    assert f.tell() == 0
 
-@libhdfs
-def test_hdfs_ls(hdfs):
-    base_path = pjoin(HDFS_TMP_PATH, 'ls-test')
-    hdfs.mkdir(base_path)
+    f.seek(5)
+    assert f.tell() == 5
 
-    dir_path = pjoin(base_path, 'a-dir')
-    f1_path = pjoin(base_path, 'a-file-1')
+    assert f.read(50) == b'sample data'
 
-    hdfs.mkdir(dir_path)
+    f.close()
 
-    f = hdfs.open(f1_path, 'wb')
-    f.write('a' * 10)
 
-    contents = sorted(hdfs.ls(base_path, False))
-    assert contents == [dir_path, f1_path]
+def test_bytes_reader():
+    # Like a BytesIO, but zero-copy underneath for C++ consumers
+    data = b'some sample data'
+    f = io.BytesReader(data)
 
+    assert f.tell() == 0
 
-@libhdfs
-def test_hdfs_download_upload(hdfs):
-    base_path = pjoin(HDFS_TMP_PATH, 'upload-test')
+    assert f.size() == len(data)
 
-    data = b'foobarbaz'
-    buf = BytesIO(data)
-    buf.seek(0)
+    assert f.read(4) == b'some'
+    assert f.tell() == 4
 
-    hdfs.upload(base_path, buf)
+    f.seek(0)
+    assert f.tell() == 0
 
-    out_buf = BytesIO()
-    hdfs.download(base_path, out_buf)
-    out_buf.seek(0)
-    assert out_buf.getvalue() == data
+    f.seek(5)
+    assert f.tell() == 5
 
+    assert f.read(50) == b'sample data'
 
-@libhdfs
-def test_hdfs_file_context_manager(hdfs):
-    path = pjoin(HDFS_TMP_PATH, 'ctx-manager')
+    f.close()
 
-    data = b'foo'
-    with hdfs.open(path, 'wb') as f:
-        f.write(data)
 
-    with hdfs.open(path, 'rb') as f:
-        assert f.size() == 3
-        result = f.read(10)
-        assert result == data
+def test_bytes_reader_non_bytes():
+    with pytest.raises(ValueError):
+        io.BytesReader(u('some sample data'))

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index a4e7fb6..d224074 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -618,7 +618,7 @@ class ArrowDeserializer {
   Status OutputFromData(int type, void* data) {
     // Zero-Copy. We can pass the data pointer directly to NumPy.
     Py_INCREF(py_ref_);
-    OwnedRef py_ref(py_ref);
+    OwnedRef py_ref(py_ref_);
     npy_intp dims[1] = {col_->length()};
     out_ = reinterpret_cast<PyArrayObject*>(PyArray_SimpleNewFromData(1, dims,
                 type, data));

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/src/pyarrow/common.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/common.cc b/python/src/pyarrow/common.cc
index a2748f9..82b14fd 100644
--- a/python/src/pyarrow/common.cc
+++ b/python/src/pyarrow/common.cc
@@ -68,4 +68,19 @@ arrow::MemoryPool* GetMemoryPool() {
   return &memory_pool;
 }
 
+// ----------------------------------------------------------------------
+// PyBytesBuffer
+
+PyBytesBuffer::PyBytesBuffer(PyObject* obj)
+    : Buffer(reinterpret_cast<const uint8_t*>(PyBytes_AS_STRING(obj)),
+        PyBytes_GET_SIZE(obj)),
+      obj_(obj) {
+  Py_INCREF(obj_);
+}
+
+PyBytesBuffer::~PyBytesBuffer() {
+  PyGILGuard lock;
+  Py_DECREF(obj_);
+}
+
 } // namespace pyarrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/src/pyarrow/common.h
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h
index fb0ba3e..bc599f8 100644
--- a/python/src/pyarrow/common.h
+++ b/python/src/pyarrow/common.h
@@ -19,9 +19,8 @@
 #define PYARROW_COMMON_H
 
 #include "pyarrow/config.h"
-
 #include "arrow/util/buffer.h"
-
+#include "arrow/util/macros.h"
 #include "pyarrow/visibility.h"
 
 namespace arrow { class MemoryPool; }
@@ -83,6 +82,20 @@ struct PyObjectStringify {
   }
 };
 
+class PyGILGuard {
+ public:
+  PyGILGuard() {
+    state_ = PyGILState_Ensure();
+  }
+
+  ~PyGILGuard() {
+    PyGILState_Release(state_);
+  }
+ private:
+  PyGILState_STATE state_;
+  DISALLOW_COPY_AND_ASSIGN(PyGILGuard);
+};
+
 // TODO(wesm): We can just let errors pass through. To be explored later
 #define RETURN_IF_PYERROR()                         \
   if (PyErr_Occurred()) {                           \
@@ -100,8 +113,8 @@ PYARROW_EXPORT arrow::MemoryPool* GetMemoryPool();
 
 class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer {
  public:
-  NumPyBuffer(PyArrayObject* arr) :
-      Buffer(nullptr, 0) {
+  NumPyBuffer(PyArrayObject* arr)
+    : Buffer(nullptr, 0) {
     arr_ = arr;
     Py_INCREF(arr);
 
@@ -117,6 +130,15 @@ class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer {
   PyArrayObject* arr_;
 };
 
+class PYARROW_EXPORT PyBytesBuffer : public arrow::Buffer {
+ public:
+  PyBytesBuffer(PyObject* obj);
+  ~PyBytesBuffer();
+
+ private:
+  PyObject* obj_;
+};
+
 } // namespace pyarrow
 
 #endif // PYARROW_COMMON_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/src/pyarrow/io.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc
new file mode 100644
index 0000000..35054e9
--- /dev/null
+++ b/python/src/pyarrow/io.cc
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pyarrow/io.h"
+
+#include <cstdint>
+#include <cstdlib>
+
+#include <arrow/io/memory.h>
+#include <arrow/util/memory-pool.h>
+#include <arrow/util/status.h>
+
+#include "pyarrow/common.h"
+#include "pyarrow/status.h"
+
+namespace pyarrow {
+
+// ----------------------------------------------------------------------
+// Python file
+
+PythonFile::PythonFile(PyObject* file)
+    : file_(file) {
+  Py_INCREF(file_);
+}
+
+PythonFile::~PythonFile() {
+  Py_DECREF(file_);
+}
+
+static arrow::Status CheckPyError() {
+  if (PyErr_Occurred()) {
+    PyObject *exc_type, *exc_value, *traceback;
+    PyErr_Fetch(&exc_type, &exc_value, &traceback);
+    PyObjectStringify stringified(exc_value);
+    std::string message(stringified.bytes);
+    Py_DECREF(exc_type);
+    Py_DECREF(exc_value);
+    Py_DECREF(traceback);
+    PyErr_Clear();
+    return arrow::Status::IOError(message);
+  }
+  return arrow::Status::OK();
+}
+
+arrow::Status PythonFile::Close() {
+  // whence: 0 for relative to start of file, 2 for end of file
+  PyObject* result = PyObject_CallMethod(file_, "close", "()");
+  Py_XDECREF(result);
+  ARROW_RETURN_NOT_OK(CheckPyError());
+  return arrow::Status::OK();
+}
+
+arrow::Status PythonFile::Seek(int64_t position, int whence) {
+  // whence: 0 for relative to start of file, 2 for end of file
+  PyObject* result = PyObject_CallMethod(file_, "seek", "(ii)", position, whence);
+  Py_XDECREF(result);
+  ARROW_RETURN_NOT_OK(CheckPyError());
+  return arrow::Status::OK();
+}
+
+arrow::Status PythonFile::Read(int64_t nbytes, PyObject** out) {
+  PyObject* result = PyObject_CallMethod(file_, "read", "(i)", nbytes);
+  ARROW_RETURN_NOT_OK(CheckPyError());
+  *out = result;
+  return arrow::Status::OK();
+}
+
+arrow::Status PythonFile::Write(const uint8_t* data, int64_t nbytes) {
+  PyObject* py_data = PyBytes_FromStringAndSize(
+      reinterpret_cast<const char*>(data), nbytes);
+  ARROW_RETURN_NOT_OK(CheckPyError());
+
+  PyObject* result = PyObject_CallMethod(file_, "write", "(O)", py_data);
+  Py_DECREF(py_data);
+  Py_XDECREF(result);
+  ARROW_RETURN_NOT_OK(CheckPyError());
+  return arrow::Status::OK();
+}
+
+arrow::Status PythonFile::Tell(int64_t* position) {
+  PyObject* result = PyObject_CallMethod(file_, "tell", "()");
+  ARROW_RETURN_NOT_OK(CheckPyError());
+
+  *position = PyLong_AsLongLong(result);
+  Py_DECREF(result);
+
+  // PyLong_AsLongLong can raise OverflowError
+  ARROW_RETURN_NOT_OK(CheckPyError());
+
+ return arrow::Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Seekable input stream
+
+PyReadableFile::PyReadableFile(PyObject* file) {
+  file_.reset(new PythonFile(file));
+}
+
+PyReadableFile::~PyReadableFile() {}
+
+arrow::Status PyReadableFile::Close() {
+  PyGILGuard lock;
+  return file_->Close();
+}
+
+arrow::Status PyReadableFile::Seek(int64_t position) {
+  PyGILGuard lock;
+  return file_->Seek(position, 0);
+}
+
+arrow::Status PyReadableFile::Tell(int64_t* position) {
+  PyGILGuard lock;
+  return file_->Tell(position);
+}
+
+arrow::Status PyReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+  PyGILGuard lock;
+  PyObject* bytes_obj;
+  ARROW_RETURN_NOT_OK(file_->Read(nbytes, &bytes_obj));
+
+  *bytes_read = PyBytes_GET_SIZE(bytes_obj);
+  std::memcpy(out, PyBytes_AS_STRING(bytes_obj), *bytes_read);
+  Py_DECREF(bytes_obj);
+
+  return arrow::Status::OK();
+}
+
+arrow::Status PyReadableFile::Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) {
+  PyGILGuard lock;
+
+  PyObject* bytes_obj;
+  ARROW_RETURN_NOT_OK(file_->Read(nbytes, &bytes_obj));
+
+  *out = std::make_shared<PyBytesBuffer>(bytes_obj);
+  Py_DECREF(bytes_obj);
+
+  return arrow::Status::OK();
+}
+
+arrow::Status PyReadableFile::GetSize(int64_t* size) {
+  PyGILGuard lock;
+
+  int64_t current_position;;
+  ARROW_RETURN_NOT_OK(file_->Tell(&current_position));
+
+  ARROW_RETURN_NOT_OK(file_->Seek(0, 2));
+
+  int64_t file_size;
+  ARROW_RETURN_NOT_OK(file_->Tell(&file_size));
+
+  // Restore previous file position
+  ARROW_RETURN_NOT_OK(file_->Seek(current_position, 0));
+
+  *size = file_size;
+  return arrow::Status::OK();
+}
+
+bool PyReadableFile::supports_zero_copy() const {
+  return false;
+}
+
+// ----------------------------------------------------------------------
+// Output stream
+
+PyOutputStream::PyOutputStream(PyObject* file) {
+  file_.reset(new PythonFile(file));
+}
+
+PyOutputStream::~PyOutputStream() {}
+
+arrow::Status PyOutputStream::Close() {
+  PyGILGuard lock;
+  return file_->Close();
+}
+
+arrow::Status PyOutputStream::Tell(int64_t* position) {
+  PyGILGuard lock;
+  return file_->Tell(position);
+}
+
+arrow::Status PyOutputStream::Write(const uint8_t* data, int64_t nbytes) {
+  PyGILGuard lock;
+  return file_->Write(data, nbytes);
+}
+
+// ----------------------------------------------------------------------
+// A readable file that is backed by a PyBytes
+
+PyBytesReader::PyBytesReader(PyObject* obj)
+    : arrow::io::BufferReader(reinterpret_cast<const uint8_t*>(PyBytes_AS_STRING(obj)),
+        PyBytes_GET_SIZE(obj)),
+      obj_(obj) {
+  Py_INCREF(obj_);
+}
+
+PyBytesReader::~PyBytesReader() {
+  Py_DECREF(obj_);
+}
+
+}  // namespace pyarrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/c7e6a071/python/src/pyarrow/io.h
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/io.h b/python/src/pyarrow/io.h
new file mode 100644
index 0000000..e14aa8c
--- /dev/null
+++ b/python/src/pyarrow/io.h
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PYARROW_IO_H
+#define PYARROW_IO_H
+
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+
+#include "pyarrow/config.h"
+#include "pyarrow/visibility.h"
+
+namespace arrow { class MemoryPool; }
+
+namespace pyarrow {
+
+// A common interface to a Python file-like object. Must acquire GIL before
+// calling any methods
+class PythonFile {
+ public:
+  PythonFile(PyObject* file);
+  ~PythonFile();
+
+  arrow::Status Close();
+  arrow::Status Seek(int64_t position, int whence);
+  arrow::Status Read(int64_t nbytes, PyObject** out);
+  arrow::Status Tell(int64_t* position);
+  arrow::Status Write(const uint8_t* data, int64_t nbytes);
+
+ private:
+  PyObject* file_;
+};
+
+class PYARROW_EXPORT PyReadableFile : public arrow::io::ReadableFileInterface {
+ public:
+  explicit PyReadableFile(PyObject* file);
+  virtual ~PyReadableFile();
+
+  arrow::Status Close() override;
+
+  arrow::Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override;
+  arrow::Status Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) override;
+
+  arrow::Status GetSize(int64_t* size) override;
+
+  arrow::Status Seek(int64_t position) override;
+
+  arrow::Status Tell(int64_t* position) override;
+
+  bool supports_zero_copy() const override;
+
+ private:
+  std::unique_ptr<PythonFile> file_;
+};
+
+class PYARROW_EXPORT PyOutputStream : public arrow::io::OutputStream {
+ public:
+  explicit PyOutputStream(PyObject* file);
+  virtual ~PyOutputStream();
+
+  arrow::Status Close() override;
+  arrow::Status Tell(int64_t* position) override;
+  arrow::Status Write(const uint8_t* data, int64_t nbytes) override;
+
+ private:
+  std::unique_ptr<PythonFile> file_;
+};
+
+// A zero-copy reader backed by a PyBytes object
+class PYARROW_EXPORT PyBytesReader : public arrow::io::BufferReader {
+ public:
+  explicit PyBytesReader(PyObject* obj);
+  virtual ~PyBytesReader();
+
+ private:
+  PyObject* obj_;
+};
+
+// TODO(wesm): seekable output files
+
+} // namespace pyarrow
+
+#endif  // PYARROW_IO_H