You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/05/13 19:44:53 UTC

[2/4] arrow git commit: ARROW-819: Public Cython and C++ API in the style of lxml, arrow::py::import_pyarrow method

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/array.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
new file mode 100644
index 0000000..46e94b4
--- /dev/null
+++ b/python/pyarrow/array.pxi
@@ -0,0 +1,1549 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.includes.libarrow cimport *
+
+# These are imprecise because the type (in pandas 0.x) depends on the presence
+# of nulls
+_pandas_type_map = {
+    _Type_NA: np.float64,  # NaNs
+    _Type_BOOL: np.bool_,
+    _Type_INT8: np.int8,
+    _Type_INT16: np.int16,
+    _Type_INT32: np.int32,
+    _Type_INT64: np.int64,
+    _Type_UINT8: np.uint8,
+    _Type_UINT16: np.uint16,
+    _Type_UINT32: np.uint32,
+    _Type_UINT64: np.uint64,
+    _Type_HALF_FLOAT: np.float16,
+    _Type_FLOAT: np.float32,
+    _Type_DOUBLE: np.float64,
+    _Type_DATE32: np.dtype('datetime64[ns]'),
+    _Type_DATE64: np.dtype('datetime64[ns]'),
+    _Type_TIMESTAMP: np.dtype('datetime64[ns]'),
+    _Type_BINARY: np.object_,
+    _Type_FIXED_SIZE_BINARY: np.object_,
+    _Type_STRING: np.object_,
+    _Type_LIST: np.object_
+}
+
+cdef class DataType:
+
+    def __cinit__(self):
+        pass
+
+    cdef void init(self, const shared_ptr[CDataType]& type):
+        self.sp_type = type
+        self.type = type.get()
+
+    def __str__(self):
+        return frombytes(self.type.ToString())
+
+    def __repr__(self):
+        return '{0.__class__.__name__}({0})'.format(self)
+
+    def __richcmp__(DataType self, DataType other, int op):
+        if op == cp.Py_EQ:
+            return self.type.Equals(deref(other.type))
+        elif op == cp.Py_NE:
+            return not self.type.Equals(deref(other.type))
+        else:
+            raise TypeError('Invalid comparison')
+
+    def to_pandas_dtype(self):
+        """
+        Return the NumPy dtype that would be used for storing this
+        """
+        cdef Type type_id = self.type.id()
+        if type_id in _pandas_type_map:
+            return _pandas_type_map[type_id]
+        else:
+            raise NotImplementedError(str(self))
+
+
+cdef class DictionaryType(DataType):
+
+    cdef void init(self, const shared_ptr[CDataType]& type):
+        DataType.init(self, type)
+        self.dict_type = <const CDictionaryType*> type.get()
+
+
+cdef class TimestampType(DataType):
+
+    cdef void init(self, const shared_ptr[CDataType]& type):
+        DataType.init(self, type)
+        self.ts_type = <const CTimestampType*> type.get()
+
+    property unit:
+
+        def __get__(self):
+            return timeunit_to_string(self.ts_type.unit())
+
+    property tz:
+
+        def __get__(self):
+            if self.ts_type.timezone().size() > 0:
+                return frombytes(self.ts_type.timezone())
+            else:
+                return None
+
+
+cdef class Time32Type(DataType):
+
+    cdef void init(self, const shared_ptr[CDataType]& type):
+        DataType.init(self, type)
+        self.time_type = <const CTime32Type*> type.get()
+
+    property unit:
+
+        def __get__(self):
+            return timeunit_to_string(self.time_type.unit())
+
+
+cdef class Time64Type(DataType):
+
+    cdef void init(self, const shared_ptr[CDataType]& type):
+        DataType.init(self, type)
+        self.time_type = <const CTime64Type*> type.get()
+
+    property unit:
+
+        def __get__(self):
+            return timeunit_to_string(self.time_type.unit())
+
+
+cdef class FixedSizeBinaryType(DataType):
+
+    cdef void init(self, const shared_ptr[CDataType]& type):
+        DataType.init(self, type)
+        self.fixed_size_binary_type = (
+            <const CFixedSizeBinaryType*> type.get())
+
+    property byte_width:
+
+        def __get__(self):
+            return self.fixed_size_binary_type.byte_width()
+
+
+cdef class DecimalType(FixedSizeBinaryType):
+
+    cdef void init(self, const shared_ptr[CDataType]& type):
+        DataType.init(self, type)
+        self.decimal_type = <const CDecimalType*> type.get()
+
+
+cdef class Field:
+    """
+    Represents a named field, with a data type, nullability, and optional
+    metadata
+
+    Notes
+    -----
+    Do not use this class's constructor directly; use pyarrow.field
+    """
+    def __cinit__(self):
+        pass
+
+    cdef init(self, const shared_ptr[CField]& field):
+        self.sp_field = field
+        self.field = field.get()
+        self.type = pyarrow_wrap_data_type(field.get().type())
+
+    def equals(self, Field other):
+        """
+        Test if this field is equal to the other
+        """
+        return self.field.Equals(deref(other.field))
+
+    def __str__(self):
+        self._check_null()
+        return 'pyarrow.Field<{0}>'.format(frombytes(self.field.ToString()))
+
+    def __repr__(self):
+        return self.__str__()
+
+    property nullable:
+
+        def __get__(self):
+            self._check_null()
+            return self.field.nullable()
+
+    property name:
+
+        def __get__(self):
+            self._check_null()
+            return frombytes(self.field.name())
+
+    property metadata:
+
+        def __get__(self):
+            self._check_null()
+            return box_metadata(self.field.metadata().get())
+
+    def _check_null(self):
+        if self.field == NULL:
+            raise ReferenceError(
+                'Field not initialized (references NULL pointer)')
+
+    def add_metadata(self, dict metadata):
+        """
+        Add metadata as dict of string keys and values to Field
+
+        Parameters
+        ----------
+        metadata : dict
+            Keys and values must be string-like / coercible to bytes
+
+        Returns
+        -------
+        field : pyarrow.Field
+        """
+        cdef shared_ptr[CKeyValueMetadata] c_meta
+        convert_metadata(metadata, &c_meta)
+
+        cdef shared_ptr[CField] new_field
+        with nogil:
+            check_status(self.field.AddMetadata(c_meta, &new_field))
+
+        return pyarrow_wrap_field(new_field)
+
+    def remove_metadata(self):
+        """
+        Create new field without metadata, if any
+
+        Returns
+        -------
+        field : pyarrow.Field
+        """
+        cdef shared_ptr[CField] new_field
+        with nogil:
+            new_field = self.field.RemoveMetadata()
+        return pyarrow_wrap_field(new_field)
+
+
+cdef class Schema:
+
+    def __cinit__(self):
+        pass
+
+    def __len__(self):
+        return self.schema.num_fields()
+
+    def __getitem__(self, i):
+        if i < 0 or i >= len(self):
+            raise IndexError("{0} is out of bounds".format(i))
+
+        cdef Field result = Field()
+        result.init(self.schema.field(i))
+        result.type = pyarrow_wrap_data_type(result.field.type())
+
+        return result
+
+    cdef init(self, const vector[shared_ptr[CField]]& fields):
+        self.schema = new CSchema(fields)
+        self.sp_schema.reset(self.schema)
+
+    cdef init_schema(self, const shared_ptr[CSchema]& schema):
+        self.schema = schema.get()
+        self.sp_schema = schema
+
+    property names:
+
+        def __get__(self):
+            cdef int i
+            result = []
+            for i in range(self.schema.num_fields()):
+                name = frombytes(self.schema.field(i).get().name())
+                result.append(name)
+            return result
+
+    property metadata:
+
+        def __get__(self):
+            return box_metadata(self.schema.metadata().get())
+
+    def equals(self, other):
+        """
+        Test if this schema is equal to the other
+        """
+        cdef Schema _other
+        _other = other
+
+        return self.sp_schema.get().Equals(deref(_other.schema))
+
+    def field_by_name(self, name):
+        """
+        Access a field by its name rather than the column index.
+
+        Parameters
+        ----------
+        name: str
+
+        Returns
+        -------
+        field: pyarrow.Field
+        """
+        return pyarrow_wrap_field(self.schema.GetFieldByName(tobytes(name)))
+
+    def add_metadata(self, dict metadata):
+        """
+        Add metadata as dict of string keys and values to Schema
+
+        Parameters
+        ----------
+        metadata : dict
+            Keys and values must be string-like / coercible to bytes
+
+        Returns
+        -------
+        schema : pyarrow.Schema
+        """
+        cdef shared_ptr[CKeyValueMetadata] c_meta
+        convert_metadata(metadata, &c_meta)
+
+        cdef shared_ptr[CSchema] new_schema
+        with nogil:
+            check_status(self.schema.AddMetadata(c_meta, &new_schema))
+
+        return pyarrow_wrap_schema(new_schema)
+
+    def remove_metadata(self):
+        """
+        Create new schema without metadata, if any
+
+        Returns
+        -------
+        schema : pyarrow.Schema
+        """
+        cdef shared_ptr[CSchema] new_schema
+        with nogil:
+            new_schema = self.schema.RemoveMetadata()
+        return pyarrow_wrap_schema(new_schema)
+
+    def __str__(self):
+        return frombytes(self.schema.ToString())
+
+    def __repr__(self):
+        return self.__str__()
+
+
+cdef box_metadata(const CKeyValueMetadata* metadata):
+    cdef unordered_map[c_string, c_string] result
+    if metadata != NULL:
+        metadata.ToUnorderedMap(&result)
+        return result
+    else:
+        return None
+
+
+cdef dict _type_cache = {}
+
+
+cdef DataType primitive_type(Type type):
+    if type in _type_cache:
+        return _type_cache[type]
+
+    cdef DataType out = DataType()
+    out.init(GetPrimitiveType(type))
+
+    _type_cache[type] = out
+    return out
+
+#------------------------------------------------------------
+# Type factory functions
+
+cdef int convert_metadata(dict metadata,
+                          shared_ptr[CKeyValueMetadata]* out) except -1:
+    cdef:
+        shared_ptr[CKeyValueMetadata] meta = (
+            make_shared[CKeyValueMetadata]())
+        c_string key, value
+
+    for py_key, py_value in metadata.items():
+        key = tobytes(py_key)
+        value = tobytes(py_value)
+        meta.get().Append(key, value)
+    out[0] = meta
+    return 0
+
+
+def field(name, DataType type, bint nullable=True, dict metadata=None):
+    """
+    Create a pyarrow.Field instance
+
+    Parameters
+    ----------
+    name : string or bytes
+    type : pyarrow.DataType
+    nullable : boolean, default True
+    metadata : dict, default None
+        Keys and values must be coercible to bytes
+
+    Returns
+    -------
+    field : pyarrow.Field
+    """
+    cdef:
+        shared_ptr[CKeyValueMetadata] c_meta
+        Field result = Field()
+
+    if metadata is not None:
+        convert_metadata(metadata, &c_meta)
+
+    result.sp_field.reset(new CField(tobytes(name), type.sp_type,
+                                     nullable, c_meta))
+    result.field = result.sp_field.get()
+    result.type = type
+    return result
+
+
+cdef set PRIMITIVE_TYPES = set([
+    _Type_NA, _Type_BOOL,
+    _Type_UINT8, _Type_INT8,
+    _Type_UINT16, _Type_INT16,
+    _Type_UINT32, _Type_INT32,
+    _Type_UINT64, _Type_INT64,
+    _Type_TIMESTAMP, _Type_DATE32,
+    _Type_DATE64,
+    _Type_HALF_FLOAT,
+    _Type_FLOAT,
+    _Type_DOUBLE])
+
+
+def null():
+    return primitive_type(_Type_NA)
+
+
+def bool_():
+    return primitive_type(_Type_BOOL)
+
+
+def uint8():
+    return primitive_type(_Type_UINT8)
+
+
+def int8():
+    return primitive_type(_Type_INT8)
+
+
+def uint16():
+    return primitive_type(_Type_UINT16)
+
+
+def int16():
+    return primitive_type(_Type_INT16)
+
+
+def uint32():
+    return primitive_type(_Type_UINT32)
+
+
+def int32():
+    return primitive_type(_Type_INT32)
+
+
+def uint64():
+    return primitive_type(_Type_UINT64)
+
+
+def int64():
+    return primitive_type(_Type_INT64)
+
+
+cdef dict _timestamp_type_cache = {}
+cdef dict _time_type_cache = {}
+
+
+cdef timeunit_to_string(TimeUnit unit):
+    if unit == TimeUnit_SECOND:
+        return 's'
+    elif unit == TimeUnit_MILLI:
+        return 'ms'
+    elif unit == TimeUnit_MICRO:
+        return 'us'
+    elif unit == TimeUnit_NANO:
+        return 'ns'
+
+
+def timestamp(unit_str, tz=None):
+    cdef:
+        TimeUnit unit
+        c_string c_timezone
+
+    if unit_str == "s":
+        unit = TimeUnit_SECOND
+    elif unit_str == 'ms':
+        unit = TimeUnit_MILLI
+    elif unit_str == 'us':
+        unit = TimeUnit_MICRO
+    elif unit_str == 'ns':
+        unit = TimeUnit_NANO
+    else:
+        raise ValueError('Invalid TimeUnit string')
+
+    cdef TimestampType out = TimestampType()
+
+    if tz is None:
+        out.init(ctimestamp(unit))
+        if unit in _timestamp_type_cache:
+            return _timestamp_type_cache[unit]
+        _timestamp_type_cache[unit] = out
+    else:
+        if not isinstance(tz, six.string_types):
+            tz = tz.zone
+
+        c_timezone = tobytes(tz)
+        out.init(ctimestamp(unit, c_timezone))
+
+    return out
+
+
+def time32(unit_str):
+    cdef:
+        TimeUnit unit
+        c_string c_timezone
+
+    if unit_str == "s":
+        unit = TimeUnit_SECOND
+    elif unit_str == 'ms':
+        unit = TimeUnit_MILLI
+    else:
+        raise ValueError('Invalid TimeUnit for time32: {}'.format(unit_str))
+
+    cdef Time32Type out
+    if unit in _time_type_cache:
+        return _time_type_cache[unit]
+    else:
+        out = Time32Type()
+        out.init(ctime32(unit))
+        _time_type_cache[unit] = out
+        return out
+
+
+def time64(unit_str):
+    cdef:
+        TimeUnit unit
+        c_string c_timezone
+
+    if unit_str == "us":
+        unit = TimeUnit_MICRO
+    elif unit_str == 'ns':
+        unit = TimeUnit_NANO
+    else:
+        raise ValueError('Invalid TimeUnit for time64: {}'.format(unit_str))
+
+    cdef Time64Type out
+    if unit in _time_type_cache:
+        return _time_type_cache[unit]
+    else:
+        out = Time64Type()
+        out.init(ctime64(unit))
+        _time_type_cache[unit] = out
+        return out
+
+
+def date32():
+    return primitive_type(_Type_DATE32)
+
+
+def date64():
+    return primitive_type(_Type_DATE64)
+
+
+def float16():
+    return primitive_type(_Type_HALF_FLOAT)
+
+
+def float32():
+    return primitive_type(_Type_FLOAT)
+
+
+def float64():
+    return primitive_type(_Type_DOUBLE)
+
+
+cpdef DataType decimal(int precision, int scale=0):
+    cdef shared_ptr[CDataType] decimal_type
+    decimal_type.reset(new CDecimalType(precision, scale))
+    return pyarrow_wrap_data_type(decimal_type)
+
+
+def string():
+    """
+    UTF8 string
+    """
+    return primitive_type(_Type_STRING)
+
+
+def binary(int length=-1):
+    """Binary (PyBytes-like) type
+
+    Parameters
+    ----------
+    length : int, optional, default -1
+        If length == -1 then return a variable length binary type. If length is
+        greater than or equal to 0 then return a fixed size binary type of
+        width `length`.
+    """
+    if length == -1:
+        return primitive_type(_Type_BINARY)
+
+    cdef shared_ptr[CDataType] fixed_size_binary_type
+    fixed_size_binary_type.reset(new CFixedSizeBinaryType(length))
+    return pyarrow_wrap_data_type(fixed_size_binary_type)
+
+
+def list_(DataType value_type):
+    cdef DataType out = DataType()
+    cdef shared_ptr[CDataType] list_type
+    list_type.reset(new CListType(value_type.sp_type))
+    out.init(list_type)
+    return out
+
+
+def dictionary(DataType index_type, Array dictionary):
+    """
+    Dictionary (categorical, or simply encoded) type
+    """
+    cdef DictionaryType out = DictionaryType()
+    cdef shared_ptr[CDataType] dict_type
+    dict_type.reset(new CDictionaryType(index_type.sp_type,
+                                        dictionary.sp_array))
+    out.init(dict_type)
+    return out
+
+
+def struct(fields):
+    """
+
+    """
+    cdef:
+        DataType out = DataType()
+        Field field
+        vector[shared_ptr[CField]] c_fields
+        cdef shared_ptr[CDataType] struct_type
+
+    for field in fields:
+        c_fields.push_back(field.sp_field)
+
+    struct_type.reset(new CStructType(c_fields))
+    out.init(struct_type)
+    return out
+
+
+def schema(fields):
+    """
+    Construct pyarrow.Schema from collection of fields
+
+    Parameters
+    ----------
+    field : list or iterable
+
+    Returns
+    -------
+    schema : pyarrow.Schema
+    """
+    cdef:
+        Schema result
+        Field field
+        vector[shared_ptr[CField]] c_fields
+
+    for i, field in enumerate(fields):
+        c_fields.push_back(field.sp_field)
+
+    result = Schema()
+    result.init(c_fields)
+    return result
+
+
+def from_numpy_dtype(object dtype):
+    """
+    Convert NumPy dtype to pyarrow.DataType
+    """
+    cdef shared_ptr[CDataType] c_type
+    with nogil:
+        check_status(NumPyDtypeToArrow(dtype, &c_type))
+
+    return pyarrow_wrap_data_type(c_type)
+
+
+NA = None
+
+
+cdef class NAType(Scalar):
+
+    def __cinit__(self):
+        global NA
+        if NA is not None:
+            raise Exception('Cannot create multiple NAType instances')
+
+        self.type = null()
+
+    def __repr__(self):
+        return 'NA'
+
+    def as_py(self):
+        return None
+
+
+NA = NAType()
+
+
+cdef class ArrayValue(Scalar):
+
+    cdef void init(self, DataType type, const shared_ptr[CArray]& sp_array,
+                   int64_t index):
+        self.type = type
+        self.index = index
+        self._set_array(sp_array)
+
+    cdef void _set_array(self, const shared_ptr[CArray]& sp_array):
+        self.sp_array = sp_array
+
+    def __repr__(self):
+        if hasattr(self, 'as_py'):
+            return repr(self.as_py())
+        else:
+            return super(Scalar, self).__repr__()
+
+
+cdef class BooleanValue(ArrayValue):
+
+    def as_py(self):
+        cdef CBooleanArray* ap = <CBooleanArray*> self.sp_array.get()
+        return ap.Value(self.index)
+
+
+cdef class Int8Value(ArrayValue):
+
+    def as_py(self):
+        cdef CInt8Array* ap = <CInt8Array*> self.sp_array.get()
+        return ap.Value(self.index)
+
+
+cdef class UInt8Value(ArrayValue):
+
+    def as_py(self):
+        cdef CUInt8Array* ap = <CUInt8Array*> self.sp_array.get()
+        return ap.Value(self.index)
+
+
+cdef class Int16Value(ArrayValue):
+
+    def as_py(self):
+        cdef CInt16Array* ap = <CInt16Array*> self.sp_array.get()
+        return ap.Value(self.index)
+
+
+cdef class UInt16Value(ArrayValue):
+
+    def as_py(self):
+        cdef CUInt16Array* ap = <CUInt16Array*> self.sp_array.get()
+        return ap.Value(self.index)
+
+
+cdef class Int32Value(ArrayValue):
+
+    def as_py(self):
+        cdef CInt32Array* ap = <CInt32Array*> self.sp_array.get()
+        return ap.Value(self.index)
+
+
+cdef class UInt32Value(ArrayValue):
+
+    def as_py(self):
+        cdef CUInt32Array* ap = <CUInt32Array*> self.sp_array.get()
+        return ap.Value(self.index)
+
+
+cdef class Int64Value(ArrayValue):
+
+    def as_py(self):
+        cdef CInt64Array* ap = <CInt64Array*> self.sp_array.get()
+        return ap.Value(self.index)
+
+
+cdef class UInt64Value(ArrayValue):
+
+    def as_py(self):
+        cdef CUInt64Array* ap = <CUInt64Array*> self.sp_array.get()
+        return ap.Value(self.index)
+
+
+cdef class Date32Value(ArrayValue):
+
+    def as_py(self):
+        cdef CDate32Array* ap = <CDate32Array*> self.sp_array.get()
+
+        # Shift to seconds since epoch
+        return datetime.datetime.utcfromtimestamp(
+            int(ap.Value(self.index)) * 86400).date()
+
+
+cdef class Date64Value(ArrayValue):
+
+    def as_py(self):
+        cdef CDate64Array* ap = <CDate64Array*> self.sp_array.get()
+        return datetime.datetime.utcfromtimestamp(
+            ap.Value(self.index) / 1000).date()
+
+
+cdef class TimestampValue(ArrayValue):
+
+    def as_py(self):
+        cdef:
+            CTimestampArray* ap = <CTimestampArray*> self.sp_array.get()
+            CTimestampType* dtype = <CTimestampType*>ap.type().get()
+            int64_t val = ap.Value(self.index)
+
+        timezone = None
+        tzinfo = None
+        if dtype.timezone().size() > 0:
+            timezone = frombytes(dtype.timezone())
+            import pytz
+            tzinfo = pytz.timezone(timezone)
+
+        try:
+            pd = _pandas()
+            if dtype.unit() == TimeUnit_SECOND:
+                val = val * 1000000000
+            elif dtype.unit() == TimeUnit_MILLI:
+                val = val * 1000000
+            elif dtype.unit() == TimeUnit_MICRO:
+                val = val * 1000
+            return pd.Timestamp(val, tz=tzinfo)
+        except ImportError:
+            if dtype.unit() == TimeUnit_SECOND:
+                result = datetime.datetime.utcfromtimestamp(val)
+            elif dtype.unit() == TimeUnit_MILLI:
+                result = datetime.datetime.utcfromtimestamp(float(val) / 1000)
+            elif dtype.unit() == TimeUnit_MICRO:
+                result = datetime.datetime.utcfromtimestamp(
+                    float(val) / 1000000)
+            else:
+                # TimeUnit_NANO
+                raise NotImplementedError("Cannot convert nanosecond "
+                                          "timestamps without pandas")
+            if timezone is not None:
+                result = result.replace(tzinfo=tzinfo)
+            return result
+
+
+cdef class FloatValue(ArrayValue):
+
+    def as_py(self):
+        cdef CFloatArray* ap = <CFloatArray*> self.sp_array.get()
+        return ap.Value(self.index)
+
+
+cdef class DoubleValue(ArrayValue):
+
+    def as_py(self):
+        cdef CDoubleArray* ap = <CDoubleArray*> self.sp_array.get()
+        return ap.Value(self.index)
+
+
+cdef class DecimalValue(ArrayValue):
+
+    def as_py(self):
+        cdef:
+            CDecimalArray* ap = <CDecimalArray*> self.sp_array.get()
+            c_string s = ap.FormatValue(self.index)
+        return _pydecimal.Decimal(s.decode('utf8'))
+
+
+cdef class StringValue(ArrayValue):
+
+    def as_py(self):
+        cdef CStringArray* ap = <CStringArray*> self.sp_array.get()
+        return ap.GetString(self.index).decode('utf-8')
+
+
+cdef class BinaryValue(ArrayValue):
+
+    def as_py(self):
+        cdef:
+            const uint8_t* ptr
+            int32_t length
+            CBinaryArray* ap = <CBinaryArray*> self.sp_array.get()
+
+        ptr = ap.GetValue(self.index, &length)
+        return cp.PyBytes_FromStringAndSize(<const char*>(ptr), length)
+
+
+cdef class ListValue(ArrayValue):
+
+    def __len__(self):
+        return self.ap.value_length(self.index)
+
+    def __getitem__(self, i):
+        return self.getitem(i)
+
+    def __iter__(self):
+        for i in range(len(self)):
+            yield self.getitem(i)
+        raise StopIteration
+
+    cdef void _set_array(self, const shared_ptr[CArray]& sp_array):
+        self.sp_array = sp_array
+        self.ap = <CListArray*> sp_array.get()
+        self.value_type = pyarrow_wrap_data_type(self.ap.value_type())
+
+    cdef getitem(self, int64_t i):
+        cdef int64_t j = self.ap.value_offset(self.index) + i
+        return box_scalar(self.value_type, self.ap.values(), j)
+
+    def as_py(self):
+        cdef:
+            int64_t j
+            list result = []
+
+        for j in range(len(self)):
+            result.append(self.getitem(j).as_py())
+
+        return result
+
+
+cdef class FixedSizeBinaryValue(ArrayValue):
+
+    def as_py(self):
+        cdef:
+            CFixedSizeBinaryArray* ap
+            CFixedSizeBinaryType* ap_type
+            int32_t length
+            const char* data
+        ap = <CFixedSizeBinaryArray*> self.sp_array.get()
+        ap_type = <CFixedSizeBinaryType*> ap.type().get()
+        length = ap_type.byte_width()
+        data = <const char*> ap.GetValue(self.index)
+        return cp.PyBytes_FromStringAndSize(data, length)
+
+
+
+cdef dict _scalar_classes = {
+    _Type_BOOL: BooleanValue,
+    _Type_UINT8: Int8Value,
+    _Type_UINT16: Int16Value,
+    _Type_UINT32: Int32Value,
+    _Type_UINT64: Int64Value,
+    _Type_INT8: Int8Value,
+    _Type_INT16: Int16Value,
+    _Type_INT32: Int32Value,
+    _Type_INT64: Int64Value,
+    _Type_DATE32: Date32Value,
+    _Type_DATE64: Date64Value,
+    _Type_TIMESTAMP: TimestampValue,
+    _Type_FLOAT: FloatValue,
+    _Type_DOUBLE: DoubleValue,
+    _Type_LIST: ListValue,
+    _Type_BINARY: BinaryValue,
+    _Type_STRING: StringValue,
+    _Type_FIXED_SIZE_BINARY: FixedSizeBinaryValue,
+    _Type_DECIMAL: DecimalValue,
+}
+
+cdef object box_scalar(DataType type, const shared_ptr[CArray]& sp_array,
+                       int64_t index):
+    cdef ArrayValue val
+    if type.type.id() == _Type_NA:
+        return NA
+    elif sp_array.get().IsNull(index):
+        return NA
+    else:
+        val = _scalar_classes[type.type.id()]()
+        val.init(type, sp_array, index)
+        return val
+
+
+cdef maybe_coerce_datetime64(values, dtype, DataType type,
+                             timestamps_to_ms=False):
+
+    from pyarrow.compat import DatetimeTZDtype
+
+    if values.dtype.type != np.datetime64:
+        return values, type
+
+    coerce_ms = timestamps_to_ms and values.dtype != 'datetime64[ms]'
+
+    if coerce_ms:
+        values = values.astype('datetime64[ms]')
+
+    if isinstance(dtype, DatetimeTZDtype):
+        tz = dtype.tz
+        unit = 'ms' if coerce_ms else dtype.unit
+        type = timestamp(unit, tz)
+    elif type is None:
+        # Trust the NumPy dtype
+        type = from_numpy_dtype(values.dtype)
+
+    return values, type
+
+
+
+def array(object sequence, DataType type=None, MemoryPool memory_pool=None):
+    """
+    Create pyarrow.Array instance from a Python sequence
+
+    Parameters
+    ----------
+    sequence : sequence-like object of Python objects
+    type : pyarrow.DataType, optional
+        If not passed, will be inferred from the data
+    memory_pool : pyarrow.MemoryPool, optional
+        If not passed, will allocate memory from the currently-set default
+        memory pool
+
+    Returns
+    -------
+    array : pyarrow.Array
+    """
+    cdef:
+       shared_ptr[CArray] sp_array
+       CMemoryPool* pool
+
+    pool = maybe_unbox_memory_pool(memory_pool)
+    if type is None:
+        check_status(ConvertPySequence(sequence, pool, &sp_array))
+    else:
+        check_status(
+            ConvertPySequence(
+                sequence, pool, &sp_array, type.sp_type
+            )
+        )
+
+    return pyarrow_wrap_array(sp_array)
+
+
+
+cdef class Array:
+
+    cdef init(self, const shared_ptr[CArray]& sp_array):
+        self.sp_array = sp_array
+        self.ap = sp_array.get()
+        self.type = pyarrow_wrap_data_type(self.sp_array.get().type())
+
+    @staticmethod
+    def from_pandas(obj, mask=None, DataType type=None,
+                    timestamps_to_ms=False,
+                    MemoryPool memory_pool=None):
+        """
+        Convert pandas.Series to an Arrow Array.
+
+        Parameters
+        ----------
+        series : pandas.Series or numpy.ndarray
+
+        mask : pandas.Series or numpy.ndarray, optional
+            boolean mask if the object is valid or null
+
+        type : pyarrow.DataType
+            Explicit type to attempt to coerce to
+
+        timestamps_to_ms : bool, optional
+            Convert datetime columns to ms resolution. This is needed for
+            compatibility with other functionality like Parquet I/O which
+            only supports milliseconds.
+
+        memory_pool: MemoryPool, optional
+            Specific memory pool to use to allocate the resulting Arrow array.
+
+        Notes
+        -----
+        Localized timestamps will currently be returned as UTC (pandas's native
+        representation).  Timezone-naive data will be implicitly interpreted as
+        UTC.
+
+        Examples
+        --------
+
+        >>> import pandas as pd
+        >>> import pyarrow as pa
+        >>> pa.Array.from_pandas(pd.Series([1, 2]))
+        <pyarrow.array.Int64Array object at 0x7f674e4c0e10>
+        [
+          1,
+          2
+        ]
+
+        >>> import numpy as np
+        >>> pa.Array.from_pandas(pd.Series([1, 2]), np.array([0, 1],
+        ... dtype=bool))
+        <pyarrow.array.Int64Array object at 0x7f9019e11208>
+        [
+          1,
+          NA
+        ]
+
+        Returns
+        -------
+        pyarrow.array.Array
+        """
+        cdef:
+            shared_ptr[CArray] out
+            shared_ptr[CDataType] c_type
+            CMemoryPool* pool
+
+        if mask is not None:
+            mask = get_series_values(mask)
+
+        values = get_series_values(obj)
+        pool = maybe_unbox_memory_pool(memory_pool)
+
+        if isinstance(values, Categorical):
+            return DictionaryArray.from_arrays(
+                values.codes, values.categories.values,
+                mask=mask, memory_pool=memory_pool)
+        elif values.dtype == object:
+            # Object dtype undergoes a different conversion path as more type
+            # inference may be needed
+            if type is not None:
+                c_type = type.sp_type
+            with nogil:
+                check_status(PandasObjectsToArrow(
+                    pool, values, mask, c_type, &out))
+        else:
+            values, type = maybe_coerce_datetime64(
+                values, obj.dtype, type, timestamps_to_ms=timestamps_to_ms)
+
+            if type is None:
+                check_status(NumPyDtypeToArrow(values.dtype, &c_type))
+            else:
+                c_type = type.sp_type
+
+            with nogil:
+                check_status(PandasToArrow(
+                    pool, values, mask, c_type, &out))
+
+        return pyarrow_wrap_array(out)
+
+    property null_count:
+
+        def __get__(self):
+            return self.sp_array.get().null_count()
+
+    def __iter__(self):
+        for i in range(len(self)):
+            yield self.getitem(i)
+        raise StopIteration
+
+    def __repr__(self):
+        from pyarrow.formatting import array_format
+        type_format = object.__repr__(self)
+        values = array_format(self, window=10)
+        return '{0}\n{1}'.format(type_format, values)
+
+    def equals(Array self, Array other):
+        return self.ap.Equals(deref(other.ap))
+
+    def __len__(self):
+        if self.sp_array.get():
+            return self.sp_array.get().length()
+        else:
+            return 0
+
+    def isnull(self):
+        raise NotImplemented
+
+    def __getitem__(self, key):
+        cdef:
+            Py_ssize_t n = len(self)
+
+        if PySlice_Check(key):
+            start = key.start or 0
+            while start < 0:
+                start += n
+
+            stop = key.stop if key.stop is not None else n
+            while stop < 0:
+                stop += n
+
+            step = key.step or 1
+            if step != 1:
+                raise IndexError('only slices with step 1 supported')
+            else:
+                return self.slice(start, stop - start)
+
+        while key < 0:
+            key += len(self)
+
+        return self.getitem(key)
+
+    cdef getitem(self, int64_t i):
+        return box_scalar(self.type, self.sp_array, i)
+
+    def slice(self, offset=0, length=None):
+        """
+        Compute zero-copy slice of this array
+
+        Parameters
+        ----------
+        offset : int, default 0
+            Offset from start of array to slice
+        length : int, default None
+            Length of slice (default is until end of Array starting from
+            offset)
+
+        Returns
+        -------
+        sliced : RecordBatch
+        """
+        cdef:
+            shared_ptr[CArray] result
+
+        if offset < 0:
+            raise IndexError('Offset must be non-negative')
+
+        if length is None:
+            result = self.ap.Slice(offset)
+        else:
+            result = self.ap.Slice(offset, length)
+
+        return pyarrow_wrap_array(result)
+
+    def to_pandas(self):
+        """
+        Convert to an array object suitable for use in pandas
+
+        See also
+        --------
+        Column.to_pandas
+        Table.to_pandas
+        RecordBatch.to_pandas
+        """
+        cdef:
+            PyObject* out
+
+        with nogil:
+            check_status(ConvertArrayToPandas(self.sp_array, self, &out))
+        return wrap_array_output(out)
+
+    def to_pylist(self):
+        """
+        Convert to an list of native Python objects.
+        """
+        return [x.as_py() for x in self]
+
+
+cdef class Tensor:
+
+    cdef init(self, const shared_ptr[CTensor]& sp_tensor):
+        self.sp_tensor = sp_tensor
+        self.tp = sp_tensor.get()
+        self.type = pyarrow_wrap_data_type(self.tp.type())
+
+    def __repr__(self):
+        return """<pyarrow.Tensor>
+type: {0}
+shape: {1}
+strides: {2}""".format(self.type, self.shape, self.strides)
+
+    @staticmethod
+    def from_numpy(obj):
+        cdef shared_ptr[CTensor] ctensor
+        check_status(NdarrayToTensor(c_default_memory_pool(), obj, &ctensor))
+        return pyarrow_wrap_tensor(ctensor)
+
+    def to_numpy(self):
+        """
+        Convert arrow::Tensor to numpy.ndarray with zero copy
+        """
+        cdef:
+            PyObject* out
+
+        check_status(TensorToNdarray(deref(self.tp), self, &out))
+        return PyObject_to_object(out)
+
+    def equals(self, Tensor other):
+        """
+        Return true if the tensors contains exactly equal data
+        """
+        return self.tp.Equals(deref(other.tp))
+
+    property is_mutable:
+
+        def __get__(self):
+            return self.tp.is_mutable()
+
+    property is_contiguous:
+
+        def __get__(self):
+            return self.tp.is_contiguous()
+
+    property ndim:
+
+        def __get__(self):
+            return self.tp.ndim()
+
+    property size:
+
+        def __get__(self):
+            return self.tp.size()
+
+    property shape:
+
+        def __get__(self):
+            cdef size_t i
+            py_shape = []
+            for i in range(self.tp.shape().size()):
+                py_shape.append(self.tp.shape()[i])
+            return py_shape
+
+    property strides:
+
+        def __get__(self):
+            cdef size_t i
+            py_strides = []
+            for i in range(self.tp.strides().size()):
+                py_strides.append(self.tp.strides()[i])
+            return py_strides
+
+
+
+cdef wrap_array_output(PyObject* output):
+    cdef object obj = PyObject_to_object(output)
+
+    if isinstance(obj, dict):
+        return Categorical(obj['indices'],
+                           categories=obj['dictionary'],
+                           fastpath=True)
+    else:
+        return obj
+
+
+cdef class NullArray(Array):
+    pass
+
+
+cdef class BooleanArray(Array):
+    pass
+
+
+cdef class NumericArray(Array):
+    pass
+
+
+cdef class IntegerArray(NumericArray):
+    pass
+
+
+cdef class FloatingPointArray(NumericArray):
+    pass
+
+
+cdef class Int8Array(IntegerArray):
+    pass
+
+
+cdef class UInt8Array(IntegerArray):
+    pass
+
+
+cdef class Int16Array(IntegerArray):
+    pass
+
+
+cdef class UInt16Array(IntegerArray):
+    pass
+
+
+cdef class Int32Array(IntegerArray):
+    pass
+
+
+cdef class UInt32Array(IntegerArray):
+    pass
+
+
+cdef class Int64Array(IntegerArray):
+    pass
+
+
+cdef class UInt64Array(IntegerArray):
+    pass
+
+
+cdef class Date32Array(NumericArray):
+    pass
+
+
+cdef class Date64Array(NumericArray):
+    pass
+
+
+cdef class TimestampArray(NumericArray):
+    pass
+
+
+cdef class Time32Array(NumericArray):
+    pass
+
+
+cdef class Time64Array(NumericArray):
+    pass
+
+
+cdef class FloatArray(FloatingPointArray):
+    pass
+
+
+cdef class DoubleArray(FloatingPointArray):
+    pass
+
+
+cdef class FixedSizeBinaryArray(Array):
+    pass
+
+
+cdef class DecimalArray(FixedSizeBinaryArray):
+    pass
+
+
+cdef class ListArray(Array):
+    pass
+
+
+cdef class StringArray(Array):
+    pass
+
+
+cdef class BinaryArray(Array):
+    pass
+
+
+cdef class DictionaryArray(Array):
+
+    cdef getitem(self, int64_t i):
+        cdef Array dictionary = self.dictionary
+        index = self.indices[i]
+        if index is NA:
+            return index
+        else:
+            return box_scalar(dictionary.type, dictionary.sp_array,
+                              index.as_py())
+
+    property dictionary:
+
+        def __get__(self):
+            cdef CDictionaryArray* darr = <CDictionaryArray*>(self.ap)
+
+            if self._dictionary is None:
+                self._dictionary = pyarrow_wrap_array(darr.dictionary())
+
+            return self._dictionary
+
+    property indices:
+
+        def __get__(self):
+            cdef CDictionaryArray* darr = <CDictionaryArray*>(self.ap)
+
+            if self._indices is None:
+                self._indices = pyarrow_wrap_array(darr.indices())
+
+            return self._indices
+
+    @staticmethod
+    def from_arrays(indices, dictionary, mask=None,
+                    MemoryPool memory_pool=None):
+        """
+        Construct Arrow DictionaryArray from array of indices (must be
+        non-negative integers) and corresponding array of dictionary values
+
+        Parameters
+        ----------
+        indices : ndarray or pandas.Series, integer type
+        dictionary : ndarray or pandas.Series
+        mask : ndarray or pandas.Series, boolean type
+            True values indicate that indices are actually null
+
+        Returns
+        -------
+        dict_array : DictionaryArray
+        """
+        cdef:
+            Array arrow_indices, arrow_dictionary
+            DictionaryArray result
+            shared_ptr[CDataType] c_type
+            shared_ptr[CArray] c_result
+
+        if isinstance(indices, Array):
+            if mask is not None:
+                raise NotImplementedError(
+                    "mask not implemented with Arrow array inputs yet")
+            arrow_indices = indices
+        else:
+            if mask is None:
+                mask = indices == -1
+            else:
+                mask = mask | (indices == -1)
+            arrow_indices = Array.from_pandas(indices, mask=mask,
+                                              memory_pool=memory_pool)
+
+        if isinstance(dictionary, Array):
+            arrow_dictionary = dictionary
+        else:
+            arrow_dictionary = Array.from_pandas(dictionary,
+                                                 memory_pool=memory_pool)
+
+        if not isinstance(arrow_indices, IntegerArray):
+            raise ValueError('Indices must be integer type')
+
+        c_type.reset(new CDictionaryType(arrow_indices.type.sp_type,
+                                         arrow_dictionary.sp_array))
+        c_result.reset(new CDictionaryArray(c_type, arrow_indices.sp_array))
+
+        result = DictionaryArray()
+        result.init(c_result)
+        return result
+
+
+cdef dict _array_classes = {
+    _Type_NA: NullArray,
+    _Type_BOOL: BooleanArray,
+    _Type_UINT8: UInt8Array,
+    _Type_UINT16: UInt16Array,
+    _Type_UINT32: UInt32Array,
+    _Type_UINT64: UInt64Array,
+    _Type_INT8: Int8Array,
+    _Type_INT16: Int16Array,
+    _Type_INT32: Int32Array,
+    _Type_INT64: Int64Array,
+    _Type_DATE32: Date32Array,
+    _Type_DATE64: Date64Array,
+    _Type_TIMESTAMP: TimestampArray,
+    _Type_TIME32: Time32Array,
+    _Type_TIME64: Time64Array,
+    _Type_FLOAT: FloatArray,
+    _Type_DOUBLE: DoubleArray,
+    _Type_LIST: ListArray,
+    _Type_BINARY: BinaryArray,
+    _Type_STRING: StringArray,
+    _Type_DICTIONARY: DictionaryArray,
+    _Type_FIXED_SIZE_BINARY: FixedSizeBinaryArray,
+    _Type_DECIMAL: DecimalArray,
+}
+
+
+cdef object get_series_values(object obj):
+    if isinstance(obj, PandasSeries):
+        result = obj.values
+    elif isinstance(obj, np.ndarray):
+        result = obj
+    else:
+        result = PandasSeries(obj).values
+
+    return result

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/error.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/error.pxi b/python/pyarrow/error.pxi
new file mode 100644
index 0000000..259aeb0
--- /dev/null
+++ b/python/pyarrow/error.pxi
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.includes.libarrow cimport CStatus
+from pyarrow.includes.common cimport c_string
+from pyarrow.compat import frombytes
+
+
+class ArrowException(Exception):
+    pass
+
+
+class ArrowInvalid(ValueError, ArrowException):
+    pass
+
+
+class ArrowMemoryError(MemoryError, ArrowException):
+    pass
+
+
+class ArrowIOError(IOError, ArrowException):
+    pass
+
+
+class ArrowKeyError(KeyError, ArrowException):
+    pass
+
+
+class ArrowTypeError(TypeError, ArrowException):
+    pass
+
+
+class ArrowNotImplementedError(NotImplementedError, ArrowException):
+    pass
+
+
+cdef int check_status(const CStatus& status) nogil except -1:
+    if status.ok():
+        return 0
+
+    with gil:
+        message = frombytes(status.ToString())
+        if status.IsInvalid():
+            raise ArrowInvalid(message)
+        elif status.IsIOError():
+            raise ArrowIOError(message)
+        elif status.IsOutOfMemory():
+            raise ArrowMemoryError(message)
+        elif status.IsKeyError():
+            raise ArrowKeyError(message)
+        elif status.IsNotImplemented():
+            raise ArrowNotImplementedError(message)
+        elif status.IsTypeError():
+            raise ArrowTypeError(message)
+        else:
+            raise ArrowException(message)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/feather.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/feather.py b/python/pyarrow/feather.py
index c7b118e..3754aec 100644
--- a/python/pyarrow/feather.py
+++ b/python/pyarrow/feather.py
@@ -22,9 +22,9 @@ import six
 import pandas as pd
 
 from pyarrow.compat import pdapi
-from pyarrow._io import FeatherError  # noqa
-from pyarrow._table import Table
-import pyarrow._io as ext
+from pyarrow.lib import FeatherError  # noqa
+from pyarrow.lib import Table
+import pyarrow.lib as ext
 
 
 if LooseVersion(pd.__version__) < '0.17.0':

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/filesystem.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py
index 92dd91c..ac37fd8 100644
--- a/python/pyarrow/filesystem.py
+++ b/python/pyarrow/filesystem.py
@@ -19,7 +19,7 @@ from os.path import join as pjoin
 import os
 
 from pyarrow.util import implements
-import pyarrow._io as io
+import pyarrow.lib as lib
 
 
 class Filesystem(object):
@@ -133,7 +133,7 @@ class LocalFilesystem(Filesystem):
         return open(path, mode=mode)
 
 
-class HdfsClient(io._HdfsClient, Filesystem):
+class HdfsClient(lib._HdfsClient, Filesystem):
     """
     Connect to an HDFS cluster. All parameters are optional and should
     only be set if the defaults need to be overridden.
@@ -168,19 +168,19 @@ class HdfsClient(io._HdfsClient, Filesystem):
 
     @implements(Filesystem.isdir)
     def isdir(self, path):
-        return io._HdfsClient.isdir(self, path)
+        return lib._HdfsClient.isdir(self, path)
 
     @implements(Filesystem.isfile)
     def isfile(self, path):
-        return io._HdfsClient.isfile(self, path)
+        return lib._HdfsClient.isfile(self, path)
 
     @implements(Filesystem.delete)
     def delete(self, path, recursive=False):
-        return io._HdfsClient.delete(self, path, recursive)
+        return lib._HdfsClient.delete(self, path, recursive)
 
     @implements(Filesystem.mkdir)
     def mkdir(self, path, create_parents=True):
-        return io._HdfsClient.mkdir(self, path)
+        return lib._HdfsClient.mkdir(self, path)
 
     def ls(self, path, full_info=False):
         """
@@ -196,4 +196,4 @@ class HdfsClient(io._HdfsClient, Filesystem):
         -------
         result : list of dicts (full_info=True) or strings (full_info=False)
         """
-        return io._HdfsClient.ls(self, path, full_info)
+        return lib._HdfsClient.ls(self, path, full_info)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/formatting.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/formatting.py b/python/pyarrow/formatting.py
index c358344..0af2873 100644
--- a/python/pyarrow/formatting.py
+++ b/python/pyarrow/formatting.py
@@ -17,7 +17,7 @@
 
 # Pretty-printing and other formatting utilities for Arrow data structures
 
-import pyarrow._array as _array
+import pyarrow.lib as lib
 
 
 def array_format(arr, window=None):
@@ -42,7 +42,7 @@ def array_format(arr, window=None):
 
 
 def value_format(x, indent_level=0):
-    if isinstance(x, _array.ListValue):
+    if isinstance(x, lib.ListValue):
         contents = ',\n'.join(value_format(item) for item in x)
         return '[{0}]'.format(_indent(contents, 1).strip())
     else:

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 8a730b3..3d56c14 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -149,7 +149,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         PoolBuffer()
         PoolBuffer(CMemoryPool*)
 
-    cdef CMemoryPool* default_memory_pool()
+    cdef CMemoryPool* c_default_memory_pool" arrow::default_memory_pool"()
 
     cdef cppclass CListType" arrow::ListType"(CDataType):
         CListType(const shared_ptr[CDataType]& value_type)
@@ -625,3 +625,62 @@ cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil:
 
         CStatus GetColumn(int i, shared_ptr[CColumn]* out)
         c_string GetColumnName(int i)
+
+
+cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
+    shared_ptr[CDataType] GetPrimitiveType(Type type)
+    shared_ptr[CDataType] GetTimestampType(TimeUnit unit)
+    CStatus ConvertPySequence(object obj, CMemoryPool* pool,
+                              shared_ptr[CArray]* out)
+    CStatus ConvertPySequence(object obj, CMemoryPool* pool,
+                              shared_ptr[CArray]* out,
+                              const shared_ptr[CDataType]& type)
+
+    CStatus NumPyDtypeToArrow(object dtype, shared_ptr[CDataType]* type)
+
+    CStatus PandasToArrow(CMemoryPool* pool, object ao, object mo,
+                          const shared_ptr[CDataType]& type,
+                          shared_ptr[CArray]* out)
+
+    CStatus PandasObjectsToArrow(CMemoryPool* pool, object ao, object mo,
+                                 const shared_ptr[CDataType]& type,
+                                 shared_ptr[CArray]* out)
+
+    CStatus NdarrayToTensor(CMemoryPool* pool, object ao,
+                            shared_ptr[CTensor]* out);
+
+    CStatus TensorToNdarray(const CTensor& tensor, object base,
+                            PyObject** out)
+
+    CStatus ConvertArrayToPandas(const shared_ptr[CArray]& arr,
+                                 object py_ref, PyObject** out)
+
+    CStatus ConvertColumnToPandas(const shared_ptr[CColumn]& arr,
+                                  object py_ref, PyObject** out)
+
+    CStatus ConvertTableToPandas(const shared_ptr[CTable]& table,
+                                 int nthreads, PyObject** out)
+
+    void c_set_default_memory_pool \
+        " arrow::py::set_default_memory_pool"(CMemoryPool* pool)\
+
+    CMemoryPool* c_get_memory_pool \
+        " arrow::py::get_memory_pool"()
+
+    cdef cppclass PyBuffer(CBuffer):
+        PyBuffer(object o)
+
+    cdef cppclass PyReadableFile(RandomAccessFile):
+        PyReadableFile(object fo)
+
+    cdef cppclass PyOutputStream(OutputStream):
+        PyOutputStream(object fo)
+
+    cdef cppclass PyBytesReader(CBufferReader):
+        PyBytesReader(object fo)
+
+cdef extern from 'arrow/python/init.h':
+    int arrow_init_numpy() except -1
+
+cdef extern from 'arrow/python/config.h' namespace 'arrow::py':
+    void set_numpy_nan(object o)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/includes/pyarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd
deleted file mode 100644
index 35c7110..0000000
--- a/python/pyarrow/includes/pyarrow.pxd
+++ /dev/null
@@ -1,75 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# distutils: language = c++
-
-from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport (CArray, CBuffer, CColumn, CDataType,
-                                        CTable, CTensor, CStatus, Type,
-                                        CMemoryPool, TimeUnit,
-                                        RandomAccessFile, OutputStream,
-                                        CBufferReader)
-
-
-cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
-    shared_ptr[CDataType] GetPrimitiveType(Type type)
-    shared_ptr[CDataType] GetTimestampType(TimeUnit unit)
-    CStatus ConvertPySequence(object obj, CMemoryPool* pool,
-                              shared_ptr[CArray]* out)
-    CStatus ConvertPySequence(object obj, CMemoryPool* pool,
-                              shared_ptr[CArray]* out,
-                              const shared_ptr[CDataType]& type)
-
-    CStatus NumPyDtypeToArrow(object dtype, shared_ptr[CDataType]* type)
-
-    CStatus PandasToArrow(CMemoryPool* pool, object ao, object mo,
-                          const shared_ptr[CDataType]& type,
-                          shared_ptr[CArray]* out)
-
-    CStatus PandasObjectsToArrow(CMemoryPool* pool, object ao, object mo,
-                                 const shared_ptr[CDataType]& type,
-                                 shared_ptr[CArray]* out)
-
-    CStatus NdarrayToTensor(CMemoryPool* pool, object ao,
-                            shared_ptr[CTensor]* out);
-
-    CStatus TensorToNdarray(const CTensor& tensor, object base,
-                            PyObject** out)
-
-    CStatus ConvertArrayToPandas(const shared_ptr[CArray]& arr,
-                                 object py_ref, PyObject** out)
-
-    CStatus ConvertColumnToPandas(const shared_ptr[CColumn]& arr,
-                                  object py_ref, PyObject** out)
-
-    CStatus ConvertTableToPandas(const shared_ptr[CTable]& table,
-                                 int nthreads, PyObject** out)
-
-    void set_default_memory_pool(CMemoryPool* pool)
-    CMemoryPool* get_memory_pool()
-
-    cdef cppclass PyBuffer(CBuffer):
-        PyBuffer(object o)
-
-    cdef cppclass PyReadableFile(RandomAccessFile):
-        PyReadableFile(object fo)
-
-    cdef cppclass PyOutputStream(OutputStream):
-        PyOutputStream(object fo)
-
-    cdef cppclass PyBytesReader(CBufferReader):
-        PyBytesReader(object fo)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9e875a68/python/pyarrow/io.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
new file mode 100644
index 0000000..a0a96e7
--- /dev/null
+++ b/python/pyarrow/io.pxi
@@ -0,0 +1,1253 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Cython wrappers for IO interfaces defined in arrow::io and messaging in
+# arrow::ipc
+
+from libc.stdlib cimport malloc, free
+from pyarrow.compat import frombytes, tobytes, encode_file_path
+
+import re
+import six
+import sys
+import threading
+import time
+
+
+# 64K
+DEFAULT_BUFFER_SIZE = 2 ** 16
+
+
+# To let us get a PyObject* and avoid Cython auto-ref-counting
+cdef extern from "Python.h":
+    PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
+        char *v, Py_ssize_t len) except NULL
+
+
+cdef class NativeFile:
+
+    def __cinit__(self):
+        self.is_open = False
+        self.own_file = False
+
+    def __dealloc__(self):
+        if self.is_open and self.own_file:
+            self.close()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, tb):
+        self.close()
+
+    def close(self):
+        if self.is_open:
+            with nogil:
+                if self.is_readable:
+                    check_status(self.rd_file.get().Close())
+                else:
+                    check_status(self.wr_file.get().Close())
+        self.is_open = False
+
+    cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
+        self._assert_readable()
+        file[0] = <shared_ptr[RandomAccessFile]> self.rd_file
+
+    cdef write_handle(self, shared_ptr[OutputStream]* file):
+        self._assert_writeable()
+        file[0] = <shared_ptr[OutputStream]> self.wr_file
+
+    def _assert_readable(self):
+        if not self.is_readable:
+            raise IOError("only valid on readonly files")
+
+        if not self.is_open:
+            raise IOError("file not open")
+
+    def _assert_writeable(self):
+        if not self.is_writeable:
+            raise IOError("only valid on writeable files")
+
+        if not self.is_open:
+            raise IOError("file not open")
+
+    def size(self):
+        cdef int64_t size
+        self._assert_readable()
+        with nogil:
+            check_status(self.rd_file.get().GetSize(&size))
+        return size
+
+    def tell(self):
+        cdef int64_t position
+        with nogil:
+            if self.is_readable:
+                check_status(self.rd_file.get().Tell(&position))
+            else:
+                check_status(self.wr_file.get().Tell(&position))
+        return position
+
+    def seek(self, int64_t position):
+        self._assert_readable()
+        with nogil:
+            check_status(self.rd_file.get().Seek(position))
+
+    def write(self, data):
+        """
+        Write byte from any object implementing buffer protocol (bytes,
+        bytearray, ndarray, pyarrow.Buffer)
+        """
+        self._assert_writeable()
+
+        if isinstance(data, six.string_types):
+            data = tobytes(data)
+
+        cdef Buffer arrow_buffer = frombuffer(data)
+
+        cdef const uint8_t* buf = arrow_buffer.buffer.get().data()
+        cdef int64_t bufsize = len(arrow_buffer)
+        with nogil:
+            check_status(self.wr_file.get().Write(buf, bufsize))
+
+    def read(self, nbytes=None):
+        cdef:
+            int64_t c_nbytes
+            int64_t bytes_read = 0
+            PyObject* obj
+
+        if nbytes is None:
+            c_nbytes = self.size() - self.tell()
+        else:
+            c_nbytes = nbytes
+
+        self._assert_readable()
+
+        # Allocate empty write space
+        obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes)
+
+        cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
+        with nogil:
+            check_status(self.rd_file.get().Read(c_nbytes, &bytes_read, buf))
+
+        if bytes_read < c_nbytes:
+            cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read)
+
+        return PyObject_to_object(obj)
+
+    def read_buffer(self, nbytes=None):
+        cdef:
+            int64_t c_nbytes
+            int64_t bytes_read = 0
+            shared_ptr[CBuffer] output
+        self._assert_readable()
+
+        if nbytes is None:
+            c_nbytes = self.size() - self.tell()
+        else:
+            c_nbytes = nbytes
+
+        with nogil:
+            check_status(self.rd_file.get().ReadB(c_nbytes, &output))
+
+        return pyarrow_wrap_buffer(output)
+
+    def download(self, stream_or_path, buffer_size=None):
+        """
+        Read file completely to local path (rather than reading completely into
+        memory). First seeks to the beginning of the file.
+        """
+        cdef:
+            int64_t bytes_read = 0
+            uint8_t* buf
+        self._assert_readable()
+
+        buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
+
+        write_queue = Queue(50)
+
+        if not hasattr(stream_or_path, 'read'):
+            stream = open(stream_or_path, 'wb')
+            cleanup = lambda: stream.close()
+        else:
+            stream = stream_or_path
+            cleanup = lambda: None
+
+        done = False
+        exc_info = None
+        def bg_write():
+            try:
+                while not done or write_queue.qsize() > 0:
+                    try:
+                        buf = write_queue.get(timeout=0.01)
+                    except QueueEmpty:
+                        continue
+                    stream.write(buf)
+            except Exception as e:
+                exc_info = sys.exc_info()
+            finally:
+                cleanup()
+
+        self.seek(0)
+
+        writer_thread = threading.Thread(target=bg_write)
+
+        # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
+        # the passed buffer, so it's hard for us to avoid doubling the memory
+        buf = <uint8_t*> malloc(buffer_size)
+        if buf == NULL:
+            raise MemoryError("Failed to allocate {0} bytes"
+                              .format(buffer_size))
+
+        writer_thread.start()
+
+        cdef int64_t total_bytes = 0
+        cdef int32_t c_buffer_size = buffer_size
+
+        try:
+            while True:
+                with nogil:
+                    check_status(self.rd_file.get()
+                                 .Read(c_buffer_size, &bytes_read, buf))
+
+                total_bytes += bytes_read
+
+                # EOF
+                if bytes_read == 0:
+                    break
+
+                pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
+                                                     bytes_read)
+
+                write_queue.put_nowait(pybuf)
+        finally:
+            free(buf)
+            done = True
+
+        writer_thread.join()
+        if exc_info is not None:
+            raise exc_info[0], exc_info[1], exc_info[2]
+
+    def upload(self, stream, buffer_size=None):
+        """
+        Pipe file-like object to file
+        """
+        write_queue = Queue(50)
+        self._assert_writeable()
+
+        buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
+
+        done = False
+        exc_info = None
+        def bg_write():
+            try:
+                while not done or write_queue.qsize() > 0:
+                    try:
+                        buf = write_queue.get(timeout=0.01)
+                    except QueueEmpty:
+                        continue
+
+                    self.write(buf)
+
+            except Exception as e:
+                exc_info = sys.exc_info()
+
+        writer_thread = threading.Thread(target=bg_write)
+        writer_thread.start()
+
+        try:
+            while True:
+                buf = stream.read(buffer_size)
+                if not buf:
+                    break
+
+                if writer_thread.is_alive():
+                    while write_queue.full():
+                        time.sleep(0.01)
+                else:
+                    break
+
+                write_queue.put_nowait(buf)
+        finally:
+            done = True
+
+        writer_thread.join()
+        if exc_info is not None:
+            raise exc_info[0], exc_info[1], exc_info[2]
+
+
+# ----------------------------------------------------------------------
+# Python file-like objects
+
+
+cdef class PythonFile(NativeFile):
+    cdef:
+        object handle
+
+    def __cinit__(self, handle, mode='w'):
+        self.handle = handle
+
+        if mode.startswith('w'):
+            self.wr_file.reset(new PyOutputStream(handle))
+            self.is_readable = 0
+            self.is_writeable = 1
+        elif mode.startswith('r'):
+            self.rd_file.reset(new PyReadableFile(handle))
+            self.is_readable = 1
+            self.is_writeable = 0
+        else:
+            raise ValueError('Invalid file mode: {0}'.format(mode))
+
+        self.is_open = True
+
+
+cdef class MemoryMappedFile(NativeFile):
+    """
+    Supports 'r', 'r+w', 'w' modes
+    """
+    cdef:
+        object path
+
+    def __cinit__(self):
+        self.is_open = False
+        self.is_readable = 0
+        self.is_writeable = 0
+
+    @staticmethod
+    def create(path, size):
+        cdef:
+            shared_ptr[CMemoryMappedFile] handle
+            c_string c_path = encode_file_path(path)
+            int64_t c_size = size
+
+        with nogil:
+            check_status(CMemoryMappedFile.Create(c_path, c_size, &handle))
+
+        cdef MemoryMappedFile result = MemoryMappedFile()
+        result.path = path
+        result.is_readable = 1
+        result.is_writeable = 1
+        result.wr_file = <shared_ptr[OutputStream]> handle
+        result.rd_file = <shared_ptr[RandomAccessFile]> handle
+        result.is_open = True
+
+        return result
+
+    def open(self, path, mode='r'):
+        self.path = path
+
+        cdef:
+            FileMode c_mode
+            shared_ptr[CMemoryMappedFile] handle
+            c_string c_path = encode_file_path(path)
+
+        if mode in ('r', 'rb'):
+            c_mode = FileMode_READ
+            self.is_readable = 1
+        elif mode in ('w', 'wb'):
+            c_mode = FileMode_WRITE
+            self.is_writeable = 1
+        elif mode == 'r+w':
+            c_mode = FileMode_READWRITE
+            self.is_readable = 1
+            self.is_writeable = 1
+        else:
+            raise ValueError('Invalid file mode: {0}'.format(mode))
+
+        check_status(CMemoryMappedFile.Open(c_path, c_mode, &handle))
+
+        self.wr_file = <shared_ptr[OutputStream]> handle
+        self.rd_file = <shared_ptr[RandomAccessFile]> handle
+        self.is_open = True
+
+
+def memory_map(path, mode='r'):
+    """
+    Open memory map at file path. Size of the memory map cannot change
+
+    Parameters
+    ----------
+    path : string
+    mode : {'r', 'w'}, default 'r'
+
+    Returns
+    -------
+    mmap : MemoryMappedFile
+    """
+    cdef MemoryMappedFile mmap = MemoryMappedFile()
+    mmap.open(path, mode)
+    return mmap
+
+
+def create_memory_map(path, size):
+    """
+    Create memory map at indicated path of the given size, return open
+    writeable file object
+
+    Parameters
+    ----------
+    path : string
+    size : int
+
+    Returns
+    -------
+    mmap : MemoryMappedFile
+    """
+    return MemoryMappedFile.create(path, size)
+
+
+cdef class OSFile(NativeFile):
+    """
+    Supports 'r', 'w' modes
+    """
+    cdef:
+        object path
+
+    def __cinit__(self, path, mode='r', MemoryPool memory_pool=None):
+        self.path = path
+
+        cdef:
+            FileMode c_mode
+            shared_ptr[Readable] handle
+            c_string c_path = encode_file_path(path)
+
+        self.is_readable = self.is_writeable = 0
+
+        if mode in ('r', 'rb'):
+            self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
+        elif mode in ('w', 'wb'):
+            self._open_writeable(c_path)
+        else:
+            raise ValueError('Invalid file mode: {0}'.format(mode))
+
+        self.is_open = True
+
+    cdef _open_readable(self, c_string path, CMemoryPool* pool):
+        cdef shared_ptr[ReadableFile] handle
+
+        with nogil:
+            check_status(ReadableFile.Open(path, pool, &handle))
+
+        self.is_readable = 1
+        self.rd_file = <shared_ptr[RandomAccessFile]> handle
+
+    cdef _open_writeable(self, c_string path):
+        cdef shared_ptr[FileOutputStream] handle
+
+        with nogil:
+            check_status(FileOutputStream.Open(path, &handle))
+        self.is_writeable = 1
+        self.wr_file = <shared_ptr[OutputStream]> handle
+
+
+# ----------------------------------------------------------------------
+# Arrow buffers
+
+
+cdef class Buffer:
+
+    def __cinit__(self):
+        pass
+
+    cdef init(self, const shared_ptr[CBuffer]& buffer):
+        self.buffer = buffer
+        self.shape[0] = self.size
+        self.strides[0] = <Py_ssize_t>(1)
+
+    def __len__(self):
+        return self.size
+
+    property size:
+
+        def __get__(self):
+            return self.buffer.get().size()
+
+    property parent:
+
+        def __get__(self):
+            cdef shared_ptr[CBuffer] parent_buf = self.buffer.get().parent()
+
+            if parent_buf.get() == NULL:
+                return None
+            else:
+                return pyarrow_wrap_buffer(parent_buf)
+
+    def __getitem__(self, key):
+        # TODO(wesm): buffer slicing
+        raise NotImplementedError
+
+    def to_pybytes(self):
+        return cp.PyBytes_FromStringAndSize(
+            <const char*>self.buffer.get().data(),
+            self.buffer.get().size())
+
+    def __getbuffer__(self, cp.Py_buffer* buffer, int flags):
+
+        buffer.buf = <char *>self.buffer.get().data()
+        buffer.format = 'b'
+        buffer.internal = NULL
+        buffer.itemsize = 1
+        buffer.len = self.size
+        buffer.ndim = 1
+        buffer.obj = self
+        buffer.readonly = 1
+        buffer.shape = self.shape
+        buffer.strides = self.strides
+        buffer.suboffsets = NULL
+
+
+cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool):
+    cdef shared_ptr[PoolBuffer] result
+    result.reset(new PoolBuffer(pool))
+    return result
+
+
+cdef class InMemoryOutputStream(NativeFile):
+
+    cdef:
+        shared_ptr[PoolBuffer] buffer
+
+    def __cinit__(self, MemoryPool memory_pool=None):
+        self.buffer = allocate_buffer(maybe_unbox_memory_pool(memory_pool))
+        self.wr_file.reset(new BufferOutputStream(
+            <shared_ptr[ResizableBuffer]> self.buffer))
+        self.is_readable = 0
+        self.is_writeable = 1
+        self.is_open = True
+
+    def get_result(self):
+        check_status(self.wr_file.get().Close())
+        self.is_open = False
+        return pyarrow_wrap_buffer(<shared_ptr[CBuffer]> self.buffer)
+
+
+cdef class BufferReader(NativeFile):
+    """
+    Zero-copy reader from objects convertible to Arrow buffer
+
+    Parameters
+    ----------
+    obj : Python bytes or pyarrow.Buffer
+    """
+    cdef:
+        Buffer buffer
+
+    def __cinit__(self, object obj):
+
+        if isinstance(obj, Buffer):
+            self.buffer = obj
+        else:
+            self.buffer = frombuffer(obj)
+
+        self.rd_file.reset(new CBufferReader(self.buffer.buffer))
+        self.is_readable = 1
+        self.is_writeable = 0
+        self.is_open = True
+
+
+def frombuffer(object obj):
+    """
+    Construct an Arrow buffer from a Python bytes object
+    """
+    cdef shared_ptr[CBuffer] buf
+    try:
+        memoryview(obj)
+        buf.reset(new PyBuffer(obj))
+        return pyarrow_wrap_buffer(buf)
+    except TypeError:
+        raise ValueError('Must pass object that implements buffer protocol')
+
+
+cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader):
+    cdef NativeFile nf
+
+    if isinstance(source, six.string_types):
+        source = memory_map(source, mode='r')
+    elif isinstance(source, Buffer):
+        source = BufferReader(source)
+    elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
+        # Optimistically hope this is file-like
+        source = PythonFile(source, mode='r')
+
+    if isinstance(source, NativeFile):
+        nf = source
+
+        # TODO: what about read-write sources (e.g. memory maps)
+        if not nf.is_readable:
+            raise IOError('Native file is not readable')
+
+        nf.read_handle(reader)
+    else:
+        raise TypeError('Unable to read from object of type: {0}'
+                        .format(type(source)))
+
+
+cdef get_writer(object source, shared_ptr[OutputStream]* writer):
+    cdef NativeFile nf
+
+    if isinstance(source, six.string_types):
+        source = OSFile(source, mode='w')
+    elif not isinstance(source, NativeFile) and hasattr(source, 'write'):
+        # Optimistically hope this is file-like
+        source = PythonFile(source, mode='w')
+
+    if isinstance(source, NativeFile):
+        nf = source
+
+        if nf.is_readable:
+            raise IOError('Native file is not writeable')
+
+        nf.write_handle(writer)
+    else:
+        raise TypeError('Unable to read from object of type: {0}'
+                        .format(type(source)))
+
+# ----------------------------------------------------------------------
+# HDFS IO implementation
+
+_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)')
+
+try:
+    # Python 3
+    from queue import Queue, Empty as QueueEmpty, Full as QueueFull
+except ImportError:
+    from Queue import Queue, Empty as QueueEmpty, Full as QueueFull
+
+
+def have_libhdfs():
+    try:
+        check_status(HaveLibHdfs())
+        return True
+    except:
+        return False
+
+
+def have_libhdfs3():
+    try:
+        check_status(HaveLibHdfs3())
+        return True
+    except:
+        return False
+
+
+def strip_hdfs_abspath(path):
+    m = _HDFS_PATH_RE.match(path)
+    if m:
+        return m.group(3)
+    else:
+        return path
+
+
+cdef class _HdfsClient:
+    cdef:
+        shared_ptr[CHdfsClient] client
+
+    cdef readonly:
+        bint is_open
+
+    def __cinit__(self):
+        pass
+
+    def _connect(self, host, port, user, kerb_ticket, driver):
+        cdef HdfsConnectionConfig conf
+
+        if host is not None:
+            conf.host = tobytes(host)
+        conf.port = port
+        if user is not None:
+            conf.user = tobytes(user)
+        if kerb_ticket is not None:
+            conf.kerb_ticket = tobytes(kerb_ticket)
+
+        if driver == 'libhdfs':
+            check_status(HaveLibHdfs())
+            conf.driver = HdfsDriver_LIBHDFS
+        else:
+            check_status(HaveLibHdfs3())
+            conf.driver = HdfsDriver_LIBHDFS3
+
+        with nogil:
+            check_status(CHdfsClient.Connect(&conf, &self.client))
+        self.is_open = True
+
+    @classmethod
+    def connect(cls, *args, **kwargs):
+        return cls(*args, **kwargs)
+
+    def __dealloc__(self):
+        if self.is_open:
+            self.close()
+
+    def close(self):
+        """
+        Disconnect from the HDFS cluster
+        """
+        self._ensure_client()
+        with nogil:
+            check_status(self.client.get().Disconnect())
+        self.is_open = False
+
+    cdef _ensure_client(self):
+        if self.client.get() == NULL:
+            raise IOError('HDFS client improperly initialized')
+        elif not self.is_open:
+            raise IOError('HDFS client is closed')
+
+    def exists(self, path):
+        """
+        Returns True if the path is known to the cluster, False if it does not
+        (or there is an RPC error)
+        """
+        self._ensure_client()
+
+        cdef c_string c_path = tobytes(path)
+        cdef c_bool result
+        with nogil:
+            result = self.client.get().Exists(c_path)
+        return result
+
+    def isdir(self, path):
+        cdef HdfsPathInfo info
+        self._path_info(path, &info)
+        return info.kind == ObjectType_DIRECTORY
+
+    def isfile(self, path):
+        cdef HdfsPathInfo info
+        self._path_info(path, &info)
+        return info.kind == ObjectType_FILE
+
+    cdef _path_info(self, path, HdfsPathInfo* info):
+        cdef c_string c_path = tobytes(path)
+
+        with nogil:
+            check_status(self.client.get()
+                         .GetPathInfo(c_path, info))
+
+
+    def ls(self, path, bint full_info):
+        cdef:
+            c_string c_path = tobytes(path)
+            vector[HdfsPathInfo] listing
+            list results = []
+            int i
+
+        self._ensure_client()
+
+        with nogil:
+            check_status(self.client.get()
+                         .ListDirectory(c_path, &listing))
+
+        cdef const HdfsPathInfo* info
+        for i in range(<int> listing.size()):
+            info = &listing[i]
+
+            # Try to trim off the hdfs://HOST:PORT piece
+            name = strip_hdfs_abspath(frombytes(info.name))
+
+            if full_info:
+                kind = ('file' if info.kind == ObjectType_FILE
+                        else 'directory')
+
+                results.append({
+                    'kind': kind,
+                    'name': name,
+                    'owner': frombytes(info.owner),
+                    'group': frombytes(info.group),
+                    'list_modified_time': info.last_modified_time,
+                    'list_access_time': info.last_access_time,
+                    'size': info.size,
+                    'replication': info.replication,
+                    'block_size': info.block_size,
+                    'permissions': info.permissions
+                })
+            else:
+                results.append(name)
+
+        return results
+
+    def mkdir(self, path):
+        """
+        Create indicated directory and any necessary parent directories
+        """
+        self._ensure_client()
+
+        cdef c_string c_path = tobytes(path)
+        with nogil:
+            check_status(self.client.get()
+                         .MakeDirectory(c_path))
+
+    def delete(self, path, bint recursive=False):
+        """
+        Delete the indicated file or directory
+
+        Parameters
+        ----------
+        path : string
+        recursive : boolean, default False
+            If True, also delete child paths for directories
+        """
+        self._ensure_client()
+
+        cdef c_string c_path = tobytes(path)
+        with nogil:
+            check_status(self.client.get()
+                         .Delete(c_path, recursive))
+
+    def open(self, path, mode='rb', buffer_size=None, replication=None,
+             default_block_size=None):
+        """
+        Parameters
+        ----------
+        mode : string, 'rb', 'wb', 'ab'
+        """
+        self._ensure_client()
+
+        cdef HdfsFile out = HdfsFile()
+
+        if mode not in ('rb', 'wb', 'ab'):
+            raise Exception("Mode must be 'rb' (read), "
+                            "'wb' (write, new file), or 'ab' (append)")
+
+        cdef c_string c_path = tobytes(path)
+        cdef c_bool append = False
+
+        # 0 in libhdfs means "use the default"
+        cdef int32_t c_buffer_size = buffer_size or 0
+        cdef int16_t c_replication = replication or 0
+        cdef int64_t c_default_block_size = default_block_size or 0
+
+        cdef shared_ptr[HdfsOutputStream] wr_handle
+        cdef shared_ptr[HdfsReadableFile] rd_handle
+
+        if mode in ('wb', 'ab'):
+            if mode == 'ab':
+                append = True
+
+            with nogil:
+                check_status(
+                    self.client.get()
+                    .OpenWriteable(c_path, append, c_buffer_size,
+                                   c_replication, c_default_block_size,
+                                   &wr_handle))
+
+            out.wr_file = <shared_ptr[OutputStream]> wr_handle
+
+            out.is_readable = False
+            out.is_writeable = 1
+        else:
+            with nogil:
+                check_status(self.client.get()
+                             .OpenReadable(c_path, &rd_handle))
+
+            out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle
+            out.is_readable = True
+            out.is_writeable = 0
+
+        if c_buffer_size == 0:
+            c_buffer_size = 2 ** 16
+
+        out.mode = mode
+        out.buffer_size = c_buffer_size
+        out.parent = _HdfsFileNanny(self, out)
+        out.is_open = True
+        out.own_file = True
+
+        return out
+
+    def download(self, path, stream, buffer_size=None):
+        with self.open(path, 'rb') as f:
+            f.download(stream, buffer_size=buffer_size)
+
+    def upload(self, path, stream, buffer_size=None):
+        """
+        Upload file-like object to HDFS path
+        """
+        with self.open(path, 'wb') as f:
+            f.upload(stream, buffer_size=buffer_size)
+
+
+# ARROW-404: Helper class to ensure that files are closed before the
+# client. During deallocation of the extension class, the attributes are
+# decref'd which can cause the client to get closed first if the file has the
+# last remaining reference
+cdef class _HdfsFileNanny:
+    cdef:
+        object client
+        object file_handle_ref
+
+    def __cinit__(self, client, file_handle):
+        import weakref
+        self.client = client
+        self.file_handle_ref = weakref.ref(file_handle)
+
+    def __dealloc__(self):
+        fh = self.file_handle_ref()
+        if fh:
+            fh.close()
+        # avoid cyclic GC
+        self.file_handle_ref = None
+        self.client = None
+
+
+cdef class HdfsFile(NativeFile):
+    cdef readonly:
+        int32_t buffer_size
+        object mode
+        object parent
+
+    cdef object __weakref__
+
+    def __dealloc__(self):
+        self.parent = None
+
+# ----------------------------------------------------------------------
+# File and stream readers and writers
+
+cdef class _StreamWriter:
+    cdef:
+        shared_ptr[CStreamWriter] writer
+        shared_ptr[OutputStream] sink
+        bint closed
+
+    def __cinit__(self):
+        self.closed = True
+
+    def __dealloc__(self):
+        if not self.closed:
+            self.close()
+
+    def _open(self, sink, Schema schema):
+        get_writer(sink, &self.sink)
+
+        with nogil:
+            check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema,
+                                            &self.writer))
+
+        self.closed = False
+
+    def write_batch(self, RecordBatch batch):
+        with nogil:
+            check_status(self.writer.get()
+                         .WriteRecordBatch(deref(batch.batch)))
+
+    def close(self):
+        with nogil:
+            check_status(self.writer.get().Close())
+        self.closed = True
+
+
+cdef class _StreamReader:
+    cdef:
+        shared_ptr[CStreamReader] reader
+
+    cdef readonly:
+        Schema schema
+
+    def __cinit__(self):
+        pass
+
+    def _open(self, source):
+        cdef:
+            shared_ptr[RandomAccessFile] reader
+            shared_ptr[InputStream] in_stream
+
+        get_reader(source, &reader)
+        in_stream = <shared_ptr[InputStream]> reader
+
+        with nogil:
+            check_status(CStreamReader.Open(in_stream, &self.reader))
+
+        self.schema = Schema()
+        self.schema.init_schema(self.reader.get().schema())
+
+    def get_next_batch(self):
+        """
+        Read next RecordBatch from the stream. Raises StopIteration at end of
+        stream
+        """
+        cdef shared_ptr[CRecordBatch] batch
+
+        with nogil:
+            check_status(self.reader.get().GetNextRecordBatch(&batch))
+
+        if batch.get() == NULL:
+            raise StopIteration
+
+        return pyarrow_wrap_batch(batch)
+
+    def read_all(self):
+        """
+        Read all record batches as a pyarrow.Table
+        """
+        cdef:
+            vector[shared_ptr[CRecordBatch]] batches
+            shared_ptr[CRecordBatch] batch
+            shared_ptr[CTable] table
+
+        with nogil:
+            while True:
+                check_status(self.reader.get().GetNextRecordBatch(&batch))
+                if batch.get() == NULL:
+                    break
+                batches.push_back(batch)
+
+            check_status(CTable.FromRecordBatches(batches, &table))
+
+        return pyarrow_wrap_table(table)
+
+
+cdef class _FileWriter(_StreamWriter):
+
+    def _open(self, sink, Schema schema):
+        cdef shared_ptr[CFileWriter] writer
+        get_writer(sink, &self.sink)
+
+        with nogil:
+            check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema,
+                                          &writer))
+
+        # Cast to base class, because has same interface
+        self.writer = <shared_ptr[CStreamWriter]> writer
+        self.closed = False
+
+
+cdef class _FileReader:
+    cdef:
+        shared_ptr[CFileReader] reader
+
+    def __cinit__(self):
+        pass
+
+    def _open(self, source, footer_offset=None):
+        cdef shared_ptr[RandomAccessFile] reader
+        get_reader(source, &reader)
+
+        cdef int64_t offset = 0
+        if footer_offset is not None:
+            offset = footer_offset
+
+        with nogil:
+            if offset != 0:
+                check_status(CFileReader.Open2(reader, offset, &self.reader))
+            else:
+                check_status(CFileReader.Open(reader, &self.reader))
+
+    property num_record_batches:
+
+        def __get__(self):
+            return self.reader.get().num_record_batches()
+
+    def get_batch(self, int i):
+        cdef shared_ptr[CRecordBatch] batch
+
+        if i < 0 or i >= self.num_record_batches:
+            raise ValueError('Batch number {0} out of range'.format(i))
+
+        with nogil:
+            check_status(self.reader.get().GetRecordBatch(i, &batch))
+
+        return pyarrow_wrap_batch(batch)
+
+    # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
+    # time has passed
+    get_record_batch = get_batch
+
+    def read_all(self):
+        """
+        Read all record batches as a pyarrow.Table
+        """
+        cdef:
+            vector[shared_ptr[CRecordBatch]] batches
+            shared_ptr[CTable] table
+            int i, nbatches
+
+        nbatches = self.num_record_batches
+
+        batches.resize(nbatches)
+        with nogil:
+            for i in range(nbatches):
+                check_status(self.reader.get().GetRecordBatch(i, &batches[i]))
+            check_status(CTable.FromRecordBatches(batches, &table))
+
+        return pyarrow_wrap_table(table)
+
+
+#----------------------------------------------------------------------
+# Implement legacy Feather file format
+
+
+class FeatherError(Exception):
+    pass
+
+
+cdef class FeatherWriter:
+    cdef:
+        unique_ptr[CFeatherWriter] writer
+
+    cdef public:
+        int64_t num_rows
+
+    def __cinit__(self):
+        self.num_rows = -1
+
+    def open(self, object dest):
+        cdef shared_ptr[OutputStream] sink
+        get_writer(dest, &sink)
+
+        with nogil:
+            check_status(CFeatherWriter.Open(sink, &self.writer))
+
+    def close(self):
+        if self.num_rows < 0:
+            self.num_rows = 0
+        self.writer.get().SetNumRows(self.num_rows)
+        check_status(self.writer.get().Finalize())
+
+    def write_array(self, object name, object col, object mask=None):
+        cdef Array arr
+
+        if self.num_rows >= 0:
+            if len(col) != self.num_rows:
+                raise ValueError('prior column had a different number of rows')
+        else:
+            self.num_rows = len(col)
+
+        if isinstance(col, Array):
+            arr = col
+        else:
+            arr = Array.from_pandas(col, mask=mask)
+
+        cdef c_string c_name = tobytes(name)
+
+        with nogil:
+            check_status(
+                self.writer.get().Append(c_name, deref(arr.sp_array)))
+
+
+cdef class FeatherReader:
+    cdef:
+        unique_ptr[CFeatherReader] reader
+
+    def __cinit__(self):
+        pass
+
+    def open(self, source):
+        cdef shared_ptr[RandomAccessFile] reader
+        get_reader(source, &reader)
+
+        with nogil:
+            check_status(CFeatherReader.Open(reader, &self.reader))
+
+    property num_rows:
+
+        def __get__(self):
+            return self.reader.get().num_rows()
+
+    property num_columns:
+
+        def __get__(self):
+            return self.reader.get().num_columns()
+
+    def get_column_name(self, int i):
+        cdef c_string name = self.reader.get().GetColumnName(i)
+        return frombytes(name)
+
+    def get_column(self, int i):
+        if i < 0 or i >= self.num_columns:
+            raise IndexError(i)
+
+        cdef shared_ptr[CColumn] sp_column
+        with nogil:
+            check_status(self.reader.get()
+                         .GetColumn(i, &sp_column))
+
+        cdef Column col = Column()
+        col.init(sp_column)
+        return col
+
+
+def get_tensor_size(Tensor tensor):
+    """
+    Return total size of serialized Tensor including metadata and padding
+    """
+    cdef int64_t size
+    with nogil:
+        check_status(GetTensorSize(deref(tensor.tp), &size))
+    return size
+
+
+def get_record_batch_size(RecordBatch batch):
+    """
+    Return total size of serialized RecordBatch including metadata and padding
+    """
+    cdef int64_t size
+    with nogil:
+        check_status(GetRecordBatchSize(deref(batch.batch), &size))
+    return size
+
+
+def write_tensor(Tensor tensor, NativeFile dest):
+    """
+    Write pyarrow.Tensor to pyarrow.NativeFile object its current position
+
+    Parameters
+    ----------
+    tensor : pyarrow.Tensor
+    dest : pyarrow.NativeFile
+
+    Returns
+    -------
+    bytes_written : int
+        Total number of bytes written to the file
+    """
+    cdef:
+        int32_t metadata_length
+        int64_t body_length
+
+    dest._assert_writeable()
+
+    with nogil:
+        check_status(
+            WriteTensor(deref(tensor.tp), dest.wr_file.get(),
+                        &metadata_length, &body_length))
+
+    return metadata_length + body_length
+
+
+def read_tensor(NativeFile source):
+    """
+    Read pyarrow.Tensor from pyarrow.NativeFile object from current
+    position. If the file source supports zero copy (e.g. a memory map), then
+    this operation does not allocate any memory
+
+    Parameters
+    ----------
+    source : pyarrow.NativeFile
+
+    Returns
+    -------
+    tensor : Tensor
+    """
+    cdef:
+        shared_ptr[CTensor] sp_tensor
+
+    source._assert_writeable()
+
+    cdef int64_t offset = source.tell()
+    with nogil:
+        check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor))
+
+    return pyarrow_wrap_tensor(sp_tensor)