You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2017/04/13 10:51:53 UTC

[3/4] arrow git commit: ARROW-751: [Python] Make all Cython modules private. Some code tidying

http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_io.pyx b/python/pyarrow/_io.pyx
new file mode 100644
index 0000000..9f067fb
--- /dev/null
+++ b/python/pyarrow/_io.pyx
@@ -0,0 +1,1273 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Cython wrappers for IO interfaces defined in arrow::io and messaging in
+# arrow::ipc
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from cython.operator cimport dereference as deref
+from libc.stdlib cimport malloc, free
+from pyarrow.includes.libarrow cimport *
+cimport pyarrow.includes.pyarrow as pyarrow
+from pyarrow._array cimport Array, Tensor, box_tensor, Schema
+from pyarrow._error cimport check_status
+from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
+from pyarrow._table cimport (Column, RecordBatch, batch_from_cbatch,
+                             table_from_ctable)
+cimport cpython as cp
+
+import pyarrow._config
+from pyarrow.compat import frombytes, tobytes, encode_file_path
+
+import re
+import six
+import sys
+import threading
+import time
+
+
+# 64K
+DEFAULT_BUFFER_SIZE = 2 ** 16
+
+
+# To let us get a PyObject* and avoid Cython auto-ref-counting
+cdef extern from "Python.h":
+    PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
+        char *v, Py_ssize_t len) except NULL
+
+cdef class NativeFile:
+
+    def __cinit__(self):
+        self.is_open = False
+        self.own_file = False
+
+    def __dealloc__(self):
+        if self.is_open and self.own_file:
+            self.close()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, tb):
+        self.close()
+
+    def close(self):
+        if self.is_open:
+            with nogil:
+                if self.is_readable:
+                    check_status(self.rd_file.get().Close())
+                else:
+                    check_status(self.wr_file.get().Close())
+        self.is_open = False
+
+    cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
+        self._assert_readable()
+        file[0] = <shared_ptr[RandomAccessFile]> self.rd_file
+
+    cdef write_handle(self, shared_ptr[OutputStream]* file):
+        self._assert_writeable()
+        file[0] = <shared_ptr[OutputStream]> self.wr_file
+
+    def _assert_readable(self):
+        if not self.is_readable:
+            raise IOError("only valid on readonly files")
+
+        if not self.is_open:
+            raise IOError("file not open")
+
+    def _assert_writeable(self):
+        if not self.is_writeable:
+            raise IOError("only valid on writeable files")
+
+        if not self.is_open:
+            raise IOError("file not open")
+
+    def size(self):
+        cdef int64_t size
+        self._assert_readable()
+        with nogil:
+            check_status(self.rd_file.get().GetSize(&size))
+        return size
+
+    def tell(self):
+        cdef int64_t position
+        with nogil:
+            if self.is_readable:
+                check_status(self.rd_file.get().Tell(&position))
+            else:
+                check_status(self.wr_file.get().Tell(&position))
+        return position
+
+    def seek(self, int64_t position):
+        self._assert_readable()
+        with nogil:
+            check_status(self.rd_file.get().Seek(position))
+
+    def write(self, data):
+        """
+        Write byte from any object implementing buffer protocol (bytes,
+        bytearray, ndarray, pyarrow.Buffer)
+        """
+        self._assert_writeable()
+
+        if isinstance(data, six.string_types):
+            data = tobytes(data)
+
+        cdef Buffer arrow_buffer = frombuffer(data)
+
+        cdef const uint8_t* buf = arrow_buffer.buffer.get().data()
+        cdef int64_t bufsize = len(arrow_buffer)
+        with nogil:
+            check_status(self.wr_file.get().Write(buf, bufsize))
+
+    def read(self, nbytes=None):
+        cdef:
+            int64_t c_nbytes
+            int64_t bytes_read = 0
+            PyObject* obj
+
+        if nbytes is None:
+            c_nbytes = self.size() - self.tell()
+        else:
+            c_nbytes = nbytes
+
+        self._assert_readable()
+
+        # Allocate empty write space
+        obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes)
+
+        cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
+        with nogil:
+            check_status(self.rd_file.get().Read(c_nbytes, &bytes_read, buf))
+
+        if bytes_read < c_nbytes:
+            cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read)
+
+        return PyObject_to_object(obj)
+
+    def read_buffer(self, nbytes=None):
+        cdef:
+            int64_t c_nbytes
+            int64_t bytes_read = 0
+            shared_ptr[CBuffer] output
+        self._assert_readable()
+
+        if nbytes is None:
+            c_nbytes = self.size() - self.tell()
+        else:
+            c_nbytes = nbytes
+
+        with nogil:
+            check_status(self.rd_file.get().ReadB(c_nbytes, &output))
+
+        return wrap_buffer(output)
+
+    def download(self, stream_or_path, buffer_size=None):
+        """
+        Read file completely to local path (rather than reading completely into
+        memory). First seeks to the beginning of the file.
+        """
+        cdef:
+            int64_t bytes_read = 0
+            uint8_t* buf
+        self._assert_readable()
+
+        buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
+
+        write_queue = Queue(50)
+
+        if not hasattr(stream_or_path, 'read'):
+            stream = open(stream_or_path, 'wb')
+            cleanup = lambda: stream.close()
+        else:
+            stream = stream_or_path
+            cleanup = lambda: None
+
+        done = False
+        exc_info = None
+        def bg_write():
+            try:
+                while not done or write_queue.qsize() > 0:
+                    try:
+                        buf = write_queue.get(timeout=0.01)
+                    except QueueEmpty:
+                        continue
+                    stream.write(buf)
+            except Exception as e:
+                exc_info = sys.exc_info()
+            finally:
+                cleanup()
+
+        self.seek(0)
+
+        writer_thread = threading.Thread(target=bg_write)
+
+        # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
+        # the passed buffer, so it's hard for us to avoid doubling the memory
+        buf = <uint8_t*> malloc(buffer_size)
+        if buf == NULL:
+            raise MemoryError("Failed to allocate {0} bytes"
+                              .format(buffer_size))
+
+        writer_thread.start()
+
+        cdef int64_t total_bytes = 0
+        cdef int32_t c_buffer_size = buffer_size
+
+        try:
+            while True:
+                with nogil:
+                    check_status(self.rd_file.get()
+                                 .Read(c_buffer_size, &bytes_read, buf))
+
+                total_bytes += bytes_read
+
+                # EOF
+                if bytes_read == 0:
+                    break
+
+                pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
+                                                     bytes_read)
+
+                write_queue.put_nowait(pybuf)
+        finally:
+            free(buf)
+            done = True
+
+        writer_thread.join()
+        if exc_info is not None:
+            raise exc_info[0], exc_info[1], exc_info[2]
+
+    def upload(self, stream, buffer_size=None):
+        """
+        Pipe file-like object to file
+        """
+        write_queue = Queue(50)
+        self._assert_writeable()
+
+        buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
+
+        done = False
+        exc_info = None
+        def bg_write():
+            try:
+                while not done or write_queue.qsize() > 0:
+                    try:
+                        buf = write_queue.get(timeout=0.01)
+                    except QueueEmpty:
+                        continue
+
+                    self.write(buf)
+
+            except Exception as e:
+                exc_info = sys.exc_info()
+
+        writer_thread = threading.Thread(target=bg_write)
+        writer_thread.start()
+
+        try:
+            while True:
+                buf = stream.read(buffer_size)
+                if not buf:
+                    break
+
+                if writer_thread.is_alive():
+                    while write_queue.full():
+                        time.sleep(0.01)
+                else:
+                    break
+
+                write_queue.put_nowait(buf)
+        finally:
+            done = True
+
+        writer_thread.join()
+        if exc_info is not None:
+            raise exc_info[0], exc_info[1], exc_info[2]
+
+
+# ----------------------------------------------------------------------
+# Python file-like objects
+
+
+cdef class PythonFileInterface(NativeFile):
+    cdef:
+        object handle
+
+    def __cinit__(self, handle, mode='w'):
+        self.handle = handle
+
+        if mode.startswith('w'):
+            self.wr_file.reset(new pyarrow.PyOutputStream(handle))
+            self.is_readable = 0
+            self.is_writeable = 1
+        elif mode.startswith('r'):
+            self.rd_file.reset(new pyarrow.PyReadableFile(handle))
+            self.is_readable = 1
+            self.is_writeable = 0
+        else:
+            raise ValueError('Invalid file mode: {0}'.format(mode))
+
+        self.is_open = True
+
+
+cdef class MemoryMappedFile(NativeFile):
+    """
+    Supports 'r', 'r+w', 'w' modes
+    """
+    cdef:
+        object path
+
+    def __cinit__(self):
+        self.is_open = False
+        self.is_readable = 0
+        self.is_writeable = 0
+
+    @staticmethod
+    def create(path, size):
+        cdef:
+            shared_ptr[CMemoryMappedFile] handle
+            c_string c_path = encode_file_path(path)
+            int64_t c_size = size
+
+        with nogil:
+            check_status(CMemoryMappedFile.Create(c_path, c_size, &handle))
+
+        cdef MemoryMappedFile result = MemoryMappedFile()
+        result.path = path
+        result.is_readable = 1
+        result.is_writeable = 1
+        result.wr_file = <shared_ptr[OutputStream]> handle
+        result.rd_file = <shared_ptr[RandomAccessFile]> handle
+        result.is_open = True
+
+        return result
+
+    def open(self, path, mode='r'):
+        self.path = path
+
+        cdef:
+            FileMode c_mode
+            shared_ptr[CMemoryMappedFile] handle
+            c_string c_path = encode_file_path(path)
+
+        if mode in ('r', 'rb'):
+            c_mode = FileMode_READ
+            self.is_readable = 1
+        elif mode in ('w', 'wb'):
+            c_mode = FileMode_WRITE
+            self.is_writeable = 1
+        elif mode == 'r+w':
+            c_mode = FileMode_READWRITE
+            self.is_readable = 1
+            self.is_writeable = 1
+        else:
+            raise ValueError('Invalid file mode: {0}'.format(mode))
+
+        check_status(CMemoryMappedFile.Open(c_path, c_mode, &handle))
+
+        self.wr_file = <shared_ptr[OutputStream]> handle
+        self.rd_file = <shared_ptr[RandomAccessFile]> handle
+        self.is_open = True
+
+
+def memory_map(path, mode='r'):
+    """
+    Open memory map at file path. Size of the memory map cannot change
+
+    Parameters
+    ----------
+    path : string
+    mode : {'r', 'w'}, default 'r'
+
+    Returns
+    -------
+    mmap : MemoryMappedFile
+    """
+    cdef MemoryMappedFile mmap = MemoryMappedFile()
+    mmap.open(path, mode)
+    return mmap
+
+
+def create_memory_map(path, size):
+    """
+    Create memory map at indicated path of the given size, return open
+    writeable file object
+
+    Parameters
+    ----------
+    path : string
+    size : int
+
+    Returns
+    -------
+    mmap : MemoryMappedFile
+    """
+    return MemoryMappedFile.create(path, size)
+
+
+cdef class OSFile(NativeFile):
+    """
+    Supports 'r', 'w' modes
+    """
+    cdef:
+        object path
+
+    def __cinit__(self, path, mode='r', MemoryPool memory_pool=None):
+        self.path = path
+
+        cdef:
+            FileMode c_mode
+            shared_ptr[Readable] handle
+            c_string c_path = encode_file_path(path)
+
+        self.is_readable = self.is_writeable = 0
+
+        if mode in ('r', 'rb'):
+            self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
+        elif mode in ('w', 'wb'):
+            self._open_writeable(c_path)
+        else:
+            raise ValueError('Invalid file mode: {0}'.format(mode))
+
+        self.is_open = True
+
+    cdef _open_readable(self, c_string path, CMemoryPool* pool):
+        cdef shared_ptr[ReadableFile] handle
+
+        with nogil:
+            check_status(ReadableFile.Open(path, pool, &handle))
+
+        self.is_readable = 1
+        self.rd_file = <shared_ptr[RandomAccessFile]> handle
+
+    cdef _open_writeable(self, c_string path):
+        cdef shared_ptr[FileOutputStream] handle
+
+        with nogil:
+            check_status(FileOutputStream.Open(path, &handle))
+        self.is_writeable = 1
+        self.wr_file = <shared_ptr[OutputStream]> handle
+
+
+# ----------------------------------------------------------------------
+# Arrow buffers
+
+
+cdef class Buffer:
+
+    def __cinit__(self):
+        pass
+
+    cdef init(self, const shared_ptr[CBuffer]& buffer):
+        self.buffer = buffer
+        self.shape[0] = self.size
+        self.strides[0] = <Py_ssize_t>(1)
+
+    def __len__(self):
+        return self.size
+
+    property size:
+
+        def __get__(self):
+            return self.buffer.get().size()
+
+    property parent:
+
+        def __get__(self):
+            cdef shared_ptr[CBuffer] parent_buf = self.buffer.get().parent()
+
+            if parent_buf.get() == NULL:
+                return None
+            else:
+                return wrap_buffer(parent_buf)
+
+    def __getitem__(self, key):
+        # TODO(wesm): buffer slicing
+        raise NotImplementedError
+
+    def to_pybytes(self):
+        return cp.PyBytes_FromStringAndSize(
+            <const char*>self.buffer.get().data(),
+            self.buffer.get().size())
+
+    def __getbuffer__(self, cp.Py_buffer* buffer, int flags):
+
+        buffer.buf = <char *>self.buffer.get().data()
+        buffer.format = 'b'
+        buffer.internal = NULL
+        buffer.itemsize = 1
+        buffer.len = self.size
+        buffer.ndim = 1
+        buffer.obj = self
+        buffer.readonly = 1
+        buffer.shape = self.shape
+        buffer.strides = self.strides
+        buffer.suboffsets = NULL
+
+cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool):
+    cdef shared_ptr[PoolBuffer] result
+    result.reset(new PoolBuffer(pool))
+    return result
+
+
+cdef class InMemoryOutputStream(NativeFile):
+
+    cdef:
+        shared_ptr[PoolBuffer] buffer
+
+    def __cinit__(self, MemoryPool memory_pool=None):
+        self.buffer = allocate_buffer(maybe_unbox_memory_pool(memory_pool))
+        self.wr_file.reset(new BufferOutputStream(
+            <shared_ptr[ResizableBuffer]> self.buffer))
+        self.is_readable = 0
+        self.is_writeable = 1
+        self.is_open = True
+
+    def get_result(self):
+        check_status(self.wr_file.get().Close())
+        self.is_open = False
+        return wrap_buffer(<shared_ptr[CBuffer]> self.buffer)
+
+
+cdef class BufferReader(NativeFile):
+    """
+    Zero-copy reader from objects convertible to Arrow buffer
+
+    Parameters
+    ----------
+    obj : Python bytes or pyarrow.io.Buffer
+    """
+    cdef:
+        Buffer buffer
+
+    def __cinit__(self, object obj):
+
+        if isinstance(obj, Buffer):
+            self.buffer = obj
+        else:
+            self.buffer = frombuffer(obj)
+
+        self.rd_file.reset(new CBufferReader(self.buffer.buffer))
+        self.is_readable = 1
+        self.is_writeable = 0
+        self.is_open = True
+
+
+def frombuffer(object obj):
+    """
+    Construct an Arrow buffer from a Python bytes object
+    """
+    cdef shared_ptr[CBuffer] buf
+    try:
+        memoryview(obj)
+        buf.reset(new pyarrow.PyBuffer(obj))
+        return wrap_buffer(buf)
+    except TypeError:
+        raise ValueError('Must pass object that implements buffer protocol')
+
+
+
+cdef Buffer wrap_buffer(const shared_ptr[CBuffer]& buf):
+    cdef Buffer result = Buffer()
+    result.init(buf)
+    return result
+
+
+cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader):
+    cdef NativeFile nf
+
+    if isinstance(source, six.string_types):
+        source = memory_map(source, mode='r')
+    elif isinstance(source, Buffer):
+        source = BufferReader(source)
+    elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
+        # Optimistically hope this is file-like
+        source = PythonFileInterface(source, mode='r')
+
+    if isinstance(source, NativeFile):
+        nf = source
+
+        # TODO: what about read-write sources (e.g. memory maps)
+        if not nf.is_readable:
+            raise IOError('Native file is not readable')
+
+        nf.read_handle(reader)
+    else:
+        raise TypeError('Unable to read from object of type: {0}'
+                        .format(type(source)))
+
+
+cdef get_writer(object source, shared_ptr[OutputStream]* writer):
+    cdef NativeFile nf
+
+    if isinstance(source, six.string_types):
+        source = OSFile(source, mode='w')
+    elif not isinstance(source, NativeFile) and hasattr(source, 'write'):
+        # Optimistically hope this is file-like
+        source = PythonFileInterface(source, mode='w')
+
+    if isinstance(source, NativeFile):
+        nf = source
+
+        if nf.is_readable:
+            raise IOError('Native file is not writeable')
+
+        nf.write_handle(writer)
+    else:
+        raise TypeError('Unable to read from object of type: {0}'
+                        .format(type(source)))
+
+# ----------------------------------------------------------------------
+# HDFS IO implementation
+
+_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)')
+
+try:
+    # Python 3
+    from queue import Queue, Empty as QueueEmpty, Full as QueueFull
+except ImportError:
+    from Queue import Queue, Empty as QueueEmpty, Full as QueueFull
+
+
+def have_libhdfs():
+    try:
+        check_status(HaveLibHdfs())
+        return True
+    except:
+        return False
+
+
+def have_libhdfs3():
+    try:
+        check_status(HaveLibHdfs3())
+        return True
+    except:
+        return False
+
+
+def strip_hdfs_abspath(path):
+    m = _HDFS_PATH_RE.match(path)
+    if m:
+        return m.group(3)
+    else:
+        return path
+
+
+cdef class _HdfsClient:
+    cdef:
+        shared_ptr[CHdfsClient] client
+
+    cdef readonly:
+        bint is_open
+
+    def __cinit__(self):
+        pass
+
+    def _connect(self, host, port, user, kerb_ticket, driver):
+        cdef HdfsConnectionConfig conf
+
+        if host is not None:
+            conf.host = tobytes(host)
+        conf.port = port
+        if user is not None:
+            conf.user = tobytes(user)
+        if kerb_ticket is not None:
+            conf.kerb_ticket = tobytes(kerb_ticket)
+
+        if driver == 'libhdfs':
+            check_status(HaveLibHdfs())
+            conf.driver = HdfsDriver_LIBHDFS
+        else:
+            check_status(HaveLibHdfs3())
+            conf.driver = HdfsDriver_LIBHDFS3
+
+        with nogil:
+            check_status(CHdfsClient.Connect(&conf, &self.client))
+        self.is_open = True
+
+    @classmethod
+    def connect(cls, *args, **kwargs):
+        return cls(*args, **kwargs)
+
+    def __dealloc__(self):
+        if self.is_open:
+            self.close()
+
+    def close(self):
+        """
+        Disconnect from the HDFS cluster
+        """
+        self._ensure_client()
+        with nogil:
+            check_status(self.client.get().Disconnect())
+        self.is_open = False
+
+    cdef _ensure_client(self):
+        if self.client.get() == NULL:
+            raise IOError('HDFS client improperly initialized')
+        elif not self.is_open:
+            raise IOError('HDFS client is closed')
+
+    def exists(self, path):
+        """
+        Returns True if the path is known to the cluster, False if it does not
+        (or there is an RPC error)
+        """
+        self._ensure_client()
+
+        cdef c_string c_path = tobytes(path)
+        cdef c_bool result
+        with nogil:
+            result = self.client.get().Exists(c_path)
+        return result
+
+    def isdir(self, path):
+        cdef HdfsPathInfo info
+        self._path_info(path, &info)
+        return info.kind == ObjectType_DIRECTORY
+
+    def isfile(self, path):
+        cdef HdfsPathInfo info
+        self._path_info(path, &info)
+        return info.kind == ObjectType_FILE
+
+    cdef _path_info(self, path, HdfsPathInfo* info):
+        cdef c_string c_path = tobytes(path)
+
+        with nogil:
+            check_status(self.client.get()
+                         .GetPathInfo(c_path, info))
+
+
+    def ls(self, path, bint full_info):
+        cdef:
+            c_string c_path = tobytes(path)
+            vector[HdfsPathInfo] listing
+            list results = []
+            int i
+
+        self._ensure_client()
+
+        with nogil:
+            check_status(self.client.get()
+                         .ListDirectory(c_path, &listing))
+
+        cdef const HdfsPathInfo* info
+        for i in range(<int> listing.size()):
+            info = &listing[i]
+
+            # Try to trim off the hdfs://HOST:PORT piece
+            name = strip_hdfs_abspath(frombytes(info.name))
+
+            if full_info:
+                kind = ('file' if info.kind == ObjectType_FILE
+                        else 'directory')
+
+                results.append({
+                    'kind': kind,
+                    'name': name,
+                    'owner': frombytes(info.owner),
+                    'group': frombytes(info.group),
+                    'list_modified_time': info.last_modified_time,
+                    'list_access_time': info.last_access_time,
+                    'size': info.size,
+                    'replication': info.replication,
+                    'block_size': info.block_size,
+                    'permissions': info.permissions
+                })
+            else:
+                results.append(name)
+
+        return results
+
+    def mkdir(self, path):
+        """
+        Create indicated directory and any necessary parent directories
+        """
+        self._ensure_client()
+
+        cdef c_string c_path = tobytes(path)
+        with nogil:
+            check_status(self.client.get()
+                         .CreateDirectory(c_path))
+
+    def delete(self, path, bint recursive=False):
+        """
+        Delete the indicated file or directory
+
+        Parameters
+        ----------
+        path : string
+        recursive : boolean, default False
+            If True, also delete child paths for directories
+        """
+        self._ensure_client()
+
+        cdef c_string c_path = tobytes(path)
+        with nogil:
+            check_status(self.client.get()
+                         .Delete(c_path, recursive))
+
+    def open(self, path, mode='rb', buffer_size=None, replication=None,
+             default_block_size=None):
+        """
+        Parameters
+        ----------
+        mode : string, 'rb', 'wb', 'ab'
+        """
+        self._ensure_client()
+
+        cdef HdfsFile out = HdfsFile()
+
+        if mode not in ('rb', 'wb', 'ab'):
+            raise Exception("Mode must be 'rb' (read), "
+                            "'wb' (write, new file), or 'ab' (append)")
+
+        cdef c_string c_path = tobytes(path)
+        cdef c_bool append = False
+
+        # 0 in libhdfs means "use the default"
+        cdef int32_t c_buffer_size = buffer_size or 0
+        cdef int16_t c_replication = replication or 0
+        cdef int64_t c_default_block_size = default_block_size or 0
+
+        cdef shared_ptr[HdfsOutputStream] wr_handle
+        cdef shared_ptr[HdfsReadableFile] rd_handle
+
+        if mode in ('wb', 'ab'):
+            if mode == 'ab':
+                append = True
+
+            with nogil:
+                check_status(
+                    self.client.get()
+                    .OpenWriteable(c_path, append, c_buffer_size,
+                                   c_replication, c_default_block_size,
+                                   &wr_handle))
+
+            out.wr_file = <shared_ptr[OutputStream]> wr_handle
+
+            out.is_readable = False
+            out.is_writeable = 1
+        else:
+            with nogil:
+                check_status(self.client.get()
+                             .OpenReadable(c_path, &rd_handle))
+
+            out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle
+            out.is_readable = True
+            out.is_writeable = 0
+
+        if c_buffer_size == 0:
+            c_buffer_size = 2 ** 16
+
+        out.mode = mode
+        out.buffer_size = c_buffer_size
+        out.parent = _HdfsFileNanny(self, out)
+        out.is_open = True
+        out.own_file = True
+
+        return out
+
+    def download(self, path, stream, buffer_size=None):
+        with self.open(path, 'rb') as f:
+            f.download(stream, buffer_size=buffer_size)
+
+    def upload(self, path, stream, buffer_size=None):
+        """
+        Upload file-like object to HDFS path
+        """
+        with self.open(path, 'wb') as f:
+            f.upload(stream, buffer_size=buffer_size)
+
+
+# ARROW-404: Helper class to ensure that files are closed before the
+# client. During deallocation of the extension class, the attributes are
+# decref'd which can cause the client to get closed first if the file has the
+# last remaining reference
+cdef class _HdfsFileNanny:
+    cdef:
+        object client
+        object file_handle_ref
+
+    def __cinit__(self, client, file_handle):
+        import weakref
+        self.client = client
+        self.file_handle_ref = weakref.ref(file_handle)
+
+    def __dealloc__(self):
+        fh = self.file_handle_ref()
+        if fh:
+            fh.close()
+        # avoid cyclic GC
+        self.file_handle_ref = None
+        self.client = None
+
+
+cdef class HdfsFile(NativeFile):
+    cdef readonly:
+        int32_t buffer_size
+        object mode
+        object parent
+
+    cdef object __weakref__
+
+    def __dealloc__(self):
+        self.parent = None
+
+# ----------------------------------------------------------------------
+# File and stream readers and writers
+
+cdef class _StreamWriter:
+    cdef:
+        shared_ptr[CStreamWriter] writer
+        shared_ptr[OutputStream] sink
+        bint closed
+
+    def __cinit__(self):
+        self.closed = True
+
+    def __dealloc__(self):
+        if not self.closed:
+            self.close()
+
+    def _open(self, sink, Schema schema):
+        get_writer(sink, &self.sink)
+
+        with nogil:
+            check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema,
+                                            &self.writer))
+
+        self.closed = False
+
+    def write_batch(self, RecordBatch batch):
+        with nogil:
+            check_status(self.writer.get()
+                         .WriteRecordBatch(deref(batch.batch)))
+
+    def close(self):
+        with nogil:
+            check_status(self.writer.get().Close())
+        self.closed = True
+
+
+cdef class _StreamReader:
+    cdef:
+        shared_ptr[CStreamReader] reader
+
+    cdef readonly:
+        Schema schema
+
+    def __cinit__(self):
+        pass
+
+    def _open(self, source):
+        cdef:
+            shared_ptr[RandomAccessFile] reader
+            shared_ptr[InputStream] in_stream
+
+        get_reader(source, &reader)
+        in_stream = <shared_ptr[InputStream]> reader
+
+        with nogil:
+            check_status(CStreamReader.Open(in_stream, &self.reader))
+
+        self.schema = Schema()
+        self.schema.init_schema(self.reader.get().schema())
+
+    def get_next_batch(self):
+        """
+        Read next RecordBatch from the stream. Raises StopIteration at end of
+        stream
+        """
+        cdef shared_ptr[CRecordBatch] batch
+
+        with nogil:
+            check_status(self.reader.get().GetNextRecordBatch(&batch))
+
+        if batch.get() == NULL:
+            raise StopIteration
+
+        return batch_from_cbatch(batch)
+
+    def read_all(self):
+        """
+        Read all record batches as a pyarrow.Table
+        """
+        cdef:
+            vector[shared_ptr[CRecordBatch]] batches
+            shared_ptr[CRecordBatch] batch
+            shared_ptr[CTable] table
+
+        with nogil:
+            while True:
+                check_status(self.reader.get().GetNextRecordBatch(&batch))
+                if batch.get() == NULL:
+                    break
+                batches.push_back(batch)
+
+            check_status(CTable.FromRecordBatches(batches, &table))
+
+        return table_from_ctable(table)
+
+
+cdef class _FileWriter(_StreamWriter):
+
+    def _open(self, sink, Schema schema):
+        cdef shared_ptr[CFileWriter] writer
+        get_writer(sink, &self.sink)
+
+        with nogil:
+            check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema,
+                                          &writer))
+
+        # Cast to base class, because has same interface
+        self.writer = <shared_ptr[CStreamWriter]> writer
+        self.closed = False
+
+
+cdef class _FileReader:
+    cdef:
+        shared_ptr[CFileReader] reader
+
+    def __cinit__(self):
+        pass
+
+    def _open(self, source, footer_offset=None):
+        cdef shared_ptr[RandomAccessFile] reader
+        get_reader(source, &reader)
+
+        cdef int64_t offset = 0
+        if footer_offset is not None:
+            offset = footer_offset
+
+        with nogil:
+            if offset != 0:
+                check_status(CFileReader.Open2(reader, offset, &self.reader))
+            else:
+                check_status(CFileReader.Open(reader, &self.reader))
+
+    property num_record_batches:
+
+        def __get__(self):
+            return self.reader.get().num_record_batches()
+
+    def get_batch(self, int i):
+        cdef shared_ptr[CRecordBatch] batch
+
+        if i < 0 or i >= self.num_record_batches:
+            raise ValueError('Batch number {0} out of range'.format(i))
+
+        with nogil:
+            check_status(self.reader.get().GetRecordBatch(i, &batch))
+
+        return batch_from_cbatch(batch)
+
+    # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
+    # time has passed
+    get_record_batch = get_batch
+
+    def read_all(self):
+        """
+        Read all record batches as a pyarrow.Table
+        """
+        cdef:
+            vector[shared_ptr[CRecordBatch]] batches
+            shared_ptr[CTable] table
+            int i, nbatches
+
+        nbatches = self.num_record_batches
+
+        batches.resize(nbatches)
+        with nogil:
+            for i in range(nbatches):
+                check_status(self.reader.get().GetRecordBatch(i, &batches[i]))
+            check_status(CTable.FromRecordBatches(batches, &table))
+
+        return table_from_ctable(table)
+
+
+#----------------------------------------------------------------------
+# Implement legacy Feather file format
+
+
+class FeatherError(Exception):
+    pass
+
+
+cdef class FeatherWriter:
+    cdef:
+        unique_ptr[CFeatherWriter] writer
+
+    cdef public:
+        int64_t num_rows
+
+    def __cinit__(self):
+        self.num_rows = -1
+
+    def open(self, object dest):
+        cdef shared_ptr[OutputStream] sink
+        get_writer(dest, &sink)
+
+        with nogil:
+            check_status(CFeatherWriter.Open(sink, &self.writer))
+
+    def close(self):
+        if self.num_rows < 0:
+            self.num_rows = 0
+        self.writer.get().SetNumRows(self.num_rows)
+        check_status(self.writer.get().Finalize())
+
+    def write_array(self, object name, object col, object mask=None):
+        cdef Array arr
+
+        if self.num_rows >= 0:
+            if len(col) != self.num_rows:
+                raise ValueError('prior column had a different number of rows')
+        else:
+            self.num_rows = len(col)
+
+        if isinstance(col, Array):
+            arr = col
+        else:
+            arr = Array.from_numpy(col, mask=mask)
+
+        cdef c_string c_name = tobytes(name)
+
+        with nogil:
+            check_status(
+                self.writer.get().Append(c_name, deref(arr.sp_array)))
+
+
+cdef class FeatherReader:
+    cdef:
+        unique_ptr[CFeatherReader] reader
+
+    def __cinit__(self):
+        pass
+
+    def open(self, source):
+        cdef shared_ptr[RandomAccessFile] reader
+        get_reader(source, &reader)
+
+        with nogil:
+            check_status(CFeatherReader.Open(reader, &self.reader))
+
+    property num_rows:
+
+        def __get__(self):
+            return self.reader.get().num_rows()
+
+    property num_columns:
+
+        def __get__(self):
+            return self.reader.get().num_columns()
+
+    def get_column_name(self, int i):
+        cdef c_string name = self.reader.get().GetColumnName(i)
+        return frombytes(name)
+
+    def get_column(self, int i):
+        if i < 0 or i >= self.num_columns:
+            raise IndexError(i)
+
+        cdef shared_ptr[CColumn] sp_column
+        with nogil:
+            check_status(self.reader.get()
+                         .GetColumn(i, &sp_column))
+
+        cdef Column col = Column()
+        col.init(sp_column)
+        return col
+
+
+def get_tensor_size(Tensor tensor):
+    """
+    Return total size of serialized Tensor including metadata and padding
+    """
+    cdef int64_t size
+    with nogil:
+        check_status(GetTensorSize(deref(tensor.tp), &size))
+    return size
+
+
+def get_record_batch_size(RecordBatch batch):
+    """
+    Return total size of serialized RecordBatch including metadata and padding
+    """
+    cdef int64_t size
+    with nogil:
+        check_status(GetRecordBatchSize(deref(batch.batch), &size))
+    return size
+
+
+def write_tensor(Tensor tensor, NativeFile dest):
+    """
+    Write pyarrow.Tensor to pyarrow.NativeFile object its current position
+
+    Parameters
+    ----------
+    tensor : pyarrow.Tensor
+    dest : pyarrow.NativeFile
+
+    Returns
+    -------
+    bytes_written : int
+        Total number of bytes written to the file
+    """
+    cdef:
+        int32_t metadata_length
+        int64_t body_length
+
+    dest._assert_writeable()
+
+    with nogil:
+        check_status(
+            WriteTensor(deref(tensor.tp), dest.wr_file.get(),
+                        &metadata_length, &body_length))
+
+    return metadata_length + body_length
+
+
+def read_tensor(NativeFile source):
+    """
+    Read pyarrow.Tensor from pyarrow.NativeFile object from current
+    position. If the file source supports zero copy (e.g. a memory map), then
+    this operation does not allocate any memory
+
+    Parameters
+    ----------
+    source : pyarrow.NativeFile
+
+    Returns
+    -------
+    tensor : Tensor
+    """
+    cdef:
+        shared_ptr[CTensor] sp_tensor
+
+    source._assert_writeable()
+
+    cdef int64_t offset = source.tell()
+    with nogil:
+        check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor))
+
+    return box_tensor(sp_tensor)

http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_jemalloc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_jemalloc.pyx b/python/pyarrow/_jemalloc.pyx
new file mode 100644
index 0000000..3b41964
--- /dev/null
+++ b/python/pyarrow/_jemalloc.pyx
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from pyarrow.includes.libarrow_jemalloc cimport CJemallocMemoryPool
+from pyarrow._memory cimport MemoryPool
+
+def default_pool():
+    cdef MemoryPool pool = MemoryPool()
+    pool.init(CJemallocMemoryPool.default_pool())
+    return pool

http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_memory.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_memory.pxd b/python/pyarrow/_memory.pxd
new file mode 100644
index 0000000..bb1af85
--- /dev/null
+++ b/python/pyarrow/_memory.pxd
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool
+
+
+cdef class MemoryPool:
+    cdef:
+        CMemoryPool* pool
+
+    cdef init(self, CMemoryPool* pool)
+
+cdef class LoggingMemoryPool(MemoryPool):
+    pass
+
+cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool)

http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_memory.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_memory.pyx b/python/pyarrow/_memory.pyx
new file mode 100644
index 0000000..98dbf66
--- /dev/null
+++ b/python/pyarrow/_memory.pyx
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool
+from pyarrow.includes.pyarrow cimport set_default_memory_pool, get_memory_pool
+
+cdef class MemoryPool:
+    cdef init(self, CMemoryPool* pool):
+        self.pool = pool
+
+    def bytes_allocated(self):
+        return self.pool.bytes_allocated()
+
+cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool):
+    if memory_pool is None:
+        return get_memory_pool()
+    else:
+        return memory_pool.pool
+
+cdef class LoggingMemoryPool(MemoryPool):
+    pass
+
+def default_pool():
+    cdef: 
+        MemoryPool pool = MemoryPool()
+    pool.init(get_memory_pool())
+    return pool
+
+def set_default_pool(MemoryPool pool):
+    set_default_memory_pool(pool.pool)
+
+def total_allocated_bytes():
+    cdef CMemoryPool* pool = get_memory_pool()
+    return pool.bytes_allocated()

http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 079bf5e..5418e1d 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -20,20 +20,18 @@
 # cython: embedsignature = True
 
 from cython.operator cimport dereference as deref
-
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport *
 cimport pyarrow.includes.pyarrow as pyarrow
+from pyarrow._array cimport Array
+from pyarrow._error cimport check_status
+from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
+from pyarrow._table cimport Table, table_from_ctable
+from pyarrow._io cimport NativeFile, get_reader, get_writer
 
-from pyarrow.array cimport Array
 from pyarrow.compat import tobytes, frombytes
-from pyarrow.error import ArrowException
-from pyarrow.error cimport check_status
-from pyarrow.io import NativeFile
-from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool
-from pyarrow.table cimport Table, table_from_ctable
-
-from pyarrow.io cimport NativeFile, get_reader, get_writer
+from pyarrow._error import ArrowException
+from pyarrow._io import NativeFile
 
 import six
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_table.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_table.pxd b/python/pyarrow/_table.pxd
new file mode 100644
index 0000000..e61e90d
--- /dev/null
+++ b/python/pyarrow/_table.pxd
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.includes.common cimport shared_ptr
+from pyarrow.includes.libarrow cimport (CChunkedArray, CColumn, CTable,
+                                        CRecordBatch)
+from pyarrow._array cimport Schema
+
+
+cdef class ChunkedArray:
+    cdef:
+        shared_ptr[CChunkedArray] sp_chunked_array
+        CChunkedArray* chunked_array
+
+    cdef init(self, const shared_ptr[CChunkedArray]& chunked_array)
+    cdef _check_nullptr(self)
+
+
+cdef class Column:
+    cdef:
+        shared_ptr[CColumn] sp_column
+        CColumn* column
+
+    cdef init(self, const shared_ptr[CColumn]& column)
+    cdef _check_nullptr(self)
+
+
+cdef class Table:
+    cdef:
+        shared_ptr[CTable] sp_table
+        CTable* table
+
+    cdef init(self, const shared_ptr[CTable]& table)
+    cdef _check_nullptr(self)
+
+
+cdef class RecordBatch:
+    cdef:
+        shared_ptr[CRecordBatch] sp_batch
+        CRecordBatch* batch
+        Schema _schema
+
+    cdef init(self, const shared_ptr[CRecordBatch]& table)
+    cdef _check_nullptr(self)
+
+cdef object box_column(const shared_ptr[CColumn]& ccolumn)
+cdef api object table_from_ctable(const shared_ptr[CTable]& ctable)
+cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch)

http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_table.pyx b/python/pyarrow/_table.pyx
new file mode 100644
index 0000000..6558b2e
--- /dev/null
+++ b/python/pyarrow/_table.pyx
@@ -0,0 +1,913 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from cython.operator cimport dereference as deref
+
+from pyarrow.includes.libarrow cimport *
+from pyarrow.includes.common cimport *
+cimport pyarrow.includes.pyarrow as pyarrow
+from pyarrow._array cimport (Array, box_array, wrap_array_output,
+                             box_data_type, box_schema, DataType, Field)
+from pyarrow._error cimport check_status
+cimport cpython
+
+import pyarrow._config
+from pyarrow._error import ArrowException
+from pyarrow._array import field
+from pyarrow.compat import frombytes, tobytes
+
+
+from collections import OrderedDict
+
+
+cdef _pandas():
+    import pandas as pd
+    return pd
+
+
+cdef class ChunkedArray:
+    """
+    Array backed via one or more memory chunks.
+
+    Warning
+    -------
+    Do not call this class's constructor directly.
+    """
+
+    def __cinit__(self):
+        self.chunked_array = NULL
+
+    cdef init(self, const shared_ptr[CChunkedArray]& chunked_array):
+        self.sp_chunked_array = chunked_array
+        self.chunked_array = chunked_array.get()
+
+    cdef _check_nullptr(self):
+        if self.chunked_array == NULL:
+            raise ReferenceError("ChunkedArray object references a NULL "
+                                 "pointer. Not initialized.")
+
+    def length(self):
+        self._check_nullptr()
+        return self.chunked_array.length()
+
+    def __len__(self):
+        return self.length()
+
+    @property
+    def null_count(self):
+        """
+        Number of null entires
+
+        Returns
+        -------
+        int
+        """
+        self._check_nullptr()
+        return self.chunked_array.null_count()
+
+    @property
+    def num_chunks(self):
+        """
+        Number of underlying chunks
+
+        Returns
+        -------
+        int
+        """
+        self._check_nullptr()
+        return self.chunked_array.num_chunks()
+
+    def chunk(self, i):
+        """
+        Select a chunk by its index
+
+        Parameters
+        ----------
+        i : int
+
+        Returns
+        -------
+        pyarrow.array.Array
+        """
+        self._check_nullptr()
+        return box_array(self.chunked_array.chunk(i))
+
+    def iterchunks(self):
+        for i in range(self.num_chunks):
+            yield self.chunk(i)
+
+    def to_pylist(self):
+        """
+        Convert to a list of native Python objects.
+        """
+        result = []
+        for i in range(self.num_chunks):
+            result += self.chunk(i).to_pylist()
+        return result
+
+
+cdef class Column:
+    """
+    Named vector of elements of equal type.
+
+    Warning
+    -------
+    Do not call this class's constructor directly.
+    """
+
+    def __cinit__(self):
+        self.column = NULL
+
+    cdef init(self, const shared_ptr[CColumn]& column):
+        self.sp_column = column
+        self.column = column.get()
+
+    @staticmethod
+    def from_array(object field_or_name, Array arr):
+        cdef Field boxed_field
+
+        if isinstance(field_or_name, Field):
+            boxed_field = field_or_name
+        else:
+            boxed_field = field(field_or_name, arr.type)
+
+        cdef shared_ptr[CColumn] sp_column
+        sp_column.reset(new CColumn(boxed_field.sp_field, arr.sp_array))
+        return box_column(sp_column)
+
+    def to_pandas(self):
+        """
+        Convert the arrow::Column to a pandas.Series
+
+        Returns
+        -------
+        pandas.Series
+        """
+        cdef:
+            PyObject* out
+
+        check_status(pyarrow.ConvertColumnToPandas(self.sp_column,
+                                                   <PyObject*> self, &out))
+
+        return _pandas().Series(wrap_array_output(out), name=self.name)
+
+    def equals(self, Column other):
+        """
+        Check if contents of two columns are equal
+
+        Parameters
+        ----------
+        other : pyarrow.Column
+
+        Returns
+        -------
+        are_equal : boolean
+        """
+        cdef:
+            CColumn* my_col = self.column
+            CColumn* other_col = other.column
+            c_bool result
+
+        self._check_nullptr()
+        other._check_nullptr()
+
+        with nogil:
+            result = my_col.Equals(deref(other_col))
+
+        return result
+
+    def to_pylist(self):
+        """
+        Convert to a list of native Python objects.
+        """
+        return self.data.to_pylist()
+
+    cdef _check_nullptr(self):
+        if self.column == NULL:
+            raise ReferenceError("Column object references a NULL pointer."
+                    "Not initialized.")
+
+    def __len__(self):
+        self._check_nullptr()
+        return self.column.length()
+
+    def length(self):
+        self._check_nullptr()
+        return self.column.length()
+
+    @property
+    def shape(self):
+        """
+        Dimensions of this columns
+
+        Returns
+        -------
+        (int,)
+        """
+        self._check_nullptr()
+        return (self.length(),)
+
+    @property
+    def null_count(self):
+        """
+        Number of null entires
+
+        Returns
+        -------
+        int
+        """
+        self._check_nullptr()
+        return self.column.null_count()
+
+    @property
+    def name(self):
+        """
+        Label of the column
+
+        Returns
+        -------
+        str
+        """
+        return bytes(self.column.name()).decode('utf8')
+
+    @property
+    def type(self):
+        """
+        Type information for this column
+
+        Returns
+        -------
+        pyarrow.schema.DataType
+        """
+        return box_data_type(self.column.type())
+
+    @property
+    def data(self):
+        """
+        The underlying data
+
+        Returns
+        -------
+        pyarrow.table.ChunkedArray
+        """
+        cdef ChunkedArray chunked_array = ChunkedArray()
+        chunked_array.init(self.column.data())
+        return chunked_array
+
+
+cdef _schema_from_arrays(arrays, names, shared_ptr[CSchema]* schema):
+    cdef:
+        Array arr
+        Column col
+        c_string c_name
+        vector[shared_ptr[CField]] fields
+        cdef shared_ptr[CDataType] type_
+
+    cdef int K = len(arrays)
+
+    fields.resize(K)
+
+    if len(arrays) == 0:
+        raise ValueError('Must pass at least one array')
+
+    if isinstance(arrays[0], Array):
+        if names is None:
+            raise ValueError('Must pass names when constructing '
+                             'from Array objects')
+        for i in range(K):
+            arr = arrays[i]
+            type_ = arr.type.sp_type
+            c_name = tobytes(names[i])
+            fields[i].reset(new CField(c_name, type_, True))
+    elif isinstance(arrays[0], Column):
+        for i in range(K):
+            col = arrays[i]
+            type_ = col.sp_column.get().type()
+            c_name = tobytes(col.name)
+            fields[i].reset(new CField(c_name, type_, True))
+    else:
+        raise TypeError(type(arrays[0]))
+
+    schema.reset(new CSchema(fields))
+
+
+
+cdef _dataframe_to_arrays(df, timestamps_to_ms, Schema schema):
+    cdef:
+        list names = []
+        list arrays = []
+        DataType type = None
+
+    for name in df.columns:
+        col = df[name]
+        if schema is not None:
+            type = schema.field_by_name(name).type
+
+        arr = Array.from_numpy(col, type=type,
+                                timestamps_to_ms=timestamps_to_ms)
+        names.append(name)
+        arrays.append(arr)
+
+    return names, arrays
+
+
+cdef class RecordBatch:
+    """
+    Batch of rows of columns of equal length
+
+    Warning
+    -------
+    Do not call this class's constructor directly, use one of the ``from_*``
+    methods instead.
+    """
+
+    def __cinit__(self):
+        self.batch = NULL
+        self._schema = None
+
+    cdef init(self, const shared_ptr[CRecordBatch]& batch):
+        self.sp_batch = batch
+        self.batch = batch.get()
+
+    cdef _check_nullptr(self):
+        if self.batch == NULL:
+            raise ReferenceError("Object not initialized")
+
+    def __len__(self):
+        self._check_nullptr()
+        return self.batch.num_rows()
+
+    @property
+    def num_columns(self):
+        """
+        Number of columns
+
+        Returns
+        -------
+        int
+        """
+        self._check_nullptr()
+        return self.batch.num_columns()
+
+    @property
+    def num_rows(self):
+        """
+        Number of rows
+
+        Due to the definition of a RecordBatch, all columns have the same
+        number of rows.
+
+        Returns
+        -------
+        int
+        """
+        return len(self)
+
+    @property
+    def schema(self):
+        """
+        Schema of the RecordBatch and its columns
+
+        Returns
+        -------
+        pyarrow.schema.Schema
+        """
+        cdef Schema schema
+        self._check_nullptr()
+        if self._schema is None:
+            schema = Schema()
+            schema.init_schema(self.batch.schema())
+            self._schema = schema
+
+        return self._schema
+
+    def __getitem__(self, i):
+        return box_array(self.batch.column(i))
+
+    def slice(self, offset=0, length=None):
+        """
+        Compute zero-copy slice of this RecordBatch
+
+        Parameters
+        ----------
+        offset : int, default 0
+            Offset from start of array to slice
+        length : int, default None
+            Length of slice (default is until end of batch starting from
+            offset)
+
+        Returns
+        -------
+        sliced : RecordBatch
+        """
+        cdef shared_ptr[CRecordBatch] result
+
+        if offset < 0:
+            raise IndexError('Offset must be non-negative')
+
+        if length is None:
+            result = self.batch.Slice(offset)
+        else:
+            result = self.batch.Slice(offset, length)
+
+        return batch_from_cbatch(result)
+
+    def equals(self, RecordBatch other):
+        cdef:
+            CRecordBatch* my_batch = self.batch
+            CRecordBatch* other_batch = other.batch
+            c_bool result
+
+        self._check_nullptr()
+        other._check_nullptr()
+
+        with nogil:
+            result = my_batch.Equals(deref(other_batch))
+
+        return result
+
+    def to_pydict(self):
+        """
+        Converted the arrow::RecordBatch to an OrderedDict
+
+        Returns
+        -------
+        OrderedDict
+        """
+        entries = []
+        for i in range(self.batch.num_columns()):
+            name = bytes(self.batch.column_name(i)).decode('utf8')
+            column = self[i].to_pylist()
+            entries.append((name, column))
+        return OrderedDict(entries)
+
+
+    def to_pandas(self, nthreads=None):
+        """
+        Convert the arrow::RecordBatch to a pandas DataFrame
+
+        Returns
+        -------
+        pandas.DataFrame
+        """
+        return Table.from_batches([self]).to_pandas(nthreads=nthreads)
+
+    @classmethod
+    def from_pandas(cls, df, schema=None):
+        """
+        Convert pandas.DataFrame to an Arrow RecordBatch
+
+        Parameters
+        ----------
+        df: pandas.DataFrame
+        schema: pyarrow.Schema (optional)
+            The expected schema of the RecordBatch. This can be used to
+            indicate the type of columns if we cannot infer it automatically.
+
+        Returns
+        -------
+        pyarrow.table.RecordBatch
+        """
+        names, arrays = _dataframe_to_arrays(df, False, schema)
+        return cls.from_arrays(arrays, names)
+
+    @staticmethod
+    def from_arrays(arrays, names):
+        """
+        Construct a RecordBatch from multiple pyarrow.Arrays
+
+        Parameters
+        ----------
+        arrays: list of pyarrow.Array
+            column-wise data vectors
+        names: list of str
+            Labels for the columns
+
+        Returns
+        -------
+        pyarrow.table.RecordBatch
+        """
+        cdef:
+            Array arr
+            c_string c_name
+            shared_ptr[CSchema] schema
+            shared_ptr[CRecordBatch] batch
+            vector[shared_ptr[CArray]] c_arrays
+            int64_t num_rows
+
+        if len(arrays) == 0:
+            raise ValueError('Record batch cannot contain no arrays (for now)')
+
+        num_rows = len(arrays[0])
+        _schema_from_arrays(arrays, names, &schema)
+
+        for i in range(len(arrays)):
+            arr = arrays[i]
+            c_arrays.push_back(arr.sp_array)
+
+        batch.reset(new CRecordBatch(schema, num_rows, c_arrays))
+        return batch_from_cbatch(batch)
+
+
+cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
+    cdef:
+        PyObject* result_obj
+        CColumn* col
+        int i
+
+    import pandas.core.internals as _int
+    from pandas import RangeIndex, Categorical
+    from pyarrow.compat import DatetimeTZDtype
+
+    with nogil:
+        check_status(pyarrow.ConvertTableToPandas(table, nthreads,
+                                                  &result_obj))
+
+    result = PyObject_to_object(result_obj)
+
+    blocks = []
+    for item in result:
+        block_arr = item['block']
+        placement = item['placement']
+        if 'dictionary' in item:
+            cat = Categorical(block_arr,
+                              categories=item['dictionary'],
+                              ordered=False, fastpath=True)
+            block = _int.make_block(cat, placement=placement,
+                                    klass=_int.CategoricalBlock,
+                                    fastpath=True)
+        elif 'timezone' in item:
+            dtype = DatetimeTZDtype('ns', tz=item['timezone'])
+            block = _int.make_block(block_arr, placement=placement,
+                                    klass=_int.DatetimeTZBlock,
+                                    dtype=dtype, fastpath=True)
+        else:
+            block = _int.make_block(block_arr, placement=placement)
+        blocks.append(block)
+
+    names = []
+    for i in range(table.get().num_columns()):
+        col = table.get().column(i).get()
+        names.append(frombytes(col.name()))
+
+    axes = [names, RangeIndex(table.get().num_rows())]
+    return _int.BlockManager(blocks, axes)
+
+
+cdef class Table:
+    """
+    A collection of top-level named, equal length Arrow arrays.
+
+    Warning
+    -------
+    Do not call this class's constructor directly, use one of the ``from_*``
+    methods instead.
+    """
+
+    def __cinit__(self):
+        self.table = NULL
+
+    def __repr__(self):
+        return 'pyarrow.Table\n{0}'.format(str(self.schema))
+
+    cdef init(self, const shared_ptr[CTable]& table):
+        self.sp_table = table
+        self.table = table.get()
+
+    cdef _check_nullptr(self):
+        if self.table == NULL:
+            raise ReferenceError("Table object references a NULL pointer."
+                    "Not initialized.")
+
+    def equals(self, Table other):
+        """
+        Check if contents of two tables are equal
+
+        Parameters
+        ----------
+        other : pyarrow.Table
+
+        Returns
+        -------
+        are_equal : boolean
+        """
+        cdef:
+            CTable* my_table = self.table
+            CTable* other_table = other.table
+            c_bool result
+
+        self._check_nullptr()
+        other._check_nullptr()
+
+        with nogil:
+            result = my_table.Equals(deref(other_table))
+
+        return result
+
+    @classmethod
+    def from_pandas(cls, df, timestamps_to_ms=False, schema=None):
+        """
+        Convert pandas.DataFrame to an Arrow Table
+
+        Parameters
+        ----------
+        df: pandas.DataFrame
+
+        timestamps_to_ms: bool
+            Convert datetime columns to ms resolution. This is needed for
+            compability with other functionality like Parquet I/O which
+            only supports milliseconds.
+
+        schema: pyarrow.Schema (optional)
+            The expected schema of the Arrow Table. This can be used to
+            indicate the type of columns if we cannot infer it automatically.
+
+        Returns
+        -------
+        pyarrow.table.Table
+
+        Examples
+        --------
+
+        >>> import pandas as pd
+        >>> import pyarrow as pa
+        >>> df = pd.DataFrame({
+            ...     'int': [1, 2],
+            ...     'str': ['a', 'b']
+            ... })
+        >>> pa.Table.from_pandas(df)
+        <pyarrow.table.Table object at 0x7f05d1fb1b40>
+        """
+        names, arrays = _dataframe_to_arrays(df,
+                                             timestamps_to_ms=timestamps_to_ms,
+                                             schema=schema)
+        return cls.from_arrays(arrays, names=names)
+
+    @staticmethod
+    def from_arrays(arrays, names=None):
+        """
+        Construct a Table from Arrow arrays or columns
+
+        Parameters
+        ----------
+        arrays: list of pyarrow.Array or pyarrow.Column
+            Equal-length arrays that should form the table.
+        names: list of str, optional
+            Names for the table columns. If Columns passed, will be
+            inferred. If Arrays passed, this argument is required
+
+        Returns
+        -------
+        pyarrow.table.Table
+
+        """
+        cdef:
+            vector[shared_ptr[CField]] fields
+            vector[shared_ptr[CColumn]] columns
+            shared_ptr[CSchema] schema
+            shared_ptr[CTable] table
+
+        _schema_from_arrays(arrays, names, &schema)
+
+        cdef int K = len(arrays)
+        columns.resize(K)
+
+        for i in range(K):
+            if isinstance(arrays[i], Array):
+                columns[i].reset(new CColumn(schema.get().field(i),
+                                             (<Array> arrays[i]).sp_array))
+            elif isinstance(arrays[i], Column):
+                columns[i] = (<Column> arrays[i]).sp_column
+            else:
+                raise ValueError(type(arrays[i]))
+
+        table.reset(new CTable(schema, columns))
+        return table_from_ctable(table)
+
+    @staticmethod
+    def from_batches(batches):
+        """
+        Construct a Table from a list of Arrow RecordBatches
+
+        Parameters
+        ----------
+
+        batches: list of RecordBatch
+            RecordBatch list to be converted, schemas must be equal
+        """
+        cdef:
+            vector[shared_ptr[CRecordBatch]] c_batches
+            shared_ptr[CTable] c_table
+            RecordBatch batch
+
+        for batch in batches:
+            c_batches.push_back(batch.sp_batch)
+
+        with nogil:
+            check_status(CTable.FromRecordBatches(c_batches, &c_table))
+
+        return table_from_ctable(c_table)
+
+    def to_pandas(self, nthreads=None):
+        """
+        Convert the arrow::Table to a pandas DataFrame
+
+        Parameters
+        ----------
+        nthreads : int, default max(1, multiprocessing.cpu_count() / 2)
+            For the default, we divide the CPU count by 2 because most modern
+            computers have hyperthreading turned on, so doubling the CPU count
+            beyond the number of physical cores does not help
+
+        Returns
+        -------
+        pandas.DataFrame
+        """
+        if nthreads is None:
+            nthreads = pyarrow._config.cpu_count()
+
+        mgr = table_to_blockmanager(self.sp_table, nthreads)
+        return _pandas().DataFrame(mgr)
+
+    def to_pydict(self):
+        """
+        Converted the arrow::Table to an OrderedDict
+
+        Returns
+        -------
+        OrderedDict
+        """
+        entries = []
+        for i in range(self.table.num_columns()):
+            name = self.column(i).name
+            column = self.column(i).to_pylist()
+            entries.append((name, column))
+        return OrderedDict(entries)
+
+    @property
+    def schema(self):
+        """
+        Schema of the table and its columns
+
+        Returns
+        -------
+        pyarrow.schema.Schema
+        """
+        return box_schema(self.table.schema())
+
+    def column(self, index):
+        """
+        Select a column by its numeric index.
+
+        Parameters
+        ----------
+        index: int
+
+        Returns
+        -------
+        pyarrow.table.Column
+        """
+        self._check_nullptr()
+        cdef Column column = Column()
+        column.init(self.table.column(index))
+        return column
+
+    def __getitem__(self, i):
+        return self.column(i)
+
+    def itercolumns(self):
+        """
+        Iterator over all columns in their numerical order
+        """
+        for i in range(self.num_columns):
+            yield self.column(i)
+
+    @property
+    def num_columns(self):
+        """
+        Number of columns in this table
+
+        Returns
+        -------
+        int
+        """
+        self._check_nullptr()
+        return self.table.num_columns()
+
+    @property
+    def num_rows(self):
+        """
+        Number of rows in this table.
+
+        Due to the definition of a table, all columns have the same number of rows.
+
+        Returns
+        -------
+        int
+        """
+        self._check_nullptr()
+        return self.table.num_rows()
+
+    def __len__(self):
+        return self.num_rows
+
+    @property
+    def shape(self):
+        """
+        Dimensions of the table: (#rows, #columns)
+
+        Returns
+        -------
+        (int, int)
+        """
+        return (self.num_rows, self.num_columns)
+
+    def add_column(self, int i, Column column):
+        """
+        Add column to Table at position. Returns new table
+        """
+        cdef:
+            shared_ptr[CTable] c_table
+
+        with nogil:
+            check_status(self.table.AddColumn(i, column.sp_column, &c_table))
+
+        return table_from_ctable(c_table)
+
+    def append_column(self, Column column):
+        """
+        Append column at end of columns. Returns new table
+        """
+        return self.add_column(self.num_columns, column)
+
+    def remove_column(self, int i):
+        """
+        Create new Table with the indicated column removed
+        """
+        cdef shared_ptr[CTable] c_table
+
+        with nogil:
+            check_status(self.table.RemoveColumn(i, &c_table))
+
+        return table_from_ctable(c_table)
+
+
+def concat_tables(tables):
+    """
+    Perform zero-copy concatenation of pyarrow.Table objects. Raises exception
+    if all of the Table schemas are not the same
+
+    Parameters
+    ----------
+    tables : iterable of pyarrow.Table objects
+    output_name : string, default None
+      A name for the output table, if any
+    """
+    cdef:
+        vector[shared_ptr[CTable]] c_tables
+        shared_ptr[CTable] c_result
+        Table table
+
+    for table in tables:
+        c_tables.push_back(table.sp_table)
+
+    with nogil:
+        check_status(ConcatenateTables(c_tables, &c_result))
+
+    return table_from_ctable(c_result)
+
+
+cdef object box_column(const shared_ptr[CColumn]& ccolumn):
+    cdef Column column = Column()
+    column.init(ccolumn)
+    return column
+
+
+cdef api object table_from_ctable(const shared_ptr[CTable]& ctable):
+    cdef Table table = Table()
+    table.init(ctable)
+    return table
+
+
+cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch):
+    cdef RecordBatch batch = RecordBatch()
+    batch.init(cbatch)
+    return batch

http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/array.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pxd b/python/pyarrow/array.pxd
deleted file mode 100644
index 3ba4871..0000000
--- a/python/pyarrow/array.pxd
+++ /dev/null
@@ -1,141 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.common cimport shared_ptr, int64_t
-from pyarrow.includes.libarrow cimport CArray, CTensor
-
-from pyarrow.scalar import NA
-
-from pyarrow.schema cimport DataType
-
-from cpython cimport PyObject
-
-
-cdef extern from "Python.h":
-    int PySlice_Check(object)
-
-
-cdef class Array:
-    cdef:
-        shared_ptr[CArray] sp_array
-        CArray* ap
-
-    cdef readonly:
-        DataType type
-
-    cdef init(self, const shared_ptr[CArray]& sp_array)
-    cdef getitem(self, int64_t i)
-
-
-cdef class Tensor:
-    cdef:
-        shared_ptr[CTensor] sp_tensor
-        CTensor* tp
-
-    cdef readonly:
-        DataType type
-
-    cdef init(self, const shared_ptr[CTensor]& sp_tensor)
-
-
-cdef object box_array(const shared_ptr[CArray]& sp_array)
-cdef object box_tensor(const shared_ptr[CTensor]& sp_tensor)
-
-
-cdef class BooleanArray(Array):
-    pass
-
-
-cdef class NumericArray(Array):
-    pass
-
-
-cdef class IntegerArray(NumericArray):
-    pass
-
-
-cdef class FloatingPointArray(NumericArray):
-    pass
-
-
-cdef class Int8Array(IntegerArray):
-    pass
-
-
-cdef class UInt8Array(IntegerArray):
-    pass
-
-
-cdef class Int16Array(IntegerArray):
-    pass
-
-
-cdef class UInt16Array(IntegerArray):
-    pass
-
-
-cdef class Int32Array(IntegerArray):
-    pass
-
-
-cdef class UInt32Array(IntegerArray):
-    pass
-
-
-cdef class Int64Array(IntegerArray):
-    pass
-
-
-cdef class UInt64Array(IntegerArray):
-    pass
-
-
-cdef class FloatArray(FloatingPointArray):
-    pass
-
-
-cdef class DoubleArray(FloatingPointArray):
-    pass
-
-
-cdef class FixedSizeBinaryArray(Array):
-    pass
-
-
-cdef class DecimalArray(FixedSizeBinaryArray):
-    pass
-
-
-cdef class ListArray(Array):
-    pass
-
-
-cdef class StringArray(Array):
-    pass
-
-
-cdef class BinaryArray(Array):
-    pass
-
-
-cdef class DictionaryArray(Array):
-    cdef:
-        object _indices, _dictionary
-
-
-
-cdef wrap_array_output(PyObject* output)