You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/01/30 22:20:15 UTC

[arrow] branch master updated: ARROW-2036: [Python] Support standard IOBase methods on NativeFile

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e63084  ARROW-2036: [Python] Support standard IOBase methods on NativeFile
3e63084 is described below

commit 3e63084bee72c097932cc07ca9deae15a2e97fbe
Author: Jim Crist <ji...@gmail.com>
AuthorDate: Tue Jan 30 17:20:09 2018 -0500

    ARROW-2036: [Python] Support standard IOBase methods on NativeFile
    
    Adds support for most common file methods, adding enough to use
    `io.TextIOWrapper`.
    
    Added attribtes/methods:
    
    - `closed` attribute
    - `readable`, `writable`, `seekable` methods
    - `read1` alias for `read` to support `TextIOWrapper` on python 2
    - No-op `flush` method
    
    Also refactored the cython internals a bit, adding default settings for
    `is_readable` and `is_writable`, which makes subclasses not need to set
    them in all places.
    
    Also renamed `is_writeable` to `is_writable` for common spelling with
    the standard python method `writable`.
    
    Author: Jim Crist <ji...@gmail.com>
    
    Closes #1517 from jcrist/full-python-file-interface and squashes the following commits:
    
    f3bc9546 [Jim Crist] Support standard IOBase methods on NativeFile
---
 python/pyarrow/io-hdfs.pxi      |   7 +--
 python/pyarrow/io.pxi           | 122 ++++++++++++++++++++++------------------
 python/pyarrow/ipc.pxi          |   2 +-
 python/pyarrow/ipc.py           |   4 +-
 python/pyarrow/lib.pxd          |   4 +-
 python/pyarrow/tests/test_io.py |  66 ++++++++++++++++++++--
 6 files changed, 136 insertions(+), 69 deletions(-)

diff --git a/python/pyarrow/io-hdfs.pxi b/python/pyarrow/io-hdfs.pxi
index 3abf045..83b14b6 100644
--- a/python/pyarrow/io-hdfs.pxi
+++ b/python/pyarrow/io-hdfs.pxi
@@ -413,9 +413,7 @@ cdef class HadoopFileSystem:
                                    &wr_handle))
 
             out.wr_file = <shared_ptr[OutputStream]> wr_handle
-
-            out.is_readable = False
-            out.is_writeable = 1
+            out.is_writable = True
         else:
             with nogil:
                 check_status(self.client.get()
@@ -423,7 +421,6 @@ cdef class HadoopFileSystem:
 
             out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle
             out.is_readable = True
-            out.is_writeable = 0
 
         if c_buffer_size == 0:
             c_buffer_size = 2 ** 16
@@ -431,7 +428,7 @@ cdef class HadoopFileSystem:
         out.mode = mode
         out.buffer_size = c_buffer_size
         out.parent = _HdfsFileNanny(self, out)
-        out.is_open = True
+        out.closed = False
         out.own_file = True
 
         return out
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index bb363ba..bd508cf 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -39,13 +39,14 @@ cdef extern from "Python.h":
 
 
 cdef class NativeFile:
-
     def __cinit__(self):
-        self.is_open = False
+        self.closed = True
         self.own_file = False
+        self.is_readable = False
+        self.is_writable = False
 
     def __dealloc__(self):
-        if self.is_open and self.own_file:
+        if self.own_file and not self.closed:
             self.close()
 
     def __enter__(self):
@@ -65,34 +66,52 @@ cdef class NativeFile:
 
         def __get__(self):
             # Emulate built-in file modes
-            if self.is_readable and self.is_writeable:
+            if self.is_readable and self.is_writable:
                 return 'rb+'
             elif self.is_readable:
                 return 'rb'
-            elif self.is_writeable:
+            elif self.is_writable:
                 return 'wb'
             else:
                 raise ValueError('File object is malformed, has no mode')
 
+    def readable(self):
+        self._assert_open()
+        return self.is_readable
+
+    def writable(self):
+        self._assert_open()
+        return self.is_writable
+
+    def seekable(self):
+        self._assert_open()
+        return self.is_readable
+
     def close(self):
-        if self.is_open:
+        if not self.closed:
             with nogil:
                 if self.is_readable:
                     check_status(self.rd_file.get().Close())
                 else:
                     check_status(self.wr_file.get().Close())
-        self.is_open = False
+        self.closed = True
+
+    def flush(self):
+        """Flush the buffer stream, if applicable.
+
+        No-op to match the IOBase interface."""
+        self._assert_open()
 
     cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
         self._assert_readable()
         file[0] = <shared_ptr[RandomAccessFile]> self.rd_file
 
     cdef write_handle(self, shared_ptr[OutputStream]* file):
-        self._assert_writeable()
+        self._assert_writable()
         file[0] = <shared_ptr[OutputStream]> self.wr_file
 
     def _assert_open(self):
-        if not self.is_open:
+        if self.closed:
             raise ValueError("I/O operation on closed file")
 
     def _assert_readable(self):
@@ -100,10 +119,10 @@ cdef class NativeFile:
         if not self.is_readable:
             raise IOError("only valid on readonly files")
 
-    def _assert_writeable(self):
+    def _assert_writable(self):
         self._assert_open()
-        if not self.is_writeable:
-            raise IOError("only valid on writeable files")
+        if not self.is_writable:
+            raise IOError("only valid on writable files")
 
     def size(self):
         """
@@ -175,7 +194,7 @@ cdef class NativeFile:
         Write byte from any object implementing buffer protocol (bytes,
         bytearray, ndarray, pyarrow.Buffer)
         """
-        self._assert_writeable()
+        self._assert_writable()
 
         if isinstance(data, six.string_types):
             data = tobytes(data)
@@ -224,6 +243,12 @@ cdef class NativeFile:
 
         return PyObject_to_object(obj)
 
+    def read1(self, nbytes=None):
+        """Read and return up to n bytes.
+
+        Alias for read, needed to match the IOBase interface."""
+        return self.read(nbytes=None)
+
     def read_buffer(self, nbytes=None):
         cdef:
             int64_t c_nbytes
@@ -333,7 +358,7 @@ cdef class NativeFile:
         Pipe file-like object to file
         """
         write_queue = Queue(50)
-        self._assert_writeable()
+        self._assert_writable()
 
         buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
 
@@ -390,16 +415,14 @@ cdef class PythonFile(NativeFile):
 
         if mode.startswith('w'):
             self.wr_file.reset(new PyOutputStream(handle))
-            self.is_readable = 0
-            self.is_writeable = 1
+            self.is_writable = True
         elif mode.startswith('r'):
             self.rd_file.reset(new PyReadableFile(handle))
-            self.is_readable = 1
-            self.is_writeable = 0
+            self.is_readable = True
         else:
             raise ValueError('Invalid file mode: {0}'.format(mode))
 
-        self.is_open = True
+        self.closed = False
 
 
 cdef class MemoryMappedFile(NativeFile):
@@ -409,11 +432,6 @@ cdef class MemoryMappedFile(NativeFile):
     cdef:
         object path
 
-    def __cinit__(self):
-        self.is_open = False
-        self.is_readable = 0
-        self.is_writeable = 0
-
     @staticmethod
     def create(path, size):
         cdef:
@@ -426,11 +444,11 @@ cdef class MemoryMappedFile(NativeFile):
 
         cdef MemoryMappedFile result = MemoryMappedFile()
         result.path = path
-        result.is_readable = 1
-        result.is_writeable = 1
+        result.is_readable = True
+        result.is_writable = True
         result.wr_file = <shared_ptr[OutputStream]> handle
         result.rd_file = <shared_ptr[RandomAccessFile]> handle
-        result.is_open = True
+        result.closed = False
 
         return result
 
@@ -444,14 +462,14 @@ cdef class MemoryMappedFile(NativeFile):
 
         if mode in ('r', 'rb'):
             c_mode = FileMode_READ
-            self.is_readable = 1
+            self.is_readable = True
         elif mode in ('w', 'wb'):
             c_mode = FileMode_WRITE
-            self.is_writeable = 1
+            self.is_writable = True
         elif mode in ('r+', 'r+b', 'rb+'):
             c_mode = FileMode_READWRITE
-            self.is_readable = 1
-            self.is_writeable = 1
+            self.is_readable = True
+            self.is_writable = True
         else:
             raise ValueError('Invalid file mode: {0}'.format(mode))
 
@@ -460,7 +478,7 @@ cdef class MemoryMappedFile(NativeFile):
 
         self.wr_file = <shared_ptr[OutputStream]> handle
         self.rd_file = <shared_ptr[RandomAccessFile]> handle
-        self.is_open = True
+        self.closed = False
 
 
 def memory_map(path, mode='r'):
@@ -484,7 +502,7 @@ def memory_map(path, mode='r'):
 def create_memory_map(path, size):
     """
     Create memory map at indicated path of the given size, return open
-    writeable file object
+    writable file object
 
     Parameters
     ----------
@@ -513,16 +531,14 @@ cdef class OSFile(NativeFile):
             shared_ptr[Readable] handle
             c_string c_path = encode_file_path(path)
 
-        self.is_readable = self.is_writeable = 0
-
         if mode in ('r', 'rb'):
             self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
         elif mode in ('w', 'wb'):
-            self._open_writeable(c_path)
+            self._open_writable(c_path)
         else:
             raise ValueError('Invalid file mode: {0}'.format(mode))
 
-        self.is_open = True
+        self.closed = False
 
     cdef _open_readable(self, c_string path, CMemoryPool* pool):
         cdef shared_ptr[ReadableFile] handle
@@ -530,15 +546,15 @@ cdef class OSFile(NativeFile):
         with nogil:
             check_status(ReadableFile.Open(path, pool, &handle))
 
-        self.is_readable = 1
+        self.is_readable = True
         self.rd_file = <shared_ptr[RandomAccessFile]> handle
 
-    cdef _open_writeable(self, c_string path):
+    cdef _open_writable(self, c_string path):
         cdef shared_ptr[FileOutputStream] handle
 
         with nogil:
             check_status(FileOutputStream.Open(path, &handle))
-        self.is_writeable = 1
+        self.is_writable = True
         self.wr_file = <shared_ptr[OutputStream]> handle
 
 
@@ -546,9 +562,8 @@ cdef class FixedSizeBufferWriter(NativeFile):
 
     def __cinit__(self, Buffer buffer):
         self.wr_file.reset(new CFixedSizeBufferWriter(buffer.buffer))
-        self.is_readable = 0
-        self.is_writeable = 1
-        self.is_open = True
+        self.is_writable = True
+        self.closed = False
 
     def set_memcopy_threads(self, int num_threads):
         cdef CFixedSizeBufferWriter* writer = \
@@ -738,14 +753,13 @@ cdef class BufferOutputStream(NativeFile):
         self.buffer = _allocate_buffer(maybe_unbox_memory_pool(memory_pool))
         self.wr_file.reset(new CBufferOutputStream(
             <shared_ptr[CResizableBuffer]> self.buffer))
-        self.is_readable = 0
-        self.is_writeable = 1
-        self.is_open = True
+        self.is_writable = True
+        self.closed = False
 
     def get_result(self):
         with nogil:
             check_status(self.wr_file.get().Close())
-        self.is_open = False
+        self.closed = True
         return pyarrow_wrap_buffer(<shared_ptr[CBuffer]> self.buffer)
 
 
@@ -753,9 +767,8 @@ cdef class MockOutputStream(NativeFile):
 
     def __cinit__(self):
         self.wr_file.reset(new CMockOutputStream())
-        self.is_readable = 0
-        self.is_writeable = 1
-        self.is_open = True
+        self.is_writable = True
+        self.closed = False
 
     def size(self):
         return (<CMockOutputStream*>self.wr_file.get()).GetExtentBytesWritten()
@@ -780,9 +793,8 @@ cdef class BufferReader(NativeFile):
             self.buffer = frombuffer(obj)
 
         self.rd_file.reset(new CBufferReader(self.buffer.buffer))
-        self.is_readable = 1
-        self.is_writeable = 0
-        self.is_open = True
+        self.is_readable = True
+        self.closed = False
 
 
 def frombuffer(object obj):
@@ -834,8 +846,8 @@ cdef get_writer(object source, shared_ptr[OutputStream]* writer):
     if isinstance(source, NativeFile):
         nf = source
 
-        if not nf.is_writeable:
-            raise IOError('Native file is not writeable')
+        if not nf.is_writable:
+            raise IOError('Native file is not writable')
 
         nf.write_handle(writer)
     else:
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 7534b0d..a30a228 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -429,7 +429,7 @@ def write_tensor(Tensor tensor, NativeFile dest):
         int32_t metadata_length
         int64_t body_length
 
-    dest._assert_writeable()
+    dest._assert_writable()
 
     with nogil:
         check_status(
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index f264f08..4081fc5 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -65,7 +65,7 @@ class RecordBatchStreamWriter(lib._RecordBatchWriter):
     Parameters
     ----------
     sink : str, pyarrow.NativeFile, or file-like Python object
-        Either a file path, or a writeable file object
+        Either a file path, or a writable file object
     schema : pyarrow.Schema
         The Arrow schema for data to be written to the file
     """
@@ -96,7 +96,7 @@ class RecordBatchFileWriter(lib._RecordBatchFileWriter):
     Parameters
     ----------
     sink : str, pyarrow.NativeFile, or file-like Python object
-        Either a file path, or a writeable file object
+        Either a file path, or a writable file object
     schema : pyarrow.Schema
         The Arrow schema for data to be written to the file
     """
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 90f749d..161562c 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -333,8 +333,8 @@ cdef class NativeFile:
         shared_ptr[RandomAccessFile] rd_file
         shared_ptr[OutputStream] wr_file
         bint is_readable
-        bint is_writeable
-        bint is_open
+        bint is_writable
+        readonly bint closed
         bint own_file
 
     # By implementing these "virtual" functions (all functions in Cython
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 3f7aa2e..da26b10 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from io import BytesIO
+from io import BytesIO, TextIOWrapper
 import gc
 import os
 import pytest
@@ -482,27 +482,48 @@ def test_native_file_modes(tmpdir):
 
     with pa.OSFile(path, mode='r') as f:
         assert f.mode == 'rb'
+        assert f.readable()
+        assert not f.writable()
+        assert f.seekable()
 
     with pa.OSFile(path, mode='rb') as f:
         assert f.mode == 'rb'
+        assert f.readable()
+        assert not f.writable()
+        assert f.seekable()
 
     with pa.OSFile(path, mode='w') as f:
         assert f.mode == 'wb'
+        assert not f.readable()
+        assert f.writable()
+        assert not f.seekable()
 
     with pa.OSFile(path, mode='wb') as f:
         assert f.mode == 'wb'
+        assert not f.readable()
+        assert f.writable()
+        assert not f.seekable()
 
     with open(path, 'wb') as f:
         f.write(b'foooo')
 
     with pa.memory_map(path, 'r') as f:
         assert f.mode == 'rb'
+        assert f.readable()
+        assert not f.writable()
+        assert f.seekable()
 
     with pa.memory_map(path, 'r+') as f:
         assert f.mode == 'rb+'
+        assert f.readable()
+        assert f.writable()
+        assert f.seekable()
 
     with pa.memory_map(path, 'r+b') as f:
         assert f.mode == 'rb+'
+        assert f.readable()
+        assert f.writable()
+        assert f.seekable()
 
 
 def test_native_file_raises_ValueError_after_close(tmpdir):
@@ -511,19 +532,56 @@ def test_native_file_raises_ValueError_after_close(tmpdir):
         f.write(b'foooo')
 
     with pa.OSFile(path, mode='rb') as os_file:
-        pass
+        assert not os_file.closed
+    assert os_file.closed
 
     with pa.memory_map(path, mode='rb') as mmap_file:
-        pass
+        assert not mmap_file.closed
+    assert mmap_file.closed
 
     files = [os_file,
              mmap_file]
 
     methods = [('tell', ()),
                ('seek', (0,)),
-               ('size', ())]
+               ('size', ()),
+               ('flush', ()),
+               ('readable', ()),
+               ('writable', ()),
+               ('seekable', ())]
 
     for f in files:
         for method, args in methods:
             with pytest.raises(ValueError):
                 getattr(f, method)(*args)
+
+
+def test_native_file_TextIOWrapper(tmpdir):
+    data = (u'foooo\n'
+            u'barrr\n'
+            u'bazzz\n')
+
+    path = os.path.join(str(tmpdir), guid())
+    with open(path, 'wb') as f:
+        f.write(data.encode('utf-8'))
+
+    with TextIOWrapper(pa.OSFile(path, mode='rb')) as fil:
+        assert fil.readable()
+        res = fil.read()
+        assert res == data
+    assert fil.closed
+
+    with TextIOWrapper(pa.OSFile(path, mode='rb')) as fil:
+        # Iteration works
+        lines = list(fil)
+        assert ''.join(lines) == data
+
+    # Writing
+    path2 = os.path.join(str(tmpdir), guid())
+    with TextIOWrapper(pa.OSFile(path2, mode='wb')) as fil:
+        assert fil.writable()
+        fil.write(data)
+
+    with TextIOWrapper(pa.OSFile(path2, mode='rb')) as fil:
+        res = fil.read()
+        assert res == data

-- 
To stop receiving notification emails like this one, please contact
wesm@apache.org.