You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/05/13 19:44:54 UTC
[3/4] arrow git commit: ARROW-819: Public Cython and C++ API in the
style of lxml, arrow::py::import_pyarrow method
http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/_io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_io.pyx b/python/pyarrow/_io.pyx
deleted file mode 100644
index e9e2ba0..0000000
--- a/python/pyarrow/_io.pyx
+++ /dev/null
@@ -1,1274 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Cython wrappers for IO interfaces defined in arrow::io and messaging in
-# arrow::ipc
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from cython.operator cimport dereference as deref
-from libc.stdlib cimport malloc, free
-from pyarrow.includes.libarrow cimport *
-cimport pyarrow.includes.pyarrow as pyarrow
-from pyarrow._array cimport Array, Tensor, box_tensor, Schema
-from pyarrow._error cimport check_status
-from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
-from pyarrow._table cimport (Column, RecordBatch, batch_from_cbatch,
- table_from_ctable)
-cimport cpython as cp
-
-import pyarrow._config
-from pyarrow.compat import frombytes, tobytes, encode_file_path
-
-import re
-import six
-import sys
-import threading
-import time
-
-
-# 64K
-DEFAULT_BUFFER_SIZE = 2 ** 16
-
-
-# To let us get a PyObject* and avoid Cython auto-ref-counting
-cdef extern from "Python.h":
- PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
- char *v, Py_ssize_t len) except NULL
-
-cdef class NativeFile:
-
- def __cinit__(self):
- self.is_open = False
- self.own_file = False
-
- def __dealloc__(self):
- if self.is_open and self.own_file:
- self.close()
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, tb):
- self.close()
-
- def close(self):
- if self.is_open:
- with nogil:
- if self.is_readable:
- check_status(self.rd_file.get().Close())
- else:
- check_status(self.wr_file.get().Close())
- self.is_open = False
-
- cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
- self._assert_readable()
- file[0] = <shared_ptr[RandomAccessFile]> self.rd_file
-
- cdef write_handle(self, shared_ptr[OutputStream]* file):
- self._assert_writeable()
- file[0] = <shared_ptr[OutputStream]> self.wr_file
-
- def _assert_readable(self):
- if not self.is_readable:
- raise IOError("only valid on readonly files")
-
- if not self.is_open:
- raise IOError("file not open")
-
- def _assert_writeable(self):
- if not self.is_writeable:
- raise IOError("only valid on writeable files")
-
- if not self.is_open:
- raise IOError("file not open")
-
- def size(self):
- cdef int64_t size
- self._assert_readable()
- with nogil:
- check_status(self.rd_file.get().GetSize(&size))
- return size
-
- def tell(self):
- cdef int64_t position
- with nogil:
- if self.is_readable:
- check_status(self.rd_file.get().Tell(&position))
- else:
- check_status(self.wr_file.get().Tell(&position))
- return position
-
- def seek(self, int64_t position):
- self._assert_readable()
- with nogil:
- check_status(self.rd_file.get().Seek(position))
-
- def write(self, data):
- """
- Write byte from any object implementing buffer protocol (bytes,
- bytearray, ndarray, pyarrow.Buffer)
- """
- self._assert_writeable()
-
- if isinstance(data, six.string_types):
- data = tobytes(data)
-
- cdef Buffer arrow_buffer = frombuffer(data)
-
- cdef const uint8_t* buf = arrow_buffer.buffer.get().data()
- cdef int64_t bufsize = len(arrow_buffer)
- with nogil:
- check_status(self.wr_file.get().Write(buf, bufsize))
-
- def read(self, nbytes=None):
- cdef:
- int64_t c_nbytes
- int64_t bytes_read = 0
- PyObject* obj
-
- if nbytes is None:
- c_nbytes = self.size() - self.tell()
- else:
- c_nbytes = nbytes
-
- self._assert_readable()
-
- # Allocate empty write space
- obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes)
-
- cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
- with nogil:
- check_status(self.rd_file.get().Read(c_nbytes, &bytes_read, buf))
-
- if bytes_read < c_nbytes:
- cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read)
-
- return PyObject_to_object(obj)
-
- def read_buffer(self, nbytes=None):
- cdef:
- int64_t c_nbytes
- int64_t bytes_read = 0
- shared_ptr[CBuffer] output
- self._assert_readable()
-
- if nbytes is None:
- c_nbytes = self.size() - self.tell()
- else:
- c_nbytes = nbytes
-
- with nogil:
- check_status(self.rd_file.get().ReadB(c_nbytes, &output))
-
- return wrap_buffer(output)
-
- def download(self, stream_or_path, buffer_size=None):
- """
- Read file completely to local path (rather than reading completely into
- memory). First seeks to the beginning of the file.
- """
- cdef:
- int64_t bytes_read = 0
- uint8_t* buf
- self._assert_readable()
-
- buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
-
- write_queue = Queue(50)
-
- if not hasattr(stream_or_path, 'read'):
- stream = open(stream_or_path, 'wb')
- cleanup = lambda: stream.close()
- else:
- stream = stream_or_path
- cleanup = lambda: None
-
- done = False
- exc_info = None
- def bg_write():
- try:
- while not done or write_queue.qsize() > 0:
- try:
- buf = write_queue.get(timeout=0.01)
- except QueueEmpty:
- continue
- stream.write(buf)
- except Exception as e:
- exc_info = sys.exc_info()
- finally:
- cleanup()
-
- self.seek(0)
-
- writer_thread = threading.Thread(target=bg_write)
-
- # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
- # the passed buffer, so it's hard for us to avoid doubling the memory
- buf = <uint8_t*> malloc(buffer_size)
- if buf == NULL:
- raise MemoryError("Failed to allocate {0} bytes"
- .format(buffer_size))
-
- writer_thread.start()
-
- cdef int64_t total_bytes = 0
- cdef int32_t c_buffer_size = buffer_size
-
- try:
- while True:
- with nogil:
- check_status(self.rd_file.get()
- .Read(c_buffer_size, &bytes_read, buf))
-
- total_bytes += bytes_read
-
- # EOF
- if bytes_read == 0:
- break
-
- pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
- bytes_read)
-
- write_queue.put_nowait(pybuf)
- finally:
- free(buf)
- done = True
-
- writer_thread.join()
- if exc_info is not None:
- raise exc_info[0], exc_info[1], exc_info[2]
-
- def upload(self, stream, buffer_size=None):
- """
- Pipe file-like object to file
- """
- write_queue = Queue(50)
- self._assert_writeable()
-
- buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
-
- done = False
- exc_info = None
- def bg_write():
- try:
- while not done or write_queue.qsize() > 0:
- try:
- buf = write_queue.get(timeout=0.01)
- except QueueEmpty:
- continue
-
- self.write(buf)
-
- except Exception as e:
- exc_info = sys.exc_info()
-
- writer_thread = threading.Thread(target=bg_write)
- writer_thread.start()
-
- try:
- while True:
- buf = stream.read(buffer_size)
- if not buf:
- break
-
- if writer_thread.is_alive():
- while write_queue.full():
- time.sleep(0.01)
- else:
- break
-
- write_queue.put_nowait(buf)
- finally:
- done = True
-
- writer_thread.join()
- if exc_info is not None:
- raise exc_info[0], exc_info[1], exc_info[2]
-
-
-# ----------------------------------------------------------------------
-# Python file-like objects
-
-
-cdef class PythonFile(NativeFile):
- cdef:
- object handle
-
- def __cinit__(self, handle, mode='w'):
- self.handle = handle
-
- if mode.startswith('w'):
- self.wr_file.reset(new pyarrow.PyOutputStream(handle))
- self.is_readable = 0
- self.is_writeable = 1
- elif mode.startswith('r'):
- self.rd_file.reset(new pyarrow.PyReadableFile(handle))
- self.is_readable = 1
- self.is_writeable = 0
- else:
- raise ValueError('Invalid file mode: {0}'.format(mode))
-
- self.is_open = True
-
-
-cdef class MemoryMappedFile(NativeFile):
- """
- Supports 'r', 'r+w', 'w' modes
- """
- cdef:
- object path
-
- def __cinit__(self):
- self.is_open = False
- self.is_readable = 0
- self.is_writeable = 0
-
- @staticmethod
- def create(path, size):
- cdef:
- shared_ptr[CMemoryMappedFile] handle
- c_string c_path = encode_file_path(path)
- int64_t c_size = size
-
- with nogil:
- check_status(CMemoryMappedFile.Create(c_path, c_size, &handle))
-
- cdef MemoryMappedFile result = MemoryMappedFile()
- result.path = path
- result.is_readable = 1
- result.is_writeable = 1
- result.wr_file = <shared_ptr[OutputStream]> handle
- result.rd_file = <shared_ptr[RandomAccessFile]> handle
- result.is_open = True
-
- return result
-
- def open(self, path, mode='r'):
- self.path = path
-
- cdef:
- FileMode c_mode
- shared_ptr[CMemoryMappedFile] handle
- c_string c_path = encode_file_path(path)
-
- if mode in ('r', 'rb'):
- c_mode = FileMode_READ
- self.is_readable = 1
- elif mode in ('w', 'wb'):
- c_mode = FileMode_WRITE
- self.is_writeable = 1
- elif mode == 'r+w':
- c_mode = FileMode_READWRITE
- self.is_readable = 1
- self.is_writeable = 1
- else:
- raise ValueError('Invalid file mode: {0}'.format(mode))
-
- check_status(CMemoryMappedFile.Open(c_path, c_mode, &handle))
-
- self.wr_file = <shared_ptr[OutputStream]> handle
- self.rd_file = <shared_ptr[RandomAccessFile]> handle
- self.is_open = True
-
-
-def memory_map(path, mode='r'):
- """
- Open memory map at file path. Size of the memory map cannot change
-
- Parameters
- ----------
- path : string
- mode : {'r', 'w'}, default 'r'
-
- Returns
- -------
- mmap : MemoryMappedFile
- """
- cdef MemoryMappedFile mmap = MemoryMappedFile()
- mmap.open(path, mode)
- return mmap
-
-
-def create_memory_map(path, size):
- """
- Create memory map at indicated path of the given size, return open
- writeable file object
-
- Parameters
- ----------
- path : string
- size : int
-
- Returns
- -------
- mmap : MemoryMappedFile
- """
- return MemoryMappedFile.create(path, size)
-
-
-cdef class OSFile(NativeFile):
- """
- Supports 'r', 'w' modes
- """
- cdef:
- object path
-
- def __cinit__(self, path, mode='r', MemoryPool memory_pool=None):
- self.path = path
-
- cdef:
- FileMode c_mode
- shared_ptr[Readable] handle
- c_string c_path = encode_file_path(path)
-
- self.is_readable = self.is_writeable = 0
-
- if mode in ('r', 'rb'):
- self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
- elif mode in ('w', 'wb'):
- self._open_writeable(c_path)
- else:
- raise ValueError('Invalid file mode: {0}'.format(mode))
-
- self.is_open = True
-
- cdef _open_readable(self, c_string path, CMemoryPool* pool):
- cdef shared_ptr[ReadableFile] handle
-
- with nogil:
- check_status(ReadableFile.Open(path, pool, &handle))
-
- self.is_readable = 1
- self.rd_file = <shared_ptr[RandomAccessFile]> handle
-
- cdef _open_writeable(self, c_string path):
- cdef shared_ptr[FileOutputStream] handle
-
- with nogil:
- check_status(FileOutputStream.Open(path, &handle))
- self.is_writeable = 1
- self.wr_file = <shared_ptr[OutputStream]> handle
-
-
-# ----------------------------------------------------------------------
-# Arrow buffers
-
-
-cdef class Buffer:
-
- def __cinit__(self):
- pass
-
- cdef init(self, const shared_ptr[CBuffer]& buffer):
- self.buffer = buffer
- self.shape[0] = self.size
- self.strides[0] = <Py_ssize_t>(1)
-
- def __len__(self):
- return self.size
-
- property size:
-
- def __get__(self):
- return self.buffer.get().size()
-
- property parent:
-
- def __get__(self):
- cdef shared_ptr[CBuffer] parent_buf = self.buffer.get().parent()
-
- if parent_buf.get() == NULL:
- return None
- else:
- return wrap_buffer(parent_buf)
-
- def __getitem__(self, key):
- # TODO(wesm): buffer slicing
- raise NotImplementedError
-
- def to_pybytes(self):
- return cp.PyBytes_FromStringAndSize(
- <const char*>self.buffer.get().data(),
- self.buffer.get().size())
-
- def __getbuffer__(self, cp.Py_buffer* buffer, int flags):
-
- buffer.buf = <char *>self.buffer.get().data()
- buffer.format = 'b'
- buffer.internal = NULL
- buffer.itemsize = 1
- buffer.len = self.size
- buffer.ndim = 1
- buffer.obj = self
- buffer.readonly = 1
- buffer.shape = self.shape
- buffer.strides = self.strides
- buffer.suboffsets = NULL
-
-
-cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool):
- cdef shared_ptr[PoolBuffer] result
- result.reset(new PoolBuffer(pool))
- return result
-
-
-cdef class InMemoryOutputStream(NativeFile):
-
- cdef:
- shared_ptr[PoolBuffer] buffer
-
- def __cinit__(self, MemoryPool memory_pool=None):
- self.buffer = allocate_buffer(maybe_unbox_memory_pool(memory_pool))
- self.wr_file.reset(new BufferOutputStream(
- <shared_ptr[ResizableBuffer]> self.buffer))
- self.is_readable = 0
- self.is_writeable = 1
- self.is_open = True
-
- def get_result(self):
- check_status(self.wr_file.get().Close())
- self.is_open = False
- return wrap_buffer(<shared_ptr[CBuffer]> self.buffer)
-
-
-cdef class BufferReader(NativeFile):
- """
- Zero-copy reader from objects convertible to Arrow buffer
-
- Parameters
- ----------
- obj : Python bytes or pyarrow.io.Buffer
- """
- cdef:
- Buffer buffer
-
- def __cinit__(self, object obj):
-
- if isinstance(obj, Buffer):
- self.buffer = obj
- else:
- self.buffer = frombuffer(obj)
-
- self.rd_file.reset(new CBufferReader(self.buffer.buffer))
- self.is_readable = 1
- self.is_writeable = 0
- self.is_open = True
-
-
-def frombuffer(object obj):
- """
- Construct an Arrow buffer from a Python bytes object
- """
- cdef shared_ptr[CBuffer] buf
- try:
- memoryview(obj)
- buf.reset(new pyarrow.PyBuffer(obj))
- return wrap_buffer(buf)
- except TypeError:
- raise ValueError('Must pass object that implements buffer protocol')
-
-
-
-cdef Buffer wrap_buffer(const shared_ptr[CBuffer]& buf):
- cdef Buffer result = Buffer()
- result.init(buf)
- return result
-
-
-cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader):
- cdef NativeFile nf
-
- if isinstance(source, six.string_types):
- source = memory_map(source, mode='r')
- elif isinstance(source, Buffer):
- source = BufferReader(source)
- elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
- # Optimistically hope this is file-like
- source = PythonFile(source, mode='r')
-
- if isinstance(source, NativeFile):
- nf = source
-
- # TODO: what about read-write sources (e.g. memory maps)
- if not nf.is_readable:
- raise IOError('Native file is not readable')
-
- nf.read_handle(reader)
- else:
- raise TypeError('Unable to read from object of type: {0}'
- .format(type(source)))
-
-
-cdef get_writer(object source, shared_ptr[OutputStream]* writer):
- cdef NativeFile nf
-
- if isinstance(source, six.string_types):
- source = OSFile(source, mode='w')
- elif not isinstance(source, NativeFile) and hasattr(source, 'write'):
- # Optimistically hope this is file-like
- source = PythonFile(source, mode='w')
-
- if isinstance(source, NativeFile):
- nf = source
-
- if nf.is_readable:
- raise IOError('Native file is not writeable')
-
- nf.write_handle(writer)
- else:
- raise TypeError('Unable to read from object of type: {0}'
- .format(type(source)))
-
-# ----------------------------------------------------------------------
-# HDFS IO implementation
-
-_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)')
-
-try:
- # Python 3
- from queue import Queue, Empty as QueueEmpty, Full as QueueFull
-except ImportError:
- from Queue import Queue, Empty as QueueEmpty, Full as QueueFull
-
-
-def have_libhdfs():
- try:
- check_status(HaveLibHdfs())
- return True
- except:
- return False
-
-
-def have_libhdfs3():
- try:
- check_status(HaveLibHdfs3())
- return True
- except:
- return False
-
-
-def strip_hdfs_abspath(path):
- m = _HDFS_PATH_RE.match(path)
- if m:
- return m.group(3)
- else:
- return path
-
-
-cdef class _HdfsClient:
- cdef:
- shared_ptr[CHdfsClient] client
-
- cdef readonly:
- bint is_open
-
- def __cinit__(self):
- pass
-
- def _connect(self, host, port, user, kerb_ticket, driver):
- cdef HdfsConnectionConfig conf
-
- if host is not None:
- conf.host = tobytes(host)
- conf.port = port
- if user is not None:
- conf.user = tobytes(user)
- if kerb_ticket is not None:
- conf.kerb_ticket = tobytes(kerb_ticket)
-
- if driver == 'libhdfs':
- check_status(HaveLibHdfs())
- conf.driver = HdfsDriver_LIBHDFS
- else:
- check_status(HaveLibHdfs3())
- conf.driver = HdfsDriver_LIBHDFS3
-
- with nogil:
- check_status(CHdfsClient.Connect(&conf, &self.client))
- self.is_open = True
-
- @classmethod
- def connect(cls, *args, **kwargs):
- return cls(*args, **kwargs)
-
- def __dealloc__(self):
- if self.is_open:
- self.close()
-
- def close(self):
- """
- Disconnect from the HDFS cluster
- """
- self._ensure_client()
- with nogil:
- check_status(self.client.get().Disconnect())
- self.is_open = False
-
- cdef _ensure_client(self):
- if self.client.get() == NULL:
- raise IOError('HDFS client improperly initialized')
- elif not self.is_open:
- raise IOError('HDFS client is closed')
-
- def exists(self, path):
- """
- Returns True if the path is known to the cluster, False if it does not
- (or there is an RPC error)
- """
- self._ensure_client()
-
- cdef c_string c_path = tobytes(path)
- cdef c_bool result
- with nogil:
- result = self.client.get().Exists(c_path)
- return result
-
- def isdir(self, path):
- cdef HdfsPathInfo info
- self._path_info(path, &info)
- return info.kind == ObjectType_DIRECTORY
-
- def isfile(self, path):
- cdef HdfsPathInfo info
- self._path_info(path, &info)
- return info.kind == ObjectType_FILE
-
- cdef _path_info(self, path, HdfsPathInfo* info):
- cdef c_string c_path = tobytes(path)
-
- with nogil:
- check_status(self.client.get()
- .GetPathInfo(c_path, info))
-
-
- def ls(self, path, bint full_info):
- cdef:
- c_string c_path = tobytes(path)
- vector[HdfsPathInfo] listing
- list results = []
- int i
-
- self._ensure_client()
-
- with nogil:
- check_status(self.client.get()
- .ListDirectory(c_path, &listing))
-
- cdef const HdfsPathInfo* info
- for i in range(<int> listing.size()):
- info = &listing[i]
-
- # Try to trim off the hdfs://HOST:PORT piece
- name = strip_hdfs_abspath(frombytes(info.name))
-
- if full_info:
- kind = ('file' if info.kind == ObjectType_FILE
- else 'directory')
-
- results.append({
- 'kind': kind,
- 'name': name,
- 'owner': frombytes(info.owner),
- 'group': frombytes(info.group),
- 'list_modified_time': info.last_modified_time,
- 'list_access_time': info.last_access_time,
- 'size': info.size,
- 'replication': info.replication,
- 'block_size': info.block_size,
- 'permissions': info.permissions
- })
- else:
- results.append(name)
-
- return results
-
- def mkdir(self, path):
- """
- Create indicated directory and any necessary parent directories
- """
- self._ensure_client()
-
- cdef c_string c_path = tobytes(path)
- with nogil:
- check_status(self.client.get()
- .MakeDirectory(c_path))
-
- def delete(self, path, bint recursive=False):
- """
- Delete the indicated file or directory
-
- Parameters
- ----------
- path : string
- recursive : boolean, default False
- If True, also delete child paths for directories
- """
- self._ensure_client()
-
- cdef c_string c_path = tobytes(path)
- with nogil:
- check_status(self.client.get()
- .Delete(c_path, recursive))
-
- def open(self, path, mode='rb', buffer_size=None, replication=None,
- default_block_size=None):
- """
- Parameters
- ----------
- mode : string, 'rb', 'wb', 'ab'
- """
- self._ensure_client()
-
- cdef HdfsFile out = HdfsFile()
-
- if mode not in ('rb', 'wb', 'ab'):
- raise Exception("Mode must be 'rb' (read), "
- "'wb' (write, new file), or 'ab' (append)")
-
- cdef c_string c_path = tobytes(path)
- cdef c_bool append = False
-
- # 0 in libhdfs means "use the default"
- cdef int32_t c_buffer_size = buffer_size or 0
- cdef int16_t c_replication = replication or 0
- cdef int64_t c_default_block_size = default_block_size or 0
-
- cdef shared_ptr[HdfsOutputStream] wr_handle
- cdef shared_ptr[HdfsReadableFile] rd_handle
-
- if mode in ('wb', 'ab'):
- if mode == 'ab':
- append = True
-
- with nogil:
- check_status(
- self.client.get()
- .OpenWriteable(c_path, append, c_buffer_size,
- c_replication, c_default_block_size,
- &wr_handle))
-
- out.wr_file = <shared_ptr[OutputStream]> wr_handle
-
- out.is_readable = False
- out.is_writeable = 1
- else:
- with nogil:
- check_status(self.client.get()
- .OpenReadable(c_path, &rd_handle))
-
- out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle
- out.is_readable = True
- out.is_writeable = 0
-
- if c_buffer_size == 0:
- c_buffer_size = 2 ** 16
-
- out.mode = mode
- out.buffer_size = c_buffer_size
- out.parent = _HdfsFileNanny(self, out)
- out.is_open = True
- out.own_file = True
-
- return out
-
- def download(self, path, stream, buffer_size=None):
- with self.open(path, 'rb') as f:
- f.download(stream, buffer_size=buffer_size)
-
- def upload(self, path, stream, buffer_size=None):
- """
- Upload file-like object to HDFS path
- """
- with self.open(path, 'wb') as f:
- f.upload(stream, buffer_size=buffer_size)
-
-
-# ARROW-404: Helper class to ensure that files are closed before the
-# client. During deallocation of the extension class, the attributes are
-# decref'd which can cause the client to get closed first if the file has the
-# last remaining reference
-cdef class _HdfsFileNanny:
- cdef:
- object client
- object file_handle_ref
-
- def __cinit__(self, client, file_handle):
- import weakref
- self.client = client
- self.file_handle_ref = weakref.ref(file_handle)
-
- def __dealloc__(self):
- fh = self.file_handle_ref()
- if fh:
- fh.close()
- # avoid cyclic GC
- self.file_handle_ref = None
- self.client = None
-
-
-cdef class HdfsFile(NativeFile):
- cdef readonly:
- int32_t buffer_size
- object mode
- object parent
-
- cdef object __weakref__
-
- def __dealloc__(self):
- self.parent = None
-
-# ----------------------------------------------------------------------
-# File and stream readers and writers
-
-cdef class _StreamWriter:
- cdef:
- shared_ptr[CStreamWriter] writer
- shared_ptr[OutputStream] sink
- bint closed
-
- def __cinit__(self):
- self.closed = True
-
- def __dealloc__(self):
- if not self.closed:
- self.close()
-
- def _open(self, sink, Schema schema):
- get_writer(sink, &self.sink)
-
- with nogil:
- check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema,
- &self.writer))
-
- self.closed = False
-
- def write_batch(self, RecordBatch batch):
- with nogil:
- check_status(self.writer.get()
- .WriteRecordBatch(deref(batch.batch)))
-
- def close(self):
- with nogil:
- check_status(self.writer.get().Close())
- self.closed = True
-
-
-cdef class _StreamReader:
- cdef:
- shared_ptr[CStreamReader] reader
-
- cdef readonly:
- Schema schema
-
- def __cinit__(self):
- pass
-
- def _open(self, source):
- cdef:
- shared_ptr[RandomAccessFile] reader
- shared_ptr[InputStream] in_stream
-
- get_reader(source, &reader)
- in_stream = <shared_ptr[InputStream]> reader
-
- with nogil:
- check_status(CStreamReader.Open(in_stream, &self.reader))
-
- self.schema = Schema()
- self.schema.init_schema(self.reader.get().schema())
-
- def get_next_batch(self):
- """
- Read next RecordBatch from the stream. Raises StopIteration at end of
- stream
- """
- cdef shared_ptr[CRecordBatch] batch
-
- with nogil:
- check_status(self.reader.get().GetNextRecordBatch(&batch))
-
- if batch.get() == NULL:
- raise StopIteration
-
- return batch_from_cbatch(batch)
-
- def read_all(self):
- """
- Read all record batches as a pyarrow.Table
- """
- cdef:
- vector[shared_ptr[CRecordBatch]] batches
- shared_ptr[CRecordBatch] batch
- shared_ptr[CTable] table
-
- with nogil:
- while True:
- check_status(self.reader.get().GetNextRecordBatch(&batch))
- if batch.get() == NULL:
- break
- batches.push_back(batch)
-
- check_status(CTable.FromRecordBatches(batches, &table))
-
- return table_from_ctable(table)
-
-
-cdef class _FileWriter(_StreamWriter):
-
- def _open(self, sink, Schema schema):
- cdef shared_ptr[CFileWriter] writer
- get_writer(sink, &self.sink)
-
- with nogil:
- check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema,
- &writer))
-
- # Cast to base class, because has same interface
- self.writer = <shared_ptr[CStreamWriter]> writer
- self.closed = False
-
-
-cdef class _FileReader:
- cdef:
- shared_ptr[CFileReader] reader
-
- def __cinit__(self):
- pass
-
- def _open(self, source, footer_offset=None):
- cdef shared_ptr[RandomAccessFile] reader
- get_reader(source, &reader)
-
- cdef int64_t offset = 0
- if footer_offset is not None:
- offset = footer_offset
-
- with nogil:
- if offset != 0:
- check_status(CFileReader.Open2(reader, offset, &self.reader))
- else:
- check_status(CFileReader.Open(reader, &self.reader))
-
- property num_record_batches:
-
- def __get__(self):
- return self.reader.get().num_record_batches()
-
- def get_batch(self, int i):
- cdef shared_ptr[CRecordBatch] batch
-
- if i < 0 or i >= self.num_record_batches:
- raise ValueError('Batch number {0} out of range'.format(i))
-
- with nogil:
- check_status(self.reader.get().GetRecordBatch(i, &batch))
-
- return batch_from_cbatch(batch)
-
- # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
- # time has passed
- get_record_batch = get_batch
-
- def read_all(self):
- """
- Read all record batches as a pyarrow.Table
- """
- cdef:
- vector[shared_ptr[CRecordBatch]] batches
- shared_ptr[CTable] table
- int i, nbatches
-
- nbatches = self.num_record_batches
-
- batches.resize(nbatches)
- with nogil:
- for i in range(nbatches):
- check_status(self.reader.get().GetRecordBatch(i, &batches[i]))
- check_status(CTable.FromRecordBatches(batches, &table))
-
- return table_from_ctable(table)
-
-
-#----------------------------------------------------------------------
-# Implement legacy Feather file format
-
-
-class FeatherError(Exception):
- pass
-
-
-cdef class FeatherWriter:
- cdef:
- unique_ptr[CFeatherWriter] writer
-
- cdef public:
- int64_t num_rows
-
- def __cinit__(self):
- self.num_rows = -1
-
- def open(self, object dest):
- cdef shared_ptr[OutputStream] sink
- get_writer(dest, &sink)
-
- with nogil:
- check_status(CFeatherWriter.Open(sink, &self.writer))
-
- def close(self):
- if self.num_rows < 0:
- self.num_rows = 0
- self.writer.get().SetNumRows(self.num_rows)
- check_status(self.writer.get().Finalize())
-
- def write_array(self, object name, object col, object mask=None):
- cdef Array arr
-
- if self.num_rows >= 0:
- if len(col) != self.num_rows:
- raise ValueError('prior column had a different number of rows')
- else:
- self.num_rows = len(col)
-
- if isinstance(col, Array):
- arr = col
- else:
- arr = Array.from_pandas(col, mask=mask)
-
- cdef c_string c_name = tobytes(name)
-
- with nogil:
- check_status(
- self.writer.get().Append(c_name, deref(arr.sp_array)))
-
-
-cdef class FeatherReader:
- cdef:
- unique_ptr[CFeatherReader] reader
-
- def __cinit__(self):
- pass
-
- def open(self, source):
- cdef shared_ptr[RandomAccessFile] reader
- get_reader(source, &reader)
-
- with nogil:
- check_status(CFeatherReader.Open(reader, &self.reader))
-
- property num_rows:
-
- def __get__(self):
- return self.reader.get().num_rows()
-
- property num_columns:
-
- def __get__(self):
- return self.reader.get().num_columns()
-
- def get_column_name(self, int i):
- cdef c_string name = self.reader.get().GetColumnName(i)
- return frombytes(name)
-
- def get_column(self, int i):
- if i < 0 or i >= self.num_columns:
- raise IndexError(i)
-
- cdef shared_ptr[CColumn] sp_column
- with nogil:
- check_status(self.reader.get()
- .GetColumn(i, &sp_column))
-
- cdef Column col = Column()
- col.init(sp_column)
- return col
-
-
-def get_tensor_size(Tensor tensor):
- """
- Return total size of serialized Tensor including metadata and padding
- """
- cdef int64_t size
- with nogil:
- check_status(GetTensorSize(deref(tensor.tp), &size))
- return size
-
-
-def get_record_batch_size(RecordBatch batch):
- """
- Return total size of serialized RecordBatch including metadata and padding
- """
- cdef int64_t size
- with nogil:
- check_status(GetRecordBatchSize(deref(batch.batch), &size))
- return size
-
-
-def write_tensor(Tensor tensor, NativeFile dest):
- """
- Write pyarrow.Tensor to pyarrow.NativeFile object its current position
-
- Parameters
- ----------
- tensor : pyarrow.Tensor
- dest : pyarrow.NativeFile
-
- Returns
- -------
- bytes_written : int
- Total number of bytes written to the file
- """
- cdef:
- int32_t metadata_length
- int64_t body_length
-
- dest._assert_writeable()
-
- with nogil:
- check_status(
- WriteTensor(deref(tensor.tp), dest.wr_file.get(),
- &metadata_length, &body_length))
-
- return metadata_length + body_length
-
-
-def read_tensor(NativeFile source):
- """
- Read pyarrow.Tensor from pyarrow.NativeFile object from current
- position. If the file source supports zero copy (e.g. a memory map), then
- this operation does not allocate any memory
-
- Parameters
- ----------
- source : pyarrow.NativeFile
-
- Returns
- -------
- tensor : Tensor
- """
- cdef:
- shared_ptr[CTensor] sp_tensor
-
- source._assert_writeable()
-
- cdef int64_t offset = source.tell()
- with nogil:
- check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor))
-
- return box_tensor(sp_tensor)
http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/_jemalloc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_jemalloc.pyx b/python/pyarrow/_jemalloc.pyx
index 3b41964..6f00c9d 100644
--- a/python/pyarrow/_jemalloc.pyx
+++ b/python/pyarrow/_jemalloc.pyx
@@ -20,7 +20,7 @@
# cython: embedsignature = True
from pyarrow.includes.libarrow_jemalloc cimport CJemallocMemoryPool
-from pyarrow._memory cimport MemoryPool
+from pyarrow.lib cimport MemoryPool
def default_pool():
cdef MemoryPool pool = MemoryPool()
http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/_memory.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_memory.pxd b/python/pyarrow/_memory.pxd
deleted file mode 100644
index bb1af85..0000000
--- a/python/pyarrow/_memory.pxd
+++ /dev/null
@@ -1,30 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool
-
-
-cdef class MemoryPool:
- cdef:
- CMemoryPool* pool
-
- cdef init(self, CMemoryPool* pool)
-
-cdef class LoggingMemoryPool(MemoryPool):
- pass
-
-cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool)
http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/_memory.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_memory.pyx b/python/pyarrow/_memory.pyx
deleted file mode 100644
index 8b73a17..0000000
--- a/python/pyarrow/_memory.pyx
+++ /dev/null
@@ -1,58 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool
-from pyarrow.includes.pyarrow cimport set_default_memory_pool, get_memory_pool
-
-
-cdef class MemoryPool:
- cdef init(self, CMemoryPool* pool):
- self.pool = pool
-
- def bytes_allocated(self):
- return self.pool.bytes_allocated()
-
-
-cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool):
- if memory_pool is None:
- return get_memory_pool()
- else:
- return memory_pool.pool
-
-
-cdef class LoggingMemoryPool(MemoryPool):
- pass
-
-
-def default_memory_pool():
- cdef:
- MemoryPool pool = MemoryPool()
- pool.init(get_memory_pool())
- return pool
-
-
-def set_memory_pool(MemoryPool pool):
- set_default_memory_pool(pool.pool)
-
-
-def total_allocated_bytes():
- cdef CMemoryPool* pool = get_memory_pool()
- return pool.bytes_allocated()
http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index c06eab2..51bd938 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -22,16 +22,16 @@
from cython.operator cimport dereference as deref
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
-cimport pyarrow.includes.pyarrow as pyarrow
-from pyarrow._array cimport Array, Schema, box_schema
-from pyarrow._error cimport check_status
-from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
-from pyarrow._table cimport Table, table_from_ctable
-from pyarrow._io cimport NativeFile, get_reader, get_writer
+from pyarrow.lib cimport (Array, Schema,
+ check_status,
+ MemoryPool, maybe_unbox_memory_pool,
+ Table,
+ pyarrow_wrap_schema,
+ pyarrow_wrap_table,
+ NativeFile, get_reader, get_writer)
from pyarrow.compat import tobytes, frombytes
-from pyarrow._error import ArrowException
-from pyarrow._io import NativeFile
+from pyarrow.lib import ArrowException, NativeFile
import six
@@ -213,7 +213,7 @@ cdef class ParquetSchema:
with nogil:
check_status(FromParquetSchema(self.schema, &sp_arrow_schema))
- return box_schema(sp_arrow_schema)
+ return pyarrow_wrap_schema(sp_arrow_schema)
def equals(self, ParquetSchema other):
"""
@@ -426,7 +426,7 @@ cdef class ParquetReader:
with nogil:
check_status(self.reader.get()
.ReadRowGroup(i, &ctable))
- return table_from_ctable(ctable)
+ return pyarrow_wrap_table(ctable)
def read_all(self, column_indices=None):
cdef:
@@ -445,7 +445,7 @@ cdef class ParquetReader:
with nogil:
check_status(self.reader.get()
.ReadTable(&ctable))
- return table_from_ctable(ctable)
+ return pyarrow_wrap_table(ctable)
def column_name_idx(self, column_name):
"""
http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/_table.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_table.pxd b/python/pyarrow/_table.pxd
deleted file mode 100644
index e61e90d..0000000
--- a/python/pyarrow/_table.pxd
+++ /dev/null
@@ -1,62 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.common cimport shared_ptr
-from pyarrow.includes.libarrow cimport (CChunkedArray, CColumn, CTable,
- CRecordBatch)
-from pyarrow._array cimport Schema
-
-
-cdef class ChunkedArray:
- cdef:
- shared_ptr[CChunkedArray] sp_chunked_array
- CChunkedArray* chunked_array
-
- cdef init(self, const shared_ptr[CChunkedArray]& chunked_array)
- cdef _check_nullptr(self)
-
-
-cdef class Column:
- cdef:
- shared_ptr[CColumn] sp_column
- CColumn* column
-
- cdef init(self, const shared_ptr[CColumn]& column)
- cdef _check_nullptr(self)
-
-
-cdef class Table:
- cdef:
- shared_ptr[CTable] sp_table
- CTable* table
-
- cdef init(self, const shared_ptr[CTable]& table)
- cdef _check_nullptr(self)
-
-
-cdef class RecordBatch:
- cdef:
- shared_ptr[CRecordBatch] sp_batch
- CRecordBatch* batch
- Schema _schema
-
- cdef init(self, const shared_ptr[CRecordBatch]& table)
- cdef _check_nullptr(self)
-
-cdef object box_column(const shared_ptr[CColumn]& ccolumn)
-cdef api object table_from_ctable(const shared_ptr[CTable]& ctable)
-cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch)
http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/_table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_table.pyx b/python/pyarrow/_table.pyx
deleted file mode 100644
index 223fe27..0000000
--- a/python/pyarrow/_table.pyx
+++ /dev/null
@@ -1,926 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from cython.operator cimport dereference as deref
-
-from pyarrow.includes.libarrow cimport *
-from pyarrow.includes.common cimport *
-cimport pyarrow.includes.pyarrow as pyarrow
-from pyarrow._array cimport (Array, box_array, wrap_array_output,
- box_data_type, box_schema, DataType, Field)
-from pyarrow._error cimport check_status
-cimport cpython
-
-import pyarrow._config
-from pyarrow._error import ArrowException
-from pyarrow._array import field
-from pyarrow.compat import frombytes, tobytes
-
-from collections import OrderedDict
-
-
-cdef _pandas():
- import pandas as pd
- return pd
-
-
-cdef class ChunkedArray:
- """
- Array backed via one or more memory chunks.
-
- Warning
- -------
- Do not call this class's constructor directly.
- """
-
- def __cinit__(self):
- self.chunked_array = NULL
-
- cdef init(self, const shared_ptr[CChunkedArray]& chunked_array):
- self.sp_chunked_array = chunked_array
- self.chunked_array = chunked_array.get()
-
- cdef _check_nullptr(self):
- if self.chunked_array == NULL:
- raise ReferenceError("ChunkedArray object references a NULL "
- "pointer. Not initialized.")
-
- def length(self):
- self._check_nullptr()
- return self.chunked_array.length()
-
- def __len__(self):
- return self.length()
-
- @property
- def null_count(self):
- """
- Number of null entires
-
- Returns
- -------
- int
- """
- self._check_nullptr()
- return self.chunked_array.null_count()
-
- @property
- def num_chunks(self):
- """
- Number of underlying chunks
-
- Returns
- -------
- int
- """
- self._check_nullptr()
- return self.chunked_array.num_chunks()
-
- def chunk(self, i):
- """
- Select a chunk by its index
-
- Parameters
- ----------
- i : int
-
- Returns
- -------
- pyarrow.array.Array
- """
- self._check_nullptr()
- return box_array(self.chunked_array.chunk(i))
-
- def iterchunks(self):
- for i in range(self.num_chunks):
- yield self.chunk(i)
-
- def to_pylist(self):
- """
- Convert to a list of native Python objects.
- """
- result = []
- for i in range(self.num_chunks):
- result += self.chunk(i).to_pylist()
- return result
-
-
-cdef class Column:
- """
- Named vector of elements of equal type.
-
- Warning
- -------
- Do not call this class's constructor directly.
- """
-
- def __cinit__(self):
- self.column = NULL
-
- cdef init(self, const shared_ptr[CColumn]& column):
- self.sp_column = column
- self.column = column.get()
-
- @staticmethod
- def from_array(object field_or_name, Array arr):
- cdef Field boxed_field
-
- if isinstance(field_or_name, Field):
- boxed_field = field_or_name
- else:
- boxed_field = field(field_or_name, arr.type)
-
- cdef shared_ptr[CColumn] sp_column
- sp_column.reset(new CColumn(boxed_field.sp_field, arr.sp_array))
- return box_column(sp_column)
-
- def to_pandas(self):
- """
- Convert the arrow::Column to a pandas.Series
-
- Returns
- -------
- pandas.Series
- """
- cdef:
- PyObject* out
-
- check_status(pyarrow.ConvertColumnToPandas(self.sp_column,
- self, &out))
-
- return _pandas().Series(wrap_array_output(out), name=self.name)
-
- def equals(self, Column other):
- """
- Check if contents of two columns are equal
-
- Parameters
- ----------
- other : pyarrow.Column
-
- Returns
- -------
- are_equal : boolean
- """
- cdef:
- CColumn* my_col = self.column
- CColumn* other_col = other.column
- c_bool result
-
- self._check_nullptr()
- other._check_nullptr()
-
- with nogil:
- result = my_col.Equals(deref(other_col))
-
- return result
-
- def to_pylist(self):
- """
- Convert to a list of native Python objects.
- """
- return self.data.to_pylist()
-
- cdef _check_nullptr(self):
- if self.column == NULL:
- raise ReferenceError("Column object references a NULL pointer."
- "Not initialized.")
-
- def __len__(self):
- self._check_nullptr()
- return self.column.length()
-
- def length(self):
- self._check_nullptr()
- return self.column.length()
-
- @property
- def shape(self):
- """
- Dimensions of this columns
-
- Returns
- -------
- (int,)
- """
- self._check_nullptr()
- return (self.length(),)
-
- @property
- def null_count(self):
- """
- Number of null entires
-
- Returns
- -------
- int
- """
- self._check_nullptr()
- return self.column.null_count()
-
- @property
- def name(self):
- """
- Label of the column
-
- Returns
- -------
- str
- """
- return bytes(self.column.name()).decode('utf8')
-
- @property
- def type(self):
- """
- Type information for this column
-
- Returns
- -------
- pyarrow.schema.DataType
- """
- return box_data_type(self.column.type())
-
- @property
- def data(self):
- """
- The underlying data
-
- Returns
- -------
- pyarrow.table.ChunkedArray
- """
- cdef ChunkedArray chunked_array = ChunkedArray()
- chunked_array.init(self.column.data())
- return chunked_array
-
-
-cdef shared_ptr[const CKeyValueMetadata] key_value_metadata_from_dict(
- dict metadata):
- cdef:
- unordered_map[c_string, c_string] unordered_metadata = metadata
- return (<shared_ptr[const CKeyValueMetadata]>
- make_shared[CKeyValueMetadata](unordered_metadata))
-
-
-cdef int _schema_from_arrays(
- arrays, names, dict metadata, shared_ptr[CSchema]* schema) except -1:
- cdef:
- Array arr
- Column col
- c_string c_name
- vector[shared_ptr[CField]] fields
- shared_ptr[CDataType] type_
- int K = len(arrays)
-
- fields.resize(K)
-
- if len(arrays) == 0:
- raise ValueError('Must pass at least one array')
-
- if isinstance(arrays[0], Array):
- if names is None:
- raise ValueError('Must pass names when constructing '
- 'from Array objects')
- for i in range(K):
- arr = arrays[i]
- type_ = arr.type.sp_type
- c_name = tobytes(names[i])
- fields[i].reset(new CField(c_name, type_, True))
- elif isinstance(arrays[0], Column):
- for i in range(K):
- col = arrays[i]
- type_ = col.sp_column.get().type()
- c_name = tobytes(col.name)
- fields[i].reset(new CField(c_name, type_, True))
- else:
- raise TypeError(type(arrays[0]))
-
- schema.reset(new CSchema(fields, key_value_metadata_from_dict(metadata)))
- return 0
-
-
-cdef tuple _dataframe_to_arrays(df, bint timestamps_to_ms, Schema schema):
- cdef:
- list names = []
- list arrays = []
- DataType type = None
- dict metadata = {}
-
- for name in df.columns:
- col = df[name]
- if schema is not None:
- type = schema.field_by_name(name).type
-
- arr = Array.from_pandas(col, type=type,
- timestamps_to_ms=timestamps_to_ms)
- names.append(name)
- arrays.append(arr)
-
- return names, arrays, metadata
-
-
-cdef class RecordBatch:
- """
- Batch of rows of columns of equal length
-
- Warning
- -------
- Do not call this class's constructor directly, use one of the ``from_*``
- methods instead.
- """
-
- def __cinit__(self):
- self.batch = NULL
- self._schema = None
-
- cdef init(self, const shared_ptr[CRecordBatch]& batch):
- self.sp_batch = batch
- self.batch = batch.get()
-
- cdef _check_nullptr(self):
- if self.batch == NULL:
- raise ReferenceError("Object not initialized")
-
- def __len__(self):
- self._check_nullptr()
- return self.batch.num_rows()
-
- @property
- def num_columns(self):
- """
- Number of columns
-
- Returns
- -------
- int
- """
- self._check_nullptr()
- return self.batch.num_columns()
-
- @property
- def num_rows(self):
- """
- Number of rows
-
- Due to the definition of a RecordBatch, all columns have the same
- number of rows.
-
- Returns
- -------
- int
- """
- return len(self)
-
- @property
- def schema(self):
- """
- Schema of the RecordBatch and its columns
-
- Returns
- -------
- pyarrow.schema.Schema
- """
- cdef Schema schema
- self._check_nullptr()
- if self._schema is None:
- schema = Schema()
- schema.init_schema(self.batch.schema())
- self._schema = schema
-
- return self._schema
-
- def __getitem__(self, i):
- return box_array(self.batch.column(i))
-
- def slice(self, offset=0, length=None):
- """
- Compute zero-copy slice of this RecordBatch
-
- Parameters
- ----------
- offset : int, default 0
- Offset from start of array to slice
- length : int, default None
- Length of slice (default is until end of batch starting from
- offset)
-
- Returns
- -------
- sliced : RecordBatch
- """
- cdef shared_ptr[CRecordBatch] result
-
- if offset < 0:
- raise IndexError('Offset must be non-negative')
-
- if length is None:
- result = self.batch.Slice(offset)
- else:
- result = self.batch.Slice(offset, length)
-
- return batch_from_cbatch(result)
-
- def equals(self, RecordBatch other):
- cdef:
- CRecordBatch* my_batch = self.batch
- CRecordBatch* other_batch = other.batch
- c_bool result
-
- self._check_nullptr()
- other._check_nullptr()
-
- with nogil:
- result = my_batch.Equals(deref(other_batch))
-
- return result
-
- def to_pydict(self):
- """
- Converted the arrow::RecordBatch to an OrderedDict
-
- Returns
- -------
- OrderedDict
- """
- entries = []
- for i in range(self.batch.num_columns()):
- name = bytes(self.batch.column_name(i)).decode('utf8')
- column = self[i].to_pylist()
- entries.append((name, column))
- return OrderedDict(entries)
-
-
- def to_pandas(self, nthreads=None):
- """
- Convert the arrow::RecordBatch to a pandas DataFrame
-
- Returns
- -------
- pandas.DataFrame
- """
- return Table.from_batches([self]).to_pandas(nthreads=nthreads)
-
- @classmethod
- def from_pandas(cls, df, schema=None):
- """
- Convert pandas.DataFrame to an Arrow RecordBatch
-
- Parameters
- ----------
- df: pandas.DataFrame
- schema: pyarrow.Schema (optional)
- The expected schema of the RecordBatch. This can be used to
- indicate the type of columns if we cannot infer it automatically.
-
- Returns
- -------
- pyarrow.table.RecordBatch
- """
- names, arrays, metadata = _dataframe_to_arrays(df, False, schema)
- return cls.from_arrays(arrays, names, metadata)
-
- @staticmethod
- def from_arrays(list arrays, list names, dict metadata=None):
- """
- Construct a RecordBatch from multiple pyarrow.Arrays
-
- Parameters
- ----------
- arrays: list of pyarrow.Array
- column-wise data vectors
- names: list of str
- Labels for the columns
-
- Returns
- -------
- pyarrow.table.RecordBatch
- """
- cdef:
- Array arr
- c_string c_name
- shared_ptr[CSchema] schema
- shared_ptr[CRecordBatch] batch
- vector[shared_ptr[CArray]] c_arrays
- int64_t num_rows
- int64_t i
- int64_t number_of_arrays = len(arrays)
-
- if not number_of_arrays:
- raise ValueError('Record batch cannot contain no arrays (for now)')
-
- num_rows = len(arrays[0])
- _schema_from_arrays(arrays, names, metadata or {}, &schema)
-
- c_arrays.reserve(len(arrays))
- for arr in arrays:
- c_arrays.push_back(arr.sp_array)
-
- batch.reset(new CRecordBatch(schema, num_rows, c_arrays))
- return batch_from_cbatch(batch)
-
-
-cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
- cdef:
- PyObject* result_obj
- CColumn* col
- int i
-
- import pandas.core.internals as _int
- from pandas import RangeIndex, Categorical
- from pyarrow.compat import DatetimeTZDtype
-
- with nogil:
- check_status(pyarrow.ConvertTableToPandas(table, nthreads,
- &result_obj))
-
- result = PyObject_to_object(result_obj)
-
- blocks = []
- for item in result:
- block_arr = item['block']
- placement = item['placement']
- if 'dictionary' in item:
- cat = Categorical(block_arr,
- categories=item['dictionary'],
- ordered=False, fastpath=True)
- block = _int.make_block(cat, placement=placement,
- klass=_int.CategoricalBlock,
- fastpath=True)
- elif 'timezone' in item:
- dtype = DatetimeTZDtype('ns', tz=item['timezone'])
- block = _int.make_block(block_arr, placement=placement,
- klass=_int.DatetimeTZBlock,
- dtype=dtype, fastpath=True)
- else:
- block = _int.make_block(block_arr, placement=placement)
- blocks.append(block)
-
- names = []
- for i in range(table.get().num_columns()):
- col = table.get().column(i).get()
- names.append(frombytes(col.name()))
-
- axes = [names, RangeIndex(table.get().num_rows())]
- return _int.BlockManager(blocks, axes)
-
-
-cdef class Table:
- """
- A collection of top-level named, equal length Arrow arrays.
-
- Warning
- -------
- Do not call this class's constructor directly, use one of the ``from_*``
- methods instead.
- """
-
- def __cinit__(self):
- self.table = NULL
-
- def __repr__(self):
- return 'pyarrow.Table\n{0}'.format(str(self.schema))
-
- cdef init(self, const shared_ptr[CTable]& table):
- self.sp_table = table
- self.table = table.get()
-
- cdef _check_nullptr(self):
- if self.table == NULL:
- raise ReferenceError("Table object references a NULL pointer."
- "Not initialized.")
-
- def equals(self, Table other):
- """
- Check if contents of two tables are equal
-
- Parameters
- ----------
- other : pyarrow.Table
-
- Returns
- -------
- are_equal : boolean
- """
- cdef:
- CTable* my_table = self.table
- CTable* other_table = other.table
- c_bool result
-
- self._check_nullptr()
- other._check_nullptr()
-
- with nogil:
- result = my_table.Equals(deref(other_table))
-
- return result
-
- @classmethod
- def from_pandas(cls, df, timestamps_to_ms=False, schema=None):
- """
- Convert pandas.DataFrame to an Arrow Table
-
- Parameters
- ----------
- df: pandas.DataFrame
-
- timestamps_to_ms: bool
- Convert datetime columns to ms resolution. This is needed for
- compability with other functionality like Parquet I/O which
- only supports milliseconds.
-
- schema: pyarrow.Schema (optional)
- The expected schema of the Arrow Table. This can be used to
- indicate the type of columns if we cannot infer it automatically.
-
- Returns
- -------
- pyarrow.table.Table
-
- Examples
- --------
-
- >>> import pandas as pd
- >>> import pyarrow as pa
- >>> df = pd.DataFrame({
- ... 'int': [1, 2],
- ... 'str': ['a', 'b']
- ... })
- >>> pa.Table.from_pandas(df)
- <pyarrow.table.Table object at 0x7f05d1fb1b40>
- """
- names, arrays, metadata = _dataframe_to_arrays(df,
- timestamps_to_ms=timestamps_to_ms,
- schema=schema)
- return cls.from_arrays(arrays, names=names, metadata=metadata)
-
- @staticmethod
- def from_arrays(arrays, names=None, dict metadata=None):
- """
- Construct a Table from Arrow arrays or columns
-
- Parameters
- ----------
- arrays: list of pyarrow.Array or pyarrow.Column
- Equal-length arrays that should form the table.
- names: list of str, optional
- Names for the table columns. If Columns passed, will be
- inferred. If Arrays passed, this argument is required
-
- Returns
- -------
- pyarrow.table.Table
-
- """
- cdef:
- vector[shared_ptr[CColumn]] columns
- shared_ptr[CSchema] schema
- shared_ptr[CTable] table
- size_t K = len(arrays)
-
- _schema_from_arrays(arrays, names, metadata or {}, &schema)
-
- columns.reserve(K)
-
- for i in range(K):
- if isinstance(arrays[i], Array):
- columns.push_back(
- make_shared[CColumn](
- schema.get().field(i),
- (<Array> arrays[i]).sp_array
- )
- )
- elif isinstance(arrays[i], Column):
- columns.push_back((<Column> arrays[i]).sp_column)
- else:
- raise ValueError(type(arrays[i]))
-
- table.reset(new CTable(schema, columns))
- return table_from_ctable(table)
-
- @staticmethod
- def from_batches(batches):
- """
- Construct a Table from a list of Arrow RecordBatches
-
- Parameters
- ----------
-
- batches: list of RecordBatch
- RecordBatch list to be converted, schemas must be equal
- """
- cdef:
- vector[shared_ptr[CRecordBatch]] c_batches
- shared_ptr[CTable] c_table
- RecordBatch batch
-
- for batch in batches:
- c_batches.push_back(batch.sp_batch)
-
- with nogil:
- check_status(CTable.FromRecordBatches(c_batches, &c_table))
-
- return table_from_ctable(c_table)
-
- def to_pandas(self, nthreads=None):
- """
- Convert the arrow::Table to a pandas DataFrame
-
- Parameters
- ----------
- nthreads : int, default max(1, multiprocessing.cpu_count() / 2)
- For the default, we divide the CPU count by 2 because most modern
- computers have hyperthreading turned on, so doubling the CPU count
- beyond the number of physical cores does not help
-
- Returns
- -------
- pandas.DataFrame
- """
- if nthreads is None:
- nthreads = pyarrow._config.cpu_count()
-
- mgr = table_to_blockmanager(self.sp_table, nthreads)
- return _pandas().DataFrame(mgr)
-
- def to_pydict(self):
- """
- Converted the arrow::Table to an OrderedDict
-
- Returns
- -------
- OrderedDict
- """
- entries = []
- for i in range(self.table.num_columns()):
- name = self.column(i).name
- column = self.column(i).to_pylist()
- entries.append((name, column))
- return OrderedDict(entries)
-
- @property
- def schema(self):
- """
- Schema of the table and its columns
-
- Returns
- -------
- pyarrow.schema.Schema
- """
- return box_schema(self.table.schema())
-
- def column(self, index):
- """
- Select a column by its numeric index.
-
- Parameters
- ----------
- index: int
-
- Returns
- -------
- pyarrow.table.Column
- """
- self._check_nullptr()
- cdef Column column = Column()
- column.init(self.table.column(index))
- return column
-
- def __getitem__(self, i):
- return self.column(i)
-
- def itercolumns(self):
- """
- Iterator over all columns in their numerical order
- """
- for i in range(self.num_columns):
- yield self.column(i)
-
- @property
- def num_columns(self):
- """
- Number of columns in this table
-
- Returns
- -------
- int
- """
- self._check_nullptr()
- return self.table.num_columns()
-
- @property
- def num_rows(self):
- """
- Number of rows in this table.
-
- Due to the definition of a table, all columns have the same number of rows.
-
- Returns
- -------
- int
- """
- self._check_nullptr()
- return self.table.num_rows()
-
- def __len__(self):
- return self.num_rows
-
- @property
- def shape(self):
- """
- Dimensions of the table: (#rows, #columns)
-
- Returns
- -------
- (int, int)
- """
- return (self.num_rows, self.num_columns)
-
- def add_column(self, int i, Column column):
- """
- Add column to Table at position. Returns new table
- """
- cdef:
- shared_ptr[CTable] c_table
-
- with nogil:
- check_status(self.table.AddColumn(i, column.sp_column, &c_table))
-
- return table_from_ctable(c_table)
-
- def append_column(self, Column column):
- """
- Append column at end of columns. Returns new table
- """
- return self.add_column(self.num_columns, column)
-
- def remove_column(self, int i):
- """
- Create new Table with the indicated column removed
- """
- cdef shared_ptr[CTable] c_table
-
- with nogil:
- check_status(self.table.RemoveColumn(i, &c_table))
-
- return table_from_ctable(c_table)
-
-
-def concat_tables(tables):
- """
- Perform zero-copy concatenation of pyarrow.Table objects. Raises exception
- if all of the Table schemas are not the same
-
- Parameters
- ----------
- tables : iterable of pyarrow.Table objects
- output_name : string, default None
- A name for the output table, if any
- """
- cdef:
- vector[shared_ptr[CTable]] c_tables
- shared_ptr[CTable] c_result
- Table table
-
- for table in tables:
- c_tables.push_back(table.sp_table)
-
- with nogil:
- check_status(ConcatenateTables(c_tables, &c_result))
-
- return table_from_ctable(c_result)
-
-
-cdef object box_column(const shared_ptr[CColumn]& ccolumn):
- cdef Column column = Column()
- column.init(ccolumn)
- return column
-
-
-cdef api object table_from_ctable(const shared_ptr[CTable]& ctable):
- cdef Table table = Table()
- table.init(ctable)
- return table
-
-
-cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch):
- cdef RecordBatch batch = RecordBatch()
- batch.init(cbatch)
- return batch