You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2017/04/13 10:51:53 UTC
[3/4] arrow git commit: ARROW-751: [Python] Make all Cython modules
private. Some code tidying
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_io.pyx b/python/pyarrow/_io.pyx
new file mode 100644
index 0000000..9f067fb
--- /dev/null
+++ b/python/pyarrow/_io.pyx
@@ -0,0 +1,1273 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Cython wrappers for IO interfaces defined in arrow::io and messaging in
+# arrow::ipc
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from cython.operator cimport dereference as deref
+from libc.stdlib cimport malloc, free
+from pyarrow.includes.libarrow cimport *
+cimport pyarrow.includes.pyarrow as pyarrow
+from pyarrow._array cimport Array, Tensor, box_tensor, Schema
+from pyarrow._error cimport check_status
+from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
+from pyarrow._table cimport (Column, RecordBatch, batch_from_cbatch,
+ table_from_ctable)
+cimport cpython as cp
+
+import pyarrow._config
+from pyarrow.compat import frombytes, tobytes, encode_file_path
+
+import re
+import six
+import sys
+import threading
+import time
+
+
+# 64K
+DEFAULT_BUFFER_SIZE = 2 ** 16
+
+
+# To let us get a PyObject* and avoid Cython auto-ref-counting
+cdef extern from "Python.h":
+ PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
+ char *v, Py_ssize_t len) except NULL
+
+cdef class NativeFile:
+
+ def __cinit__(self):
+ self.is_open = False
+ self.own_file = False
+
+ def __dealloc__(self):
+ if self.is_open and self.own_file:
+ self.close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ self.close()
+
+ def close(self):
+ if self.is_open:
+ with nogil:
+ if self.is_readable:
+ check_status(self.rd_file.get().Close())
+ else:
+ check_status(self.wr_file.get().Close())
+ self.is_open = False
+
+ cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
+ self._assert_readable()
+ file[0] = <shared_ptr[RandomAccessFile]> self.rd_file
+
+ cdef write_handle(self, shared_ptr[OutputStream]* file):
+ self._assert_writeable()
+ file[0] = <shared_ptr[OutputStream]> self.wr_file
+
+ def _assert_readable(self):
+ if not self.is_readable:
+ raise IOError("only valid on readonly files")
+
+ if not self.is_open:
+ raise IOError("file not open")
+
+ def _assert_writeable(self):
+ if not self.is_writeable:
+ raise IOError("only valid on writeable files")
+
+ if not self.is_open:
+ raise IOError("file not open")
+
+ def size(self):
+ cdef int64_t size
+ self._assert_readable()
+ with nogil:
+ check_status(self.rd_file.get().GetSize(&size))
+ return size
+
+ def tell(self):
+ cdef int64_t position
+ with nogil:
+ if self.is_readable:
+ check_status(self.rd_file.get().Tell(&position))
+ else:
+ check_status(self.wr_file.get().Tell(&position))
+ return position
+
+ def seek(self, int64_t position):
+ self._assert_readable()
+ with nogil:
+ check_status(self.rd_file.get().Seek(position))
+
+ def write(self, data):
+ """
+ Write byte from any object implementing buffer protocol (bytes,
+ bytearray, ndarray, pyarrow.Buffer)
+ """
+ self._assert_writeable()
+
+ if isinstance(data, six.string_types):
+ data = tobytes(data)
+
+ cdef Buffer arrow_buffer = frombuffer(data)
+
+ cdef const uint8_t* buf = arrow_buffer.buffer.get().data()
+ cdef int64_t bufsize = len(arrow_buffer)
+ with nogil:
+ check_status(self.wr_file.get().Write(buf, bufsize))
+
+ def read(self, nbytes=None):
+ cdef:
+ int64_t c_nbytes
+ int64_t bytes_read = 0
+ PyObject* obj
+
+ if nbytes is None:
+ c_nbytes = self.size() - self.tell()
+ else:
+ c_nbytes = nbytes
+
+ self._assert_readable()
+
+ # Allocate empty write space
+ obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes)
+
+ cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
+ with nogil:
+ check_status(self.rd_file.get().Read(c_nbytes, &bytes_read, buf))
+
+ if bytes_read < c_nbytes:
+ cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read)
+
+ return PyObject_to_object(obj)
+
+ def read_buffer(self, nbytes=None):
+ cdef:
+ int64_t c_nbytes
+ int64_t bytes_read = 0
+ shared_ptr[CBuffer] output
+ self._assert_readable()
+
+ if nbytes is None:
+ c_nbytes = self.size() - self.tell()
+ else:
+ c_nbytes = nbytes
+
+ with nogil:
+ check_status(self.rd_file.get().ReadB(c_nbytes, &output))
+
+ return wrap_buffer(output)
+
+ def download(self, stream_or_path, buffer_size=None):
+ """
+ Read file completely to local path (rather than reading completely into
+ memory). First seeks to the beginning of the file.
+ """
+ cdef:
+ int64_t bytes_read = 0
+ uint8_t* buf
+ self._assert_readable()
+
+ buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
+
+ write_queue = Queue(50)
+
+ if not hasattr(stream_or_path, 'read'):
+ stream = open(stream_or_path, 'wb')
+ cleanup = lambda: stream.close()
+ else:
+ stream = stream_or_path
+ cleanup = lambda: None
+
+ done = False
+ exc_info = None
+ def bg_write():
+ try:
+ while not done or write_queue.qsize() > 0:
+ try:
+ buf = write_queue.get(timeout=0.01)
+ except QueueEmpty:
+ continue
+ stream.write(buf)
+ except Exception as e:
+ exc_info = sys.exc_info()
+ finally:
+ cleanup()
+
+ self.seek(0)
+
+ writer_thread = threading.Thread(target=bg_write)
+
+ # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
+ # the passed buffer, so it's hard for us to avoid doubling the memory
+ buf = <uint8_t*> malloc(buffer_size)
+ if buf == NULL:
+ raise MemoryError("Failed to allocate {0} bytes"
+ .format(buffer_size))
+
+ writer_thread.start()
+
+ cdef int64_t total_bytes = 0
+ cdef int32_t c_buffer_size = buffer_size
+
+ try:
+ while True:
+ with nogil:
+ check_status(self.rd_file.get()
+ .Read(c_buffer_size, &bytes_read, buf))
+
+ total_bytes += bytes_read
+
+ # EOF
+ if bytes_read == 0:
+ break
+
+ pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
+ bytes_read)
+
+ write_queue.put_nowait(pybuf)
+ finally:
+ free(buf)
+ done = True
+
+ writer_thread.join()
+ if exc_info is not None:
+ raise exc_info[0], exc_info[1], exc_info[2]
+
+ def upload(self, stream, buffer_size=None):
+ """
+ Pipe file-like object to file
+ """
+ write_queue = Queue(50)
+ self._assert_writeable()
+
+ buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
+
+ done = False
+ exc_info = None
+ def bg_write():
+ try:
+ while not done or write_queue.qsize() > 0:
+ try:
+ buf = write_queue.get(timeout=0.01)
+ except QueueEmpty:
+ continue
+
+ self.write(buf)
+
+ except Exception as e:
+ exc_info = sys.exc_info()
+
+ writer_thread = threading.Thread(target=bg_write)
+ writer_thread.start()
+
+ try:
+ while True:
+ buf = stream.read(buffer_size)
+ if not buf:
+ break
+
+ if writer_thread.is_alive():
+ while write_queue.full():
+ time.sleep(0.01)
+ else:
+ break
+
+ write_queue.put_nowait(buf)
+ finally:
+ done = True
+
+ writer_thread.join()
+ if exc_info is not None:
+ raise exc_info[0], exc_info[1], exc_info[2]
+
+
+# ----------------------------------------------------------------------
+# Python file-like objects
+
+
+cdef class PythonFileInterface(NativeFile):
+ cdef:
+ object handle
+
+ def __cinit__(self, handle, mode='w'):
+ self.handle = handle
+
+ if mode.startswith('w'):
+ self.wr_file.reset(new pyarrow.PyOutputStream(handle))
+ self.is_readable = 0
+ self.is_writeable = 1
+ elif mode.startswith('r'):
+ self.rd_file.reset(new pyarrow.PyReadableFile(handle))
+ self.is_readable = 1
+ self.is_writeable = 0
+ else:
+ raise ValueError('Invalid file mode: {0}'.format(mode))
+
+ self.is_open = True
+
+
+cdef class MemoryMappedFile(NativeFile):
+ """
+ Supports 'r', 'r+w', 'w' modes
+ """
+ cdef:
+ object path
+
+ def __cinit__(self):
+ self.is_open = False
+ self.is_readable = 0
+ self.is_writeable = 0
+
+ @staticmethod
+ def create(path, size):
+ cdef:
+ shared_ptr[CMemoryMappedFile] handle
+ c_string c_path = encode_file_path(path)
+ int64_t c_size = size
+
+ with nogil:
+ check_status(CMemoryMappedFile.Create(c_path, c_size, &handle))
+
+ cdef MemoryMappedFile result = MemoryMappedFile()
+ result.path = path
+ result.is_readable = 1
+ result.is_writeable = 1
+ result.wr_file = <shared_ptr[OutputStream]> handle
+ result.rd_file = <shared_ptr[RandomAccessFile]> handle
+ result.is_open = True
+
+ return result
+
+ def open(self, path, mode='r'):
+ self.path = path
+
+ cdef:
+ FileMode c_mode
+ shared_ptr[CMemoryMappedFile] handle
+ c_string c_path = encode_file_path(path)
+
+ if mode in ('r', 'rb'):
+ c_mode = FileMode_READ
+ self.is_readable = 1
+ elif mode in ('w', 'wb'):
+ c_mode = FileMode_WRITE
+ self.is_writeable = 1
+ elif mode == 'r+w':
+ c_mode = FileMode_READWRITE
+ self.is_readable = 1
+ self.is_writeable = 1
+ else:
+ raise ValueError('Invalid file mode: {0}'.format(mode))
+
+ check_status(CMemoryMappedFile.Open(c_path, c_mode, &handle))
+
+ self.wr_file = <shared_ptr[OutputStream]> handle
+ self.rd_file = <shared_ptr[RandomAccessFile]> handle
+ self.is_open = True
+
+
+def memory_map(path, mode='r'):
+ """
+ Open memory map at file path. Size of the memory map cannot change
+
+ Parameters
+ ----------
+ path : string
+ mode : {'r', 'w'}, default 'r'
+
+ Returns
+ -------
+ mmap : MemoryMappedFile
+ """
+ cdef MemoryMappedFile mmap = MemoryMappedFile()
+ mmap.open(path, mode)
+ return mmap
+
+
+def create_memory_map(path, size):
+ """
+ Create memory map at indicated path of the given size, return open
+ writeable file object
+
+ Parameters
+ ----------
+ path : string
+ size : int
+
+ Returns
+ -------
+ mmap : MemoryMappedFile
+ """
+ return MemoryMappedFile.create(path, size)
+
+
+cdef class OSFile(NativeFile):
+ """
+ Supports 'r', 'w' modes
+ """
+ cdef:
+ object path
+
+ def __cinit__(self, path, mode='r', MemoryPool memory_pool=None):
+ self.path = path
+
+ cdef:
+ FileMode c_mode
+ shared_ptr[Readable] handle
+ c_string c_path = encode_file_path(path)
+
+ self.is_readable = self.is_writeable = 0
+
+ if mode in ('r', 'rb'):
+ self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
+ elif mode in ('w', 'wb'):
+ self._open_writeable(c_path)
+ else:
+ raise ValueError('Invalid file mode: {0}'.format(mode))
+
+ self.is_open = True
+
+ cdef _open_readable(self, c_string path, CMemoryPool* pool):
+ cdef shared_ptr[ReadableFile] handle
+
+ with nogil:
+ check_status(ReadableFile.Open(path, pool, &handle))
+
+ self.is_readable = 1
+ self.rd_file = <shared_ptr[RandomAccessFile]> handle
+
+ cdef _open_writeable(self, c_string path):
+ cdef shared_ptr[FileOutputStream] handle
+
+ with nogil:
+ check_status(FileOutputStream.Open(path, &handle))
+ self.is_writeable = 1
+ self.wr_file = <shared_ptr[OutputStream]> handle
+
+
+# ----------------------------------------------------------------------
+# Arrow buffers
+
+
+cdef class Buffer:
+
+ def __cinit__(self):
+ pass
+
+ cdef init(self, const shared_ptr[CBuffer]& buffer):
+ self.buffer = buffer
+ self.shape[0] = self.size
+ self.strides[0] = <Py_ssize_t>(1)
+
+ def __len__(self):
+ return self.size
+
+ property size:
+
+ def __get__(self):
+ return self.buffer.get().size()
+
+ property parent:
+
+ def __get__(self):
+ cdef shared_ptr[CBuffer] parent_buf = self.buffer.get().parent()
+
+ if parent_buf.get() == NULL:
+ return None
+ else:
+ return wrap_buffer(parent_buf)
+
+ def __getitem__(self, key):
+ # TODO(wesm): buffer slicing
+ raise NotImplementedError
+
+ def to_pybytes(self):
+ return cp.PyBytes_FromStringAndSize(
+ <const char*>self.buffer.get().data(),
+ self.buffer.get().size())
+
+ def __getbuffer__(self, cp.Py_buffer* buffer, int flags):
+
+ buffer.buf = <char *>self.buffer.get().data()
+ buffer.format = 'b'
+ buffer.internal = NULL
+ buffer.itemsize = 1
+ buffer.len = self.size
+ buffer.ndim = 1
+ buffer.obj = self
+ buffer.readonly = 1
+ buffer.shape = self.shape
+ buffer.strides = self.strides
+ buffer.suboffsets = NULL
+
+cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool):
+ cdef shared_ptr[PoolBuffer] result
+ result.reset(new PoolBuffer(pool))
+ return result
+
+
+cdef class InMemoryOutputStream(NativeFile):
+
+ cdef:
+ shared_ptr[PoolBuffer] buffer
+
+ def __cinit__(self, MemoryPool memory_pool=None):
+ self.buffer = allocate_buffer(maybe_unbox_memory_pool(memory_pool))
+ self.wr_file.reset(new BufferOutputStream(
+ <shared_ptr[ResizableBuffer]> self.buffer))
+ self.is_readable = 0
+ self.is_writeable = 1
+ self.is_open = True
+
+ def get_result(self):
+ check_status(self.wr_file.get().Close())
+ self.is_open = False
+ return wrap_buffer(<shared_ptr[CBuffer]> self.buffer)
+
+
+cdef class BufferReader(NativeFile):
+ """
+ Zero-copy reader from objects convertible to Arrow buffer
+
+ Parameters
+ ----------
+ obj : Python bytes or pyarrow.io.Buffer
+ """
+ cdef:
+ Buffer buffer
+
+ def __cinit__(self, object obj):
+
+ if isinstance(obj, Buffer):
+ self.buffer = obj
+ else:
+ self.buffer = frombuffer(obj)
+
+ self.rd_file.reset(new CBufferReader(self.buffer.buffer))
+ self.is_readable = 1
+ self.is_writeable = 0
+ self.is_open = True
+
+
+def frombuffer(object obj):
+ """
+ Construct an Arrow buffer from a Python bytes object
+ """
+ cdef shared_ptr[CBuffer] buf
+ try:
+ memoryview(obj)
+ buf.reset(new pyarrow.PyBuffer(obj))
+ return wrap_buffer(buf)
+ except TypeError:
+ raise ValueError('Must pass object that implements buffer protocol')
+
+
+
+cdef Buffer wrap_buffer(const shared_ptr[CBuffer]& buf):
+ cdef Buffer result = Buffer()
+ result.init(buf)
+ return result
+
+
+cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader):
+ cdef NativeFile nf
+
+ if isinstance(source, six.string_types):
+ source = memory_map(source, mode='r')
+ elif isinstance(source, Buffer):
+ source = BufferReader(source)
+ elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
+ # Optimistically hope this is file-like
+ source = PythonFileInterface(source, mode='r')
+
+ if isinstance(source, NativeFile):
+ nf = source
+
+ # TODO: what about read-write sources (e.g. memory maps)
+ if not nf.is_readable:
+ raise IOError('Native file is not readable')
+
+ nf.read_handle(reader)
+ else:
+ raise TypeError('Unable to read from object of type: {0}'
+ .format(type(source)))
+
+
+cdef get_writer(object source, shared_ptr[OutputStream]* writer):
+ cdef NativeFile nf
+
+ if isinstance(source, six.string_types):
+ source = OSFile(source, mode='w')
+ elif not isinstance(source, NativeFile) and hasattr(source, 'write'):
+ # Optimistically hope this is file-like
+ source = PythonFileInterface(source, mode='w')
+
+ if isinstance(source, NativeFile):
+ nf = source
+
+ if nf.is_readable:
+ raise IOError('Native file is not writeable')
+
+ nf.write_handle(writer)
+ else:
+ raise TypeError('Unable to read from object of type: {0}'
+ .format(type(source)))
+
+# ----------------------------------------------------------------------
+# HDFS IO implementation
+
+_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)')
+
+try:
+ # Python 3
+ from queue import Queue, Empty as QueueEmpty, Full as QueueFull
+except ImportError:
+ from Queue import Queue, Empty as QueueEmpty, Full as QueueFull
+
+
+def have_libhdfs():
+ try:
+ check_status(HaveLibHdfs())
+ return True
+ except:
+ return False
+
+
+def have_libhdfs3():
+ try:
+ check_status(HaveLibHdfs3())
+ return True
+ except:
+ return False
+
+
+def strip_hdfs_abspath(path):
+ m = _HDFS_PATH_RE.match(path)
+ if m:
+ return m.group(3)
+ else:
+ return path
+
+
+cdef class _HdfsClient:
+ cdef:
+ shared_ptr[CHdfsClient] client
+
+ cdef readonly:
+ bint is_open
+
+ def __cinit__(self):
+ pass
+
+ def _connect(self, host, port, user, kerb_ticket, driver):
+ cdef HdfsConnectionConfig conf
+
+ if host is not None:
+ conf.host = tobytes(host)
+ conf.port = port
+ if user is not None:
+ conf.user = tobytes(user)
+ if kerb_ticket is not None:
+ conf.kerb_ticket = tobytes(kerb_ticket)
+
+ if driver == 'libhdfs':
+ check_status(HaveLibHdfs())
+ conf.driver = HdfsDriver_LIBHDFS
+ else:
+ check_status(HaveLibHdfs3())
+ conf.driver = HdfsDriver_LIBHDFS3
+
+ with nogil:
+ check_status(CHdfsClient.Connect(&conf, &self.client))
+ self.is_open = True
+
+ @classmethod
+ def connect(cls, *args, **kwargs):
+ return cls(*args, **kwargs)
+
+ def __dealloc__(self):
+ if self.is_open:
+ self.close()
+
+ def close(self):
+ """
+ Disconnect from the HDFS cluster
+ """
+ self._ensure_client()
+ with nogil:
+ check_status(self.client.get().Disconnect())
+ self.is_open = False
+
+ cdef _ensure_client(self):
+ if self.client.get() == NULL:
+ raise IOError('HDFS client improperly initialized')
+ elif not self.is_open:
+ raise IOError('HDFS client is closed')
+
+ def exists(self, path):
+ """
+ Returns True if the path is known to the cluster, False if it does not
+ (or there is an RPC error)
+ """
+ self._ensure_client()
+
+ cdef c_string c_path = tobytes(path)
+ cdef c_bool result
+ with nogil:
+ result = self.client.get().Exists(c_path)
+ return result
+
+ def isdir(self, path):
+ cdef HdfsPathInfo info
+ self._path_info(path, &info)
+ return info.kind == ObjectType_DIRECTORY
+
+ def isfile(self, path):
+ cdef HdfsPathInfo info
+ self._path_info(path, &info)
+ return info.kind == ObjectType_FILE
+
+ cdef _path_info(self, path, HdfsPathInfo* info):
+ cdef c_string c_path = tobytes(path)
+
+ with nogil:
+ check_status(self.client.get()
+ .GetPathInfo(c_path, info))
+
+
+ def ls(self, path, bint full_info):
+ cdef:
+ c_string c_path = tobytes(path)
+ vector[HdfsPathInfo] listing
+ list results = []
+ int i
+
+ self._ensure_client()
+
+ with nogil:
+ check_status(self.client.get()
+ .ListDirectory(c_path, &listing))
+
+ cdef const HdfsPathInfo* info
+ for i in range(<int> listing.size()):
+ info = &listing[i]
+
+ # Try to trim off the hdfs://HOST:PORT piece
+ name = strip_hdfs_abspath(frombytes(info.name))
+
+ if full_info:
+ kind = ('file' if info.kind == ObjectType_FILE
+ else 'directory')
+
+ results.append({
+ 'kind': kind,
+ 'name': name,
+ 'owner': frombytes(info.owner),
+ 'group': frombytes(info.group),
+ 'list_modified_time': info.last_modified_time,
+ 'list_access_time': info.last_access_time,
+ 'size': info.size,
+ 'replication': info.replication,
+ 'block_size': info.block_size,
+ 'permissions': info.permissions
+ })
+ else:
+ results.append(name)
+
+ return results
+
+ def mkdir(self, path):
+ """
+ Create indicated directory and any necessary parent directories
+ """
+ self._ensure_client()
+
+ cdef c_string c_path = tobytes(path)
+ with nogil:
+ check_status(self.client.get()
+ .CreateDirectory(c_path))
+
+ def delete(self, path, bint recursive=False):
+ """
+ Delete the indicated file or directory
+
+ Parameters
+ ----------
+ path : string
+ recursive : boolean, default False
+ If True, also delete child paths for directories
+ """
+ self._ensure_client()
+
+ cdef c_string c_path = tobytes(path)
+ with nogil:
+ check_status(self.client.get()
+ .Delete(c_path, recursive))
+
+ def open(self, path, mode='rb', buffer_size=None, replication=None,
+ default_block_size=None):
+ """
+ Parameters
+ ----------
+ mode : string, 'rb', 'wb', 'ab'
+ """
+ self._ensure_client()
+
+ cdef HdfsFile out = HdfsFile()
+
+ if mode not in ('rb', 'wb', 'ab'):
+ raise Exception("Mode must be 'rb' (read), "
+ "'wb' (write, new file), or 'ab' (append)")
+
+ cdef c_string c_path = tobytes(path)
+ cdef c_bool append = False
+
+ # 0 in libhdfs means "use the default"
+ cdef int32_t c_buffer_size = buffer_size or 0
+ cdef int16_t c_replication = replication or 0
+ cdef int64_t c_default_block_size = default_block_size or 0
+
+ cdef shared_ptr[HdfsOutputStream] wr_handle
+ cdef shared_ptr[HdfsReadableFile] rd_handle
+
+ if mode in ('wb', 'ab'):
+ if mode == 'ab':
+ append = True
+
+ with nogil:
+ check_status(
+ self.client.get()
+ .OpenWriteable(c_path, append, c_buffer_size,
+ c_replication, c_default_block_size,
+ &wr_handle))
+
+ out.wr_file = <shared_ptr[OutputStream]> wr_handle
+
+ out.is_readable = False
+ out.is_writeable = 1
+ else:
+ with nogil:
+ check_status(self.client.get()
+ .OpenReadable(c_path, &rd_handle))
+
+ out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle
+ out.is_readable = True
+ out.is_writeable = 0
+
+ if c_buffer_size == 0:
+ c_buffer_size = 2 ** 16
+
+ out.mode = mode
+ out.buffer_size = c_buffer_size
+ out.parent = _HdfsFileNanny(self, out)
+ out.is_open = True
+ out.own_file = True
+
+ return out
+
+ def download(self, path, stream, buffer_size=None):
+ with self.open(path, 'rb') as f:
+ f.download(stream, buffer_size=buffer_size)
+
+ def upload(self, path, stream, buffer_size=None):
+ """
+ Upload file-like object to HDFS path
+ """
+ with self.open(path, 'wb') as f:
+ f.upload(stream, buffer_size=buffer_size)
+
+
+# ARROW-404: Helper class to ensure that files are closed before the
+# client. During deallocation of the extension class, the attributes are
+# decref'd which can cause the client to get closed first if the file has the
+# last remaining reference
+cdef class _HdfsFileNanny:
+ cdef:
+ object client
+ object file_handle_ref
+
+ def __cinit__(self, client, file_handle):
+ import weakref
+ self.client = client
+ self.file_handle_ref = weakref.ref(file_handle)
+
+ def __dealloc__(self):
+ fh = self.file_handle_ref()
+ if fh:
+ fh.close()
+ # avoid cyclic GC
+ self.file_handle_ref = None
+ self.client = None
+
+
+cdef class HdfsFile(NativeFile):
+ cdef readonly:
+ int32_t buffer_size
+ object mode
+ object parent
+
+ cdef object __weakref__
+
+ def __dealloc__(self):
+ self.parent = None
+
+# ----------------------------------------------------------------------
+# File and stream readers and writers
+
+cdef class _StreamWriter:
+ cdef:
+ shared_ptr[CStreamWriter] writer
+ shared_ptr[OutputStream] sink
+ bint closed
+
+ def __cinit__(self):
+ self.closed = True
+
+ def __dealloc__(self):
+ if not self.closed:
+ self.close()
+
+ def _open(self, sink, Schema schema):
+ get_writer(sink, &self.sink)
+
+ with nogil:
+ check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema,
+ &self.writer))
+
+ self.closed = False
+
+ def write_batch(self, RecordBatch batch):
+ with nogil:
+ check_status(self.writer.get()
+ .WriteRecordBatch(deref(batch.batch)))
+
+ def close(self):
+ with nogil:
+ check_status(self.writer.get().Close())
+ self.closed = True
+
+
+cdef class _StreamReader:
+ cdef:
+ shared_ptr[CStreamReader] reader
+
+ cdef readonly:
+ Schema schema
+
+ def __cinit__(self):
+ pass
+
+ def _open(self, source):
+ cdef:
+ shared_ptr[RandomAccessFile] reader
+ shared_ptr[InputStream] in_stream
+
+ get_reader(source, &reader)
+ in_stream = <shared_ptr[InputStream]> reader
+
+ with nogil:
+ check_status(CStreamReader.Open(in_stream, &self.reader))
+
+ self.schema = Schema()
+ self.schema.init_schema(self.reader.get().schema())
+
+ def get_next_batch(self):
+ """
+ Read next RecordBatch from the stream. Raises StopIteration at end of
+ stream
+ """
+ cdef shared_ptr[CRecordBatch] batch
+
+ with nogil:
+ check_status(self.reader.get().GetNextRecordBatch(&batch))
+
+ if batch.get() == NULL:
+ raise StopIteration
+
+ return batch_from_cbatch(batch)
+
+ def read_all(self):
+ """
+ Read all record batches as a pyarrow.Table
+ """
+ cdef:
+ vector[shared_ptr[CRecordBatch]] batches
+ shared_ptr[CRecordBatch] batch
+ shared_ptr[CTable] table
+
+ with nogil:
+ while True:
+ check_status(self.reader.get().GetNextRecordBatch(&batch))
+ if batch.get() == NULL:
+ break
+ batches.push_back(batch)
+
+ check_status(CTable.FromRecordBatches(batches, &table))
+
+ return table_from_ctable(table)
+
+
+cdef class _FileWriter(_StreamWriter):
+
+ def _open(self, sink, Schema schema):
+ cdef shared_ptr[CFileWriter] writer
+ get_writer(sink, &self.sink)
+
+ with nogil:
+ check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema,
+ &writer))
+
+ # Cast to base class, because has same interface
+ self.writer = <shared_ptr[CStreamWriter]> writer
+ self.closed = False
+
+
+cdef class _FileReader:
+ cdef:
+ shared_ptr[CFileReader] reader
+
+ def __cinit__(self):
+ pass
+
+ def _open(self, source, footer_offset=None):
+ cdef shared_ptr[RandomAccessFile] reader
+ get_reader(source, &reader)
+
+ cdef int64_t offset = 0
+ if footer_offset is not None:
+ offset = footer_offset
+
+ with nogil:
+ if offset != 0:
+ check_status(CFileReader.Open2(reader, offset, &self.reader))
+ else:
+ check_status(CFileReader.Open(reader, &self.reader))
+
+ property num_record_batches:
+
+ def __get__(self):
+ return self.reader.get().num_record_batches()
+
+ def get_batch(self, int i):
+ cdef shared_ptr[CRecordBatch] batch
+
+ if i < 0 or i >= self.num_record_batches:
+ raise ValueError('Batch number {0} out of range'.format(i))
+
+ with nogil:
+ check_status(self.reader.get().GetRecordBatch(i, &batch))
+
+ return batch_from_cbatch(batch)
+
+ # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
+ # time has passed
+ get_record_batch = get_batch
+
+ def read_all(self):
+ """
+ Read all record batches as a pyarrow.Table
+ """
+ cdef:
+ vector[shared_ptr[CRecordBatch]] batches
+ shared_ptr[CTable] table
+ int i, nbatches
+
+ nbatches = self.num_record_batches
+
+ batches.resize(nbatches)
+ with nogil:
+ for i in range(nbatches):
+ check_status(self.reader.get().GetRecordBatch(i, &batches[i]))
+ check_status(CTable.FromRecordBatches(batches, &table))
+
+ return table_from_ctable(table)
+
+
+#----------------------------------------------------------------------
+# Implement legacy Feather file format
+
+
+class FeatherError(Exception):
+ pass
+
+
+cdef class FeatherWriter:
+ cdef:
+ unique_ptr[CFeatherWriter] writer
+
+ cdef public:
+ int64_t num_rows
+
+ def __cinit__(self):
+ self.num_rows = -1
+
+ def open(self, object dest):
+ cdef shared_ptr[OutputStream] sink
+ get_writer(dest, &sink)
+
+ with nogil:
+ check_status(CFeatherWriter.Open(sink, &self.writer))
+
+ def close(self):
+ if self.num_rows < 0:
+ self.num_rows = 0
+ self.writer.get().SetNumRows(self.num_rows)
+ check_status(self.writer.get().Finalize())
+
+ def write_array(self, object name, object col, object mask=None):
+ cdef Array arr
+
+ if self.num_rows >= 0:
+ if len(col) != self.num_rows:
+ raise ValueError('prior column had a different number of rows')
+ else:
+ self.num_rows = len(col)
+
+ if isinstance(col, Array):
+ arr = col
+ else:
+ arr = Array.from_numpy(col, mask=mask)
+
+ cdef c_string c_name = tobytes(name)
+
+ with nogil:
+ check_status(
+ self.writer.get().Append(c_name, deref(arr.sp_array)))
+
+
+cdef class FeatherReader:
+ cdef:
+ unique_ptr[CFeatherReader] reader
+
+ def __cinit__(self):
+ pass
+
+ def open(self, source):
+ cdef shared_ptr[RandomAccessFile] reader
+ get_reader(source, &reader)
+
+ with nogil:
+ check_status(CFeatherReader.Open(reader, &self.reader))
+
+ property num_rows:
+
+ def __get__(self):
+ return self.reader.get().num_rows()
+
+ property num_columns:
+
+ def __get__(self):
+ return self.reader.get().num_columns()
+
+ def get_column_name(self, int i):
+ cdef c_string name = self.reader.get().GetColumnName(i)
+ return frombytes(name)
+
+ def get_column(self, int i):
+ if i < 0 or i >= self.num_columns:
+ raise IndexError(i)
+
+ cdef shared_ptr[CColumn] sp_column
+ with nogil:
+ check_status(self.reader.get()
+ .GetColumn(i, &sp_column))
+
+ cdef Column col = Column()
+ col.init(sp_column)
+ return col
+
+
+def get_tensor_size(Tensor tensor):
+ """
+ Return total size of serialized Tensor including metadata and padding
+ """
+ cdef int64_t size
+ with nogil:
+ check_status(GetTensorSize(deref(tensor.tp), &size))
+ return size
+
+
+def get_record_batch_size(RecordBatch batch):
+ """
+ Return total size of serialized RecordBatch including metadata and padding
+ """
+ cdef int64_t size
+ with nogil:
+ check_status(GetRecordBatchSize(deref(batch.batch), &size))
+ return size
+
+
+def write_tensor(Tensor tensor, NativeFile dest):
+ """
+ Write pyarrow.Tensor to pyarrow.NativeFile object its current position
+
+ Parameters
+ ----------
+ tensor : pyarrow.Tensor
+ dest : pyarrow.NativeFile
+
+ Returns
+ -------
+ bytes_written : int
+ Total number of bytes written to the file
+ """
+ cdef:
+ int32_t metadata_length
+ int64_t body_length
+
+ dest._assert_writeable()
+
+ with nogil:
+ check_status(
+ WriteTensor(deref(tensor.tp), dest.wr_file.get(),
+ &metadata_length, &body_length))
+
+ return metadata_length + body_length
+
+
+def read_tensor(NativeFile source):
+ """
+ Read pyarrow.Tensor from pyarrow.NativeFile object from current
+ position. If the file source supports zero copy (e.g. a memory map), then
+ this operation does not allocate any memory
+
+ Parameters
+ ----------
+ source : pyarrow.NativeFile
+
+ Returns
+ -------
+ tensor : Tensor
+ """
+ cdef:
+ shared_ptr[CTensor] sp_tensor
+
+ source._assert_writeable()
+
+ cdef int64_t offset = source.tell()
+ with nogil:
+ check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor))
+
+ return box_tensor(sp_tensor)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_jemalloc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_jemalloc.pyx b/python/pyarrow/_jemalloc.pyx
new file mode 100644
index 0000000..3b41964
--- /dev/null
+++ b/python/pyarrow/_jemalloc.pyx
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from pyarrow.includes.libarrow_jemalloc cimport CJemallocMemoryPool
+from pyarrow._memory cimport MemoryPool
+
+def default_pool():
+ cdef MemoryPool pool = MemoryPool()
+ pool.init(CJemallocMemoryPool.default_pool())
+ return pool
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_memory.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_memory.pxd b/python/pyarrow/_memory.pxd
new file mode 100644
index 0000000..bb1af85
--- /dev/null
+++ b/python/pyarrow/_memory.pxd
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool
+
+
+cdef class MemoryPool:
+ cdef:
+ CMemoryPool* pool
+
+ cdef init(self, CMemoryPool* pool)
+
+cdef class LoggingMemoryPool(MemoryPool):
+ pass
+
+cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_memory.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_memory.pyx b/python/pyarrow/_memory.pyx
new file mode 100644
index 0000000..98dbf66
--- /dev/null
+++ b/python/pyarrow/_memory.pyx
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool
+from pyarrow.includes.pyarrow cimport set_default_memory_pool, get_memory_pool
+
+cdef class MemoryPool:
+ cdef init(self, CMemoryPool* pool):
+ self.pool = pool
+
+ def bytes_allocated(self):
+ return self.pool.bytes_allocated()
+
+cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool):
+ if memory_pool is None:
+ return get_memory_pool()
+ else:
+ return memory_pool.pool
+
+cdef class LoggingMemoryPool(MemoryPool):
+ pass
+
+def default_pool():
+ cdef:
+ MemoryPool pool = MemoryPool()
+ pool.init(get_memory_pool())
+ return pool
+
+def set_default_pool(MemoryPool pool):
+ set_default_memory_pool(pool.pool)
+
+def total_allocated_bytes():
+ cdef CMemoryPool* pool = get_memory_pool()
+ return pool.bytes_allocated()
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 079bf5e..5418e1d 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -20,20 +20,18 @@
# cython: embedsignature = True
from cython.operator cimport dereference as deref
-
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
cimport pyarrow.includes.pyarrow as pyarrow
+from pyarrow._array cimport Array
+from pyarrow._error cimport check_status
+from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
+from pyarrow._table cimport Table, table_from_ctable
+from pyarrow._io cimport NativeFile, get_reader, get_writer
-from pyarrow.array cimport Array
from pyarrow.compat import tobytes, frombytes
-from pyarrow.error import ArrowException
-from pyarrow.error cimport check_status
-from pyarrow.io import NativeFile
-from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool
-from pyarrow.table cimport Table, table_from_ctable
-
-from pyarrow.io cimport NativeFile, get_reader, get_writer
+from pyarrow._error import ArrowException
+from pyarrow._io import NativeFile
import six
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_table.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_table.pxd b/python/pyarrow/_table.pxd
new file mode 100644
index 0000000..e61e90d
--- /dev/null
+++ b/python/pyarrow/_table.pxd
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.includes.common cimport shared_ptr
+from pyarrow.includes.libarrow cimport (CChunkedArray, CColumn, CTable,
+ CRecordBatch)
+from pyarrow._array cimport Schema
+
+
+cdef class ChunkedArray:
+ cdef:
+ shared_ptr[CChunkedArray] sp_chunked_array
+ CChunkedArray* chunked_array
+
+ cdef init(self, const shared_ptr[CChunkedArray]& chunked_array)
+ cdef _check_nullptr(self)
+
+
+cdef class Column:
+ cdef:
+ shared_ptr[CColumn] sp_column
+ CColumn* column
+
+ cdef init(self, const shared_ptr[CColumn]& column)
+ cdef _check_nullptr(self)
+
+
+cdef class Table:
+ cdef:
+ shared_ptr[CTable] sp_table
+ CTable* table
+
+ cdef init(self, const shared_ptr[CTable]& table)
+ cdef _check_nullptr(self)
+
+
+cdef class RecordBatch:
+ cdef:
+ shared_ptr[CRecordBatch] sp_batch
+ CRecordBatch* batch
+ Schema _schema
+
+ cdef init(self, const shared_ptr[CRecordBatch]& table)
+ cdef _check_nullptr(self)
+
+cdef object box_column(const shared_ptr[CColumn]& ccolumn)
+cdef api object table_from_ctable(const shared_ptr[CTable]& ctable)
+cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_table.pyx b/python/pyarrow/_table.pyx
new file mode 100644
index 0000000..6558b2e
--- /dev/null
+++ b/python/pyarrow/_table.pyx
@@ -0,0 +1,913 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from cython.operator cimport dereference as deref
+
+from pyarrow.includes.libarrow cimport *
+from pyarrow.includes.common cimport *
+cimport pyarrow.includes.pyarrow as pyarrow
+from pyarrow._array cimport (Array, box_array, wrap_array_output,
+ box_data_type, box_schema, DataType, Field)
+from pyarrow._error cimport check_status
+cimport cpython
+
+import pyarrow._config
+from pyarrow._error import ArrowException
+from pyarrow._array import field
+from pyarrow.compat import frombytes, tobytes
+
+
+from collections import OrderedDict
+
+
+cdef _pandas():
+ import pandas as pd
+ return pd
+
+
+cdef class ChunkedArray:
+ """
+ Array backed via one or more memory chunks.
+
+ Warning
+ -------
+ Do not call this class's constructor directly.
+ """
+
+ def __cinit__(self):
+ self.chunked_array = NULL
+
+ cdef init(self, const shared_ptr[CChunkedArray]& chunked_array):
+ self.sp_chunked_array = chunked_array
+ self.chunked_array = chunked_array.get()
+
+ cdef _check_nullptr(self):
+ if self.chunked_array == NULL:
+ raise ReferenceError("ChunkedArray object references a NULL "
+ "pointer. Not initialized.")
+
+ def length(self):
+ self._check_nullptr()
+ return self.chunked_array.length()
+
+ def __len__(self):
+ return self.length()
+
+ @property
+ def null_count(self):
+ """
+ Number of null entires
+
+ Returns
+ -------
+ int
+ """
+ self._check_nullptr()
+ return self.chunked_array.null_count()
+
+ @property
+ def num_chunks(self):
+ """
+ Number of underlying chunks
+
+ Returns
+ -------
+ int
+ """
+ self._check_nullptr()
+ return self.chunked_array.num_chunks()
+
+ def chunk(self, i):
+ """
+ Select a chunk by its index
+
+ Parameters
+ ----------
+ i : int
+
+ Returns
+ -------
+ pyarrow.array.Array
+ """
+ self._check_nullptr()
+ return box_array(self.chunked_array.chunk(i))
+
+ def iterchunks(self):
+ for i in range(self.num_chunks):
+ yield self.chunk(i)
+
+ def to_pylist(self):
+ """
+ Convert to a list of native Python objects.
+ """
+ result = []
+ for i in range(self.num_chunks):
+ result += self.chunk(i).to_pylist()
+ return result
+
+
+cdef class Column:
+ """
+ Named vector of elements of equal type.
+
+ Warning
+ -------
+ Do not call this class's constructor directly.
+ """
+
+ def __cinit__(self):
+ self.column = NULL
+
+ cdef init(self, const shared_ptr[CColumn]& column):
+ self.sp_column = column
+ self.column = column.get()
+
+ @staticmethod
+ def from_array(object field_or_name, Array arr):
+ cdef Field boxed_field
+
+ if isinstance(field_or_name, Field):
+ boxed_field = field_or_name
+ else:
+ boxed_field = field(field_or_name, arr.type)
+
+ cdef shared_ptr[CColumn] sp_column
+ sp_column.reset(new CColumn(boxed_field.sp_field, arr.sp_array))
+ return box_column(sp_column)
+
+ def to_pandas(self):
+ """
+ Convert the arrow::Column to a pandas.Series
+
+ Returns
+ -------
+ pandas.Series
+ """
+ cdef:
+ PyObject* out
+
+ check_status(pyarrow.ConvertColumnToPandas(self.sp_column,
+ <PyObject*> self, &out))
+
+ return _pandas().Series(wrap_array_output(out), name=self.name)
+
+ def equals(self, Column other):
+ """
+ Check if contents of two columns are equal
+
+ Parameters
+ ----------
+ other : pyarrow.Column
+
+ Returns
+ -------
+ are_equal : boolean
+ """
+ cdef:
+ CColumn* my_col = self.column
+ CColumn* other_col = other.column
+ c_bool result
+
+ self._check_nullptr()
+ other._check_nullptr()
+
+ with nogil:
+ result = my_col.Equals(deref(other_col))
+
+ return result
+
+ def to_pylist(self):
+ """
+ Convert to a list of native Python objects.
+ """
+ return self.data.to_pylist()
+
+ cdef _check_nullptr(self):
+ if self.column == NULL:
+ raise ReferenceError("Column object references a NULL pointer."
+ "Not initialized.")
+
+ def __len__(self):
+ self._check_nullptr()
+ return self.column.length()
+
+ def length(self):
+ self._check_nullptr()
+ return self.column.length()
+
+ @property
+ def shape(self):
+ """
+ Dimensions of this columns
+
+ Returns
+ -------
+ (int,)
+ """
+ self._check_nullptr()
+ return (self.length(),)
+
+ @property
+ def null_count(self):
+ """
+ Number of null entires
+
+ Returns
+ -------
+ int
+ """
+ self._check_nullptr()
+ return self.column.null_count()
+
+ @property
+ def name(self):
+ """
+ Label of the column
+
+ Returns
+ -------
+ str
+ """
+ return bytes(self.column.name()).decode('utf8')
+
+ @property
+ def type(self):
+ """
+ Type information for this column
+
+ Returns
+ -------
+ pyarrow.schema.DataType
+ """
+ return box_data_type(self.column.type())
+
+ @property
+ def data(self):
+ """
+ The underlying data
+
+ Returns
+ -------
+ pyarrow.table.ChunkedArray
+ """
+ cdef ChunkedArray chunked_array = ChunkedArray()
+ chunked_array.init(self.column.data())
+ return chunked_array
+
+
+cdef _schema_from_arrays(arrays, names, shared_ptr[CSchema]* schema):
+ cdef:
+ Array arr
+ Column col
+ c_string c_name
+ vector[shared_ptr[CField]] fields
+ cdef shared_ptr[CDataType] type_
+
+ cdef int K = len(arrays)
+
+ fields.resize(K)
+
+ if len(arrays) == 0:
+ raise ValueError('Must pass at least one array')
+
+ if isinstance(arrays[0], Array):
+ if names is None:
+ raise ValueError('Must pass names when constructing '
+ 'from Array objects')
+ for i in range(K):
+ arr = arrays[i]
+ type_ = arr.type.sp_type
+ c_name = tobytes(names[i])
+ fields[i].reset(new CField(c_name, type_, True))
+ elif isinstance(arrays[0], Column):
+ for i in range(K):
+ col = arrays[i]
+ type_ = col.sp_column.get().type()
+ c_name = tobytes(col.name)
+ fields[i].reset(new CField(c_name, type_, True))
+ else:
+ raise TypeError(type(arrays[0]))
+
+ schema.reset(new CSchema(fields))
+
+
+
+cdef _dataframe_to_arrays(df, timestamps_to_ms, Schema schema):
+ cdef:
+ list names = []
+ list arrays = []
+ DataType type = None
+
+ for name in df.columns:
+ col = df[name]
+ if schema is not None:
+ type = schema.field_by_name(name).type
+
+ arr = Array.from_numpy(col, type=type,
+ timestamps_to_ms=timestamps_to_ms)
+ names.append(name)
+ arrays.append(arr)
+
+ return names, arrays
+
+
+cdef class RecordBatch:
+ """
+ Batch of rows of columns of equal length
+
+ Warning
+ -------
+ Do not call this class's constructor directly, use one of the ``from_*``
+ methods instead.
+ """
+
+ def __cinit__(self):
+ self.batch = NULL
+ self._schema = None
+
+ cdef init(self, const shared_ptr[CRecordBatch]& batch):
+ self.sp_batch = batch
+ self.batch = batch.get()
+
+ cdef _check_nullptr(self):
+ if self.batch == NULL:
+ raise ReferenceError("Object not initialized")
+
+ def __len__(self):
+ self._check_nullptr()
+ return self.batch.num_rows()
+
+ @property
+ def num_columns(self):
+ """
+ Number of columns
+
+ Returns
+ -------
+ int
+ """
+ self._check_nullptr()
+ return self.batch.num_columns()
+
+ @property
+ def num_rows(self):
+ """
+ Number of rows
+
+ Due to the definition of a RecordBatch, all columns have the same
+ number of rows.
+
+ Returns
+ -------
+ int
+ """
+ return len(self)
+
+ @property
+ def schema(self):
+ """
+ Schema of the RecordBatch and its columns
+
+ Returns
+ -------
+ pyarrow.schema.Schema
+ """
+ cdef Schema schema
+ self._check_nullptr()
+ if self._schema is None:
+ schema = Schema()
+ schema.init_schema(self.batch.schema())
+ self._schema = schema
+
+ return self._schema
+
+ def __getitem__(self, i):
+ return box_array(self.batch.column(i))
+
+ def slice(self, offset=0, length=None):
+ """
+ Compute zero-copy slice of this RecordBatch
+
+ Parameters
+ ----------
+ offset : int, default 0
+ Offset from start of array to slice
+ length : int, default None
+ Length of slice (default is until end of batch starting from
+ offset)
+
+ Returns
+ -------
+ sliced : RecordBatch
+ """
+ cdef shared_ptr[CRecordBatch] result
+
+ if offset < 0:
+ raise IndexError('Offset must be non-negative')
+
+ if length is None:
+ result = self.batch.Slice(offset)
+ else:
+ result = self.batch.Slice(offset, length)
+
+ return batch_from_cbatch(result)
+
+ def equals(self, RecordBatch other):
+ cdef:
+ CRecordBatch* my_batch = self.batch
+ CRecordBatch* other_batch = other.batch
+ c_bool result
+
+ self._check_nullptr()
+ other._check_nullptr()
+
+ with nogil:
+ result = my_batch.Equals(deref(other_batch))
+
+ return result
+
+ def to_pydict(self):
+ """
+ Converted the arrow::RecordBatch to an OrderedDict
+
+ Returns
+ -------
+ OrderedDict
+ """
+ entries = []
+ for i in range(self.batch.num_columns()):
+ name = bytes(self.batch.column_name(i)).decode('utf8')
+ column = self[i].to_pylist()
+ entries.append((name, column))
+ return OrderedDict(entries)
+
+
+ def to_pandas(self, nthreads=None):
+ """
+ Convert the arrow::RecordBatch to a pandas DataFrame
+
+ Returns
+ -------
+ pandas.DataFrame
+ """
+ return Table.from_batches([self]).to_pandas(nthreads=nthreads)
+
+ @classmethod
+ def from_pandas(cls, df, schema=None):
+ """
+ Convert pandas.DataFrame to an Arrow RecordBatch
+
+ Parameters
+ ----------
+ df: pandas.DataFrame
+ schema: pyarrow.Schema (optional)
+ The expected schema of the RecordBatch. This can be used to
+ indicate the type of columns if we cannot infer it automatically.
+
+ Returns
+ -------
+ pyarrow.table.RecordBatch
+ """
+ names, arrays = _dataframe_to_arrays(df, False, schema)
+ return cls.from_arrays(arrays, names)
+
+ @staticmethod
+ def from_arrays(arrays, names):
+ """
+ Construct a RecordBatch from multiple pyarrow.Arrays
+
+ Parameters
+ ----------
+ arrays: list of pyarrow.Array
+ column-wise data vectors
+ names: list of str
+ Labels for the columns
+
+ Returns
+ -------
+ pyarrow.table.RecordBatch
+ """
+ cdef:
+ Array arr
+ c_string c_name
+ shared_ptr[CSchema] schema
+ shared_ptr[CRecordBatch] batch
+ vector[shared_ptr[CArray]] c_arrays
+ int64_t num_rows
+
+ if len(arrays) == 0:
+ raise ValueError('Record batch cannot contain no arrays (for now)')
+
+ num_rows = len(arrays[0])
+ _schema_from_arrays(arrays, names, &schema)
+
+ for i in range(len(arrays)):
+ arr = arrays[i]
+ c_arrays.push_back(arr.sp_array)
+
+ batch.reset(new CRecordBatch(schema, num_rows, c_arrays))
+ return batch_from_cbatch(batch)
+
+
+cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
+ cdef:
+ PyObject* result_obj
+ CColumn* col
+ int i
+
+ import pandas.core.internals as _int
+ from pandas import RangeIndex, Categorical
+ from pyarrow.compat import DatetimeTZDtype
+
+ with nogil:
+ check_status(pyarrow.ConvertTableToPandas(table, nthreads,
+ &result_obj))
+
+ result = PyObject_to_object(result_obj)
+
+ blocks = []
+ for item in result:
+ block_arr = item['block']
+ placement = item['placement']
+ if 'dictionary' in item:
+ cat = Categorical(block_arr,
+ categories=item['dictionary'],
+ ordered=False, fastpath=True)
+ block = _int.make_block(cat, placement=placement,
+ klass=_int.CategoricalBlock,
+ fastpath=True)
+ elif 'timezone' in item:
+ dtype = DatetimeTZDtype('ns', tz=item['timezone'])
+ block = _int.make_block(block_arr, placement=placement,
+ klass=_int.DatetimeTZBlock,
+ dtype=dtype, fastpath=True)
+ else:
+ block = _int.make_block(block_arr, placement=placement)
+ blocks.append(block)
+
+ names = []
+ for i in range(table.get().num_columns()):
+ col = table.get().column(i).get()
+ names.append(frombytes(col.name()))
+
+ axes = [names, RangeIndex(table.get().num_rows())]
+ return _int.BlockManager(blocks, axes)
+
+
+cdef class Table:
+ """
+ A collection of top-level named, equal length Arrow arrays.
+
+ Warning
+ -------
+ Do not call this class's constructor directly, use one of the ``from_*``
+ methods instead.
+ """
+
+ def __cinit__(self):
+ self.table = NULL
+
+ def __repr__(self):
+ return 'pyarrow.Table\n{0}'.format(str(self.schema))
+
+ cdef init(self, const shared_ptr[CTable]& table):
+ self.sp_table = table
+ self.table = table.get()
+
+ cdef _check_nullptr(self):
+ if self.table == NULL:
+ raise ReferenceError("Table object references a NULL pointer."
+ "Not initialized.")
+
+ def equals(self, Table other):
+ """
+ Check if contents of two tables are equal
+
+ Parameters
+ ----------
+ other : pyarrow.Table
+
+ Returns
+ -------
+ are_equal : boolean
+ """
+ cdef:
+ CTable* my_table = self.table
+ CTable* other_table = other.table
+ c_bool result
+
+ self._check_nullptr()
+ other._check_nullptr()
+
+ with nogil:
+ result = my_table.Equals(deref(other_table))
+
+ return result
+
+ @classmethod
+ def from_pandas(cls, df, timestamps_to_ms=False, schema=None):
+ """
+ Convert pandas.DataFrame to an Arrow Table
+
+ Parameters
+ ----------
+ df: pandas.DataFrame
+
+ timestamps_to_ms: bool
+ Convert datetime columns to ms resolution. This is needed for
+ compability with other functionality like Parquet I/O which
+ only supports milliseconds.
+
+ schema: pyarrow.Schema (optional)
+ The expected schema of the Arrow Table. This can be used to
+ indicate the type of columns if we cannot infer it automatically.
+
+ Returns
+ -------
+ pyarrow.table.Table
+
+ Examples
+ --------
+
+ >>> import pandas as pd
+ >>> import pyarrow as pa
+ >>> df = pd.DataFrame({
+ ... 'int': [1, 2],
+ ... 'str': ['a', 'b']
+ ... })
+ >>> pa.Table.from_pandas(df)
+ <pyarrow.table.Table object at 0x7f05d1fb1b40>
+ """
+ names, arrays = _dataframe_to_arrays(df,
+ timestamps_to_ms=timestamps_to_ms,
+ schema=schema)
+ return cls.from_arrays(arrays, names=names)
+
+ @staticmethod
+ def from_arrays(arrays, names=None):
+ """
+ Construct a Table from Arrow arrays or columns
+
+ Parameters
+ ----------
+ arrays: list of pyarrow.Array or pyarrow.Column
+ Equal-length arrays that should form the table.
+ names: list of str, optional
+ Names for the table columns. If Columns passed, will be
+ inferred. If Arrays passed, this argument is required
+
+ Returns
+ -------
+ pyarrow.table.Table
+
+ """
+ cdef:
+ vector[shared_ptr[CField]] fields
+ vector[shared_ptr[CColumn]] columns
+ shared_ptr[CSchema] schema
+ shared_ptr[CTable] table
+
+ _schema_from_arrays(arrays, names, &schema)
+
+ cdef int K = len(arrays)
+ columns.resize(K)
+
+ for i in range(K):
+ if isinstance(arrays[i], Array):
+ columns[i].reset(new CColumn(schema.get().field(i),
+ (<Array> arrays[i]).sp_array))
+ elif isinstance(arrays[i], Column):
+ columns[i] = (<Column> arrays[i]).sp_column
+ else:
+ raise ValueError(type(arrays[i]))
+
+ table.reset(new CTable(schema, columns))
+ return table_from_ctable(table)
+
+ @staticmethod
+ def from_batches(batches):
+ """
+ Construct a Table from a list of Arrow RecordBatches
+
+ Parameters
+ ----------
+
+ batches: list of RecordBatch
+ RecordBatch list to be converted, schemas must be equal
+ """
+ cdef:
+ vector[shared_ptr[CRecordBatch]] c_batches
+ shared_ptr[CTable] c_table
+ RecordBatch batch
+
+ for batch in batches:
+ c_batches.push_back(batch.sp_batch)
+
+ with nogil:
+ check_status(CTable.FromRecordBatches(c_batches, &c_table))
+
+ return table_from_ctable(c_table)
+
+ def to_pandas(self, nthreads=None):
+ """
+ Convert the arrow::Table to a pandas DataFrame
+
+ Parameters
+ ----------
+ nthreads : int, default max(1, multiprocessing.cpu_count() / 2)
+ For the default, we divide the CPU count by 2 because most modern
+ computers have hyperthreading turned on, so doubling the CPU count
+ beyond the number of physical cores does not help
+
+ Returns
+ -------
+ pandas.DataFrame
+ """
+ if nthreads is None:
+ nthreads = pyarrow._config.cpu_count()
+
+ mgr = table_to_blockmanager(self.sp_table, nthreads)
+ return _pandas().DataFrame(mgr)
+
+ def to_pydict(self):
+ """
+ Converted the arrow::Table to an OrderedDict
+
+ Returns
+ -------
+ OrderedDict
+ """
+ entries = []
+ for i in range(self.table.num_columns()):
+ name = self.column(i).name
+ column = self.column(i).to_pylist()
+ entries.append((name, column))
+ return OrderedDict(entries)
+
+ @property
+ def schema(self):
+ """
+ Schema of the table and its columns
+
+ Returns
+ -------
+ pyarrow.schema.Schema
+ """
+ return box_schema(self.table.schema())
+
+ def column(self, index):
+ """
+ Select a column by its numeric index.
+
+ Parameters
+ ----------
+ index: int
+
+ Returns
+ -------
+ pyarrow.table.Column
+ """
+ self._check_nullptr()
+ cdef Column column = Column()
+ column.init(self.table.column(index))
+ return column
+
+ def __getitem__(self, i):
+ return self.column(i)
+
+ def itercolumns(self):
+ """
+ Iterator over all columns in their numerical order
+ """
+ for i in range(self.num_columns):
+ yield self.column(i)
+
+ @property
+ def num_columns(self):
+ """
+ Number of columns in this table
+
+ Returns
+ -------
+ int
+ """
+ self._check_nullptr()
+ return self.table.num_columns()
+
+ @property
+ def num_rows(self):
+ """
+ Number of rows in this table.
+
+ Due to the definition of a table, all columns have the same number of rows.
+
+ Returns
+ -------
+ int
+ """
+ self._check_nullptr()
+ return self.table.num_rows()
+
+ def __len__(self):
+ return self.num_rows
+
+ @property
+ def shape(self):
+ """
+ Dimensions of the table: (#rows, #columns)
+
+ Returns
+ -------
+ (int, int)
+ """
+ return (self.num_rows, self.num_columns)
+
+ def add_column(self, int i, Column column):
+ """
+ Add column to Table at position. Returns new table
+ """
+ cdef:
+ shared_ptr[CTable] c_table
+
+ with nogil:
+ check_status(self.table.AddColumn(i, column.sp_column, &c_table))
+
+ return table_from_ctable(c_table)
+
+ def append_column(self, Column column):
+ """
+ Append column at end of columns. Returns new table
+ """
+ return self.add_column(self.num_columns, column)
+
+ def remove_column(self, int i):
+ """
+ Create new Table with the indicated column removed
+ """
+ cdef shared_ptr[CTable] c_table
+
+ with nogil:
+ check_status(self.table.RemoveColumn(i, &c_table))
+
+ return table_from_ctable(c_table)
+
+
+def concat_tables(tables):
+ """
+ Perform zero-copy concatenation of pyarrow.Table objects. Raises exception
+ if all of the Table schemas are not the same
+
+ Parameters
+ ----------
+ tables : iterable of pyarrow.Table objects
+ output_name : string, default None
+ A name for the output table, if any
+ """
+ cdef:
+ vector[shared_ptr[CTable]] c_tables
+ shared_ptr[CTable] c_result
+ Table table
+
+ for table in tables:
+ c_tables.push_back(table.sp_table)
+
+ with nogil:
+ check_status(ConcatenateTables(c_tables, &c_result))
+
+ return table_from_ctable(c_result)
+
+
+cdef object box_column(const shared_ptr[CColumn]& ccolumn):
+ cdef Column column = Column()
+ column.init(ccolumn)
+ return column
+
+
+cdef api object table_from_ctable(const shared_ptr[CTable]& ctable):
+ cdef Table table = Table()
+ table.init(ctable)
+ return table
+
+
+cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch):
+ cdef RecordBatch batch = RecordBatch()
+ batch.init(cbatch)
+ return batch
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/array.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pxd b/python/pyarrow/array.pxd
deleted file mode 100644
index 3ba4871..0000000
--- a/python/pyarrow/array.pxd
+++ /dev/null
@@ -1,141 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.common cimport shared_ptr, int64_t
-from pyarrow.includes.libarrow cimport CArray, CTensor
-
-from pyarrow.scalar import NA
-
-from pyarrow.schema cimport DataType
-
-from cpython cimport PyObject
-
-
-cdef extern from "Python.h":
- int PySlice_Check(object)
-
-
-cdef class Array:
- cdef:
- shared_ptr[CArray] sp_array
- CArray* ap
-
- cdef readonly:
- DataType type
-
- cdef init(self, const shared_ptr[CArray]& sp_array)
- cdef getitem(self, int64_t i)
-
-
-cdef class Tensor:
- cdef:
- shared_ptr[CTensor] sp_tensor
- CTensor* tp
-
- cdef readonly:
- DataType type
-
- cdef init(self, const shared_ptr[CTensor]& sp_tensor)
-
-
-cdef object box_array(const shared_ptr[CArray]& sp_array)
-cdef object box_tensor(const shared_ptr[CTensor]& sp_tensor)
-
-
-cdef class BooleanArray(Array):
- pass
-
-
-cdef class NumericArray(Array):
- pass
-
-
-cdef class IntegerArray(NumericArray):
- pass
-
-
-cdef class FloatingPointArray(NumericArray):
- pass
-
-
-cdef class Int8Array(IntegerArray):
- pass
-
-
-cdef class UInt8Array(IntegerArray):
- pass
-
-
-cdef class Int16Array(IntegerArray):
- pass
-
-
-cdef class UInt16Array(IntegerArray):
- pass
-
-
-cdef class Int32Array(IntegerArray):
- pass
-
-
-cdef class UInt32Array(IntegerArray):
- pass
-
-
-cdef class Int64Array(IntegerArray):
- pass
-
-
-cdef class UInt64Array(IntegerArray):
- pass
-
-
-cdef class FloatArray(FloatingPointArray):
- pass
-
-
-cdef class DoubleArray(FloatingPointArray):
- pass
-
-
-cdef class FixedSizeBinaryArray(Array):
- pass
-
-
-cdef class DecimalArray(FixedSizeBinaryArray):
- pass
-
-
-cdef class ListArray(Array):
- pass
-
-
-cdef class StringArray(Array):
- pass
-
-
-cdef class BinaryArray(Array):
- pass
-
-
-cdef class DictionaryArray(Array):
- cdef:
- object _indices, _dictionary
-
-
-
-cdef wrap_array_output(PyObject* output)