You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/01/30 22:20:15 UTC
[arrow] branch master updated: ARROW-2036: [Python] Support
standard IOBase methods on NativeFile
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 3e63084 ARROW-2036: [Python] Support standard IOBase methods on NativeFile
3e63084 is described below
commit 3e63084bee72c097932cc07ca9deae15a2e97fbe
Author: Jim Crist <ji...@gmail.com>
AuthorDate: Tue Jan 30 17:20:09 2018 -0500
ARROW-2036: [Python] Support standard IOBase methods on NativeFile
Adds support for most common file methods, adding enough to use
`io.TextIOWrapper`.
Added attribtes/methods:
- `closed` attribute
- `readable`, `writable`, `seekable` methods
- `read1` alias for `read` to support `TextIOWrapper` on python 2
- No-op `flush` method
Also refactored the cython internals a bit, adding default settings for
`is_readable` and `is_writable`, which makes subclasses not need to set
them in all places.
Also renamed `is_writeable` to `is_writable` for common spelling with
the standard python method `writable`.
Author: Jim Crist <ji...@gmail.com>
Closes #1517 from jcrist/full-python-file-interface and squashes the following commits:
f3bc9546 [Jim Crist] Support standard IOBase methods on NativeFile
---
python/pyarrow/io-hdfs.pxi | 7 +--
python/pyarrow/io.pxi | 122 ++++++++++++++++++++++------------------
python/pyarrow/ipc.pxi | 2 +-
python/pyarrow/ipc.py | 4 +-
python/pyarrow/lib.pxd | 4 +-
python/pyarrow/tests/test_io.py | 66 ++++++++++++++++++++--
6 files changed, 136 insertions(+), 69 deletions(-)
diff --git a/python/pyarrow/io-hdfs.pxi b/python/pyarrow/io-hdfs.pxi
index 3abf045..83b14b6 100644
--- a/python/pyarrow/io-hdfs.pxi
+++ b/python/pyarrow/io-hdfs.pxi
@@ -413,9 +413,7 @@ cdef class HadoopFileSystem:
&wr_handle))
out.wr_file = <shared_ptr[OutputStream]> wr_handle
-
- out.is_readable = False
- out.is_writeable = 1
+ out.is_writable = True
else:
with nogil:
check_status(self.client.get()
@@ -423,7 +421,6 @@ cdef class HadoopFileSystem:
out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle
out.is_readable = True
- out.is_writeable = 0
if c_buffer_size == 0:
c_buffer_size = 2 ** 16
@@ -431,7 +428,7 @@ cdef class HadoopFileSystem:
out.mode = mode
out.buffer_size = c_buffer_size
out.parent = _HdfsFileNanny(self, out)
- out.is_open = True
+ out.closed = False
out.own_file = True
return out
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index bb363ba..bd508cf 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -39,13 +39,14 @@ cdef extern from "Python.h":
cdef class NativeFile:
-
def __cinit__(self):
- self.is_open = False
+ self.closed = True
self.own_file = False
+ self.is_readable = False
+ self.is_writable = False
def __dealloc__(self):
- if self.is_open and self.own_file:
+ if self.own_file and not self.closed:
self.close()
def __enter__(self):
@@ -65,34 +66,52 @@ cdef class NativeFile:
def __get__(self):
# Emulate built-in file modes
- if self.is_readable and self.is_writeable:
+ if self.is_readable and self.is_writable:
return 'rb+'
elif self.is_readable:
return 'rb'
- elif self.is_writeable:
+ elif self.is_writable:
return 'wb'
else:
raise ValueError('File object is malformed, has no mode')
+ def readable(self):
+ self._assert_open()
+ return self.is_readable
+
+ def writable(self):
+ self._assert_open()
+ return self.is_writable
+
+ def seekable(self):
+ self._assert_open()
+ return self.is_readable
+
def close(self):
- if self.is_open:
+ if not self.closed:
with nogil:
if self.is_readable:
check_status(self.rd_file.get().Close())
else:
check_status(self.wr_file.get().Close())
- self.is_open = False
+ self.closed = True
+
+ def flush(self):
+ """Flush the buffer stream, if applicable.
+
+ No-op to match the IOBase interface."""
+ self._assert_open()
cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
self._assert_readable()
file[0] = <shared_ptr[RandomAccessFile]> self.rd_file
cdef write_handle(self, shared_ptr[OutputStream]* file):
- self._assert_writeable()
+ self._assert_writable()
file[0] = <shared_ptr[OutputStream]> self.wr_file
def _assert_open(self):
- if not self.is_open:
+ if self.closed:
raise ValueError("I/O operation on closed file")
def _assert_readable(self):
@@ -100,10 +119,10 @@ cdef class NativeFile:
if not self.is_readable:
raise IOError("only valid on readonly files")
- def _assert_writeable(self):
+ def _assert_writable(self):
self._assert_open()
- if not self.is_writeable:
- raise IOError("only valid on writeable files")
+ if not self.is_writable:
+ raise IOError("only valid on writable files")
def size(self):
"""
@@ -175,7 +194,7 @@ cdef class NativeFile:
Write byte from any object implementing buffer protocol (bytes,
bytearray, ndarray, pyarrow.Buffer)
"""
- self._assert_writeable()
+ self._assert_writable()
if isinstance(data, six.string_types):
data = tobytes(data)
@@ -224,6 +243,12 @@ cdef class NativeFile:
return PyObject_to_object(obj)
+ def read1(self, nbytes=None):
+ """Read and return up to n bytes.
+
+ Alias for read, needed to match the IOBase interface."""
+ return self.read(nbytes=None)
+
def read_buffer(self, nbytes=None):
cdef:
int64_t c_nbytes
@@ -333,7 +358,7 @@ cdef class NativeFile:
Pipe file-like object to file
"""
write_queue = Queue(50)
- self._assert_writeable()
+ self._assert_writable()
buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
@@ -390,16 +415,14 @@ cdef class PythonFile(NativeFile):
if mode.startswith('w'):
self.wr_file.reset(new PyOutputStream(handle))
- self.is_readable = 0
- self.is_writeable = 1
+ self.is_writable = True
elif mode.startswith('r'):
self.rd_file.reset(new PyReadableFile(handle))
- self.is_readable = 1
- self.is_writeable = 0
+ self.is_readable = True
else:
raise ValueError('Invalid file mode: {0}'.format(mode))
- self.is_open = True
+ self.closed = False
cdef class MemoryMappedFile(NativeFile):
@@ -409,11 +432,6 @@ cdef class MemoryMappedFile(NativeFile):
cdef:
object path
- def __cinit__(self):
- self.is_open = False
- self.is_readable = 0
- self.is_writeable = 0
-
@staticmethod
def create(path, size):
cdef:
@@ -426,11 +444,11 @@ cdef class MemoryMappedFile(NativeFile):
cdef MemoryMappedFile result = MemoryMappedFile()
result.path = path
- result.is_readable = 1
- result.is_writeable = 1
+ result.is_readable = True
+ result.is_writable = True
result.wr_file = <shared_ptr[OutputStream]> handle
result.rd_file = <shared_ptr[RandomAccessFile]> handle
- result.is_open = True
+ result.closed = False
return result
@@ -444,14 +462,14 @@ cdef class MemoryMappedFile(NativeFile):
if mode in ('r', 'rb'):
c_mode = FileMode_READ
- self.is_readable = 1
+ self.is_readable = True
elif mode in ('w', 'wb'):
c_mode = FileMode_WRITE
- self.is_writeable = 1
+ self.is_writable = True
elif mode in ('r+', 'r+b', 'rb+'):
c_mode = FileMode_READWRITE
- self.is_readable = 1
- self.is_writeable = 1
+ self.is_readable = True
+ self.is_writable = True
else:
raise ValueError('Invalid file mode: {0}'.format(mode))
@@ -460,7 +478,7 @@ cdef class MemoryMappedFile(NativeFile):
self.wr_file = <shared_ptr[OutputStream]> handle
self.rd_file = <shared_ptr[RandomAccessFile]> handle
- self.is_open = True
+ self.closed = False
def memory_map(path, mode='r'):
@@ -484,7 +502,7 @@ def memory_map(path, mode='r'):
def create_memory_map(path, size):
"""
Create memory map at indicated path of the given size, return open
- writeable file object
+ writable file object
Parameters
----------
@@ -513,16 +531,14 @@ cdef class OSFile(NativeFile):
shared_ptr[Readable] handle
c_string c_path = encode_file_path(path)
- self.is_readable = self.is_writeable = 0
-
if mode in ('r', 'rb'):
self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
elif mode in ('w', 'wb'):
- self._open_writeable(c_path)
+ self._open_writable(c_path)
else:
raise ValueError('Invalid file mode: {0}'.format(mode))
- self.is_open = True
+ self.closed = False
cdef _open_readable(self, c_string path, CMemoryPool* pool):
cdef shared_ptr[ReadableFile] handle
@@ -530,15 +546,15 @@ cdef class OSFile(NativeFile):
with nogil:
check_status(ReadableFile.Open(path, pool, &handle))
- self.is_readable = 1
+ self.is_readable = True
self.rd_file = <shared_ptr[RandomAccessFile]> handle
- cdef _open_writeable(self, c_string path):
+ cdef _open_writable(self, c_string path):
cdef shared_ptr[FileOutputStream] handle
with nogil:
check_status(FileOutputStream.Open(path, &handle))
- self.is_writeable = 1
+ self.is_writable = True
self.wr_file = <shared_ptr[OutputStream]> handle
@@ -546,9 +562,8 @@ cdef class FixedSizeBufferWriter(NativeFile):
def __cinit__(self, Buffer buffer):
self.wr_file.reset(new CFixedSizeBufferWriter(buffer.buffer))
- self.is_readable = 0
- self.is_writeable = 1
- self.is_open = True
+ self.is_writable = True
+ self.closed = False
def set_memcopy_threads(self, int num_threads):
cdef CFixedSizeBufferWriter* writer = \
@@ -738,14 +753,13 @@ cdef class BufferOutputStream(NativeFile):
self.buffer = _allocate_buffer(maybe_unbox_memory_pool(memory_pool))
self.wr_file.reset(new CBufferOutputStream(
<shared_ptr[CResizableBuffer]> self.buffer))
- self.is_readable = 0
- self.is_writeable = 1
- self.is_open = True
+ self.is_writable = True
+ self.closed = False
def get_result(self):
with nogil:
check_status(self.wr_file.get().Close())
- self.is_open = False
+ self.closed = True
return pyarrow_wrap_buffer(<shared_ptr[CBuffer]> self.buffer)
@@ -753,9 +767,8 @@ cdef class MockOutputStream(NativeFile):
def __cinit__(self):
self.wr_file.reset(new CMockOutputStream())
- self.is_readable = 0
- self.is_writeable = 1
- self.is_open = True
+ self.is_writable = True
+ self.closed = False
def size(self):
return (<CMockOutputStream*>self.wr_file.get()).GetExtentBytesWritten()
@@ -780,9 +793,8 @@ cdef class BufferReader(NativeFile):
self.buffer = frombuffer(obj)
self.rd_file.reset(new CBufferReader(self.buffer.buffer))
- self.is_readable = 1
- self.is_writeable = 0
- self.is_open = True
+ self.is_readable = True
+ self.closed = False
def frombuffer(object obj):
@@ -834,8 +846,8 @@ cdef get_writer(object source, shared_ptr[OutputStream]* writer):
if isinstance(source, NativeFile):
nf = source
- if not nf.is_writeable:
- raise IOError('Native file is not writeable')
+ if not nf.is_writable:
+ raise IOError('Native file is not writable')
nf.write_handle(writer)
else:
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 7534b0d..a30a228 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -429,7 +429,7 @@ def write_tensor(Tensor tensor, NativeFile dest):
int32_t metadata_length
int64_t body_length
- dest._assert_writeable()
+ dest._assert_writable()
with nogil:
check_status(
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index f264f08..4081fc5 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -65,7 +65,7 @@ class RecordBatchStreamWriter(lib._RecordBatchWriter):
Parameters
----------
sink : str, pyarrow.NativeFile, or file-like Python object
- Either a file path, or a writeable file object
+ Either a file path, or a writable file object
schema : pyarrow.Schema
The Arrow schema for data to be written to the file
"""
@@ -96,7 +96,7 @@ class RecordBatchFileWriter(lib._RecordBatchFileWriter):
Parameters
----------
sink : str, pyarrow.NativeFile, or file-like Python object
- Either a file path, or a writeable file object
+ Either a file path, or a writable file object
schema : pyarrow.Schema
The Arrow schema for data to be written to the file
"""
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 90f749d..161562c 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -333,8 +333,8 @@ cdef class NativeFile:
shared_ptr[RandomAccessFile] rd_file
shared_ptr[OutputStream] wr_file
bint is_readable
- bint is_writeable
- bint is_open
+ bint is_writable
+ readonly bint closed
bint own_file
# By implementing these "virtual" functions (all functions in Cython
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 3f7aa2e..da26b10 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-from io import BytesIO
+from io import BytesIO, TextIOWrapper
import gc
import os
import pytest
@@ -482,27 +482,48 @@ def test_native_file_modes(tmpdir):
with pa.OSFile(path, mode='r') as f:
assert f.mode == 'rb'
+ assert f.readable()
+ assert not f.writable()
+ assert f.seekable()
with pa.OSFile(path, mode='rb') as f:
assert f.mode == 'rb'
+ assert f.readable()
+ assert not f.writable()
+ assert f.seekable()
with pa.OSFile(path, mode='w') as f:
assert f.mode == 'wb'
+ assert not f.readable()
+ assert f.writable()
+ assert not f.seekable()
with pa.OSFile(path, mode='wb') as f:
assert f.mode == 'wb'
+ assert not f.readable()
+ assert f.writable()
+ assert not f.seekable()
with open(path, 'wb') as f:
f.write(b'foooo')
with pa.memory_map(path, 'r') as f:
assert f.mode == 'rb'
+ assert f.readable()
+ assert not f.writable()
+ assert f.seekable()
with pa.memory_map(path, 'r+') as f:
assert f.mode == 'rb+'
+ assert f.readable()
+ assert f.writable()
+ assert f.seekable()
with pa.memory_map(path, 'r+b') as f:
assert f.mode == 'rb+'
+ assert f.readable()
+ assert f.writable()
+ assert f.seekable()
def test_native_file_raises_ValueError_after_close(tmpdir):
@@ -511,19 +532,56 @@ def test_native_file_raises_ValueError_after_close(tmpdir):
f.write(b'foooo')
with pa.OSFile(path, mode='rb') as os_file:
- pass
+ assert not os_file.closed
+ assert os_file.closed
with pa.memory_map(path, mode='rb') as mmap_file:
- pass
+ assert not mmap_file.closed
+ assert mmap_file.closed
files = [os_file,
mmap_file]
methods = [('tell', ()),
('seek', (0,)),
- ('size', ())]
+ ('size', ()),
+ ('flush', ()),
+ ('readable', ()),
+ ('writable', ()),
+ ('seekable', ())]
for f in files:
for method, args in methods:
with pytest.raises(ValueError):
getattr(f, method)(*args)
+
+
+def test_native_file_TextIOWrapper(tmpdir):
+ data = (u'foooo\n'
+ u'barrr\n'
+ u'bazzz\n')
+
+ path = os.path.join(str(tmpdir), guid())
+ with open(path, 'wb') as f:
+ f.write(data.encode('utf-8'))
+
+ with TextIOWrapper(pa.OSFile(path, mode='rb')) as fil:
+ assert fil.readable()
+ res = fil.read()
+ assert res == data
+ assert fil.closed
+
+ with TextIOWrapper(pa.OSFile(path, mode='rb')) as fil:
+ # Iteration works
+ lines = list(fil)
+ assert ''.join(lines) == data
+
+ # Writing
+ path2 = os.path.join(str(tmpdir), guid())
+ with TextIOWrapper(pa.OSFile(path2, mode='wb')) as fil:
+ assert fil.writable()
+ fil.write(data)
+
+ with TextIOWrapper(pa.OSFile(path2, mode='rb')) as fil:
+ res = fil.read()
+ assert res == data
--
To stop receiving notification emails like this one, please contact
wesm@apache.org.