You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/05/13 19:44:54 UTC

[3/4] arrow git commit: ARROW-819: Public Cython and C++ API in the style of lxml, arrow::py::import_pyarrow method

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/_io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_io.pyx b/python/pyarrow/_io.pyx
deleted file mode 100644
index e9e2ba0..0000000
--- a/python/pyarrow/_io.pyx
+++ /dev/null
@@ -1,1274 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Cython wrappers for IO interfaces defined in arrow::io and messaging in
-# arrow::ipc
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from cython.operator cimport dereference as deref
-from libc.stdlib cimport malloc, free
-from pyarrow.includes.libarrow cimport *
-cimport pyarrow.includes.pyarrow as pyarrow
-from pyarrow._array cimport Array, Tensor, box_tensor, Schema
-from pyarrow._error cimport check_status
-from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
-from pyarrow._table cimport (Column, RecordBatch, batch_from_cbatch,
-                             table_from_ctable)
-cimport cpython as cp
-
-import pyarrow._config
-from pyarrow.compat import frombytes, tobytes, encode_file_path
-
-import re
-import six
-import sys
-import threading
-import time
-
-
-# 64K
-DEFAULT_BUFFER_SIZE = 2 ** 16
-
-
-# To let us get a PyObject* and avoid Cython auto-ref-counting
-cdef extern from "Python.h":
-    PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
-        char *v, Py_ssize_t len) except NULL
-
-cdef class NativeFile:
-
-    def __cinit__(self):
-        self.is_open = False
-        self.own_file = False
-
-    def __dealloc__(self):
-        if self.is_open and self.own_file:
-            self.close()
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exc_type, exc_value, tb):
-        self.close()
-
-    def close(self):
-        if self.is_open:
-            with nogil:
-                if self.is_readable:
-                    check_status(self.rd_file.get().Close())
-                else:
-                    check_status(self.wr_file.get().Close())
-        self.is_open = False
-
-    cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
-        self._assert_readable()
-        file[0] = <shared_ptr[RandomAccessFile]> self.rd_file
-
-    cdef write_handle(self, shared_ptr[OutputStream]* file):
-        self._assert_writeable()
-        file[0] = <shared_ptr[OutputStream]> self.wr_file
-
-    def _assert_readable(self):
-        if not self.is_readable:
-            raise IOError("only valid on readonly files")
-
-        if not self.is_open:
-            raise IOError("file not open")
-
-    def _assert_writeable(self):
-        if not self.is_writeable:
-            raise IOError("only valid on writeable files")
-
-        if not self.is_open:
-            raise IOError("file not open")
-
-    def size(self):
-        cdef int64_t size
-        self._assert_readable()
-        with nogil:
-            check_status(self.rd_file.get().GetSize(&size))
-        return size
-
-    def tell(self):
-        cdef int64_t position
-        with nogil:
-            if self.is_readable:
-                check_status(self.rd_file.get().Tell(&position))
-            else:
-                check_status(self.wr_file.get().Tell(&position))
-        return position
-
-    def seek(self, int64_t position):
-        self._assert_readable()
-        with nogil:
-            check_status(self.rd_file.get().Seek(position))
-
-    def write(self, data):
-        """
-        Write byte from any object implementing buffer protocol (bytes,
-        bytearray, ndarray, pyarrow.Buffer)
-        """
-        self._assert_writeable()
-
-        if isinstance(data, six.string_types):
-            data = tobytes(data)
-
-        cdef Buffer arrow_buffer = frombuffer(data)
-
-        cdef const uint8_t* buf = arrow_buffer.buffer.get().data()
-        cdef int64_t bufsize = len(arrow_buffer)
-        with nogil:
-            check_status(self.wr_file.get().Write(buf, bufsize))
-
-    def read(self, nbytes=None):
-        cdef:
-            int64_t c_nbytes
-            int64_t bytes_read = 0
-            PyObject* obj
-
-        if nbytes is None:
-            c_nbytes = self.size() - self.tell()
-        else:
-            c_nbytes = nbytes
-
-        self._assert_readable()
-
-        # Allocate empty write space
-        obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes)
-
-        cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
-        with nogil:
-            check_status(self.rd_file.get().Read(c_nbytes, &bytes_read, buf))
-
-        if bytes_read < c_nbytes:
-            cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read)
-
-        return PyObject_to_object(obj)
-
-    def read_buffer(self, nbytes=None):
-        cdef:
-            int64_t c_nbytes
-            int64_t bytes_read = 0
-            shared_ptr[CBuffer] output
-        self._assert_readable()
-
-        if nbytes is None:
-            c_nbytes = self.size() - self.tell()
-        else:
-            c_nbytes = nbytes
-
-        with nogil:
-            check_status(self.rd_file.get().ReadB(c_nbytes, &output))
-
-        return wrap_buffer(output)
-
-    def download(self, stream_or_path, buffer_size=None):
-        """
-        Read file completely to local path (rather than reading completely into
-        memory). First seeks to the beginning of the file.
-        """
-        cdef:
-            int64_t bytes_read = 0
-            uint8_t* buf
-        self._assert_readable()
-
-        buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
-
-        write_queue = Queue(50)
-
-        if not hasattr(stream_or_path, 'read'):
-            stream = open(stream_or_path, 'wb')
-            cleanup = lambda: stream.close()
-        else:
-            stream = stream_or_path
-            cleanup = lambda: None
-
-        done = False
-        exc_info = None
-        def bg_write():
-            try:
-                while not done or write_queue.qsize() > 0:
-                    try:
-                        buf = write_queue.get(timeout=0.01)
-                    except QueueEmpty:
-                        continue
-                    stream.write(buf)
-            except Exception as e:
-                exc_info = sys.exc_info()
-            finally:
-                cleanup()
-
-        self.seek(0)
-
-        writer_thread = threading.Thread(target=bg_write)
-
-        # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
-        # the passed buffer, so it's hard for us to avoid doubling the memory
-        buf = <uint8_t*> malloc(buffer_size)
-        if buf == NULL:
-            raise MemoryError("Failed to allocate {0} bytes"
-                              .format(buffer_size))
-
-        writer_thread.start()
-
-        cdef int64_t total_bytes = 0
-        cdef int32_t c_buffer_size = buffer_size
-
-        try:
-            while True:
-                with nogil:
-                    check_status(self.rd_file.get()
-                                 .Read(c_buffer_size, &bytes_read, buf))
-
-                total_bytes += bytes_read
-
-                # EOF
-                if bytes_read == 0:
-                    break
-
-                pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
-                                                     bytes_read)
-
-                write_queue.put_nowait(pybuf)
-        finally:
-            free(buf)
-            done = True
-
-        writer_thread.join()
-        if exc_info is not None:
-            raise exc_info[0], exc_info[1], exc_info[2]
-
-    def upload(self, stream, buffer_size=None):
-        """
-        Pipe file-like object to file
-        """
-        write_queue = Queue(50)
-        self._assert_writeable()
-
-        buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
-
-        done = False
-        exc_info = None
-        def bg_write():
-            try:
-                while not done or write_queue.qsize() > 0:
-                    try:
-                        buf = write_queue.get(timeout=0.01)
-                    except QueueEmpty:
-                        continue
-
-                    self.write(buf)
-
-            except Exception as e:
-                exc_info = sys.exc_info()
-
-        writer_thread = threading.Thread(target=bg_write)
-        writer_thread.start()
-
-        try:
-            while True:
-                buf = stream.read(buffer_size)
-                if not buf:
-                    break
-
-                if writer_thread.is_alive():
-                    while write_queue.full():
-                        time.sleep(0.01)
-                else:
-                    break
-
-                write_queue.put_nowait(buf)
-        finally:
-            done = True
-
-        writer_thread.join()
-        if exc_info is not None:
-            raise exc_info[0], exc_info[1], exc_info[2]
-
-
-# ----------------------------------------------------------------------
-# Python file-like objects
-
-
-cdef class PythonFile(NativeFile):
-    cdef:
-        object handle
-
-    def __cinit__(self, handle, mode='w'):
-        self.handle = handle
-
-        if mode.startswith('w'):
-            self.wr_file.reset(new pyarrow.PyOutputStream(handle))
-            self.is_readable = 0
-            self.is_writeable = 1
-        elif mode.startswith('r'):
-            self.rd_file.reset(new pyarrow.PyReadableFile(handle))
-            self.is_readable = 1
-            self.is_writeable = 0
-        else:
-            raise ValueError('Invalid file mode: {0}'.format(mode))
-
-        self.is_open = True
-
-
-cdef class MemoryMappedFile(NativeFile):
-    """
-    Supports 'r', 'r+w', 'w' modes
-    """
-    cdef:
-        object path
-
-    def __cinit__(self):
-        self.is_open = False
-        self.is_readable = 0
-        self.is_writeable = 0
-
-    @staticmethod
-    def create(path, size):
-        cdef:
-            shared_ptr[CMemoryMappedFile] handle
-            c_string c_path = encode_file_path(path)
-            int64_t c_size = size
-
-        with nogil:
-            check_status(CMemoryMappedFile.Create(c_path, c_size, &handle))
-
-        cdef MemoryMappedFile result = MemoryMappedFile()
-        result.path = path
-        result.is_readable = 1
-        result.is_writeable = 1
-        result.wr_file = <shared_ptr[OutputStream]> handle
-        result.rd_file = <shared_ptr[RandomAccessFile]> handle
-        result.is_open = True
-
-        return result
-
-    def open(self, path, mode='r'):
-        self.path = path
-
-        cdef:
-            FileMode c_mode
-            shared_ptr[CMemoryMappedFile] handle
-            c_string c_path = encode_file_path(path)
-
-        if mode in ('r', 'rb'):
-            c_mode = FileMode_READ
-            self.is_readable = 1
-        elif mode in ('w', 'wb'):
-            c_mode = FileMode_WRITE
-            self.is_writeable = 1
-        elif mode == 'r+w':
-            c_mode = FileMode_READWRITE
-            self.is_readable = 1
-            self.is_writeable = 1
-        else:
-            raise ValueError('Invalid file mode: {0}'.format(mode))
-
-        check_status(CMemoryMappedFile.Open(c_path, c_mode, &handle))
-
-        self.wr_file = <shared_ptr[OutputStream]> handle
-        self.rd_file = <shared_ptr[RandomAccessFile]> handle
-        self.is_open = True
-
-
-def memory_map(path, mode='r'):
-    """
-    Open memory map at file path. Size of the memory map cannot change
-
-    Parameters
-    ----------
-    path : string
-    mode : {'r', 'w'}, default 'r'
-
-    Returns
-    -------
-    mmap : MemoryMappedFile
-    """
-    cdef MemoryMappedFile mmap = MemoryMappedFile()
-    mmap.open(path, mode)
-    return mmap
-
-
-def create_memory_map(path, size):
-    """
-    Create memory map at indicated path of the given size, return open
-    writeable file object
-
-    Parameters
-    ----------
-    path : string
-    size : int
-
-    Returns
-    -------
-    mmap : MemoryMappedFile
-    """
-    return MemoryMappedFile.create(path, size)
-
-
-cdef class OSFile(NativeFile):
-    """
-    Supports 'r', 'w' modes
-    """
-    cdef:
-        object path
-
-    def __cinit__(self, path, mode='r', MemoryPool memory_pool=None):
-        self.path = path
-
-        cdef:
-            FileMode c_mode
-            shared_ptr[Readable] handle
-            c_string c_path = encode_file_path(path)
-
-        self.is_readable = self.is_writeable = 0
-
-        if mode in ('r', 'rb'):
-            self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
-        elif mode in ('w', 'wb'):
-            self._open_writeable(c_path)
-        else:
-            raise ValueError('Invalid file mode: {0}'.format(mode))
-
-        self.is_open = True
-
-    cdef _open_readable(self, c_string path, CMemoryPool* pool):
-        cdef shared_ptr[ReadableFile] handle
-
-        with nogil:
-            check_status(ReadableFile.Open(path, pool, &handle))
-
-        self.is_readable = 1
-        self.rd_file = <shared_ptr[RandomAccessFile]> handle
-
-    cdef _open_writeable(self, c_string path):
-        cdef shared_ptr[FileOutputStream] handle
-
-        with nogil:
-            check_status(FileOutputStream.Open(path, &handle))
-        self.is_writeable = 1
-        self.wr_file = <shared_ptr[OutputStream]> handle
-
-
-# ----------------------------------------------------------------------
-# Arrow buffers
-
-
-cdef class Buffer:
-
-    def __cinit__(self):
-        pass
-
-    cdef init(self, const shared_ptr[CBuffer]& buffer):
-        self.buffer = buffer
-        self.shape[0] = self.size
-        self.strides[0] = <Py_ssize_t>(1)
-
-    def __len__(self):
-        return self.size
-
-    property size:
-
-        def __get__(self):
-            return self.buffer.get().size()
-
-    property parent:
-
-        def __get__(self):
-            cdef shared_ptr[CBuffer] parent_buf = self.buffer.get().parent()
-
-            if parent_buf.get() == NULL:
-                return None
-            else:
-                return wrap_buffer(parent_buf)
-
-    def __getitem__(self, key):
-        # TODO(wesm): buffer slicing
-        raise NotImplementedError
-
-    def to_pybytes(self):
-        return cp.PyBytes_FromStringAndSize(
-            <const char*>self.buffer.get().data(),
-            self.buffer.get().size())
-
-    def __getbuffer__(self, cp.Py_buffer* buffer, int flags):
-
-        buffer.buf = <char *>self.buffer.get().data()
-        buffer.format = 'b'
-        buffer.internal = NULL
-        buffer.itemsize = 1
-        buffer.len = self.size
-        buffer.ndim = 1
-        buffer.obj = self
-        buffer.readonly = 1
-        buffer.shape = self.shape
-        buffer.strides = self.strides
-        buffer.suboffsets = NULL
-
-
-cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool):
-    cdef shared_ptr[PoolBuffer] result
-    result.reset(new PoolBuffer(pool))
-    return result
-
-
-cdef class InMemoryOutputStream(NativeFile):
-
-    cdef:
-        shared_ptr[PoolBuffer] buffer
-
-    def __cinit__(self, MemoryPool memory_pool=None):
-        self.buffer = allocate_buffer(maybe_unbox_memory_pool(memory_pool))
-        self.wr_file.reset(new BufferOutputStream(
-            <shared_ptr[ResizableBuffer]> self.buffer))
-        self.is_readable = 0
-        self.is_writeable = 1
-        self.is_open = True
-
-    def get_result(self):
-        check_status(self.wr_file.get().Close())
-        self.is_open = False
-        return wrap_buffer(<shared_ptr[CBuffer]> self.buffer)
-
-
-cdef class BufferReader(NativeFile):
-    """
-    Zero-copy reader from objects convertible to Arrow buffer
-
-    Parameters
-    ----------
-    obj : Python bytes or pyarrow.io.Buffer
-    """
-    cdef:
-        Buffer buffer
-
-    def __cinit__(self, object obj):
-
-        if isinstance(obj, Buffer):
-            self.buffer = obj
-        else:
-            self.buffer = frombuffer(obj)
-
-        self.rd_file.reset(new CBufferReader(self.buffer.buffer))
-        self.is_readable = 1
-        self.is_writeable = 0
-        self.is_open = True
-
-
-def frombuffer(object obj):
-    """
-    Construct an Arrow buffer from a Python bytes object
-    """
-    cdef shared_ptr[CBuffer] buf
-    try:
-        memoryview(obj)
-        buf.reset(new pyarrow.PyBuffer(obj))
-        return wrap_buffer(buf)
-    except TypeError:
-        raise ValueError('Must pass object that implements buffer protocol')
-
-
-
-cdef Buffer wrap_buffer(const shared_ptr[CBuffer]& buf):
-    cdef Buffer result = Buffer()
-    result.init(buf)
-    return result
-
-
-cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader):
-    cdef NativeFile nf
-
-    if isinstance(source, six.string_types):
-        source = memory_map(source, mode='r')
-    elif isinstance(source, Buffer):
-        source = BufferReader(source)
-    elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
-        # Optimistically hope this is file-like
-        source = PythonFile(source, mode='r')
-
-    if isinstance(source, NativeFile):
-        nf = source
-
-        # TODO: what about read-write sources (e.g. memory maps)
-        if not nf.is_readable:
-            raise IOError('Native file is not readable')
-
-        nf.read_handle(reader)
-    else:
-        raise TypeError('Unable to read from object of type: {0}'
-                        .format(type(source)))
-
-
-cdef get_writer(object source, shared_ptr[OutputStream]* writer):
-    cdef NativeFile nf
-
-    if isinstance(source, six.string_types):
-        source = OSFile(source, mode='w')
-    elif not isinstance(source, NativeFile) and hasattr(source, 'write'):
-        # Optimistically hope this is file-like
-        source = PythonFile(source, mode='w')
-
-    if isinstance(source, NativeFile):
-        nf = source
-
-        if nf.is_readable:
-            raise IOError('Native file is not writeable')
-
-        nf.write_handle(writer)
-    else:
-        raise TypeError('Unable to read from object of type: {0}'
-                        .format(type(source)))
-
-# ----------------------------------------------------------------------
-# HDFS IO implementation
-
-_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)')
-
-try:
-    # Python 3
-    from queue import Queue, Empty as QueueEmpty, Full as QueueFull
-except ImportError:
-    from Queue import Queue, Empty as QueueEmpty, Full as QueueFull
-
-
-def have_libhdfs():
-    try:
-        check_status(HaveLibHdfs())
-        return True
-    except:
-        return False
-
-
-def have_libhdfs3():
-    try:
-        check_status(HaveLibHdfs3())
-        return True
-    except:
-        return False
-
-
-def strip_hdfs_abspath(path):
-    m = _HDFS_PATH_RE.match(path)
-    if m:
-        return m.group(3)
-    else:
-        return path
-
-
-cdef class _HdfsClient:
-    cdef:
-        shared_ptr[CHdfsClient] client
-
-    cdef readonly:
-        bint is_open
-
-    def __cinit__(self):
-        pass
-
-    def _connect(self, host, port, user, kerb_ticket, driver):
-        cdef HdfsConnectionConfig conf
-
-        if host is not None:
-            conf.host = tobytes(host)
-        conf.port = port
-        if user is not None:
-            conf.user = tobytes(user)
-        if kerb_ticket is not None:
-            conf.kerb_ticket = tobytes(kerb_ticket)
-
-        if driver == 'libhdfs':
-            check_status(HaveLibHdfs())
-            conf.driver = HdfsDriver_LIBHDFS
-        else:
-            check_status(HaveLibHdfs3())
-            conf.driver = HdfsDriver_LIBHDFS3
-
-        with nogil:
-            check_status(CHdfsClient.Connect(&conf, &self.client))
-        self.is_open = True
-
-    @classmethod
-    def connect(cls, *args, **kwargs):
-        return cls(*args, **kwargs)
-
-    def __dealloc__(self):
-        if self.is_open:
-            self.close()
-
-    def close(self):
-        """
-        Disconnect from the HDFS cluster
-        """
-        self._ensure_client()
-        with nogil:
-            check_status(self.client.get().Disconnect())
-        self.is_open = False
-
-    cdef _ensure_client(self):
-        if self.client.get() == NULL:
-            raise IOError('HDFS client improperly initialized')
-        elif not self.is_open:
-            raise IOError('HDFS client is closed')
-
-    def exists(self, path):
-        """
-        Returns True if the path is known to the cluster, False if it does not
-        (or there is an RPC error)
-        """
-        self._ensure_client()
-
-        cdef c_string c_path = tobytes(path)
-        cdef c_bool result
-        with nogil:
-            result = self.client.get().Exists(c_path)
-        return result
-
-    def isdir(self, path):
-        cdef HdfsPathInfo info
-        self._path_info(path, &info)
-        return info.kind == ObjectType_DIRECTORY
-
-    def isfile(self, path):
-        cdef HdfsPathInfo info
-        self._path_info(path, &info)
-        return info.kind == ObjectType_FILE
-
-    cdef _path_info(self, path, HdfsPathInfo* info):
-        cdef c_string c_path = tobytes(path)
-
-        with nogil:
-            check_status(self.client.get()
-                         .GetPathInfo(c_path, info))
-
-
-    def ls(self, path, bint full_info):
-        cdef:
-            c_string c_path = tobytes(path)
-            vector[HdfsPathInfo] listing
-            list results = []
-            int i
-
-        self._ensure_client()
-
-        with nogil:
-            check_status(self.client.get()
-                         .ListDirectory(c_path, &listing))
-
-        cdef const HdfsPathInfo* info
-        for i in range(<int> listing.size()):
-            info = &listing[i]
-
-            # Try to trim off the hdfs://HOST:PORT piece
-            name = strip_hdfs_abspath(frombytes(info.name))
-
-            if full_info:
-                kind = ('file' if info.kind == ObjectType_FILE
-                        else 'directory')
-
-                results.append({
-                    'kind': kind,
-                    'name': name,
-                    'owner': frombytes(info.owner),
-                    'group': frombytes(info.group),
-                    'list_modified_time': info.last_modified_time,
-                    'list_access_time': info.last_access_time,
-                    'size': info.size,
-                    'replication': info.replication,
-                    'block_size': info.block_size,
-                    'permissions': info.permissions
-                })
-            else:
-                results.append(name)
-
-        return results
-
-    def mkdir(self, path):
-        """
-        Create indicated directory and any necessary parent directories
-        """
-        self._ensure_client()
-
-        cdef c_string c_path = tobytes(path)
-        with nogil:
-            check_status(self.client.get()
-                         .MakeDirectory(c_path))
-
-    def delete(self, path, bint recursive=False):
-        """
-        Delete the indicated file or directory
-
-        Parameters
-        ----------
-        path : string
-        recursive : boolean, default False
-            If True, also delete child paths for directories
-        """
-        self._ensure_client()
-
-        cdef c_string c_path = tobytes(path)
-        with nogil:
-            check_status(self.client.get()
-                         .Delete(c_path, recursive))
-
-    def open(self, path, mode='rb', buffer_size=None, replication=None,
-             default_block_size=None):
-        """
-        Parameters
-        ----------
-        mode : string, 'rb', 'wb', 'ab'
-        """
-        self._ensure_client()
-
-        cdef HdfsFile out = HdfsFile()
-
-        if mode not in ('rb', 'wb', 'ab'):
-            raise Exception("Mode must be 'rb' (read), "
-                            "'wb' (write, new file), or 'ab' (append)")
-
-        cdef c_string c_path = tobytes(path)
-        cdef c_bool append = False
-
-        # 0 in libhdfs means "use the default"
-        cdef int32_t c_buffer_size = buffer_size or 0
-        cdef int16_t c_replication = replication or 0
-        cdef int64_t c_default_block_size = default_block_size or 0
-
-        cdef shared_ptr[HdfsOutputStream] wr_handle
-        cdef shared_ptr[HdfsReadableFile] rd_handle
-
-        if mode in ('wb', 'ab'):
-            if mode == 'ab':
-                append = True
-
-            with nogil:
-                check_status(
-                    self.client.get()
-                    .OpenWriteable(c_path, append, c_buffer_size,
-                                   c_replication, c_default_block_size,
-                                   &wr_handle))
-
-            out.wr_file = <shared_ptr[OutputStream]> wr_handle
-
-            out.is_readable = False
-            out.is_writeable = 1
-        else:
-            with nogil:
-                check_status(self.client.get()
-                             .OpenReadable(c_path, &rd_handle))
-
-            out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle
-            out.is_readable = True
-            out.is_writeable = 0
-
-        if c_buffer_size == 0:
-            c_buffer_size = 2 ** 16
-
-        out.mode = mode
-        out.buffer_size = c_buffer_size
-        out.parent = _HdfsFileNanny(self, out)
-        out.is_open = True
-        out.own_file = True
-
-        return out
-
-    def download(self, path, stream, buffer_size=None):
-        with self.open(path, 'rb') as f:
-            f.download(stream, buffer_size=buffer_size)
-
-    def upload(self, path, stream, buffer_size=None):
-        """
-        Upload file-like object to HDFS path
-        """
-        with self.open(path, 'wb') as f:
-            f.upload(stream, buffer_size=buffer_size)
-
-
-# ARROW-404: Helper class to ensure that files are closed before the
-# client. During deallocation of the extension class, the attributes are
-# decref'd which can cause the client to get closed first if the file has the
-# last remaining reference
-cdef class _HdfsFileNanny:
-    cdef:
-        object client
-        object file_handle_ref
-
-    def __cinit__(self, client, file_handle):
-        import weakref
-        self.client = client
-        self.file_handle_ref = weakref.ref(file_handle)
-
-    def __dealloc__(self):
-        fh = self.file_handle_ref()
-        if fh:
-            fh.close()
-        # avoid cyclic GC
-        self.file_handle_ref = None
-        self.client = None
-
-
-cdef class HdfsFile(NativeFile):
-    cdef readonly:
-        int32_t buffer_size
-        object mode
-        object parent
-
-    cdef object __weakref__
-
-    def __dealloc__(self):
-        self.parent = None
-
-# ----------------------------------------------------------------------
-# File and stream readers and writers
-
-cdef class _StreamWriter:
-    cdef:
-        shared_ptr[CStreamWriter] writer
-        shared_ptr[OutputStream] sink
-        bint closed
-
-    def __cinit__(self):
-        self.closed = True
-
-    def __dealloc__(self):
-        if not self.closed:
-            self.close()
-
-    def _open(self, sink, Schema schema):
-        get_writer(sink, &self.sink)
-
-        with nogil:
-            check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema,
-                                            &self.writer))
-
-        self.closed = False
-
-    def write_batch(self, RecordBatch batch):
-        with nogil:
-            check_status(self.writer.get()
-                         .WriteRecordBatch(deref(batch.batch)))
-
-    def close(self):
-        with nogil:
-            check_status(self.writer.get().Close())
-        self.closed = True
-
-
-cdef class _StreamReader:
-    cdef:
-        shared_ptr[CStreamReader] reader
-
-    cdef readonly:
-        Schema schema
-
-    def __cinit__(self):
-        pass
-
-    def _open(self, source):
-        cdef:
-            shared_ptr[RandomAccessFile] reader
-            shared_ptr[InputStream] in_stream
-
-        get_reader(source, &reader)
-        in_stream = <shared_ptr[InputStream]> reader
-
-        with nogil:
-            check_status(CStreamReader.Open(in_stream, &self.reader))
-
-        self.schema = Schema()
-        self.schema.init_schema(self.reader.get().schema())
-
-    def get_next_batch(self):
-        """
-        Read next RecordBatch from the stream. Raises StopIteration at end of
-        stream
-        """
-        cdef shared_ptr[CRecordBatch] batch
-
-        with nogil:
-            check_status(self.reader.get().GetNextRecordBatch(&batch))
-
-        if batch.get() == NULL:
-            raise StopIteration
-
-        return batch_from_cbatch(batch)
-
-    def read_all(self):
-        """
-        Read all record batches as a pyarrow.Table
-        """
-        cdef:
-            vector[shared_ptr[CRecordBatch]] batches
-            shared_ptr[CRecordBatch] batch
-            shared_ptr[CTable] table
-
-        with nogil:
-            while True:
-                check_status(self.reader.get().GetNextRecordBatch(&batch))
-                if batch.get() == NULL:
-                    break
-                batches.push_back(batch)
-
-            check_status(CTable.FromRecordBatches(batches, &table))
-
-        return table_from_ctable(table)
-
-
-cdef class _FileWriter(_StreamWriter):
-
-    def _open(self, sink, Schema schema):
-        cdef shared_ptr[CFileWriter] writer
-        get_writer(sink, &self.sink)
-
-        with nogil:
-            check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema,
-                                          &writer))
-
-        # Cast to base class, because has same interface
-        self.writer = <shared_ptr[CStreamWriter]> writer
-        self.closed = False
-
-
-cdef class _FileReader:
-    cdef:
-        shared_ptr[CFileReader] reader
-
-    def __cinit__(self):
-        pass
-
-    def _open(self, source, footer_offset=None):
-        cdef shared_ptr[RandomAccessFile] reader
-        get_reader(source, &reader)
-
-        cdef int64_t offset = 0
-        if footer_offset is not None:
-            offset = footer_offset
-
-        with nogil:
-            if offset != 0:
-                check_status(CFileReader.Open2(reader, offset, &self.reader))
-            else:
-                check_status(CFileReader.Open(reader, &self.reader))
-
-    property num_record_batches:
-
-        def __get__(self):
-            return self.reader.get().num_record_batches()
-
-    def get_batch(self, int i):
-        cdef shared_ptr[CRecordBatch] batch
-
-        if i < 0 or i >= self.num_record_batches:
-            raise ValueError('Batch number {0} out of range'.format(i))
-
-        with nogil:
-            check_status(self.reader.get().GetRecordBatch(i, &batch))
-
-        return batch_from_cbatch(batch)
-
-    # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
-    # time has passed
-    get_record_batch = get_batch
-
-    def read_all(self):
-        """
-        Read all record batches as a pyarrow.Table
-        """
-        cdef:
-            vector[shared_ptr[CRecordBatch]] batches
-            shared_ptr[CTable] table
-            int i, nbatches
-
-        nbatches = self.num_record_batches
-
-        batches.resize(nbatches)
-        with nogil:
-            for i in range(nbatches):
-                check_status(self.reader.get().GetRecordBatch(i, &batches[i]))
-            check_status(CTable.FromRecordBatches(batches, &table))
-
-        return table_from_ctable(table)
-
-
-#----------------------------------------------------------------------
-# Implement legacy Feather file format
-
-
-class FeatherError(Exception):
-    pass
-
-
-cdef class FeatherWriter:
-    cdef:
-        unique_ptr[CFeatherWriter] writer
-
-    cdef public:
-        int64_t num_rows
-
-    def __cinit__(self):
-        self.num_rows = -1
-
-    def open(self, object dest):
-        cdef shared_ptr[OutputStream] sink
-        get_writer(dest, &sink)
-
-        with nogil:
-            check_status(CFeatherWriter.Open(sink, &self.writer))
-
-    def close(self):
-        if self.num_rows < 0:
-            self.num_rows = 0
-        self.writer.get().SetNumRows(self.num_rows)
-        check_status(self.writer.get().Finalize())
-
-    def write_array(self, object name, object col, object mask=None):
-        cdef Array arr
-
-        if self.num_rows >= 0:
-            if len(col) != self.num_rows:
-                raise ValueError('prior column had a different number of rows')
-        else:
-            self.num_rows = len(col)
-
-        if isinstance(col, Array):
-            arr = col
-        else:
-            arr = Array.from_pandas(col, mask=mask)
-
-        cdef c_string c_name = tobytes(name)
-
-        with nogil:
-            check_status(
-                self.writer.get().Append(c_name, deref(arr.sp_array)))
-
-
-cdef class FeatherReader:
-    cdef:
-        unique_ptr[CFeatherReader] reader
-
-    def __cinit__(self):
-        pass
-
-    def open(self, source):
-        cdef shared_ptr[RandomAccessFile] reader
-        get_reader(source, &reader)
-
-        with nogil:
-            check_status(CFeatherReader.Open(reader, &self.reader))
-
-    property num_rows:
-
-        def __get__(self):
-            return self.reader.get().num_rows()
-
-    property num_columns:
-
-        def __get__(self):
-            return self.reader.get().num_columns()
-
-    def get_column_name(self, int i):
-        cdef c_string name = self.reader.get().GetColumnName(i)
-        return frombytes(name)
-
-    def get_column(self, int i):
-        if i < 0 or i >= self.num_columns:
-            raise IndexError(i)
-
-        cdef shared_ptr[CColumn] sp_column
-        with nogil:
-            check_status(self.reader.get()
-                         .GetColumn(i, &sp_column))
-
-        cdef Column col = Column()
-        col.init(sp_column)
-        return col
-
-
-def get_tensor_size(Tensor tensor):
-    """
-    Return total size of serialized Tensor including metadata and padding
-    """
-    cdef int64_t size
-    with nogil:
-        check_status(GetTensorSize(deref(tensor.tp), &size))
-    return size
-
-
-def get_record_batch_size(RecordBatch batch):
-    """
-    Return total size of serialized RecordBatch including metadata and padding
-    """
-    cdef int64_t size
-    with nogil:
-        check_status(GetRecordBatchSize(deref(batch.batch), &size))
-    return size
-
-
-def write_tensor(Tensor tensor, NativeFile dest):
-    """
-    Write pyarrow.Tensor to pyarrow.NativeFile object its current position
-
-    Parameters
-    ----------
-    tensor : pyarrow.Tensor
-    dest : pyarrow.NativeFile
-
-    Returns
-    -------
-    bytes_written : int
-        Total number of bytes written to the file
-    """
-    cdef:
-        int32_t metadata_length
-        int64_t body_length
-
-    dest._assert_writeable()
-
-    with nogil:
-        check_status(
-            WriteTensor(deref(tensor.tp), dest.wr_file.get(),
-                        &metadata_length, &body_length))
-
-    return metadata_length + body_length
-
-
-def read_tensor(NativeFile source):
-    """
-    Read pyarrow.Tensor from pyarrow.NativeFile object from current
-    position. If the file source supports zero copy (e.g. a memory map), then
-    this operation does not allocate any memory
-
-    Parameters
-    ----------
-    source : pyarrow.NativeFile
-
-    Returns
-    -------
-    tensor : Tensor
-    """
-    cdef:
-        shared_ptr[CTensor] sp_tensor
-
-    source._assert_writeable()
-
-    cdef int64_t offset = source.tell()
-    with nogil:
-        check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor))
-
-    return box_tensor(sp_tensor)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/_jemalloc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_jemalloc.pyx b/python/pyarrow/_jemalloc.pyx
index 3b41964..6f00c9d 100644
--- a/python/pyarrow/_jemalloc.pyx
+++ b/python/pyarrow/_jemalloc.pyx
@@ -20,7 +20,7 @@
 # cython: embedsignature = True
 
 from pyarrow.includes.libarrow_jemalloc cimport CJemallocMemoryPool
-from pyarrow._memory cimport MemoryPool
+from pyarrow.lib cimport MemoryPool
 
 def default_pool():
     cdef MemoryPool pool = MemoryPool()

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/_memory.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_memory.pxd b/python/pyarrow/_memory.pxd
deleted file mode 100644
index bb1af85..0000000
--- a/python/pyarrow/_memory.pxd
+++ /dev/null
@@ -1,30 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool
-
-
-cdef class MemoryPool:
-    cdef:
-        CMemoryPool* pool
-
-    cdef init(self, CMemoryPool* pool)
-
-cdef class LoggingMemoryPool(MemoryPool):
-    pass
-
-cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/_memory.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_memory.pyx b/python/pyarrow/_memory.pyx
deleted file mode 100644
index 8b73a17..0000000
--- a/python/pyarrow/_memory.pyx
+++ /dev/null
@@ -1,58 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool
-from pyarrow.includes.pyarrow cimport set_default_memory_pool, get_memory_pool
-
-
-cdef class MemoryPool:
-    cdef init(self, CMemoryPool* pool):
-        self.pool = pool
-
-    def bytes_allocated(self):
-        return self.pool.bytes_allocated()
-
-
-cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool):
-    if memory_pool is None:
-        return get_memory_pool()
-    else:
-        return memory_pool.pool
-
-
-cdef class LoggingMemoryPool(MemoryPool):
-    pass
-
-
-def default_memory_pool():
-    cdef:
-        MemoryPool pool = MemoryPool()
-    pool.init(get_memory_pool())
-    return pool
-
-
-def set_memory_pool(MemoryPool pool):
-    set_default_memory_pool(pool.pool)
-
-
-def total_allocated_bytes():
-    cdef CMemoryPool* pool = get_memory_pool()
-    return pool.bytes_allocated()

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index c06eab2..51bd938 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -22,16 +22,16 @@
 from cython.operator cimport dereference as deref
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport *
-cimport pyarrow.includes.pyarrow as pyarrow
-from pyarrow._array cimport Array, Schema, box_schema
-from pyarrow._error cimport check_status
-from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
-from pyarrow._table cimport Table, table_from_ctable
-from pyarrow._io cimport NativeFile, get_reader, get_writer
+from pyarrow.lib cimport (Array, Schema,
+                          check_status,
+                          MemoryPool, maybe_unbox_memory_pool,
+                          Table,
+                          pyarrow_wrap_schema,
+                          pyarrow_wrap_table,
+                          NativeFile, get_reader, get_writer)
 
 from pyarrow.compat import tobytes, frombytes
-from pyarrow._error import ArrowException
-from pyarrow._io import NativeFile
+from pyarrow.lib import ArrowException, NativeFile
 
 import six
 
@@ -213,7 +213,7 @@ cdef class ParquetSchema:
         with nogil:
             check_status(FromParquetSchema(self.schema, &sp_arrow_schema))
 
-        return box_schema(sp_arrow_schema)
+        return pyarrow_wrap_schema(sp_arrow_schema)
 
     def equals(self, ParquetSchema other):
         """
@@ -426,7 +426,7 @@ cdef class ParquetReader:
             with nogil:
                 check_status(self.reader.get()
                              .ReadRowGroup(i, &ctable))
-        return table_from_ctable(ctable)
+        return pyarrow_wrap_table(ctable)
 
     def read_all(self, column_indices=None):
         cdef:
@@ -445,7 +445,7 @@ cdef class ParquetReader:
             with nogil:
                 check_status(self.reader.get()
                              .ReadTable(&ctable))
-        return table_from_ctable(ctable)
+        return pyarrow_wrap_table(ctable)
 
     def column_name_idx(self, column_name):
         """

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/_table.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_table.pxd b/python/pyarrow/_table.pxd
deleted file mode 100644
index e61e90d..0000000
--- a/python/pyarrow/_table.pxd
+++ /dev/null
@@ -1,62 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.common cimport shared_ptr
-from pyarrow.includes.libarrow cimport (CChunkedArray, CColumn, CTable,
-                                        CRecordBatch)
-from pyarrow._array cimport Schema
-
-
-cdef class ChunkedArray:
-    cdef:
-        shared_ptr[CChunkedArray] sp_chunked_array
-        CChunkedArray* chunked_array
-
-    cdef init(self, const shared_ptr[CChunkedArray]& chunked_array)
-    cdef _check_nullptr(self)
-
-
-cdef class Column:
-    cdef:
-        shared_ptr[CColumn] sp_column
-        CColumn* column
-
-    cdef init(self, const shared_ptr[CColumn]& column)
-    cdef _check_nullptr(self)
-
-
-cdef class Table:
-    cdef:
-        shared_ptr[CTable] sp_table
-        CTable* table
-
-    cdef init(self, const shared_ptr[CTable]& table)
-    cdef _check_nullptr(self)
-
-
-cdef class RecordBatch:
-    cdef:
-        shared_ptr[CRecordBatch] sp_batch
-        CRecordBatch* batch
-        Schema _schema
-
-    cdef init(self, const shared_ptr[CRecordBatch]& table)
-    cdef _check_nullptr(self)
-
-cdef object box_column(const shared_ptr[CColumn]& ccolumn)
-cdef api object table_from_ctable(const shared_ptr[CTable]& ctable)
-cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/_table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_table.pyx b/python/pyarrow/_table.pyx
deleted file mode 100644
index 223fe27..0000000
--- a/python/pyarrow/_table.pyx
+++ /dev/null
@@ -1,926 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from cython.operator cimport dereference as deref
-
-from pyarrow.includes.libarrow cimport *
-from pyarrow.includes.common cimport *
-cimport pyarrow.includes.pyarrow as pyarrow
-from pyarrow._array cimport (Array, box_array, wrap_array_output,
-                             box_data_type, box_schema, DataType, Field)
-from pyarrow._error cimport check_status
-cimport cpython
-
-import pyarrow._config
-from pyarrow._error import ArrowException
-from pyarrow._array import field
-from pyarrow.compat import frombytes, tobytes
-
-from collections import OrderedDict
-
-
-cdef _pandas():
-    import pandas as pd
-    return pd
-
-
-cdef class ChunkedArray:
-    """
-    Array backed via one or more memory chunks.
-
-    Warning
-    -------
-    Do not call this class's constructor directly.
-    """
-
-    def __cinit__(self):
-        self.chunked_array = NULL
-
-    cdef init(self, const shared_ptr[CChunkedArray]& chunked_array):
-        self.sp_chunked_array = chunked_array
-        self.chunked_array = chunked_array.get()
-
-    cdef _check_nullptr(self):
-        if self.chunked_array == NULL:
-            raise ReferenceError("ChunkedArray object references a NULL "
-                                 "pointer. Not initialized.")
-
-    def length(self):
-        self._check_nullptr()
-        return self.chunked_array.length()
-
-    def __len__(self):
-        return self.length()
-
-    @property
-    def null_count(self):
-        """
-        Number of null entires
-
-        Returns
-        -------
-        int
-        """
-        self._check_nullptr()
-        return self.chunked_array.null_count()
-
-    @property
-    def num_chunks(self):
-        """
-        Number of underlying chunks
-
-        Returns
-        -------
-        int
-        """
-        self._check_nullptr()
-        return self.chunked_array.num_chunks()
-
-    def chunk(self, i):
-        """
-        Select a chunk by its index
-
-        Parameters
-        ----------
-        i : int
-
-        Returns
-        -------
-        pyarrow.array.Array
-        """
-        self._check_nullptr()
-        return box_array(self.chunked_array.chunk(i))
-
-    def iterchunks(self):
-        for i in range(self.num_chunks):
-            yield self.chunk(i)
-
-    def to_pylist(self):
-        """
-        Convert to a list of native Python objects.
-        """
-        result = []
-        for i in range(self.num_chunks):
-            result += self.chunk(i).to_pylist()
-        return result
-
-
-cdef class Column:
-    """
-    Named vector of elements of equal type.
-
-    Warning
-    -------
-    Do not call this class's constructor directly.
-    """
-
-    def __cinit__(self):
-        self.column = NULL
-
-    cdef init(self, const shared_ptr[CColumn]& column):
-        self.sp_column = column
-        self.column = column.get()
-
-    @staticmethod
-    def from_array(object field_or_name, Array arr):
-        cdef Field boxed_field
-
-        if isinstance(field_or_name, Field):
-            boxed_field = field_or_name
-        else:
-            boxed_field = field(field_or_name, arr.type)
-
-        cdef shared_ptr[CColumn] sp_column
-        sp_column.reset(new CColumn(boxed_field.sp_field, arr.sp_array))
-        return box_column(sp_column)
-
-    def to_pandas(self):
-        """
-        Convert the arrow::Column to a pandas.Series
-
-        Returns
-        -------
-        pandas.Series
-        """
-        cdef:
-            PyObject* out
-
-        check_status(pyarrow.ConvertColumnToPandas(self.sp_column,
-                                                   self, &out))
-
-        return _pandas().Series(wrap_array_output(out), name=self.name)
-
-    def equals(self, Column other):
-        """
-        Check if contents of two columns are equal
-
-        Parameters
-        ----------
-        other : pyarrow.Column
-
-        Returns
-        -------
-        are_equal : boolean
-        """
-        cdef:
-            CColumn* my_col = self.column
-            CColumn* other_col = other.column
-            c_bool result
-
-        self._check_nullptr()
-        other._check_nullptr()
-
-        with nogil:
-            result = my_col.Equals(deref(other_col))
-
-        return result
-
-    def to_pylist(self):
-        """
-        Convert to a list of native Python objects.
-        """
-        return self.data.to_pylist()
-
-    cdef _check_nullptr(self):
-        if self.column == NULL:
-            raise ReferenceError("Column object references a NULL pointer."
-                    "Not initialized.")
-
-    def __len__(self):
-        self._check_nullptr()
-        return self.column.length()
-
-    def length(self):
-        self._check_nullptr()
-        return self.column.length()
-
-    @property
-    def shape(self):
-        """
-        Dimensions of this columns
-
-        Returns
-        -------
-        (int,)
-        """
-        self._check_nullptr()
-        return (self.length(),)
-
-    @property
-    def null_count(self):
-        """
-        Number of null entires
-
-        Returns
-        -------
-        int
-        """
-        self._check_nullptr()
-        return self.column.null_count()
-
-    @property
-    def name(self):
-        """
-        Label of the column
-
-        Returns
-        -------
-        str
-        """
-        return bytes(self.column.name()).decode('utf8')
-
-    @property
-    def type(self):
-        """
-        Type information for this column
-
-        Returns
-        -------
-        pyarrow.schema.DataType
-        """
-        return box_data_type(self.column.type())
-
-    @property
-    def data(self):
-        """
-        The underlying data
-
-        Returns
-        -------
-        pyarrow.table.ChunkedArray
-        """
-        cdef ChunkedArray chunked_array = ChunkedArray()
-        chunked_array.init(self.column.data())
-        return chunked_array
-
-
-cdef shared_ptr[const CKeyValueMetadata] key_value_metadata_from_dict(
-    dict metadata):
-    cdef:
-        unordered_map[c_string, c_string] unordered_metadata = metadata
-    return (<shared_ptr[const CKeyValueMetadata]>
-            make_shared[CKeyValueMetadata](unordered_metadata))
-
-
-cdef int _schema_from_arrays(
-        arrays, names, dict metadata, shared_ptr[CSchema]* schema) except -1:
-    cdef:
-        Array arr
-        Column col
-        c_string c_name
-        vector[shared_ptr[CField]] fields
-        shared_ptr[CDataType] type_
-        int K = len(arrays)
-
-    fields.resize(K)
-
-    if len(arrays) == 0:
-        raise ValueError('Must pass at least one array')
-
-    if isinstance(arrays[0], Array):
-        if names is None:
-            raise ValueError('Must pass names when constructing '
-                             'from Array objects')
-        for i in range(K):
-            arr = arrays[i]
-            type_ = arr.type.sp_type
-            c_name = tobytes(names[i])
-            fields[i].reset(new CField(c_name, type_, True))
-    elif isinstance(arrays[0], Column):
-        for i in range(K):
-            col = arrays[i]
-            type_ = col.sp_column.get().type()
-            c_name = tobytes(col.name)
-            fields[i].reset(new CField(c_name, type_, True))
-    else:
-        raise TypeError(type(arrays[0]))
-
-    schema.reset(new CSchema(fields, key_value_metadata_from_dict(metadata)))
-    return 0
-
-
-cdef tuple _dataframe_to_arrays(df, bint timestamps_to_ms, Schema schema):
-    cdef:
-        list names = []
-        list arrays = []
-        DataType type = None
-        dict metadata = {}
-
-    for name in df.columns:
-        col = df[name]
-        if schema is not None:
-            type = schema.field_by_name(name).type
-
-        arr = Array.from_pandas(col, type=type,
-                                timestamps_to_ms=timestamps_to_ms)
-        names.append(name)
-        arrays.append(arr)
-
-    return names, arrays, metadata
-
-
-cdef class RecordBatch:
-    """
-    Batch of rows of columns of equal length
-
-    Warning
-    -------
-    Do not call this class's constructor directly, use one of the ``from_*``
-    methods instead.
-    """
-
-    def __cinit__(self):
-        self.batch = NULL
-        self._schema = None
-
-    cdef init(self, const shared_ptr[CRecordBatch]& batch):
-        self.sp_batch = batch
-        self.batch = batch.get()
-
-    cdef _check_nullptr(self):
-        if self.batch == NULL:
-            raise ReferenceError("Object not initialized")
-
-    def __len__(self):
-        self._check_nullptr()
-        return self.batch.num_rows()
-
-    @property
-    def num_columns(self):
-        """
-        Number of columns
-
-        Returns
-        -------
-        int
-        """
-        self._check_nullptr()
-        return self.batch.num_columns()
-
-    @property
-    def num_rows(self):
-        """
-        Number of rows
-
-        Due to the definition of a RecordBatch, all columns have the same
-        number of rows.
-
-        Returns
-        -------
-        int
-        """
-        return len(self)
-
-    @property
-    def schema(self):
-        """
-        Schema of the RecordBatch and its columns
-
-        Returns
-        -------
-        pyarrow.schema.Schema
-        """
-        cdef Schema schema
-        self._check_nullptr()
-        if self._schema is None:
-            schema = Schema()
-            schema.init_schema(self.batch.schema())
-            self._schema = schema
-
-        return self._schema
-
-    def __getitem__(self, i):
-        return box_array(self.batch.column(i))
-
-    def slice(self, offset=0, length=None):
-        """
-        Compute zero-copy slice of this RecordBatch
-
-        Parameters
-        ----------
-        offset : int, default 0
-            Offset from start of array to slice
-        length : int, default None
-            Length of slice (default is until end of batch starting from
-            offset)
-
-        Returns
-        -------
-        sliced : RecordBatch
-        """
-        cdef shared_ptr[CRecordBatch] result
-
-        if offset < 0:
-            raise IndexError('Offset must be non-negative')
-
-        if length is None:
-            result = self.batch.Slice(offset)
-        else:
-            result = self.batch.Slice(offset, length)
-
-        return batch_from_cbatch(result)
-
-    def equals(self, RecordBatch other):
-        cdef:
-            CRecordBatch* my_batch = self.batch
-            CRecordBatch* other_batch = other.batch
-            c_bool result
-
-        self._check_nullptr()
-        other._check_nullptr()
-
-        with nogil:
-            result = my_batch.Equals(deref(other_batch))
-
-        return result
-
-    def to_pydict(self):
-        """
-        Converted the arrow::RecordBatch to an OrderedDict
-
-        Returns
-        -------
-        OrderedDict
-        """
-        entries = []
-        for i in range(self.batch.num_columns()):
-            name = bytes(self.batch.column_name(i)).decode('utf8')
-            column = self[i].to_pylist()
-            entries.append((name, column))
-        return OrderedDict(entries)
-
-
-    def to_pandas(self, nthreads=None):
-        """
-        Convert the arrow::RecordBatch to a pandas DataFrame
-
-        Returns
-        -------
-        pandas.DataFrame
-        """
-        return Table.from_batches([self]).to_pandas(nthreads=nthreads)
-
-    @classmethod
-    def from_pandas(cls, df, schema=None):
-        """
-        Convert pandas.DataFrame to an Arrow RecordBatch
-
-        Parameters
-        ----------
-        df: pandas.DataFrame
-        schema: pyarrow.Schema (optional)
-            The expected schema of the RecordBatch. This can be used to
-            indicate the type of columns if we cannot infer it automatically.
-
-        Returns
-        -------
-        pyarrow.table.RecordBatch
-        """
-        names, arrays, metadata = _dataframe_to_arrays(df, False, schema)
-        return cls.from_arrays(arrays, names, metadata)
-
-    @staticmethod
-    def from_arrays(list arrays, list names, dict metadata=None):
-        """
-        Construct a RecordBatch from multiple pyarrow.Arrays
-
-        Parameters
-        ----------
-        arrays: list of pyarrow.Array
-            column-wise data vectors
-        names: list of str
-            Labels for the columns
-
-        Returns
-        -------
-        pyarrow.table.RecordBatch
-        """
-        cdef:
-            Array arr
-            c_string c_name
-            shared_ptr[CSchema] schema
-            shared_ptr[CRecordBatch] batch
-            vector[shared_ptr[CArray]] c_arrays
-            int64_t num_rows
-            int64_t i
-            int64_t number_of_arrays = len(arrays)
-
-        if not number_of_arrays:
-            raise ValueError('Record batch cannot contain no arrays (for now)')
-
-        num_rows = len(arrays[0])
-        _schema_from_arrays(arrays, names, metadata or {}, &schema)
-
-        c_arrays.reserve(len(arrays))
-        for arr in arrays:
-            c_arrays.push_back(arr.sp_array)
-
-        batch.reset(new CRecordBatch(schema, num_rows, c_arrays))
-        return batch_from_cbatch(batch)
-
-
-cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
-    cdef:
-        PyObject* result_obj
-        CColumn* col
-        int i
-
-    import pandas.core.internals as _int
-    from pandas import RangeIndex, Categorical
-    from pyarrow.compat import DatetimeTZDtype
-
-    with nogil:
-        check_status(pyarrow.ConvertTableToPandas(table, nthreads,
-                                                  &result_obj))
-
-    result = PyObject_to_object(result_obj)
-
-    blocks = []
-    for item in result:
-        block_arr = item['block']
-        placement = item['placement']
-        if 'dictionary' in item:
-            cat = Categorical(block_arr,
-                              categories=item['dictionary'],
-                              ordered=False, fastpath=True)
-            block = _int.make_block(cat, placement=placement,
-                                    klass=_int.CategoricalBlock,
-                                    fastpath=True)
-        elif 'timezone' in item:
-            dtype = DatetimeTZDtype('ns', tz=item['timezone'])
-            block = _int.make_block(block_arr, placement=placement,
-                                    klass=_int.DatetimeTZBlock,
-                                    dtype=dtype, fastpath=True)
-        else:
-            block = _int.make_block(block_arr, placement=placement)
-        blocks.append(block)
-
-    names = []
-    for i in range(table.get().num_columns()):
-        col = table.get().column(i).get()
-        names.append(frombytes(col.name()))
-
-    axes = [names, RangeIndex(table.get().num_rows())]
-    return _int.BlockManager(blocks, axes)
-
-
-cdef class Table:
-    """
-    A collection of top-level named, equal length Arrow arrays.
-
-    Warning
-    -------
-    Do not call this class's constructor directly, use one of the ``from_*``
-    methods instead.
-    """
-
-    def __cinit__(self):
-        self.table = NULL
-
-    def __repr__(self):
-        return 'pyarrow.Table\n{0}'.format(str(self.schema))
-
-    cdef init(self, const shared_ptr[CTable]& table):
-        self.sp_table = table
-        self.table = table.get()
-
-    cdef _check_nullptr(self):
-        if self.table == NULL:
-            raise ReferenceError("Table object references a NULL pointer."
-                    "Not initialized.")
-
-    def equals(self, Table other):
-        """
-        Check if contents of two tables are equal
-
-        Parameters
-        ----------
-        other : pyarrow.Table
-
-        Returns
-        -------
-        are_equal : boolean
-        """
-        cdef:
-            CTable* my_table = self.table
-            CTable* other_table = other.table
-            c_bool result
-
-        self._check_nullptr()
-        other._check_nullptr()
-
-        with nogil:
-            result = my_table.Equals(deref(other_table))
-
-        return result
-
-    @classmethod
-    def from_pandas(cls, df, timestamps_to_ms=False, schema=None):
-        """
-        Convert pandas.DataFrame to an Arrow Table
-
-        Parameters
-        ----------
-        df: pandas.DataFrame
-
-        timestamps_to_ms: bool
-            Convert datetime columns to ms resolution. This is needed for
-            compability with other functionality like Parquet I/O which
-            only supports milliseconds.
-
-        schema: pyarrow.Schema (optional)
-            The expected schema of the Arrow Table. This can be used to
-            indicate the type of columns if we cannot infer it automatically.
-
-        Returns
-        -------
-        pyarrow.table.Table
-
-        Examples
-        --------
-
-        >>> import pandas as pd
-        >>> import pyarrow as pa
-        >>> df = pd.DataFrame({
-            ...     'int': [1, 2],
-            ...     'str': ['a', 'b']
-            ... })
-        >>> pa.Table.from_pandas(df)
-        <pyarrow.table.Table object at 0x7f05d1fb1b40>
-        """
-        names, arrays, metadata = _dataframe_to_arrays(df,
-                                             timestamps_to_ms=timestamps_to_ms,
-                                             schema=schema)
-        return cls.from_arrays(arrays, names=names, metadata=metadata)
-
-    @staticmethod
-    def from_arrays(arrays, names=None, dict metadata=None):
-        """
-        Construct a Table from Arrow arrays or columns
-
-        Parameters
-        ----------
-        arrays: list of pyarrow.Array or pyarrow.Column
-            Equal-length arrays that should form the table.
-        names: list of str, optional
-            Names for the table columns. If Columns passed, will be
-            inferred. If Arrays passed, this argument is required
-
-        Returns
-        -------
-        pyarrow.table.Table
-
-        """
-        cdef:
-            vector[shared_ptr[CColumn]] columns
-            shared_ptr[CSchema] schema
-            shared_ptr[CTable] table
-            size_t K = len(arrays)
-
-        _schema_from_arrays(arrays, names, metadata or {}, &schema)
-
-        columns.reserve(K)
-
-        for i in range(K):
-            if isinstance(arrays[i], Array):
-                columns.push_back(
-                    make_shared[CColumn](
-                        schema.get().field(i),
-                        (<Array> arrays[i]).sp_array
-                    )
-                )
-            elif isinstance(arrays[i], Column):
-                columns.push_back((<Column> arrays[i]).sp_column)
-            else:
-                raise ValueError(type(arrays[i]))
-
-        table.reset(new CTable(schema, columns))
-        return table_from_ctable(table)
-
-    @staticmethod
-    def from_batches(batches):
-        """
-        Construct a Table from a list of Arrow RecordBatches
-
-        Parameters
-        ----------
-
-        batches: list of RecordBatch
-            RecordBatch list to be converted, schemas must be equal
-        """
-        cdef:
-            vector[shared_ptr[CRecordBatch]] c_batches
-            shared_ptr[CTable] c_table
-            RecordBatch batch
-
-        for batch in batches:
-            c_batches.push_back(batch.sp_batch)
-
-        with nogil:
-            check_status(CTable.FromRecordBatches(c_batches, &c_table))
-
-        return table_from_ctable(c_table)
-
-    def to_pandas(self, nthreads=None):
-        """
-        Convert the arrow::Table to a pandas DataFrame
-
-        Parameters
-        ----------
-        nthreads : int, default max(1, multiprocessing.cpu_count() / 2)
-            For the default, we divide the CPU count by 2 because most modern
-            computers have hyperthreading turned on, so doubling the CPU count
-            beyond the number of physical cores does not help
-
-        Returns
-        -------
-        pandas.DataFrame
-        """
-        if nthreads is None:
-            nthreads = pyarrow._config.cpu_count()
-
-        mgr = table_to_blockmanager(self.sp_table, nthreads)
-        return _pandas().DataFrame(mgr)
-
-    def to_pydict(self):
-        """
-        Converted the arrow::Table to an OrderedDict
-
-        Returns
-        -------
-        OrderedDict
-        """
-        entries = []
-        for i in range(self.table.num_columns()):
-            name = self.column(i).name
-            column = self.column(i).to_pylist()
-            entries.append((name, column))
-        return OrderedDict(entries)
-
-    @property
-    def schema(self):
-        """
-        Schema of the table and its columns
-
-        Returns
-        -------
-        pyarrow.schema.Schema
-        """
-        return box_schema(self.table.schema())
-
-    def column(self, index):
-        """
-        Select a column by its numeric index.
-
-        Parameters
-        ----------
-        index: int
-
-        Returns
-        -------
-        pyarrow.table.Column
-        """
-        self._check_nullptr()
-        cdef Column column = Column()
-        column.init(self.table.column(index))
-        return column
-
-    def __getitem__(self, i):
-        return self.column(i)
-
-    def itercolumns(self):
-        """
-        Iterator over all columns in their numerical order
-        """
-        for i in range(self.num_columns):
-            yield self.column(i)
-
-    @property
-    def num_columns(self):
-        """
-        Number of columns in this table
-
-        Returns
-        -------
-        int
-        """
-        self._check_nullptr()
-        return self.table.num_columns()
-
-    @property
-    def num_rows(self):
-        """
-        Number of rows in this table.
-
-        Due to the definition of a table, all columns have the same number of rows.
-
-        Returns
-        -------
-        int
-        """
-        self._check_nullptr()
-        return self.table.num_rows()
-
-    def __len__(self):
-        return self.num_rows
-
-    @property
-    def shape(self):
-        """
-        Dimensions of the table: (#rows, #columns)
-
-        Returns
-        -------
-        (int, int)
-        """
-        return (self.num_rows, self.num_columns)
-
-    def add_column(self, int i, Column column):
-        """
-        Add column to Table at position. Returns new table
-        """
-        cdef:
-            shared_ptr[CTable] c_table
-
-        with nogil:
-            check_status(self.table.AddColumn(i, column.sp_column, &c_table))
-
-        return table_from_ctable(c_table)
-
-    def append_column(self, Column column):
-        """
-        Append column at end of columns. Returns new table
-        """
-        return self.add_column(self.num_columns, column)
-
-    def remove_column(self, int i):
-        """
-        Create new Table with the indicated column removed
-        """
-        cdef shared_ptr[CTable] c_table
-
-        with nogil:
-            check_status(self.table.RemoveColumn(i, &c_table))
-
-        return table_from_ctable(c_table)
-
-
-def concat_tables(tables):
-    """
-    Perform zero-copy concatenation of pyarrow.Table objects. Raises exception
-    if all of the Table schemas are not the same
-
-    Parameters
-    ----------
-    tables : iterable of pyarrow.Table objects
-    output_name : string, default None
-      A name for the output table, if any
-    """
-    cdef:
-        vector[shared_ptr[CTable]] c_tables
-        shared_ptr[CTable] c_result
-        Table table
-
-    for table in tables:
-        c_tables.push_back(table.sp_table)
-
-    with nogil:
-        check_status(ConcatenateTables(c_tables, &c_result))
-
-    return table_from_ctable(c_result)
-
-
-cdef object box_column(const shared_ptr[CColumn]& ccolumn):
-    cdef Column column = Column()
-    column.init(ccolumn)
-    return column
-
-
-cdef api object table_from_ctable(const shared_ptr[CTable]& ctable):
-    cdef Table table = Table()
-    table.init(ctable)
-    return table
-
-
-cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch):
-    cdef RecordBatch batch = RecordBatch()
-    batch.init(cbatch)
-    return batch