You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/23 14:10:28 UTC
arrow git commit: ARROW-503: [Python] Implement Python interface to
streaming file format
Repository: arrow
Updated Branches:
refs/heads/master c327b5fd2 -> 1f81adcc8
ARROW-503: [Python] Implement Python interface to streaming file format
See the new `StreamWriter` and `StreamReader` classes. This patch is stacked on top of the patch for ARROW-475. Will rebase when that is merged.
Author: Wes McKinney <we...@twosigma.com>
Closes #299 from wesm/ARROW-503 and squashes the following commits:
e9d918e [Wes McKinney] Close BufferOutputStream after completing file or stream writes
31e519f [Wes McKinney] Add function alias to preserve backwards compatibility
faac28c [Wes McKinney] Fix small bug in BinaryArray::Equals, add rudimentary StreamReader/Writer interface and tests
d9fb3dc [Wes McKinney] Refactoring, consolidate IPC code into io.pyx
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/1f81adcc
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/1f81adcc
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/1f81adcc
Branch: refs/heads/master
Commit: 1f81adcc88b138c6ae5f5ffb3250f87239c89dc1
Parents: c327b5f
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Jan 23 09:10:18 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Jan 23 09:10:18 2017 -0500
----------------------------------------------------------------------
cpp/src/arrow/array.cc | 2 +-
cpp/src/arrow/ipc/ipc-file-test.cc | 2 +
cpp/src/arrow/ipc/stream.cc | 6 +-
cpp/src/arrow/ipc/stream.h | 3 +
python/CMakeLists.txt | 1 -
python/pyarrow/__init__.py | 2 +
python/pyarrow/includes/libarrow_ipc.pxd | 29 +-
python/pyarrow/io.pyx | 367 ++++++++++++++++++--------
python/pyarrow/ipc.py | 83 ++++++
python/pyarrow/ipc.pyx | 115 --------
python/pyarrow/schema.pyx | 1 +
python/pyarrow/table.pxd | 1 +
python/pyarrow/tests/test_ipc.py | 120 ++++-----
python/setup.py | 1 -
14 files changed, 438 insertions(+), 295 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index 7509520..aa4a692 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -359,7 +359,7 @@ bool BinaryArray::EqualsExact(const BinaryArray& other) const {
if (!data_buffer_ && !(other.data_buffer_)) { return true; }
- return data_buffer_->Equals(*other.data_buffer_, data_buffer_->size());
+ return data_buffer_->Equals(*other.data_buffer_, raw_offsets()[length_]);
}
bool BinaryArray::Equals(const std::shared_ptr<Array>& arr) const {
http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/cpp/src/arrow/ipc/ipc-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc
index 15ceb80..7cd8054 100644
--- a/cpp/src/arrow/ipc/ipc-file-test.cc
+++ b/cpp/src/arrow/ipc/ipc-file-test.cc
@@ -75,6 +75,7 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
}
RETURN_NOT_OK(writer->Close());
+ RETURN_NOT_OK(sink_->Close());
// Current offset into stream is the end of the file
int64_t footer_offset;
@@ -138,6 +139,7 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
RETURN_NOT_OK(writer->WriteRecordBatch(batch));
}
RETURN_NOT_OK(writer->Close());
+ RETURN_NOT_OK(sink_->Close());
// Open the file
auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/cpp/src/arrow/ipc/stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.cc b/cpp/src/arrow/ipc/stream.cc
index a2ca672..c9057e8 100644
--- a/cpp/src/arrow/ipc/stream.cc
+++ b/cpp/src/arrow/ipc/stream.cc
@@ -117,9 +117,9 @@ Status StreamWriter::WriteRecordBatch(const RecordBatch& batch) {
}
Status StreamWriter::Close() {
- // Close the stream
- RETURN_NOT_OK(CheckStarted());
- return sink_->Close();
+ // Write the schema if not already written
+ // User is responsible for closing the OutputStream
+ return CheckStarted();
}
// ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/cpp/src/arrow/ipc/stream.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.h b/cpp/src/arrow/ipc/stream.h
index 0b0e62f..53f51dc 100644
--- a/cpp/src/arrow/ipc/stream.h
+++ b/cpp/src/arrow/ipc/stream.h
@@ -54,6 +54,9 @@ class ARROW_EXPORT StreamWriter {
std::shared_ptr<StreamWriter>* out);
virtual Status WriteRecordBatch(const RecordBatch& batch);
+
+ /// Perform any logic necessary to finish the stream. User is responsible for
+ /// closing the actual OutputStream
virtual Status Close();
protected:
http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index b3735b1..d63fff4 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -409,7 +409,6 @@ set(CYTHON_EXTENSIONS
config
error
io
- ipc
scalar
schema
table
http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index d563c7a..7c521db 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -46,6 +46,8 @@ from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
from pyarrow.io import (HdfsFile, NativeFile, PythonFileInterface,
Buffer, InMemoryOutputStream, BufferReader)
+from pyarrow.ipc import FileReader, FileWriter, StreamReader, StreamWriter
+
from pyarrow.scalar import (ArrayValue, Scalar, NA, NAType,
BooleanValue,
Int8Value, Int16Value, Int32Value, Int64Value,
http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/includes/libarrow_ipc.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd
index 8295760..bfece14 100644
--- a/python/pyarrow/includes/libarrow_ipc.pxd
+++ b/python/pyarrow/includes/libarrow_ipc.pxd
@@ -20,18 +20,37 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport (MemoryPool, CArray, CSchema,
CRecordBatch)
-from pyarrow.includes.libarrow_io cimport (OutputStream, ReadableFileInterface)
+from pyarrow.includes.libarrow_io cimport (InputStream, OutputStream,
+ ReadableFileInterface)
-cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil:
- cdef cppclass CFileWriter " arrow::ipc::FileWriter":
+cdef extern from "arrow/ipc/stream.h" namespace "arrow::ipc" nogil:
+
+ cdef cppclass CStreamWriter " arrow::ipc::StreamWriter":
@staticmethod
CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
- shared_ptr[CFileWriter]* out)
+ shared_ptr[CStreamWriter]* out)
+ CStatus Close()
CStatus WriteRecordBatch(const CRecordBatch& batch)
- CStatus Close()
+ cdef cppclass CStreamReader " arrow::ipc::StreamReader":
+
+ @staticmethod
+ CStatus Open(const shared_ptr[InputStream]& stream,
+ shared_ptr[CStreamReader]* out)
+
+ shared_ptr[CSchema] schema()
+
+ CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch)
+
+
+cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil:
+
+ cdef cppclass CFileWriter " arrow::ipc::FileWriter"(CStreamWriter):
+ @staticmethod
+ CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
+ shared_ptr[CFileWriter]* out)
cdef cppclass CFileReader " arrow::ipc::FileReader":
http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 2621512..0755ed8 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -15,20 +15,26 @@
# specific language governing permissions and limitations
# under the License.
-# Cython wrappers for IO interfaces defined in arrow/io
+# Cython wrappers for IO interfaces defined in arrow::io and messaging in
+# arrow::ipc
# cython: profile=False
# distutils: language = c++
# cython: embedsignature = True
+from cython.operator cimport dereference as deref
+
from libc.stdlib cimport malloc, free
from pyarrow.includes.libarrow cimport *
-cimport pyarrow.includes.pyarrow as pyarrow
from pyarrow.includes.libarrow_io cimport *
+from pyarrow.includes.libarrow_ipc cimport *
+cimport pyarrow.includes.pyarrow as pyarrow
from pyarrow.compat import frombytes, tobytes, encode_file_path
from pyarrow.error cimport check_status
+from pyarrow.schema cimport Schema
+from pyarrow.table cimport RecordBatch, batch_from_cbatch
cimport cpython as cp
@@ -38,6 +44,11 @@ import sys
import threading
import time
+
+# 64K
+DEFAULT_BUFFER_SIZE = 2 ** 16
+
+
# To let us get a PyObject* and avoid Cython auto-ref-counting
cdef extern from "Python.h":
PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
@@ -167,6 +178,129 @@ cdef class NativeFile:
return wrap_buffer(output)
+ def download(self, stream_or_path, buffer_size=None):
+ """
+ Read file completely to local path (rather than reading completely into
+ memory). First seeks to the beginning of the file.
+ """
+ cdef:
+ int64_t bytes_read = 0
+ uint8_t* buf
+ self._assert_readable()
+
+ buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
+
+ write_queue = Queue(50)
+
+ if not hasattr(stream_or_path, 'read'):
+ stream = open(stream_or_path, 'wb')
+ cleanup = lambda: stream.close()
+ else:
+ stream = stream_or_path
+ cleanup = lambda: None
+
+ done = False
+ exc_info = None
+ def bg_write():
+ try:
+ while not done or write_queue.qsize() > 0:
+ try:
+ buf = write_queue.get(timeout=0.01)
+ except QueueEmpty:
+ continue
+ stream.write(buf)
+ except Exception as e:
+ exc_info = sys.exc_info()
+ finally:
+ cleanup()
+
+ self.seek(0)
+
+ writer_thread = threading.Thread(target=bg_write)
+
+ # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
+ # the passed buffer, so it's hard for us to avoid doubling the memory
+ buf = <uint8_t*> malloc(buffer_size)
+ if buf == NULL:
+ raise MemoryError("Failed to allocate {0} bytes"
+ .format(buffer_size))
+
+ writer_thread.start()
+
+ cdef int64_t total_bytes = 0
+ cdef int32_t c_buffer_size = buffer_size
+
+ try:
+ while True:
+ with nogil:
+ check_status(self.rd_file.get()
+ .Read(c_buffer_size, &bytes_read, buf))
+
+ total_bytes += bytes_read
+
+ # EOF
+ if bytes_read == 0:
+ break
+
+ pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
+ bytes_read)
+
+ write_queue.put_nowait(pybuf)
+ finally:
+ free(buf)
+ done = True
+
+ writer_thread.join()
+ if exc_info is not None:
+ raise exc_info[0], exc_info[1], exc_info[2]
+
+ def upload(self, stream, buffer_size=None):
+ """
+ Pipe file-like object to file
+ """
+ write_queue = Queue(50)
+ self._assert_writeable()
+
+ buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
+
+ done = False
+ exc_info = None
+ def bg_write():
+ try:
+ while not done or write_queue.qsize() > 0:
+ try:
+ buf = write_queue.get(timeout=0.01)
+ except QueueEmpty:
+ continue
+
+ self.write(buf)
+
+ except Exception as e:
+ exc_info = sys.exc_info()
+
+ writer_thread = threading.Thread(target=bg_write)
+ writer_thread.start()
+
+ try:
+ while True:
+ buf = stream.read(buffer_size)
+ if not buf:
+ break
+
+ if writer_thread.is_alive():
+ while write_queue.full():
+ time.sleep(0.01)
+ else:
+ break
+
+ write_queue.put_nowait(buf)
+ finally:
+ done = True
+
+ writer_thread.join()
+ if exc_info is not None:
+ raise exc_info[0], exc_info[1], exc_info[2]
+
# ----------------------------------------------------------------------
# Python file-like objects
@@ -679,58 +813,17 @@ cdef class _HdfsClient:
return out
- def upload(self, path, stream, buffer_size=2**16):
+ def download(self, path, stream, buffer_size=None):
+ with self.open(path, 'rb') as f:
+ f.download(stream, buffer_size=buffer_size)
+
+ def upload(self, path, stream, buffer_size=None):
"""
Upload file-like object to HDFS path
"""
- write_queue = Queue(50)
-
with self.open(path, 'wb') as f:
- done = False
- exc_info = None
- def bg_write():
- try:
- while not done or write_queue.qsize() > 0:
- try:
- buf = write_queue.get(timeout=0.01)
- except QueueEmpty:
- continue
-
- f.write(buf)
-
- except Exception as e:
- exc_info = sys.exc_info()
-
- writer_thread = threading.Thread(target=bg_write)
- writer_thread.start()
+ f.upload(stream, buffer_size=buffer_size)
- try:
- while True:
- buf = stream.read(buffer_size)
- if not buf:
- break
-
- if writer_thread.is_alive():
- while write_queue.full():
- time.sleep(0.01)
- else:
- break
-
- write_queue.put_nowait(buf)
- finally:
- done = True
-
- writer_thread.join()
- if exc_info is not None:
- raise exc_info[0], exc_info[1], exc_info[2]
-
- def download(self, path, stream, buffer_size=None):
- with self.open(path, 'rb', buffer_size=buffer_size) as f:
- f.download(stream)
-
-
-# ----------------------------------------------------------------------
-# Specialization for HDFS
# ARROW-404: Helper class to ensure that files are closed before the
# client. During deallocation of the extension class, the attributes are
@@ -766,75 +859,139 @@ cdef class HdfsFile(NativeFile):
def __dealloc__(self):
self.parent = None
- def download(self, stream_or_path):
+# ----------------------------------------------------------------------
+# File and stream readers and writers
+
+cdef class _StreamWriter:
+ cdef:
+ shared_ptr[CStreamWriter] writer
+ shared_ptr[OutputStream] sink
+ bint closed
+
+ def __cinit__(self):
+ self.closed = True
+
+ def __dealloc__(self):
+ if not self.closed:
+ self.close()
+
+ def _open(self, sink, Schema schema):
+ get_writer(sink, &self.sink)
+
+ with nogil:
+ check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema,
+ &self.writer))
+
+ self.closed = False
+
+ def write_batch(self, RecordBatch batch):
+ with nogil:
+ check_status(self.writer.get()
+ .WriteRecordBatch(deref(batch.batch)))
+
+ def close(self):
+ with nogil:
+ check_status(self.writer.get().Close())
+ self.closed = True
+
+
+cdef class _StreamReader:
+ cdef:
+ shared_ptr[CStreamReader] reader
+
+ cdef readonly:
+ Schema schema
+
+ def __cinit__(self):
+ pass
+
+ def _open(self, source):
+ cdef:
+ shared_ptr[ReadableFileInterface] reader
+ shared_ptr[InputStream] in_stream
+
+ get_reader(source, &reader)
+ in_stream = <shared_ptr[InputStream]> reader
+
+ with nogil:
+ check_status(CStreamReader.Open(in_stream, &self.reader))
+
+ schema = Schema()
+ schema.init_schema(self.reader.get().schema())
+
+ def get_next_batch(self):
"""
- Read file completely to local path (rather than reading completely into
- memory). First seeks to the beginning of the file.
+ Read next RecordBatch from the stream. Raises StopIteration at end of
+ stream
"""
- cdef:
- int64_t bytes_read = 0
- uint8_t* buf
- self._assert_readable()
+ cdef shared_ptr[CRecordBatch] batch
- write_queue = Queue(50)
+ with nogil:
+ check_status(self.reader.get().GetNextRecordBatch(&batch))
- if not hasattr(stream_or_path, 'read'):
- stream = open(stream_or_path, 'wb')
- cleanup = lambda: stream.close()
- else:
- stream = stream_or_path
- cleanup = lambda: None
+ if batch.get() == NULL:
+ raise StopIteration
- done = False
- exc_info = None
- def bg_write():
- try:
- while not done or write_queue.qsize() > 0:
- try:
- buf = write_queue.get(timeout=0.01)
- except QueueEmpty:
- continue
- stream.write(buf)
- except Exception as e:
- exc_info = sys.exc_info()
- finally:
- cleanup()
+ return batch_from_cbatch(batch)
- self.seek(0)
- writer_thread = threading.Thread(target=bg_write)
+cdef class _FileWriter(_StreamWriter):
- # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
- # the passed buffer, so it's hard for us to avoid doubling the memory
- buf = <uint8_t*> malloc(self.buffer_size)
- if buf == NULL:
- raise MemoryError("Failed to allocate {0} bytes"
- .format(self.buffer_size))
+ def _open(self, sink, Schema schema):
+ cdef shared_ptr[CFileWriter] writer
+ get_writer(sink, &self.sink)
- writer_thread.start()
+ with nogil:
+ check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema,
+ &writer))
- cdef int64_t total_bytes = 0
+ # Cast to base class, because has same interface
+ self.writer = <shared_ptr[CStreamWriter]> writer
+ self.closed = False
- try:
- while True:
- with nogil:
- check_status(self.rd_file.get()
- .Read(self.buffer_size, &bytes_read, buf))
- total_bytes += bytes_read
+cdef class _FileReader:
+ cdef:
+ shared_ptr[CFileReader] reader
- # EOF
- if bytes_read == 0:
- break
+ def __cinit__(self):
+ pass
- pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
- bytes_read)
+ def _open(self, source, footer_offset=None):
+ cdef shared_ptr[ReadableFileInterface] reader
+ get_reader(source, &reader)
- write_queue.put_nowait(pybuf)
- finally:
- free(buf)
- done = True
+ cdef int64_t offset = 0
+ if footer_offset is not None:
+ offset = footer_offset
- writer_thread.join()
- if exc_info is not None:
- raise exc_info[0], exc_info[1], exc_info[2]
+ with nogil:
+ if offset != 0:
+ check_status(CFileReader.Open2(reader, offset, &self.reader))
+ else:
+ check_status(CFileReader.Open(reader, &self.reader))
+
+ property num_dictionaries:
+
+ def __get__(self):
+ return self.reader.get().num_dictionaries()
+
+ property num_record_batches:
+
+ def __get__(self):
+ return self.reader.get().num_record_batches()
+
+ def get_batch(self, int i):
+ cdef shared_ptr[CRecordBatch] batch
+
+ if i < 0 or i >= self.num_record_batches:
+ raise ValueError('Batch number {0} out of range'.format(i))
+
+ with nogil:
+ check_status(self.reader.get().GetRecordBatch(i, &batch))
+
+ return batch_from_cbatch(batch)
+
+ # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
+ # time has passed
+ get_record_batch = get_batch
http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
new file mode 100644
index 0000000..5a56165
--- /dev/null
+++ b/python/pyarrow/ipc.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Arrow file and stream reader/writer classes, and other messaging tools
+
+import pyarrow.io as io
+
+
+class StreamReader(io._StreamReader):
+ """
+ Reader for the Arrow streaming binary format
+
+ Parameters
+ ----------
+ source : str, pyarrow.NativeFile, or file-like Python object
+ Either a file path, or a readable file object
+ """
+ def __init__(self, source):
+ self._open(source)
+
+ def __iter__(self):
+ while True:
+ yield self.get_next_batch()
+
+
+class StreamWriter(io._StreamWriter):
+ """
+ Writer for the Arrow streaming binary format
+
+ Parameters
+ ----------
+ sink : str, pyarrow.NativeFile, or file-like Python object
+ Either a file path, or a writeable file object
+ schema : pyarrow.Schema
+ The Arrow schema for data to be written to the file
+ """
+ def __init__(self, sink, schema):
+ self._open(sink, schema)
+
+
+class FileReader(io._FileReader):
+ """
+ Class for reading Arrow record batch data from the Arrow binary file format
+
+ Parameters
+ ----------
+ source : str, pyarrow.NativeFile, or file-like Python object
+ Either a file path, or a readable file object
+ footer_offset : int, default None
+ If the file is embedded in some larger file, this is the byte offset to
+ the very end of the file data
+ """
+ def __init__(self, source, footer_offset=None):
+ self._open(source, footer_offset=footer_offset)
+
+
+class FileWriter(io._FileWriter):
+ """
+ Writer to create the Arrow binary file format
+
+ Parameters
+ ----------
+ sink : str, pyarrow.NativeFile, or file-like Python object
+ Either a file path, or a writeable file object
+ schema : pyarrow.Schema
+ The Arrow schema for data to be written to the file
+ """
+ def __init__(self, sink, schema):
+ self._open(sink, schema)
http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/ipc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.pyx b/python/pyarrow/ipc.pyx
deleted file mode 100644
index 22069a7..0000000
--- a/python/pyarrow/ipc.pyx
+++ /dev/null
@@ -1,115 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Cython wrappers for arrow::ipc
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from cython.operator cimport dereference as deref
-
-from pyarrow.includes.libarrow cimport *
-from pyarrow.includes.libarrow_io cimport *
-from pyarrow.includes.libarrow_ipc cimport *
-cimport pyarrow.includes.pyarrow as pyarrow
-
-from pyarrow.error cimport check_status
-from pyarrow.io cimport NativeFile, get_reader, get_writer
-from pyarrow.schema cimport Schema
-from pyarrow.table cimport RecordBatch
-
-from pyarrow.compat import frombytes, tobytes
-import pyarrow.io as io
-
-cimport cpython as cp
-
-
-cdef class ArrowFileWriter:
- cdef:
- shared_ptr[CFileWriter] writer
- shared_ptr[OutputStream] sink
- bint closed
-
- def __cinit__(self, sink, Schema schema):
- self.closed = True
- get_writer(sink, &self.sink)
-
- with nogil:
- check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema,
- &self.writer))
-
- self.closed = False
-
- def __dealloc__(self):
- if not self.closed:
- self.close()
-
- def write_record_batch(self, RecordBatch batch):
- with nogil:
- check_status(self.writer.get()
- .WriteRecordBatch(deref(batch.batch)))
-
- def close(self):
- with nogil:
- check_status(self.writer.get().Close())
- self.closed = True
-
-
-cdef class ArrowFileReader:
- cdef:
- shared_ptr[CFileReader] reader
-
- def __cinit__(self, source, footer_offset=None):
- cdef shared_ptr[ReadableFileInterface] reader
- get_reader(source, &reader)
-
- cdef int64_t offset = 0
- if footer_offset is not None:
- offset = footer_offset
-
- with nogil:
- if offset != 0:
- check_status(CFileReader.Open2(reader, offset, &self.reader))
- else:
- check_status(CFileReader.Open(reader, &self.reader))
-
- property num_dictionaries:
-
- def __get__(self):
- return self.reader.get().num_dictionaries()
-
- property num_record_batches:
-
- def __get__(self):
- return self.reader.get().num_record_batches()
-
- def get_record_batch(self, int i):
- cdef:
- shared_ptr[CRecordBatch] batch
- RecordBatch result
-
- if i < 0 or i >= self.num_record_batches:
- raise ValueError('Batch number {0} out of range'.format(i))
-
- with nogil:
- check_status(self.reader.get().GetRecordBatch(i, &batch))
-
- result = RecordBatch()
- result.init(batch)
-
- return result
http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/schema.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/schema.pyx b/python/pyarrow/schema.pyx
index 2bcfec1..52eeeaf 100644
--- a/python/pyarrow/schema.pyx
+++ b/python/pyarrow/schema.pyx
@@ -112,6 +112,7 @@ cdef class Field:
def __get__(self):
return frombytes(self.field.name)
+
cdef class Schema:
def __cinit__(self):
http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/table.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxd b/python/pyarrow/table.pxd
index df3687d..389727b 100644
--- a/python/pyarrow/table.pxd
+++ b/python/pyarrow/table.pxd
@@ -59,3 +59,4 @@ cdef class RecordBatch:
cdef _check_nullptr(self)
cdef api object table_from_ctable(const shared_ptr[CTable]& ctable)
+cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch)
http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/pyarrow/tests/test_ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index bbd6c6a..819d1b7 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -16,21 +16,20 @@
# under the License.
import io
+import pytest
import numpy as np
from pandas.util.testing import assert_frame_equal
import pandas as pd
-import pyarrow as A
-import pyarrow.io as aio
-import pyarrow.ipc as ipc
+from pyarrow.compat import unittest
+import pyarrow as pa
-class RoundtripTest(object):
- # Also tests writing zero-copy NumPy array with additional padding
+class MessagingTest(object):
- def __init__(self):
+ def setUp(self):
self.sink = self._get_sink()
def _get_sink(self):
@@ -39,14 +38,15 @@ class RoundtripTest(object):
def _get_source(self):
return self.sink.getvalue()
- def run(self):
+ def write_batches(self):
nrows = 5
df = pd.DataFrame({
'one': np.random.randn(nrows),
'two': ['foo', np.nan, 'bar', 'bazbaz', 'qux']})
- batch = A.RecordBatch.from_pandas(df)
- writer = ipc.ArrowFileWriter(self.sink, batch.schema)
+ batch = pa.RecordBatch.from_pandas(df)
+
+ writer = self._get_writer(self.sink, batch.schema)
num_batches = 5
frames = []
@@ -55,46 +55,73 @@ class RoundtripTest(object):
unique_df = df.copy()
unique_df['one'] = np.random.randn(nrows)
- batch = A.RecordBatch.from_pandas(unique_df)
- writer.write_record_batch(batch)
+ batch = pa.RecordBatch.from_pandas(unique_df)
+ writer.write_batch(batch)
frames.append(unique_df)
batches.append(batch)
writer.close()
+ return batches
+
+
+class TestFile(MessagingTest, unittest.TestCase):
+ # Also tests writing zero-copy NumPy array with additional padding
+
+ def _get_writer(self, sink, schema):
+ return pa.FileWriter(sink, schema)
+ def test_simple_roundtrip(self):
+ batches = self.write_batches()
file_contents = self._get_source()
- reader = ipc.ArrowFileReader(aio.BufferReader(file_contents))
- assert reader.num_record_batches == num_batches
+ reader = pa.FileReader(pa.BufferReader(file_contents))
- for i in range(num_batches):
+ assert reader.num_record_batches == len(batches)
+
+ for i, batch in enumerate(batches):
# it works. Must convert back to DataFrame
- batch = reader.get_record_batch(i)
+ batch = reader.get_batch(i)
assert batches[i].equals(batch)
-class InMemoryStreamTest(RoundtripTest):
+class TestStream(MessagingTest, unittest.TestCase):
+
+ def _get_writer(self, sink, schema):
+ return pa.StreamWriter(sink, schema)
+
+ def test_simple_roundtrip(self):
+ batches = self.write_batches()
+ file_contents = self._get_source()
+ reader = pa.StreamReader(pa.BufferReader(file_contents))
+
+ total = 0
+ for i, next_batch in enumerate(reader):
+ assert next_batch.equals(batches[i])
+ total += 1
+
+ assert total == len(batches)
+
+ with pytest.raises(StopIteration):
+ reader.get_next_batch()
+
+
+class TestInMemoryFile(TestFile):
def _get_sink(self):
- return aio.InMemoryOutputStream()
+ return pa.InMemoryOutputStream()
def _get_source(self):
return self.sink.get_result()
-def test_ipc_file_simple_roundtrip():
- helper = RoundtripTest()
- helper.run()
-
-
def test_ipc_zero_copy_numpy():
df = pd.DataFrame({'foo': [1.5]})
- batch = A.RecordBatch.from_pandas(df)
- sink = aio.InMemoryOutputStream()
+ batch = pa.RecordBatch.from_pandas(df)
+ sink = pa.InMemoryOutputStream()
write_file(batch, sink)
buffer = sink.get_result()
- reader = aio.BufferReader(buffer)
+ reader = pa.BufferReader(buffer)
batches = read_file(reader)
@@ -103,48 +130,13 @@ def test_ipc_zero_copy_numpy():
assert_frame_equal(df, rdf)
-# XXX: For benchmarking
-
-def big_batch():
- K = 2**4
- N = 2**20
- df = pd.DataFrame(
- np.random.randn(K, N).T,
- columns=[str(i) for i in range(K)]
- )
-
- df = pd.concat([df] * 2 ** 3, ignore_index=True)
- return df
-
-
-def write_to_memory2(batch):
- sink = aio.InMemoryOutputStream()
- write_file(batch, sink)
- return sink.get_result()
-
-
-def write_to_memory(batch):
- sink = io.BytesIO()
- write_file(batch, sink)
- return sink.getvalue()
-
-
def write_file(batch, sink):
- writer = ipc.ArrowFileWriter(sink, batch.schema)
- writer.write_record_batch(batch)
+ writer = pa.FileWriter(sink, batch.schema)
+ writer.write_batch(batch)
writer.close()
def read_file(source):
- reader = ipc.ArrowFileReader(source)
- return [reader.get_record_batch(i)
+ reader = pa.FileReader(source)
+ return [reader.get_batch(i)
for i in range(reader.num_record_batches)]
-
-# df = big_batch()
-# batch = A.RecordBatch.from_pandas(df)
-# mem = write_to_memory(batch)
-# batches = read_file(mem)
-# data = batches[0].to_pandas()
-# rdf = pd.DataFrame(data)
-
-# [x.to_pandas() for x in batches]
http://git-wip-us.apache.org/repos/asf/arrow/blob/1f81adcc/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index de59a92..9c63e93 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -94,7 +94,6 @@ class build_ext(_build_ext):
'config',
'error',
'io',
- 'ipc',
'_parquet',
'scalar',
'schema',