You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/13 13:50:20 UTC

arrow git commit: ARROW-472: [Python] Expose more C++ IO interfaces. Add equals methods to Parquet schemas. Pass Parquet metadata separately in reader

Repository: arrow
Updated Branches:
  refs/heads/master 5ffbda1b4 -> ad0e57d23


ARROW-472: [Python] Expose more C++ IO interfaces. Add equals methods to Parquet schemas. Pass Parquet metadata separately in reader

Also includes ARROW-471, ARROW-441.

Needed to compare file schemas easily.

Requires PARQUET-830

Author: Wes McKinney <we...@twosigma.com>

Closes #280 from wesm/ARROW-472 and squashes the following commits:

1c5e27c [Wes McKinney] Name static const variables constexpr instead
25c5c90 [Wes McKinney] Add some tests for io.OSFile
1d22428 [Wes McKinney] Add some memory map Python unit tests
5268b6c [Wes McKinney] Add untested wrapper for operating system files
fd52153 [Wes McKinney] Add unit test for passing metadata down
2316e64 [Wes McKinney] Expose MemoryMappedFile in pyarrow.io, expand parquet::arrow::OpenFile to take metadata, props parameters
a2ce247 [Wes McKinney] Add equals methods to Parquet Schema and ColumnSchema objects


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ad0e57d2
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ad0e57d2
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ad0e57d2

Branch: refs/heads/master
Commit: ad0e57d23257462b9933745949d54ca729da537e
Parents: 5ffbda1
Author: Wes McKinney <we...@twosigma.com>
Authored: Fri Jan 13 08:50:14 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Fri Jan 13 08:50:14 2017 -0500

----------------------------------------------------------------------
 cpp/src/arrow/io/file.cc                |  45 ++++++----
 cpp/src/arrow/io/file.h                 |   2 +
 cpp/src/arrow/io/io-file-test.cc        |   4 +-
 python/pyarrow/_parquet.pxd             |  18 +++-
 python/pyarrow/_parquet.pyx             |  37 +++++---
 python/pyarrow/compat.py                |  26 ++++++
 python/pyarrow/includes/libarrow_io.pxd |  10 +++
 python/pyarrow/io.pxd                   |   3 +-
 python/pyarrow/io.pyx                   | 125 ++++++++++++++++++++++----
 python/pyarrow/parquet.py               |   2 +-
 python/pyarrow/table.pyx                |   2 +-
 python/pyarrow/tests/test_io.py         | 130 ++++++++++++++++++++++++++-
 python/pyarrow/tests/test_parquet.py    |  52 +++++++++--
 13 files changed, 396 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/cpp/src/arrow/io/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 0fb13ea..1de6efa 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -188,6 +188,8 @@ static inline Status FileOpenWriteable(
   memcpy(wpath.data() + nwchars, L"\0", sizeof(wchar_t));
 
   int oflag = _O_CREAT | _O_BINARY;
+  int sh_flag = _S_IWRITE;
+  if (!write_only) { sh_flag |= _S_IREAD; }
 
   if (truncate) { oflag |= _O_TRUNC; }
 
@@ -197,7 +199,7 @@ static inline Status FileOpenWriteable(
     oflag |= _O_RDWR;
   }
 
-  errno_actual = _wsopen_s(fd, wpath.data(), oflag, _SH_DENYNO, _S_IWRITE);
+  errno_actual = _wsopen_s(fd, wpath.data(), oflag, _SH_DENYNO, sh_flag);
   ret = *fd;
 
 #else
@@ -319,7 +321,7 @@ class OSFile {
     RETURN_NOT_OK(FileOpenWriteable(path, write_only, !append, &fd_));
     path_ = path;
     is_open_ = true;
-    mode_ = write_only ? FileMode::READ : FileMode::READWRITE;
+    mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE;
 
     if (append) {
       RETURN_NOT_OK(FileGetSize(fd_, &size_));
@@ -352,7 +354,7 @@ class OSFile {
   }
 
   Status Seek(int64_t pos) {
-    if (pos > size_) { pos = size_; }
+    if (pos < 0) { return Status::Invalid("Invalid position"); }
     return FileSeek(fd_, pos);
   }
 
@@ -523,17 +525,24 @@ class MemoryMappedFile::MemoryMappedFileImpl : public OSFile {
   }
 
   Status Open(const std::string& path, FileMode::type mode) {
-    int prot_flags = PROT_READ;
+    int prot_flags;
+    int map_mode;
 
     if (mode != FileMode::READ) {
-      prot_flags |= PROT_WRITE;
-      const bool append = true;
-      RETURN_NOT_OK(OSFile::OpenWriteable(path, append, mode == FileMode::WRITE));
+      // Memory mapping has permission failures if PROT_READ not set
+      prot_flags = PROT_READ | PROT_WRITE;
+      map_mode = MAP_SHARED;
+      constexpr bool append = true;
+      constexpr bool write_only = false;
+      RETURN_NOT_OK(OSFile::OpenWriteable(path, append, write_only));
+      mode_ = mode;
     } else {
+      prot_flags = PROT_READ;
+      map_mode = MAP_PRIVATE;  // Changes are not to be committed back to the file
       RETURN_NOT_OK(OSFile::OpenReadable(path));
     }
 
-    void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fd(), 0);
+    void* result = mmap(nullptr, size_, prot_flags, map_mode, fd(), 0);
     if (result == MAP_FAILED) {
       std::stringstream ss;
       ss << "Memory mapping file failed, errno: " << errno;
@@ -548,16 +557,14 @@ class MemoryMappedFile::MemoryMappedFileImpl : public OSFile {
   int64_t size() const { return size_; }
 
   Status Seek(int64_t position) {
-    if (position < 0 || position >= size_) {
-      return Status::Invalid("position is out of bounds");
-    }
+    if (position < 0) { return Status::Invalid("position is out of bounds"); }
     position_ = position;
     return Status::OK();
   }
 
   int64_t position() { return position_; }
 
-  void advance(int64_t nbytes) { position_ = std::min(size_, position_ + nbytes); }
+  void advance(int64_t nbytes) { position_ = position_ + nbytes; }
 
   uint8_t* data() { return data_; }
 
@@ -611,16 +618,18 @@ Status MemoryMappedFile::Close() {
 }
 
 Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
-  nbytes = std::min(nbytes, impl_->size() - impl_->position());
-  std::memcpy(out, impl_->head(), nbytes);
+  nbytes = std::max<int64_t>(0, std::min(nbytes, impl_->size() - impl_->position()));
+  if (nbytes > 0) { std::memcpy(out, impl_->head(), nbytes); }
   *bytes_read = nbytes;
   impl_->advance(nbytes);
   return Status::OK();
 }
 
 Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
-  nbytes = std::min(nbytes, impl_->size() - impl_->position());
-  *out = std::make_shared<Buffer>(impl_->head(), nbytes);
+  nbytes = std::max<int64_t>(0, std::min(nbytes, impl_->size() - impl_->position()));
+
+  const uint8_t* data = nbytes > 0 ? impl_->head() : nullptr;
+  *out = std::make_shared<Buffer>(data, nbytes);
   impl_->advance(nbytes);
   return Status::OK();
 }
@@ -655,5 +664,9 @@ Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) {
   return Status::OK();
 }
 
+int MemoryMappedFile::file_descriptor() const {
+  return impl_->fd();
+}
+
 }  // namespace io
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/cpp/src/arrow/io/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
index 9ca9c54..2387232 100644
--- a/cpp/src/arrow/io/file.h
+++ b/cpp/src/arrow/io/file.h
@@ -127,6 +127,8 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
   // @return: the size in bytes of the memory source
   Status GetSize(int64_t* size) override;
 
+  int file_descriptor() const;
+
  private:
   explicit MemoryMappedFile(FileMode::type mode);
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/cpp/src/arrow/io/io-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index 821e71d..20cd047 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -209,8 +209,8 @@ TEST_F(TestReadableFile, SeekTellSize) {
   ASSERT_OK(file_->Seek(100));
   ASSERT_OK(file_->Tell(&position));
 
-  // now at EOF
-  ASSERT_EQ(8, position);
+  // Can seek past end of file
+  ASSERT_EQ(100, position);
 
   int64_t size;
   ASSERT_OK(file_->GetSize(&size));

http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/_parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 7e49e9e..cf1da1c 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -99,8 +99,9 @@ cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
         ParquetVersion_V2" parquet::ParquetVersion::PARQUET_2_0"
 
     cdef cppclass ColumnDescriptor:
-        shared_ptr[ColumnPath] path()
+        c_bool Equals(const ColumnDescriptor& other)
 
+        shared_ptr[ColumnPath] path()
         int16_t max_definition_level()
         int16_t max_repetition_level()
 
@@ -115,6 +116,7 @@ cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
         const ColumnDescriptor* Column(int i)
         shared_ptr[Node] schema()
         GroupNode* group()
+        c_bool Equals(const SchemaDescriptor& other)
         int num_columns()
 
 
@@ -163,8 +165,18 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
         unique_ptr[CRowGroupMetaData] RowGroup(int i)
         const SchemaDescriptor* schema()
 
+    cdef cppclass ReaderProperties:
+        pass
+
+    ReaderProperties default_reader_properties()
+
     cdef cppclass ParquetFileReader:
-        # TODO: Some default arguments are missing
+        @staticmethod
+        unique_ptr[ParquetFileReader] Open(
+            const shared_ptr[ReadableFileInterface]& file,
+            const ReaderProperties& props,
+            const shared_ptr[CFileMetaData]& metadata)
+
         @staticmethod
         unique_ptr[ParquetFileReader] OpenFile(const c_string& path)
         shared_ptr[CFileMetaData] metadata();
@@ -193,6 +205,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
 cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
     CStatus OpenFile(const shared_ptr[ReadableFileInterface]& file,
                      MemoryPool* allocator,
+                     const ReaderProperties& properties,
+                     const shared_ptr[CFileMetaData]& metadata,
                      unique_ptr[FileReader]* reader)
 
     cdef cppclass FileReader:

http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 30e3de4..867fc4c 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -19,8 +19,9 @@
 # distutils: language = c++
 # cython: embedsignature = True
 
-from pyarrow._parquet cimport *
+from cython.operator cimport dereference as deref
 
+from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport *
 from pyarrow.includes.libarrow_io cimport (ReadableFileInterface, OutputStream,
                                            FileOutputStream)
@@ -196,6 +197,12 @@ cdef class Schema:
     def __getitem__(self, i):
         return self.column(i)
 
+    def equals(self, Schema other):
+        """
+        Returns True if the Parquet schemas are equal
+        """
+        return self.schema.Equals(deref(other.schema))
+
     def column(self, i):
         if i < 0 or i >= len(self):
             raise IndexError('{0} out of bounds'.format(i))
@@ -217,6 +224,12 @@ cdef class ColumnSchema:
         self.parent = schema
         self.descr = schema.schema.Column(i)
 
+    def equals(self, ColumnSchema other):
+        """
+        Returns True if the column schemas are equal
+        """
+        return self.descr.Equals(deref(other.descr))
+
     def __repr__(self):
         physical_type = self.physical_type
         logical_type = self.logical_type
@@ -337,24 +350,20 @@ cdef class ParquetReader:
         self.allocator = default_memory_pool()
         self._metadata = None
 
-    def open(self, object source):
+    def open(self, object source, FileMetaData metadata=None):
         cdef:
             shared_ptr[ReadableFileInterface] rd_handle
+            shared_ptr[CFileMetaData] c_metadata
+            ReaderProperties properties = default_reader_properties()
             c_string path
 
-        if isinstance(source, six.string_types):
-            path = tobytes(source)
-
-            # Must be in one expression to avoid calling std::move which is not
-            # possible in Cython (due to missing rvalue support)
+        if metadata is not None:
+            c_metadata = metadata.sp_metadata
 
-            # TODO(wesm): ParquetFileReader::OpenFile can throw?
-            self.reader = unique_ptr[FileReader](
-                new FileReader(default_memory_pool(),
-                               ParquetFileReader.OpenFile(path)))
-        else:
-            get_reader(source, &rd_handle)
-            check_status(OpenFile(rd_handle, self.allocator, &self.reader))
+        get_reader(source, &rd_handle)
+        with nogil:
+            check_status(OpenFile(rd_handle, self.allocator, properties,
+                                  c_metadata, &self.reader))
 
     @property
     def metadata(self):

http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/compat.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/compat.py b/python/pyarrow/compat.py
index 2dfdb50..9148be7 100644
--- a/python/pyarrow/compat.py
+++ b/python/pyarrow/compat.py
@@ -54,6 +54,10 @@ if PY2:
     range = xrange
     long = long
 
+    def guid():
+        from uuid import uuid4
+        return uuid4().get_hex()
+
     def u(s):
         return unicode(s, "unicode_escape")
 
@@ -76,6 +80,10 @@ else:
     from decimal import Decimal
     range = range
 
+    def guid():
+        from uuid import uuid4
+        return uuid4().hex
+
     def u(s):
         return s
 
@@ -89,6 +97,24 @@ else:
         return o.decode('utf8')
 
 
+def encode_file_path(path):
+    import os
+    # Windows requires utf-16le encoding for unicode file names
+    if isinstance(path, unicode_type):
+        if os.name == 'nt':
+            # try:
+            #     encoded_path = path.encode('ascii')
+            # except:
+            encoded_path = path.encode('utf-16le')
+        else:
+            # POSIX systems can handle utf-8
+            encoded_path = path.encode('utf-8')
+    else:
+        encoded_path = path
+
+    return encoded_path
+
+
 integer_types = six.integer_types + (np.integer,)
 
 __all__ = []

http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/includes/libarrow_io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd
index 99f88ad..6b141a3 100644
--- a/python/pyarrow/includes/libarrow_io.pxd
+++ b/python/pyarrow/includes/libarrow_io.pxd
@@ -69,6 +69,8 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
 
 
 cdef extern from "arrow/io/file.h" namespace "arrow::io" nogil:
+
+
     cdef cppclass FileOutputStream(OutputStream):
         @staticmethod
         CStatus Open(const c_string& path, shared_ptr[FileOutputStream]* file)
@@ -85,6 +87,14 @@ cdef extern from "arrow/io/file.h" namespace "arrow::io" nogil:
 
         int file_descriptor()
 
+    cdef cppclass CMemoryMappedFile" arrow::io::MemoryMappedFile"\
+        (ReadWriteFileInterface):
+        @staticmethod
+        CStatus Open(const c_string& path, FileMode mode,
+                     shared_ptr[CMemoryMappedFile]* file)
+
+        int file_descriptor()
+
 
 cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
     CStatus HaveLibHdfs()

http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd
index 02265d0..fffc7c5 100644
--- a/python/pyarrow/io.pxd
+++ b/python/pyarrow/io.pxd
@@ -32,7 +32,8 @@ cdef class NativeFile:
     cdef:
         shared_ptr[ReadableFileInterface] rd_file
         shared_ptr[OutputStream] wr_file
-        bint is_readonly
+        bint is_readable
+        bint is_writeable
         bint is_open
         bint own_file
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index b62de6c..2d8e4e8 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -27,12 +27,13 @@ from pyarrow.includes.libarrow cimport *
 cimport pyarrow.includes.pyarrow as pyarrow
 from pyarrow.includes.libarrow_io cimport *
 
-from pyarrow.compat import frombytes, tobytes
+from pyarrow.compat import frombytes, tobytes, encode_file_path
 from pyarrow.error cimport check_status
 
 cimport cpython as cp
 
 import re
+import six
 import sys
 import threading
 import time
@@ -42,6 +43,7 @@ cdef extern from "Python.h":
     PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
         char *v, Py_ssize_t len) except NULL
 
+
 cdef class NativeFile:
 
     def __cinit__(self):
@@ -61,7 +63,7 @@ cdef class NativeFile:
     def close(self):
         if self.is_open:
             with nogil:
-                if self.is_readonly:
+                if self.is_readable:
                     check_status(self.rd_file.get().Close())
                 else:
                     check_status(self.wr_file.get().Close())
@@ -76,15 +78,15 @@ cdef class NativeFile:
         file[0] = <shared_ptr[OutputStream]> self.wr_file
 
     def _assert_readable(self):
-        if not self.is_readonly:
+        if not self.is_readable:
             raise IOError("only valid on readonly files")
 
         if not self.is_open:
             raise IOError("file not open")
 
     def _assert_writeable(self):
-        if self.is_readonly:
-            raise IOError("only valid on writeonly files")
+        if not self.is_writeable:
+            raise IOError("only valid on writeable files")
 
         if not self.is_open:
             raise IOError("file not open")
@@ -99,7 +101,7 @@ cdef class NativeFile:
     def tell(self):
         cdef int64_t position
         with nogil:
-            if self.is_readonly:
+            if self.is_readable:
                 check_status(self.rd_file.get().Tell(&position))
             else:
                 check_status(self.wr_file.get().Tell(&position))
@@ -137,7 +139,7 @@ cdef class NativeFile:
         self._assert_readable()
 
         # Allocate empty write space
-        obj = PyBytes_FromStringAndSizeNative(NULL, nbytes)
+        obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes)
 
         cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
         with nogil:
@@ -179,16 +181,100 @@ cdef class PythonFileInterface(NativeFile):
 
         if mode.startswith('w'):
             self.wr_file.reset(new pyarrow.PyOutputStream(handle))
-            self.is_readonly = 0
+            self.is_readable = 0
+            self.is_writeable = 1
         elif mode.startswith('r'):
             self.rd_file.reset(new pyarrow.PyReadableFile(handle))
-            self.is_readonly = 1
+            self.is_readable = 1
+            self.is_writeable = 0
+        else:
+            raise ValueError('Invalid file mode: {0}'.format(mode))
+
+        self.is_open = True
+
+
+cdef class MemoryMappedFile(NativeFile):
+    """
+    Supports 'r', 'r+w', 'w' modes
+    """
+    cdef:
+        object path
+
+    def __cinit__(self, path, mode='r'):
+        self.path = path
+
+        cdef:
+            FileMode c_mode
+            shared_ptr[CMemoryMappedFile] handle
+            c_string c_path = encode_file_path(path)
+
+        self.is_readable = self.is_writeable = 0
+
+        if mode in ('r', 'rb'):
+            c_mode = FileMode_READ
+            self.is_readable = 1
+        elif mode in ('w', 'wb'):
+            c_mode = FileMode_WRITE
+            self.is_writeable = 1
+        elif mode == 'r+w':
+            c_mode = FileMode_READWRITE
+            self.is_readable = 1
+            self.is_writeable = 1
         else:
             raise ValueError('Invalid file mode: {0}'.format(mode))
 
+        check_status(CMemoryMappedFile.Open(c_path, c_mode, &handle))
+
+        self.wr_file = <shared_ptr[OutputStream]> handle
+        self.rd_file = <shared_ptr[ReadableFileInterface]> handle
         self.is_open = True
 
 
+cdef class OSFile(NativeFile):
+    """
+    Supports 'r', 'w' modes
+    """
+    cdef:
+        object path
+
+    def __cinit__(self, path, mode='r'):
+        self.path = path
+
+        cdef:
+            FileMode c_mode
+            shared_ptr[Readable] handle
+            c_string c_path = encode_file_path(path)
+
+        self.is_readable = self.is_writeable = 0
+
+        if mode in ('r', 'rb'):
+            self._open_readable(c_path)
+        elif mode in ('w', 'wb'):
+            self._open_writeable(c_path)
+        else:
+            raise ValueError('Invalid file mode: {0}'.format(mode))
+
+        self.is_open = True
+
+    cdef _open_readable(self, c_string path):
+        cdef shared_ptr[ReadableFile] handle
+
+        with nogil:
+            check_status(ReadableFile.Open(path, pyarrow.get_memory_pool(),
+                                           &handle))
+
+        self.is_readable = 1
+        self.rd_file = <shared_ptr[ReadableFileInterface]> handle
+
+    cdef _open_writeable(self, c_string path):
+        cdef shared_ptr[FileOutputStream] handle
+
+        with nogil:
+            check_status(FileOutputStream.Open(path, &handle))
+        self.is_writeable = 1
+        self.wr_file = <shared_ptr[OutputStream]> handle
+
+
 cdef class BytesReader(NativeFile):
     cdef:
         object obj
@@ -198,7 +284,8 @@ cdef class BytesReader(NativeFile):
             raise ValueError('Must pass bytes object')
 
         self.obj = obj
-        self.is_readonly = 1
+        self.is_readable = 1
+        self.is_writeable = 0
         self.is_open = True
 
         self.rd_file.reset(new pyarrow.PyBytesReader(obj))
@@ -264,7 +351,8 @@ cdef class InMemoryOutputStream(NativeFile):
         self.buffer = allocate_buffer()
         self.wr_file.reset(new BufferOutputStream(
             <shared_ptr[ResizableBuffer]> self.buffer))
-        self.is_readonly = 0
+        self.is_readable = 0
+        self.is_writeable = 1
         self.is_open = True
 
     def get_result(self):
@@ -285,7 +373,8 @@ cdef class BufferReader(NativeFile):
         self.buffer = buffer
         self.rd_file.reset(new CBufferReader(buffer.buffer.get().data(),
                                              buffer.buffer.get().size()))
-        self.is_readonly = 1
+        self.is_readable = 1
+        self.is_writeable = 0
         self.is_open = True
 
 
@@ -311,12 +400,14 @@ cdef get_reader(object source, shared_ptr[ReadableFileInterface]* reader):
     elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
         # Optimistically hope this is file-like
         source = PythonFileInterface(source, mode='r')
+    elif isinstance(source, six.string_types):
+        source = MemoryMappedFile(source, mode='r')
 
     if isinstance(source, NativeFile):
         nf = source
 
         # TODO: what about read-write sources (e.g. memory maps)
-        if not nf.is_readonly:
+        if not nf.is_readable:
             raise IOError('Native file is not readable')
 
         nf.read_handle(reader)
@@ -335,7 +426,7 @@ cdef get_writer(object source, shared_ptr[OutputStream]* writer):
     if isinstance(source, NativeFile):
         nf = source
 
-        if nf.is_readonly:
+        if nf.is_readable:
             raise IOError('Native file is not writeable')
 
         nf.write_handle(writer)
@@ -593,14 +684,16 @@ cdef class HdfsClient:
 
             out.wr_file = <shared_ptr[OutputStream]> wr_handle
 
-            out.is_readonly = False
+            out.is_readable = False
+            out.is_writeable = 1
         else:
             with nogil:
                 check_status(self.client.get()
                              .OpenReadable(c_path, &rd_handle))
 
             out.rd_file = <shared_ptr[ReadableFileInterface]> rd_handle
-            out.is_readonly = True
+            out.is_readable = True
+            out.is_writeable = 0
 
         if c_buffer_size == 0:
             c_buffer_size = 2 ** 16

http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 2dedb72..708ae65 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -33,7 +33,7 @@ class ParquetFile(object):
     """
     def __init__(self, source, metadata=None):
         self.reader = _parquet.ParquetReader()
-        self.reader.open(source)
+        self.reader.open(source, metadata=metadata)
 
     @property
     def metadata(self):

http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index 3a04651..dce125a 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -22,7 +22,7 @@
 from cython.operator cimport dereference as deref
 
 from pyarrow.includes.libarrow cimport *
-from pyarrow.includes.common cimport PyObject_to_object
+from pyarrow.includes.common cimport *
 cimport pyarrow.includes.pyarrow as pyarrow
 
 import pyarrow.config

http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/tests/test_io.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 3e7a437..224f20d 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -16,10 +16,14 @@
 # under the License.
 
 from io import BytesIO
+import os
 import pytest
 
-from pyarrow.compat import u
+import numpy as np
+
+from pyarrow.compat import u, guid
 import pyarrow.io as io
+import pyarrow as pa
 
 # ----------------------------------------------------------------------
 # Python file-like objects
@@ -155,3 +159,127 @@ def test_inmemory_write_after_closed():
 
     with pytest.raises(IOError):
         f.write(b'not ok')
+
+
+# ----------------------------------------------------------------------
+# OS files and memory maps
+
+@pytest.fixture(scope='session')
+def sample_disk_data(request):
+
+    SIZE = 4096
+    arr = np.random.randint(0, 256, size=SIZE).astype('u1')
+    data = arr.tobytes()[:SIZE]
+
+    path = guid()
+    with open(path, 'wb') as f:
+        f.write(data)
+
+    def teardown():
+        _try_delete(path)
+    request.addfinalizer(teardown)
+    return path, data
+
+
+def _check_native_file_reader(KLASS, sample_data):
+    path, data = sample_data
+
+    f = KLASS(path, mode='r')
+
+    assert f.read(10) == data[:10]
+    assert f.read(0) == b''
+    assert f.tell() == 10
+
+    assert f.read() == data[10:]
+
+    assert f.size() == len(data)
+
+    f.seek(0)
+    assert f.tell() == 0
+
+    # Seeking past end of file not supported in memory maps
+    f.seek(len(data) + 1)
+    assert f.tell() == len(data) + 1
+    assert f.read(5) == b''
+
+
+def test_memory_map_reader(sample_disk_data):
+    _check_native_file_reader(io.MemoryMappedFile, sample_disk_data)
+
+
+def test_os_file_reader(sample_disk_data):
+    _check_native_file_reader(io.OSFile, sample_disk_data)
+
+
+def _try_delete(path):
+    try:
+        os.remove(path)
+    except os.error:
+        pass
+
+
+def test_memory_map_writer():
+    SIZE = 4096
+    arr = np.random.randint(0, 256, size=SIZE).astype('u1')
+    data = arr.tobytes()[:SIZE]
+
+    path = guid()
+    try:
+        with open(path, 'wb') as f:
+            f.write(data)
+
+        f = io.MemoryMappedFile(path, mode='r+w')
+
+        f.seek(10)
+        f.write('peekaboo')
+        assert f.tell() == 18
+
+        f.seek(10)
+        assert f.read(8) == b'peekaboo'
+
+        f2 = io.MemoryMappedFile(path, mode='r+w')
+
+        f2.seek(10)
+        f2.write(b'booapeak')
+        f2.seek(10)
+
+        f.seek(10)
+        assert f.read(8) == b'booapeak'
+
+        # Does not truncate file
+        f3 = io.MemoryMappedFile(path, mode='w')
+        f3.write('foo')
+
+        with io.MemoryMappedFile(path) as f4:
+            assert f4.size() == SIZE
+
+        with pytest.raises(IOError):
+            f3.read(5)
+
+        f.seek(0)
+        assert f.read(3) == b'foo'
+    finally:
+        _try_delete(path)
+
+
+def test_os_file_writer():
+    SIZE = 4096
+    arr = np.random.randint(0, 256, size=SIZE).astype('u1')
+    data = arr.tobytes()[:SIZE]
+
+    path = guid()
+    try:
+        with open(path, 'wb') as f:
+            f.write(data)
+
+        # Truncates file
+        f2 = io.OSFile(path, mode='w')
+        f2.write('foo')
+
+        with io.OSFile(path) as f3:
+            assert f3.size() == 3
+
+        with pytest.raises(IOError):
+            f2.read(5)
+    finally:
+        _try_delete(path)

http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index e157155..9cf860a 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -236,19 +236,22 @@ def test_pandas_parquet_configuration_options(tmpdir):
         pdt.assert_frame_equal(df, df_read)
 
 
-@parquet
-def test_parquet_metadata_api():
-    df = alltypes_sample(size=10000)
-    df = df.reindex(columns=sorted(df.columns))
-
+def make_sample_file(df):
     a_table = A.Table.from_pandas(df, timestamps_to_ms=True)
 
     buf = io.BytesIO()
     pq.write_table(a_table, buf, compression='SNAPPY', version='2.0')
 
     buf.seek(0)
-    fileh = pq.ParquetFile(buf)
+    return pq.ParquetFile(buf)
 
+
+@parquet
+def test_parquet_metadata_api():
+    df = alltypes_sample(size=10000)
+    df = df.reindex(columns=sorted(df.columns))
+
+    fileh = make_sample_file(df)
     ncols = len(df.columns)
 
     # Series of sniff tests
@@ -288,3 +291,40 @@ def test_parquet_metadata_api():
 
     assert rg_meta.num_rows == len(df)
     assert rg_meta.num_columns == ncols
+
+
+@parquet
+def test_compare_schemas():
+    df = alltypes_sample(size=10000)
+
+    fileh = make_sample_file(df)
+    fileh2 = make_sample_file(df)
+    fileh3 = make_sample_file(df[df.columns[::2]])
+
+    assert fileh.schema.equals(fileh.schema)
+    assert fileh.schema.equals(fileh2.schema)
+
+    assert not fileh.schema.equals(fileh3.schema)
+
+    assert fileh.schema[0].equals(fileh.schema[0])
+    assert not fileh.schema[0].equals(fileh.schema[1])
+
+
+@parquet
+def test_pass_separate_metadata():
+    # ARROW-471
+    df = alltypes_sample(size=10000)
+
+    a_table = A.Table.from_pandas(df, timestamps_to_ms=True)
+
+    buf = io.BytesIO()
+    pq.write_table(a_table, buf, compression='snappy', version='2.0')
+
+    buf.seek(0)
+    metadata = pq.ParquetFile(buf).metadata
+
+    buf.seek(0)
+
+    fileh = pq.ParquetFile(buf, metadata=metadata)
+
+    pdt.assert_frame_equal(df, fileh.read().to_pandas())