You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/13 13:50:20 UTC
arrow git commit: ARROW-472: [Python] Expose more C++ IO interfaces.
Add equals methods to Parquet schemas. Pass Parquet metadata separately in
reader
Repository: arrow
Updated Branches:
refs/heads/master 5ffbda1b4 -> ad0e57d23
ARROW-472: [Python] Expose more C++ IO interfaces. Add equals methods to Parquet schemas. Pass Parquet metadata separately in reader
Also includes ARROW-471, ARROW-441.
Needed to compare file schemas easily.
Requires PARQUET-830
Author: Wes McKinney <we...@twosigma.com>
Closes #280 from wesm/ARROW-472 and squashes the following commits:
1c5e27c [Wes McKinney] Name static const variables constexpr instead
25c5c90 [Wes McKinney] Add some tests for io.OSFile
1d22428 [Wes McKinney] Add some memory map Python unit tests
5268b6c [Wes McKinney] Add untested wrapper for operating system files
fd52153 [Wes McKinney] Add unit test for passing metadata down
2316e64 [Wes McKinney] Expose MemoryMappedFile in pyarrow.io, expand parquet::arrow::OpenFile to take metadata, props parameters
a2ce247 [Wes McKinney] Add equals methods to Parquet Schema and ColumnSchema objects
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ad0e57d2
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ad0e57d2
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ad0e57d2
Branch: refs/heads/master
Commit: ad0e57d23257462b9933745949d54ca729da537e
Parents: 5ffbda1
Author: Wes McKinney <we...@twosigma.com>
Authored: Fri Jan 13 08:50:14 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Fri Jan 13 08:50:14 2017 -0500
----------------------------------------------------------------------
cpp/src/arrow/io/file.cc | 45 ++++++----
cpp/src/arrow/io/file.h | 2 +
cpp/src/arrow/io/io-file-test.cc | 4 +-
python/pyarrow/_parquet.pxd | 18 +++-
python/pyarrow/_parquet.pyx | 37 +++++---
python/pyarrow/compat.py | 26 ++++++
python/pyarrow/includes/libarrow_io.pxd | 10 +++
python/pyarrow/io.pxd | 3 +-
python/pyarrow/io.pyx | 125 ++++++++++++++++++++++----
python/pyarrow/parquet.py | 2 +-
python/pyarrow/table.pyx | 2 +-
python/pyarrow/tests/test_io.py | 130 ++++++++++++++++++++++++++-
python/pyarrow/tests/test_parquet.py | 52 +++++++++--
13 files changed, 396 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/cpp/src/arrow/io/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 0fb13ea..1de6efa 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -188,6 +188,8 @@ static inline Status FileOpenWriteable(
memcpy(wpath.data() + nwchars, L"\0", sizeof(wchar_t));
int oflag = _O_CREAT | _O_BINARY;
+ int sh_flag = _S_IWRITE;
+ if (!write_only) { sh_flag |= _S_IREAD; }
if (truncate) { oflag |= _O_TRUNC; }
@@ -197,7 +199,7 @@ static inline Status FileOpenWriteable(
oflag |= _O_RDWR;
}
- errno_actual = _wsopen_s(fd, wpath.data(), oflag, _SH_DENYNO, _S_IWRITE);
+ errno_actual = _wsopen_s(fd, wpath.data(), oflag, _SH_DENYNO, sh_flag);
ret = *fd;
#else
@@ -319,7 +321,7 @@ class OSFile {
RETURN_NOT_OK(FileOpenWriteable(path, write_only, !append, &fd_));
path_ = path;
is_open_ = true;
- mode_ = write_only ? FileMode::READ : FileMode::READWRITE;
+ mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE;
if (append) {
RETURN_NOT_OK(FileGetSize(fd_, &size_));
@@ -352,7 +354,7 @@ class OSFile {
}
Status Seek(int64_t pos) {
- if (pos > size_) { pos = size_; }
+ if (pos < 0) { return Status::Invalid("Invalid position"); }
return FileSeek(fd_, pos);
}
@@ -523,17 +525,24 @@ class MemoryMappedFile::MemoryMappedFileImpl : public OSFile {
}
Status Open(const std::string& path, FileMode::type mode) {
- int prot_flags = PROT_READ;
+ int prot_flags;
+ int map_mode;
if (mode != FileMode::READ) {
- prot_flags |= PROT_WRITE;
- const bool append = true;
- RETURN_NOT_OK(OSFile::OpenWriteable(path, append, mode == FileMode::WRITE));
+ // Memory mapping has permission failures if PROT_READ not set
+ prot_flags = PROT_READ | PROT_WRITE;
+ map_mode = MAP_SHARED;
+ constexpr bool append = true;
+ constexpr bool write_only = false;
+ RETURN_NOT_OK(OSFile::OpenWriteable(path, append, write_only));
+ mode_ = mode;
} else {
+ prot_flags = PROT_READ;
+ map_mode = MAP_PRIVATE; // Changes are not to be committed back to the file
RETURN_NOT_OK(OSFile::OpenReadable(path));
}
- void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fd(), 0);
+ void* result = mmap(nullptr, size_, prot_flags, map_mode, fd(), 0);
if (result == MAP_FAILED) {
std::stringstream ss;
ss << "Memory mapping file failed, errno: " << errno;
@@ -548,16 +557,14 @@ class MemoryMappedFile::MemoryMappedFileImpl : public OSFile {
int64_t size() const { return size_; }
Status Seek(int64_t position) {
- if (position < 0 || position >= size_) {
- return Status::Invalid("position is out of bounds");
- }
+ if (position < 0) { return Status::Invalid("position is out of bounds"); }
position_ = position;
return Status::OK();
}
int64_t position() { return position_; }
- void advance(int64_t nbytes) { position_ = std::min(size_, position_ + nbytes); }
+ void advance(int64_t nbytes) { position_ = position_ + nbytes; }
uint8_t* data() { return data_; }
@@ -611,16 +618,18 @@ Status MemoryMappedFile::Close() {
}
Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
- nbytes = std::min(nbytes, impl_->size() - impl_->position());
- std::memcpy(out, impl_->head(), nbytes);
+ nbytes = std::max<int64_t>(0, std::min(nbytes, impl_->size() - impl_->position()));
+ if (nbytes > 0) { std::memcpy(out, impl_->head(), nbytes); }
*bytes_read = nbytes;
impl_->advance(nbytes);
return Status::OK();
}
Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
- nbytes = std::min(nbytes, impl_->size() - impl_->position());
- *out = std::make_shared<Buffer>(impl_->head(), nbytes);
+ nbytes = std::max<int64_t>(0, std::min(nbytes, impl_->size() - impl_->position()));
+
+ const uint8_t* data = nbytes > 0 ? impl_->head() : nullptr;
+ *out = std::make_shared<Buffer>(data, nbytes);
impl_->advance(nbytes);
return Status::OK();
}
@@ -655,5 +664,9 @@ Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) {
return Status::OK();
}
+int MemoryMappedFile::file_descriptor() const {
+ return impl_->fd();
+}
+
} // namespace io
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/cpp/src/arrow/io/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
index 9ca9c54..2387232 100644
--- a/cpp/src/arrow/io/file.h
+++ b/cpp/src/arrow/io/file.h
@@ -127,6 +127,8 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
// @return: the size in bytes of the memory source
Status GetSize(int64_t* size) override;
+ int file_descriptor() const;
+
private:
explicit MemoryMappedFile(FileMode::type mode);
http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/cpp/src/arrow/io/io-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index 821e71d..20cd047 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -209,8 +209,8 @@ TEST_F(TestReadableFile, SeekTellSize) {
ASSERT_OK(file_->Seek(100));
ASSERT_OK(file_->Tell(&position));
- // now at EOF
- ASSERT_EQ(8, position);
+ // Can seek past end of file
+ ASSERT_EQ(100, position);
int64_t size;
ASSERT_OK(file_->GetSize(&size));
http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/_parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 7e49e9e..cf1da1c 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -99,8 +99,9 @@ cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
ParquetVersion_V2" parquet::ParquetVersion::PARQUET_2_0"
cdef cppclass ColumnDescriptor:
- shared_ptr[ColumnPath] path()
+ c_bool Equals(const ColumnDescriptor& other)
+ shared_ptr[ColumnPath] path()
int16_t max_definition_level()
int16_t max_repetition_level()
@@ -115,6 +116,7 @@ cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
const ColumnDescriptor* Column(int i)
shared_ptr[Node] schema()
GroupNode* group()
+ c_bool Equals(const SchemaDescriptor& other)
int num_columns()
@@ -163,8 +165,18 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
unique_ptr[CRowGroupMetaData] RowGroup(int i)
const SchemaDescriptor* schema()
+ cdef cppclass ReaderProperties:
+ pass
+
+ ReaderProperties default_reader_properties()
+
cdef cppclass ParquetFileReader:
- # TODO: Some default arguments are missing
+ @staticmethod
+ unique_ptr[ParquetFileReader] Open(
+ const shared_ptr[ReadableFileInterface]& file,
+ const ReaderProperties& props,
+ const shared_ptr[CFileMetaData]& metadata)
+
@staticmethod
unique_ptr[ParquetFileReader] OpenFile(const c_string& path)
shared_ptr[CFileMetaData] metadata();
@@ -193,6 +205,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
CStatus OpenFile(const shared_ptr[ReadableFileInterface]& file,
MemoryPool* allocator,
+ const ReaderProperties& properties,
+ const shared_ptr[CFileMetaData]& metadata,
unique_ptr[FileReader]* reader)
cdef cppclass FileReader:
http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 30e3de4..867fc4c 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -19,8 +19,9 @@
# distutils: language = c++
# cython: embedsignature = True
-from pyarrow._parquet cimport *
+from cython.operator cimport dereference as deref
+from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_io cimport (ReadableFileInterface, OutputStream,
FileOutputStream)
@@ -196,6 +197,12 @@ cdef class Schema:
def __getitem__(self, i):
return self.column(i)
+ def equals(self, Schema other):
+ """
+ Returns True if the Parquet schemas are equal
+ """
+ return self.schema.Equals(deref(other.schema))
+
def column(self, i):
if i < 0 or i >= len(self):
raise IndexError('{0} out of bounds'.format(i))
@@ -217,6 +224,12 @@ cdef class ColumnSchema:
self.parent = schema
self.descr = schema.schema.Column(i)
+ def equals(self, ColumnSchema other):
+ """
+ Returns True if the column schemas are equal
+ """
+ return self.descr.Equals(deref(other.descr))
+
def __repr__(self):
physical_type = self.physical_type
logical_type = self.logical_type
@@ -337,24 +350,20 @@ cdef class ParquetReader:
self.allocator = default_memory_pool()
self._metadata = None
- def open(self, object source):
+ def open(self, object source, FileMetaData metadata=None):
cdef:
shared_ptr[ReadableFileInterface] rd_handle
+ shared_ptr[CFileMetaData] c_metadata
+ ReaderProperties properties = default_reader_properties()
c_string path
- if isinstance(source, six.string_types):
- path = tobytes(source)
-
- # Must be in one expression to avoid calling std::move which is not
- # possible in Cython (due to missing rvalue support)
+ if metadata is not None:
+ c_metadata = metadata.sp_metadata
- # TODO(wesm): ParquetFileReader::OpenFile can throw?
- self.reader = unique_ptr[FileReader](
- new FileReader(default_memory_pool(),
- ParquetFileReader.OpenFile(path)))
- else:
- get_reader(source, &rd_handle)
- check_status(OpenFile(rd_handle, self.allocator, &self.reader))
+ get_reader(source, &rd_handle)
+ with nogil:
+ check_status(OpenFile(rd_handle, self.allocator, properties,
+ c_metadata, &self.reader))
@property
def metadata(self):
http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/compat.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/compat.py b/python/pyarrow/compat.py
index 2dfdb50..9148be7 100644
--- a/python/pyarrow/compat.py
+++ b/python/pyarrow/compat.py
@@ -54,6 +54,10 @@ if PY2:
range = xrange
long = long
+ def guid():
+ from uuid import uuid4
+ return uuid4().get_hex()
+
def u(s):
return unicode(s, "unicode_escape")
@@ -76,6 +80,10 @@ else:
from decimal import Decimal
range = range
+ def guid():
+ from uuid import uuid4
+ return uuid4().hex
+
def u(s):
return s
@@ -89,6 +97,24 @@ else:
return o.decode('utf8')
+def encode_file_path(path):
+ import os
+ # Windows requires utf-16le encoding for unicode file names
+ if isinstance(path, unicode_type):
+ if os.name == 'nt':
+ # try:
+ # encoded_path = path.encode('ascii')
+ # except:
+ encoded_path = path.encode('utf-16le')
+ else:
+ # POSIX systems can handle utf-8
+ encoded_path = path.encode('utf-8')
+ else:
+ encoded_path = path
+
+ return encoded_path
+
+
integer_types = six.integer_types + (np.integer,)
__all__ = []
http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/includes/libarrow_io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd
index 99f88ad..6b141a3 100644
--- a/python/pyarrow/includes/libarrow_io.pxd
+++ b/python/pyarrow/includes/libarrow_io.pxd
@@ -69,6 +69,8 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
cdef extern from "arrow/io/file.h" namespace "arrow::io" nogil:
+
+
cdef cppclass FileOutputStream(OutputStream):
@staticmethod
CStatus Open(const c_string& path, shared_ptr[FileOutputStream]* file)
@@ -85,6 +87,14 @@ cdef extern from "arrow/io/file.h" namespace "arrow::io" nogil:
int file_descriptor()
+ cdef cppclass CMemoryMappedFile" arrow::io::MemoryMappedFile"\
+ (ReadWriteFileInterface):
+ @staticmethod
+ CStatus Open(const c_string& path, FileMode mode,
+ shared_ptr[CMemoryMappedFile]* file)
+
+ int file_descriptor()
+
cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
CStatus HaveLibHdfs()
http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd
index 02265d0..fffc7c5 100644
--- a/python/pyarrow/io.pxd
+++ b/python/pyarrow/io.pxd
@@ -32,7 +32,8 @@ cdef class NativeFile:
cdef:
shared_ptr[ReadableFileInterface] rd_file
shared_ptr[OutputStream] wr_file
- bint is_readonly
+ bint is_readable
+ bint is_writeable
bint is_open
bint own_file
http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index b62de6c..2d8e4e8 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -27,12 +27,13 @@ from pyarrow.includes.libarrow cimport *
cimport pyarrow.includes.pyarrow as pyarrow
from pyarrow.includes.libarrow_io cimport *
-from pyarrow.compat import frombytes, tobytes
+from pyarrow.compat import frombytes, tobytes, encode_file_path
from pyarrow.error cimport check_status
cimport cpython as cp
import re
+import six
import sys
import threading
import time
@@ -42,6 +43,7 @@ cdef extern from "Python.h":
PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
char *v, Py_ssize_t len) except NULL
+
cdef class NativeFile:
def __cinit__(self):
@@ -61,7 +63,7 @@ cdef class NativeFile:
def close(self):
if self.is_open:
with nogil:
- if self.is_readonly:
+ if self.is_readable:
check_status(self.rd_file.get().Close())
else:
check_status(self.wr_file.get().Close())
@@ -76,15 +78,15 @@ cdef class NativeFile:
file[0] = <shared_ptr[OutputStream]> self.wr_file
def _assert_readable(self):
- if not self.is_readonly:
+ if not self.is_readable:
raise IOError("only valid on readonly files")
if not self.is_open:
raise IOError("file not open")
def _assert_writeable(self):
- if self.is_readonly:
- raise IOError("only valid on writeonly files")
+ if not self.is_writeable:
+ raise IOError("only valid on writeable files")
if not self.is_open:
raise IOError("file not open")
@@ -99,7 +101,7 @@ cdef class NativeFile:
def tell(self):
cdef int64_t position
with nogil:
- if self.is_readonly:
+ if self.is_readable:
check_status(self.rd_file.get().Tell(&position))
else:
check_status(self.wr_file.get().Tell(&position))
@@ -137,7 +139,7 @@ cdef class NativeFile:
self._assert_readable()
# Allocate empty write space
- obj = PyBytes_FromStringAndSizeNative(NULL, nbytes)
+ obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes)
cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
with nogil:
@@ -179,16 +181,100 @@ cdef class PythonFileInterface(NativeFile):
if mode.startswith('w'):
self.wr_file.reset(new pyarrow.PyOutputStream(handle))
- self.is_readonly = 0
+ self.is_readable = 0
+ self.is_writeable = 1
elif mode.startswith('r'):
self.rd_file.reset(new pyarrow.PyReadableFile(handle))
- self.is_readonly = 1
+ self.is_readable = 1
+ self.is_writeable = 0
+ else:
+ raise ValueError('Invalid file mode: {0}'.format(mode))
+
+ self.is_open = True
+
+
+cdef class MemoryMappedFile(NativeFile):
+ """
+ Supports 'r', 'r+w', 'w' modes
+ """
+ cdef:
+ object path
+
+ def __cinit__(self, path, mode='r'):
+ self.path = path
+
+ cdef:
+ FileMode c_mode
+ shared_ptr[CMemoryMappedFile] handle
+ c_string c_path = encode_file_path(path)
+
+ self.is_readable = self.is_writeable = 0
+
+ if mode in ('r', 'rb'):
+ c_mode = FileMode_READ
+ self.is_readable = 1
+ elif mode in ('w', 'wb'):
+ c_mode = FileMode_WRITE
+ self.is_writeable = 1
+ elif mode == 'r+w':
+ c_mode = FileMode_READWRITE
+ self.is_readable = 1
+ self.is_writeable = 1
else:
raise ValueError('Invalid file mode: {0}'.format(mode))
+ check_status(CMemoryMappedFile.Open(c_path, c_mode, &handle))
+
+ self.wr_file = <shared_ptr[OutputStream]> handle
+ self.rd_file = <shared_ptr[ReadableFileInterface]> handle
self.is_open = True
+cdef class OSFile(NativeFile):
+ """
+ Supports 'r', 'w' modes
+ """
+ cdef:
+ object path
+
+ def __cinit__(self, path, mode='r'):
+ self.path = path
+
+ cdef:
+ FileMode c_mode
+ shared_ptr[Readable] handle
+ c_string c_path = encode_file_path(path)
+
+ self.is_readable = self.is_writeable = 0
+
+ if mode in ('r', 'rb'):
+ self._open_readable(c_path)
+ elif mode in ('w', 'wb'):
+ self._open_writeable(c_path)
+ else:
+ raise ValueError('Invalid file mode: {0}'.format(mode))
+
+ self.is_open = True
+
+ cdef _open_readable(self, c_string path):
+ cdef shared_ptr[ReadableFile] handle
+
+ with nogil:
+ check_status(ReadableFile.Open(path, pyarrow.get_memory_pool(),
+ &handle))
+
+ self.is_readable = 1
+ self.rd_file = <shared_ptr[ReadableFileInterface]> handle
+
+ cdef _open_writeable(self, c_string path):
+ cdef shared_ptr[FileOutputStream] handle
+
+ with nogil:
+ check_status(FileOutputStream.Open(path, &handle))
+ self.is_writeable = 1
+ self.wr_file = <shared_ptr[OutputStream]> handle
+
+
cdef class BytesReader(NativeFile):
cdef:
object obj
@@ -198,7 +284,8 @@ cdef class BytesReader(NativeFile):
raise ValueError('Must pass bytes object')
self.obj = obj
- self.is_readonly = 1
+ self.is_readable = 1
+ self.is_writeable = 0
self.is_open = True
self.rd_file.reset(new pyarrow.PyBytesReader(obj))
@@ -264,7 +351,8 @@ cdef class InMemoryOutputStream(NativeFile):
self.buffer = allocate_buffer()
self.wr_file.reset(new BufferOutputStream(
<shared_ptr[ResizableBuffer]> self.buffer))
- self.is_readonly = 0
+ self.is_readable = 0
+ self.is_writeable = 1
self.is_open = True
def get_result(self):
@@ -285,7 +373,8 @@ cdef class BufferReader(NativeFile):
self.buffer = buffer
self.rd_file.reset(new CBufferReader(buffer.buffer.get().data(),
buffer.buffer.get().size()))
- self.is_readonly = 1
+ self.is_readable = 1
+ self.is_writeable = 0
self.is_open = True
@@ -311,12 +400,14 @@ cdef get_reader(object source, shared_ptr[ReadableFileInterface]* reader):
elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
# Optimistically hope this is file-like
source = PythonFileInterface(source, mode='r')
+ elif isinstance(source, six.string_types):
+ source = MemoryMappedFile(source, mode='r')
if isinstance(source, NativeFile):
nf = source
# TODO: what about read-write sources (e.g. memory maps)
- if not nf.is_readonly:
+ if not nf.is_readable:
raise IOError('Native file is not readable')
nf.read_handle(reader)
@@ -335,7 +426,7 @@ cdef get_writer(object source, shared_ptr[OutputStream]* writer):
if isinstance(source, NativeFile):
nf = source
- if nf.is_readonly:
+ if nf.is_readable:
raise IOError('Native file is not writeable')
nf.write_handle(writer)
@@ -593,14 +684,16 @@ cdef class HdfsClient:
out.wr_file = <shared_ptr[OutputStream]> wr_handle
- out.is_readonly = False
+ out.is_readable = False
+ out.is_writeable = 1
else:
with nogil:
check_status(self.client.get()
.OpenReadable(c_path, &rd_handle))
out.rd_file = <shared_ptr[ReadableFileInterface]> rd_handle
- out.is_readonly = True
+ out.is_readable = True
+ out.is_writeable = 0
if c_buffer_size == 0:
c_buffer_size = 2 ** 16
http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 2dedb72..708ae65 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -33,7 +33,7 @@ class ParquetFile(object):
"""
def __init__(self, source, metadata=None):
self.reader = _parquet.ParquetReader()
- self.reader.open(source)
+ self.reader.open(source, metadata=metadata)
@property
def metadata(self):
http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index 3a04651..dce125a 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -22,7 +22,7 @@
from cython.operator cimport dereference as deref
from pyarrow.includes.libarrow cimport *
-from pyarrow.includes.common cimport PyObject_to_object
+from pyarrow.includes.common cimport *
cimport pyarrow.includes.pyarrow as pyarrow
import pyarrow.config
http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/tests/test_io.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 3e7a437..224f20d 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -16,10 +16,14 @@
# under the License.
from io import BytesIO
+import os
import pytest
-from pyarrow.compat import u
+import numpy as np
+
+from pyarrow.compat import u, guid
import pyarrow.io as io
+import pyarrow as pa
# ----------------------------------------------------------------------
# Python file-like objects
@@ -155,3 +159,127 @@ def test_inmemory_write_after_closed():
with pytest.raises(IOError):
f.write(b'not ok')
+
+
+# ----------------------------------------------------------------------
+# OS files and memory maps
+
+@pytest.fixture(scope='session')
+def sample_disk_data(request):
+
+ SIZE = 4096
+ arr = np.random.randint(0, 256, size=SIZE).astype('u1')
+ data = arr.tobytes()[:SIZE]
+
+ path = guid()
+ with open(path, 'wb') as f:
+ f.write(data)
+
+ def teardown():
+ _try_delete(path)
+ request.addfinalizer(teardown)
+ return path, data
+
+
+def _check_native_file_reader(KLASS, sample_data):
+ path, data = sample_data
+
+ f = KLASS(path, mode='r')
+
+ assert f.read(10) == data[:10]
+ assert f.read(0) == b''
+ assert f.tell() == 10
+
+ assert f.read() == data[10:]
+
+ assert f.size() == len(data)
+
+ f.seek(0)
+ assert f.tell() == 0
+
+ # Seeking past end of file not supported in memory maps
+ f.seek(len(data) + 1)
+ assert f.tell() == len(data) + 1
+ assert f.read(5) == b''
+
+
+def test_memory_map_reader(sample_disk_data):
+ _check_native_file_reader(io.MemoryMappedFile, sample_disk_data)
+
+
+def test_os_file_reader(sample_disk_data):
+ _check_native_file_reader(io.OSFile, sample_disk_data)
+
+
+def _try_delete(path):
+ try:
+ os.remove(path)
+ except os.error:
+ pass
+
+
+def test_memory_map_writer():
+ SIZE = 4096
+ arr = np.random.randint(0, 256, size=SIZE).astype('u1')
+ data = arr.tobytes()[:SIZE]
+
+ path = guid()
+ try:
+ with open(path, 'wb') as f:
+ f.write(data)
+
+ f = io.MemoryMappedFile(path, mode='r+w')
+
+ f.seek(10)
+ f.write('peekaboo')
+ assert f.tell() == 18
+
+ f.seek(10)
+ assert f.read(8) == b'peekaboo'
+
+ f2 = io.MemoryMappedFile(path, mode='r+w')
+
+ f2.seek(10)
+ f2.write(b'booapeak')
+ f2.seek(10)
+
+ f.seek(10)
+ assert f.read(8) == b'booapeak'
+
+ # Does not truncate file
+ f3 = io.MemoryMappedFile(path, mode='w')
+ f3.write('foo')
+
+ with io.MemoryMappedFile(path) as f4:
+ assert f4.size() == SIZE
+
+ with pytest.raises(IOError):
+ f3.read(5)
+
+ f.seek(0)
+ assert f.read(3) == b'foo'
+ finally:
+ _try_delete(path)
+
+
+def test_os_file_writer():
+ SIZE = 4096
+ arr = np.random.randint(0, 256, size=SIZE).astype('u1')
+ data = arr.tobytes()[:SIZE]
+
+ path = guid()
+ try:
+ with open(path, 'wb') as f:
+ f.write(data)
+
+ # Truncates file
+ f2 = io.OSFile(path, mode='w')
+ f2.write('foo')
+
+ with io.OSFile(path) as f3:
+ assert f3.size() == 3
+
+ with pytest.raises(IOError):
+ f2.read(5)
+ finally:
+ _try_delete(path)
http://git-wip-us.apache.org/repos/asf/arrow/blob/ad0e57d2/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index e157155..9cf860a 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -236,19 +236,22 @@ def test_pandas_parquet_configuration_options(tmpdir):
pdt.assert_frame_equal(df, df_read)
-@parquet
-def test_parquet_metadata_api():
- df = alltypes_sample(size=10000)
- df = df.reindex(columns=sorted(df.columns))
-
+def make_sample_file(df):
a_table = A.Table.from_pandas(df, timestamps_to_ms=True)
buf = io.BytesIO()
pq.write_table(a_table, buf, compression='SNAPPY', version='2.0')
buf.seek(0)
- fileh = pq.ParquetFile(buf)
+ return pq.ParquetFile(buf)
+
+@parquet
+def test_parquet_metadata_api():
+ df = alltypes_sample(size=10000)
+ df = df.reindex(columns=sorted(df.columns))
+
+ fileh = make_sample_file(df)
ncols = len(df.columns)
# Series of sniff tests
@@ -288,3 +291,40 @@ def test_parquet_metadata_api():
assert rg_meta.num_rows == len(df)
assert rg_meta.num_columns == ncols
+
+
+@parquet
+def test_compare_schemas():
+ df = alltypes_sample(size=10000)
+
+ fileh = make_sample_file(df)
+ fileh2 = make_sample_file(df)
+ fileh3 = make_sample_file(df[df.columns[::2]])
+
+ assert fileh.schema.equals(fileh.schema)
+ assert fileh.schema.equals(fileh2.schema)
+
+ assert not fileh.schema.equals(fileh3.schema)
+
+ assert fileh.schema[0].equals(fileh.schema[0])
+ assert not fileh.schema[0].equals(fileh.schema[1])
+
+
+@parquet
+def test_pass_separate_metadata():
+ # ARROW-471
+ df = alltypes_sample(size=10000)
+
+ a_table = A.Table.from_pandas(df, timestamps_to_ms=True)
+
+ buf = io.BytesIO()
+ pq.write_table(a_table, buf, compression='snappy', version='2.0')
+
+ buf.seek(0)
+ metadata = pq.ParquetFile(buf).metadata
+
+ buf.seek(0)
+
+ fileh = pq.ParquetFile(buf, metadata=metadata)
+
+ pdt.assert_frame_equal(df, fileh.read().to_pandas())