You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2017/04/13 10:51:51 UTC
[1/4] arrow git commit: ARROW-751: [Python] Make all Cython modules
private. Some code tidying
Repository: arrow
Updated Branches:
refs/heads/master e93436503 -> 8b64a4fb2
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/schema.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/schema.pyx b/python/pyarrow/schema.pyx
deleted file mode 100644
index 4749809..0000000
--- a/python/pyarrow/schema.pyx
+++ /dev/null
@@ -1,477 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-########################################
-# Data types, fields, schemas, and so forth
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from cython.operator cimport dereference as deref
-
-from pyarrow.compat import frombytes, tobytes
-from pyarrow.array cimport Array
-from pyarrow.error cimport check_status
-from pyarrow.includes.libarrow cimport (CDataType, CStructType, CListType,
- CFixedSizeBinaryType,
- CDecimalType,
- TimeUnit_SECOND, TimeUnit_MILLI,
- TimeUnit_MICRO, TimeUnit_NANO,
- Type, TimeUnit)
-cimport pyarrow.includes.pyarrow as pyarrow
-cimport pyarrow.includes.libarrow as la
-
-cimport cpython
-
-import six
-
-
-cdef class DataType:
-
- def __cinit__(self):
- pass
-
- cdef void init(self, const shared_ptr[CDataType]& type):
- self.sp_type = type
- self.type = type.get()
-
- def __str__(self):
- return frombytes(self.type.ToString())
-
- def __repr__(self):
- return '{0.__class__.__name__}({0})'.format(self)
-
- def __richcmp__(DataType self, DataType other, int op):
- if op == cpython.Py_EQ:
- return self.type.Equals(deref(other.type))
- elif op == cpython.Py_NE:
- return not self.type.Equals(deref(other.type))
- else:
- raise TypeError('Invalid comparison')
-
-
-cdef class DictionaryType(DataType):
-
- cdef void init(self, const shared_ptr[CDataType]& type):
- DataType.init(self, type)
- self.dict_type = <const CDictionaryType*> type.get()
-
-
-cdef class TimestampType(DataType):
-
- cdef void init(self, const shared_ptr[CDataType]& type):
- DataType.init(self, type)
- self.ts_type = <const CTimestampType*> type.get()
-
- property unit:
-
- def __get__(self):
- return timeunit_to_string(self.ts_type.unit())
-
- property tz:
-
- def __get__(self):
- if self.ts_type.timezone().size() > 0:
- return frombytes(self.ts_type.timezone())
- else:
- return None
-
-
-cdef class FixedSizeBinaryType(DataType):
-
- cdef void init(self, const shared_ptr[CDataType]& type):
- DataType.init(self, type)
- self.fixed_size_binary_type = <const CFixedSizeBinaryType*> type.get()
-
- property byte_width:
-
- def __get__(self):
- return self.fixed_size_binary_type.byte_width()
-
-
-cdef class DecimalType(FixedSizeBinaryType):
-
- cdef void init(self, const shared_ptr[CDataType]& type):
- DataType.init(self, type)
- self.decimal_type = <const CDecimalType*> type.get()
-
-
-cdef class Field:
-
- def __cinit__(self):
- pass
-
- cdef init(self, const shared_ptr[CField]& field):
- self.sp_field = field
- self.field = field.get()
- self.type = box_data_type(field.get().type())
-
- @classmethod
- def from_py(cls, object name, DataType type, bint nullable=True):
- cdef Field result = Field()
- result.type = type
- result.sp_field.reset(new CField(tobytes(name), type.sp_type,
- nullable))
- result.field = result.sp_field.get()
-
- return result
-
- def __repr__(self):
- return 'Field({0!r}, type={1})'.format(self.name, str(self.type))
-
- property nullable:
-
- def __get__(self):
- return self.field.nullable()
-
- property name:
-
- def __get__(self):
- if box_field(self.sp_field) is None:
- raise ReferenceError(
- 'Field not initialized (references NULL pointer)')
- return frombytes(self.field.name())
-
-
-cdef class Schema:
-
- def __cinit__(self):
- pass
-
- def __len__(self):
- return self.schema.num_fields()
-
- def __getitem__(self, i):
- if i < 0 or i >= len(self):
- raise IndexError("{0} is out of bounds".format(i))
-
- cdef Field result = Field()
- result.init(self.schema.field(i))
- result.type = box_data_type(result.field.type())
-
- return result
-
- cdef init(self, const vector[shared_ptr[CField]]& fields):
- self.schema = new CSchema(fields)
- self.sp_schema.reset(self.schema)
-
- cdef init_schema(self, const shared_ptr[CSchema]& schema):
- self.schema = schema.get()
- self.sp_schema = schema
-
- def equals(self, other):
- """
- Test if this schema is equal to the other
- """
- cdef Schema _other
- _other = other
-
- return self.sp_schema.get().Equals(deref(_other.schema))
-
- def field_by_name(self, name):
- """
- Access a field by its name rather than the column index.
-
- Parameters
- ----------
- name: str
-
- Returns
- -------
- field: pyarrow.Field
- """
- return box_field(self.schema.GetFieldByName(tobytes(name)))
-
- @classmethod
- def from_fields(cls, fields):
- cdef:
- Schema result
- Field field
- vector[shared_ptr[CField]] c_fields
-
- c_fields.resize(len(fields))
-
- for i in range(len(fields)):
- field = fields[i]
- c_fields[i] = field.sp_field
-
- result = Schema()
- result.init(c_fields)
-
- return result
-
- def __str__(self):
- return frombytes(self.schema.ToString())
-
- def __repr__(self):
- return self.__str__()
-
-
-cdef dict _type_cache = {}
-
-
-cdef DataType primitive_type(Type type):
- if type in _type_cache:
- return _type_cache[type]
-
- cdef DataType out = DataType()
- out.init(pyarrow.GetPrimitiveType(type))
-
- _type_cache[type] = out
- return out
-
-#------------------------------------------------------------
-# Type factory functions
-
-def field(name, type, bint nullable=True):
- return Field.from_py(name, type, nullable)
-
-
-cdef set PRIMITIVE_TYPES = set([
- la.Type_NA, la.Type_BOOL,
- la.Type_UINT8, la.Type_INT8,
- la.Type_UINT16, la.Type_INT16,
- la.Type_UINT32, la.Type_INT32,
- la.Type_UINT64, la.Type_INT64,
- la.Type_TIMESTAMP, la.Type_DATE32,
- la.Type_DATE64,
- la.Type_HALF_FLOAT,
- la.Type_FLOAT,
- la.Type_DOUBLE])
-
-
-def null():
- return primitive_type(la.Type_NA)
-
-
-def bool_():
- return primitive_type(la.Type_BOOL)
-
-
-def uint8():
- return primitive_type(la.Type_UINT8)
-
-
-def int8():
- return primitive_type(la.Type_INT8)
-
-
-def uint16():
- return primitive_type(la.Type_UINT16)
-
-
-def int16():
- return primitive_type(la.Type_INT16)
-
-
-def uint32():
- return primitive_type(la.Type_UINT32)
-
-
-def int32():
- return primitive_type(la.Type_INT32)
-
-
-def uint64():
- return primitive_type(la.Type_UINT64)
-
-
-def int64():
- return primitive_type(la.Type_INT64)
-
-
-cdef dict _timestamp_type_cache = {}
-
-
-cdef timeunit_to_string(TimeUnit unit):
- if unit == TimeUnit_SECOND:
- return 's'
- elif unit == TimeUnit_MILLI:
- return 'ms'
- elif unit == TimeUnit_MICRO:
- return 'us'
- elif unit == TimeUnit_NANO:
- return 'ns'
-
-
-def timestamp(unit_str, tz=None):
- cdef:
- TimeUnit unit
- c_string c_timezone
-
- if unit_str == "s":
- unit = TimeUnit_SECOND
- elif unit_str == 'ms':
- unit = TimeUnit_MILLI
- elif unit_str == 'us':
- unit = TimeUnit_MICRO
- elif unit_str == 'ns':
- unit = TimeUnit_NANO
- else:
- raise TypeError('Invalid TimeUnit string')
-
- cdef TimestampType out = TimestampType()
-
- if tz is None:
- out.init(la.timestamp(unit))
- if unit in _timestamp_type_cache:
- return _timestamp_type_cache[unit]
- _timestamp_type_cache[unit] = out
- else:
- if not isinstance(tz, six.string_types):
- tz = tz.zone
-
- c_timezone = tobytes(tz)
- out.init(la.timestamp(unit, c_timezone))
-
- return out
-
-
-def date32():
- return primitive_type(la.Type_DATE32)
-
-
-def date64():
- return primitive_type(la.Type_DATE64)
-
-
-def float16():
- return primitive_type(la.Type_HALF_FLOAT)
-
-
-def float32():
- return primitive_type(la.Type_FLOAT)
-
-
-def float64():
- return primitive_type(la.Type_DOUBLE)
-
-
-cpdef DataType decimal(int precision, int scale=0):
- cdef shared_ptr[CDataType] decimal_type
- decimal_type.reset(new CDecimalType(precision, scale))
- return box_data_type(decimal_type)
-
-
-def string():
- """
- UTF8 string
- """
- return primitive_type(la.Type_STRING)
-
-
-def binary(int length=-1):
- """Binary (PyBytes-like) type
-
- Parameters
- ----------
- length : int, optional, default -1
- If length == -1 then return a variable length binary type. If length is
- greater than or equal to 0 then return a fixed size binary type of
- width `length`.
- """
- if length == -1:
- return primitive_type(la.Type_BINARY)
-
- cdef shared_ptr[CDataType] fixed_size_binary_type
- fixed_size_binary_type.reset(new CFixedSizeBinaryType(length))
- return box_data_type(fixed_size_binary_type)
-
-
-def list_(DataType value_type):
- cdef DataType out = DataType()
- cdef shared_ptr[CDataType] list_type
- list_type.reset(new CListType(value_type.sp_type))
- out.init(list_type)
- return out
-
-
-def dictionary(DataType index_type, Array dictionary):
- """
- Dictionary (categorical, or simply encoded) type
- """
- cdef DictionaryType out = DictionaryType()
- cdef shared_ptr[CDataType] dict_type
- dict_type.reset(new CDictionaryType(index_type.sp_type,
- dictionary.sp_array))
- out.init(dict_type)
- return out
-
-
-def struct(fields):
- """
-
- """
- cdef:
- DataType out = DataType()
- Field field
- vector[shared_ptr[CField]] c_fields
- cdef shared_ptr[CDataType] struct_type
-
- for field in fields:
- c_fields.push_back(field.sp_field)
-
- struct_type.reset(new CStructType(c_fields))
- out.init(struct_type)
- return out
-
-
-def schema(fields):
- return Schema.from_fields(fields)
-
-
-cdef DataType box_data_type(const shared_ptr[CDataType]& type):
- cdef:
- DataType out
-
- if type.get() == NULL:
- return None
-
- if type.get().id() == la.Type_DICTIONARY:
- out = DictionaryType()
- elif type.get().id() == la.Type_TIMESTAMP:
- out = TimestampType()
- elif type.get().id() == la.Type_FIXED_SIZE_BINARY:
- out = FixedSizeBinaryType()
- elif type.get().id() == la.Type_DECIMAL:
- out = DecimalType()
- else:
- out = DataType()
-
- out.init(type)
- return out
-
-cdef Field box_field(const shared_ptr[CField]& field):
- if field.get() == NULL:
- return None
- cdef Field out = Field()
- out.init(field)
- return out
-
-cdef Schema box_schema(const shared_ptr[CSchema]& type):
- cdef Schema out = Schema()
- out.init_schema(type)
- return out
-
-
-def type_from_numpy_dtype(object dtype):
- cdef shared_ptr[CDataType] c_type
- with nogil:
- check_status(pyarrow.NumPyDtypeToArrow(dtype, &c_type))
-
- return box_data_type(c_type)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/table.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxd b/python/pyarrow/table.pxd
deleted file mode 100644
index f564042..0000000
--- a/python/pyarrow/table.pxd
+++ /dev/null
@@ -1,63 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.common cimport shared_ptr
-from pyarrow.includes.libarrow cimport (CChunkedArray, CColumn, CTable,
- CRecordBatch)
-
-from pyarrow.schema cimport Schema
-
-
-cdef class ChunkedArray:
- cdef:
- shared_ptr[CChunkedArray] sp_chunked_array
- CChunkedArray* chunked_array
-
- cdef init(self, const shared_ptr[CChunkedArray]& chunked_array)
- cdef _check_nullptr(self)
-
-
-cdef class Column:
- cdef:
- shared_ptr[CColumn] sp_column
- CColumn* column
-
- cdef init(self, const shared_ptr[CColumn]& column)
- cdef _check_nullptr(self)
-
-
-cdef class Table:
- cdef:
- shared_ptr[CTable] sp_table
- CTable* table
-
- cdef init(self, const shared_ptr[CTable]& table)
- cdef _check_nullptr(self)
-
-
-cdef class RecordBatch:
- cdef:
- shared_ptr[CRecordBatch] sp_batch
- CRecordBatch* batch
- Schema _schema
-
- cdef init(self, const shared_ptr[CRecordBatch]& table)
- cdef _check_nullptr(self)
-
-cdef object box_column(const shared_ptr[CColumn]& ccolumn)
-cdef api object table_from_ctable(const shared_ptr[CTable]& ctable)
-cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
deleted file mode 100644
index 3972bda..0000000
--- a/python/pyarrow/table.pyx
+++ /dev/null
@@ -1,915 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from cython.operator cimport dereference as deref
-
-from pyarrow.includes.libarrow cimport *
-from pyarrow.includes.common cimport *
-cimport pyarrow.includes.pyarrow as pyarrow
-
-import pyarrow.config
-
-from pyarrow.array cimport Array, box_array, wrap_array_output
-from pyarrow.error import ArrowException
-from pyarrow.error cimport check_status
-from pyarrow.schema cimport box_data_type, box_schema, DataType, Field
-
-from pyarrow.schema import field
-from pyarrow.compat import frombytes, tobytes
-
-cimport cpython
-
-from collections import OrderedDict
-
-
-cdef _pandas():
- import pandas as pd
- return pd
-
-
-cdef class ChunkedArray:
- """
- Array backed via one or more memory chunks.
-
- Warning
- -------
- Do not call this class's constructor directly.
- """
-
- def __cinit__(self):
- self.chunked_array = NULL
-
- cdef init(self, const shared_ptr[CChunkedArray]& chunked_array):
- self.sp_chunked_array = chunked_array
- self.chunked_array = chunked_array.get()
-
- cdef _check_nullptr(self):
- if self.chunked_array == NULL:
- raise ReferenceError("ChunkedArray object references a NULL "
- "pointer. Not initialized.")
-
- def length(self):
- self._check_nullptr()
- return self.chunked_array.length()
-
- def __len__(self):
- return self.length()
-
- @property
- def null_count(self):
- """
- Number of null entires
-
- Returns
- -------
- int
- """
- self._check_nullptr()
- return self.chunked_array.null_count()
-
- @property
- def num_chunks(self):
- """
- Number of underlying chunks
-
- Returns
- -------
- int
- """
- self._check_nullptr()
- return self.chunked_array.num_chunks()
-
- def chunk(self, i):
- """
- Select a chunk by its index
-
- Parameters
- ----------
- i : int
-
- Returns
- -------
- pyarrow.array.Array
- """
- self._check_nullptr()
- return box_array(self.chunked_array.chunk(i))
-
- def iterchunks(self):
- for i in range(self.num_chunks):
- yield self.chunk(i)
-
- def to_pylist(self):
- """
- Convert to a list of native Python objects.
- """
- result = []
- for i in range(self.num_chunks):
- result += self.chunk(i).to_pylist()
- return result
-
-
-cdef class Column:
- """
- Named vector of elements of equal type.
-
- Warning
- -------
- Do not call this class's constructor directly.
- """
-
- def __cinit__(self):
- self.column = NULL
-
- cdef init(self, const shared_ptr[CColumn]& column):
- self.sp_column = column
- self.column = column.get()
-
- @staticmethod
- def from_array(object field_or_name, Array arr):
- cdef Field boxed_field
-
- if isinstance(field_or_name, Field):
- boxed_field = field_or_name
- else:
- boxed_field = field(field_or_name, arr.type)
-
- cdef shared_ptr[CColumn] sp_column
- sp_column.reset(new CColumn(boxed_field.sp_field, arr.sp_array))
- return box_column(sp_column)
-
- def to_pandas(self):
- """
- Convert the arrow::Column to a pandas.Series
-
- Returns
- -------
- pandas.Series
- """
- cdef:
- PyObject* out
-
- check_status(pyarrow.ConvertColumnToPandas(self.sp_column,
- <PyObject*> self, &out))
-
- return _pandas().Series(wrap_array_output(out), name=self.name)
-
- def equals(self, Column other):
- """
- Check if contents of two columns are equal
-
- Parameters
- ----------
- other : pyarrow.Column
-
- Returns
- -------
- are_equal : boolean
- """
- cdef:
- CColumn* my_col = self.column
- CColumn* other_col = other.column
- c_bool result
-
- self._check_nullptr()
- other._check_nullptr()
-
- with nogil:
- result = my_col.Equals(deref(other_col))
-
- return result
-
- def to_pylist(self):
- """
- Convert to a list of native Python objects.
- """
- return self.data.to_pylist()
-
- cdef _check_nullptr(self):
- if self.column == NULL:
- raise ReferenceError("Column object references a NULL pointer."
- "Not initialized.")
-
- def __len__(self):
- self._check_nullptr()
- return self.column.length()
-
- def length(self):
- self._check_nullptr()
- return self.column.length()
-
- @property
- def shape(self):
- """
- Dimensions of this columns
-
- Returns
- -------
- (int,)
- """
- self._check_nullptr()
- return (self.length(),)
-
- @property
- def null_count(self):
- """
- Number of null entires
-
- Returns
- -------
- int
- """
- self._check_nullptr()
- return self.column.null_count()
-
- @property
- def name(self):
- """
- Label of the column
-
- Returns
- -------
- str
- """
- return bytes(self.column.name()).decode('utf8')
-
- @property
- def type(self):
- """
- Type information for this column
-
- Returns
- -------
- pyarrow.schema.DataType
- """
- return box_data_type(self.column.type())
-
- @property
- def data(self):
- """
- The underlying data
-
- Returns
- -------
- pyarrow.table.ChunkedArray
- """
- cdef ChunkedArray chunked_array = ChunkedArray()
- chunked_array.init(self.column.data())
- return chunked_array
-
-
-cdef _schema_from_arrays(arrays, names, shared_ptr[CSchema]* schema):
- cdef:
- Array arr
- Column col
- c_string c_name
- vector[shared_ptr[CField]] fields
- cdef shared_ptr[CDataType] type_
-
- cdef int K = len(arrays)
-
- fields.resize(K)
-
- if len(arrays) == 0:
- raise ValueError('Must pass at least one array')
-
- if isinstance(arrays[0], Array):
- if names is None:
- raise ValueError('Must pass names when constructing '
- 'from Array objects')
- for i in range(K):
- arr = arrays[i]
- type_ = arr.type.sp_type
- c_name = tobytes(names[i])
- fields[i].reset(new CField(c_name, type_, True))
- elif isinstance(arrays[0], Column):
- for i in range(K):
- col = arrays[i]
- type_ = col.sp_column.get().type()
- c_name = tobytes(col.name)
- fields[i].reset(new CField(c_name, type_, True))
- else:
- raise TypeError(type(arrays[0]))
-
- schema.reset(new CSchema(fields))
-
-
-
-cdef _dataframe_to_arrays(df, timestamps_to_ms, Schema schema):
- cdef:
- list names = []
- list arrays = []
- DataType type = None
-
- for name in df.columns:
- col = df[name]
- if schema is not None:
- type = schema.field_by_name(name).type
-
- arr = Array.from_numpy(col, type=type,
- timestamps_to_ms=timestamps_to_ms)
- names.append(name)
- arrays.append(arr)
-
- return names, arrays
-
-
-cdef class RecordBatch:
- """
- Batch of rows of columns of equal length
-
- Warning
- -------
- Do not call this class's constructor directly, use one of the ``from_*``
- methods instead.
- """
-
- def __cinit__(self):
- self.batch = NULL
- self._schema = None
-
- cdef init(self, const shared_ptr[CRecordBatch]& batch):
- self.sp_batch = batch
- self.batch = batch.get()
-
- cdef _check_nullptr(self):
- if self.batch == NULL:
- raise ReferenceError("Object not initialized")
-
- def __len__(self):
- self._check_nullptr()
- return self.batch.num_rows()
-
- @property
- def num_columns(self):
- """
- Number of columns
-
- Returns
- -------
- int
- """
- self._check_nullptr()
- return self.batch.num_columns()
-
- @property
- def num_rows(self):
- """
- Number of rows
-
- Due to the definition of a RecordBatch, all columns have the same
- number of rows.
-
- Returns
- -------
- int
- """
- return len(self)
-
- @property
- def schema(self):
- """
- Schema of the RecordBatch and its columns
-
- Returns
- -------
- pyarrow.schema.Schema
- """
- cdef Schema schema
- self._check_nullptr()
- if self._schema is None:
- schema = Schema()
- schema.init_schema(self.batch.schema())
- self._schema = schema
-
- return self._schema
-
- def __getitem__(self, i):
- return box_array(self.batch.column(i))
-
- def slice(self, offset=0, length=None):
- """
- Compute zero-copy slice of this RecordBatch
-
- Parameters
- ----------
- offset : int, default 0
- Offset from start of array to slice
- length : int, default None
- Length of slice (default is until end of batch starting from
- offset)
-
- Returns
- -------
- sliced : RecordBatch
- """
- cdef shared_ptr[CRecordBatch] result
-
- if offset < 0:
- raise IndexError('Offset must be non-negative')
-
- if length is None:
- result = self.batch.Slice(offset)
- else:
- result = self.batch.Slice(offset, length)
-
- return batch_from_cbatch(result)
-
- def equals(self, RecordBatch other):
- cdef:
- CRecordBatch* my_batch = self.batch
- CRecordBatch* other_batch = other.batch
- c_bool result
-
- self._check_nullptr()
- other._check_nullptr()
-
- with nogil:
- result = my_batch.Equals(deref(other_batch))
-
- return result
-
- def to_pydict(self):
- """
- Converted the arrow::RecordBatch to an OrderedDict
-
- Returns
- -------
- OrderedDict
- """
- entries = []
- for i in range(self.batch.num_columns()):
- name = bytes(self.batch.column_name(i)).decode('utf8')
- column = self[i].to_pylist()
- entries.append((name, column))
- return OrderedDict(entries)
-
-
- def to_pandas(self, nthreads=None):
- """
- Convert the arrow::RecordBatch to a pandas DataFrame
-
- Returns
- -------
- pandas.DataFrame
- """
- return Table.from_batches([self]).to_pandas(nthreads=nthreads)
-
- @classmethod
- def from_pandas(cls, df, schema=None):
- """
- Convert pandas.DataFrame to an Arrow RecordBatch
-
- Parameters
- ----------
- df: pandas.DataFrame
- schema: pyarrow.Schema (optional)
- The expected schema of the RecordBatch. This can be used to
- indicate the type of columns if we cannot infer it automatically.
-
- Returns
- -------
- pyarrow.table.RecordBatch
- """
- names, arrays = _dataframe_to_arrays(df, False, schema)
- return cls.from_arrays(arrays, names)
-
- @staticmethod
- def from_arrays(arrays, names):
- """
- Construct a RecordBatch from multiple pyarrow.Arrays
-
- Parameters
- ----------
- arrays: list of pyarrow.Array
- column-wise data vectors
- names: list of str
- Labels for the columns
-
- Returns
- -------
- pyarrow.table.RecordBatch
- """
- cdef:
- Array arr
- c_string c_name
- shared_ptr[CSchema] schema
- shared_ptr[CRecordBatch] batch
- vector[shared_ptr[CArray]] c_arrays
- int64_t num_rows
-
- if len(arrays) == 0:
- raise ValueError('Record batch cannot contain no arrays (for now)')
-
- num_rows = len(arrays[0])
- _schema_from_arrays(arrays, names, &schema)
-
- for i in range(len(arrays)):
- arr = arrays[i]
- c_arrays.push_back(arr.sp_array)
-
- batch.reset(new CRecordBatch(schema, num_rows, c_arrays))
- return batch_from_cbatch(batch)
-
-
-cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
- cdef:
- PyObject* result_obj
- CColumn* col
- int i
-
- import pandas.core.internals as _int
- from pandas import RangeIndex, Categorical
- from pyarrow.compat import DatetimeTZDtype
-
- with nogil:
- check_status(pyarrow.ConvertTableToPandas(table, nthreads,
- &result_obj))
-
- result = PyObject_to_object(result_obj)
-
- blocks = []
- for item in result:
- block_arr = item['block']
- placement = item['placement']
- if 'dictionary' in item:
- cat = Categorical(block_arr,
- categories=item['dictionary'],
- ordered=False, fastpath=True)
- block = _int.make_block(cat, placement=placement,
- klass=_int.CategoricalBlock,
- fastpath=True)
- elif 'timezone' in item:
- dtype = DatetimeTZDtype('ns', tz=item['timezone'])
- block = _int.make_block(block_arr, placement=placement,
- klass=_int.DatetimeTZBlock,
- dtype=dtype, fastpath=True)
- else:
- block = _int.make_block(block_arr, placement=placement)
- blocks.append(block)
-
- names = []
- for i in range(table.get().num_columns()):
- col = table.get().column(i).get()
- names.append(frombytes(col.name()))
-
- axes = [names, RangeIndex(table.get().num_rows())]
- return _int.BlockManager(blocks, axes)
-
-
-cdef class Table:
- """
- A collection of top-level named, equal length Arrow arrays.
-
- Warning
- -------
- Do not call this class's constructor directly, use one of the ``from_*``
- methods instead.
- """
-
- def __cinit__(self):
- self.table = NULL
-
- def __repr__(self):
- return 'pyarrow.Table\n{0}'.format(str(self.schema))
-
- cdef init(self, const shared_ptr[CTable]& table):
- self.sp_table = table
- self.table = table.get()
-
- cdef _check_nullptr(self):
- if self.table == NULL:
- raise ReferenceError("Table object references a NULL pointer."
- "Not initialized.")
-
- def equals(self, Table other):
- """
- Check if contents of two tables are equal
-
- Parameters
- ----------
- other : pyarrow.Table
-
- Returns
- -------
- are_equal : boolean
- """
- cdef:
- CTable* my_table = self.table
- CTable* other_table = other.table
- c_bool result
-
- self._check_nullptr()
- other._check_nullptr()
-
- with nogil:
- result = my_table.Equals(deref(other_table))
-
- return result
-
- @classmethod
- def from_pandas(cls, df, timestamps_to_ms=False, schema=None):
- """
- Convert pandas.DataFrame to an Arrow Table
-
- Parameters
- ----------
- df: pandas.DataFrame
-
- timestamps_to_ms: bool
- Convert datetime columns to ms resolution. This is needed for
- compability with other functionality like Parquet I/O which
- only supports milliseconds.
-
- schema: pyarrow.Schema (optional)
- The expected schema of the Arrow Table. This can be used to
- indicate the type of columns if we cannot infer it automatically.
-
- Returns
- -------
- pyarrow.table.Table
-
- Examples
- --------
-
- >>> import pandas as pd
- >>> import pyarrow as pa
- >>> df = pd.DataFrame({
- ... 'int': [1, 2],
- ... 'str': ['a', 'b']
- ... })
- >>> pa.Table.from_pandas(df)
- <pyarrow.table.Table object at 0x7f05d1fb1b40>
- """
- names, arrays = _dataframe_to_arrays(df,
- timestamps_to_ms=timestamps_to_ms,
- schema=schema)
- return cls.from_arrays(arrays, names=names)
-
- @staticmethod
- def from_arrays(arrays, names=None):
- """
- Construct a Table from Arrow arrays or columns
-
- Parameters
- ----------
- arrays: list of pyarrow.Array or pyarrow.Column
- Equal-length arrays that should form the table.
- names: list of str, optional
- Names for the table columns. If Columns passed, will be
- inferred. If Arrays passed, this argument is required
-
- Returns
- -------
- pyarrow.table.Table
-
- """
- cdef:
- vector[shared_ptr[CField]] fields
- vector[shared_ptr[CColumn]] columns
- shared_ptr[CSchema] schema
- shared_ptr[CTable] table
-
- _schema_from_arrays(arrays, names, &schema)
-
- cdef int K = len(arrays)
- columns.resize(K)
-
- for i in range(K):
- if isinstance(arrays[i], Array):
- columns[i].reset(new CColumn(schema.get().field(i),
- (<Array> arrays[i]).sp_array))
- elif isinstance(arrays[i], Column):
- columns[i] = (<Column> arrays[i]).sp_column
- else:
- raise ValueError(type(arrays[i]))
-
- table.reset(new CTable(schema, columns))
- return table_from_ctable(table)
-
- @staticmethod
- def from_batches(batches):
- """
- Construct a Table from a list of Arrow RecordBatches
-
- Parameters
- ----------
-
- batches: list of RecordBatch
- RecordBatch list to be converted, schemas must be equal
- """
- cdef:
- vector[shared_ptr[CRecordBatch]] c_batches
- shared_ptr[CTable] c_table
- RecordBatch batch
-
- for batch in batches:
- c_batches.push_back(batch.sp_batch)
-
- with nogil:
- check_status(CTable.FromRecordBatches(c_batches, &c_table))
-
- return table_from_ctable(c_table)
-
- def to_pandas(self, nthreads=None):
- """
- Convert the arrow::Table to a pandas DataFrame
-
- Parameters
- ----------
- nthreads : int, default max(1, multiprocessing.cpu_count() / 2)
- For the default, we divide the CPU count by 2 because most modern
- computers have hyperthreading turned on, so doubling the CPU count
- beyond the number of physical cores does not help
-
- Returns
- -------
- pandas.DataFrame
- """
- if nthreads is None:
- nthreads = pyarrow.config.cpu_count()
-
- mgr = table_to_blockmanager(self.sp_table, nthreads)
- return _pandas().DataFrame(mgr)
-
- def to_pydict(self):
- """
- Converted the arrow::Table to an OrderedDict
-
- Returns
- -------
- OrderedDict
- """
- entries = []
- for i in range(self.table.num_columns()):
- name = self.column(i).name
- column = self.column(i).to_pylist()
- entries.append((name, column))
- return OrderedDict(entries)
-
- @property
- def schema(self):
- """
- Schema of the table and its columns
-
- Returns
- -------
- pyarrow.schema.Schema
- """
- return box_schema(self.table.schema())
-
- def column(self, index):
- """
- Select a column by its numeric index.
-
- Parameters
- ----------
- index: int
-
- Returns
- -------
- pyarrow.table.Column
- """
- self._check_nullptr()
- cdef Column column = Column()
- column.init(self.table.column(index))
- return column
-
- def __getitem__(self, i):
- return self.column(i)
-
- def itercolumns(self):
- """
- Iterator over all columns in their numerical order
- """
- for i in range(self.num_columns):
- yield self.column(i)
-
- @property
- def num_columns(self):
- """
- Number of columns in this table
-
- Returns
- -------
- int
- """
- self._check_nullptr()
- return self.table.num_columns()
-
- @property
- def num_rows(self):
- """
- Number of rows in this table.
-
- Due to the definition of a table, all columns have the same number of rows.
-
- Returns
- -------
- int
- """
- self._check_nullptr()
- return self.table.num_rows()
-
- def __len__(self):
- return self.num_rows
-
- @property
- def shape(self):
- """
- Dimensions of the table: (#rows, #columns)
-
- Returns
- -------
- (int, int)
- """
- return (self.num_rows, self.num_columns)
-
- def add_column(self, int i, Column column):
- """
- Add column to Table at position. Returns new table
- """
- cdef:
- shared_ptr[CTable] c_table
-
- with nogil:
- check_status(self.table.AddColumn(i, column.sp_column, &c_table))
-
- return table_from_ctable(c_table)
-
- def append_column(self, Column column):
- """
- Append column at end of columns. Returns new table
- """
- return self.add_column(self.num_columns, column)
-
- def remove_column(self, int i):
- """
- Create new Table with the indicated column removed
- """
- cdef shared_ptr[CTable] c_table
-
- with nogil:
- check_status(self.table.RemoveColumn(i, &c_table))
-
- return table_from_ctable(c_table)
-
-
-def concat_tables(tables):
- """
- Perform zero-copy concatenation of pyarrow.Table objects. Raises exception
- if all of the Table schemas are not the same
-
- Parameters
- ----------
- tables : iterable of pyarrow.Table objects
- output_name : string, default None
- A name for the output table, if any
- """
- cdef:
- vector[shared_ptr[CTable]] c_tables
- shared_ptr[CTable] c_result
- Table table
-
- for table in tables:
- c_tables.push_back(table.sp_table)
-
- with nogil:
- check_status(ConcatenateTables(c_tables, &c_result))
-
- return table_from_ctable(c_result)
-
-
-cdef object box_column(const shared_ptr[CColumn]& ccolumn):
- cdef Column column = Column()
- column.init(ccolumn)
- return column
-
-
-cdef api object table_from_ctable(const shared_ptr[CTable]& ctable):
- cdef Table table = Table()
- table.init(ctable)
- return table
-
-
-cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch):
- cdef RecordBatch batch = RecordBatch()
- batch.init(cbatch)
- return batch
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/tests/test_feather.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_feather.py b/python/pyarrow/tests/test_feather.py
index cba9464..6f8040f 100644
--- a/python/pyarrow/tests/test_feather.py
+++ b/python/pyarrow/tests/test_feather.py
@@ -25,7 +25,7 @@ import pyarrow as pa
from pyarrow.compat import guid
from pyarrow.feather import (read_feather, write_feather,
FeatherReader)
-from pyarrow.io import FeatherWriter
+from pyarrow._io import FeatherWriter
def random_path():
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/tests/test_hdfs.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py
index b8f7e25..d2a5479 100644
--- a/python/pyarrow/tests/test_hdfs.py
+++ b/python/pyarrow/tests/test_hdfs.py
@@ -26,8 +26,6 @@ import pandas.util.testing as pdt
import pytest
from pyarrow.compat import guid
-from pyarrow.filesystem import HdfsClient
-import pyarrow.io as io
import pyarrow as pa
import pyarrow.tests.test_parquet as test_parquet
@@ -45,7 +43,7 @@ def hdfs_test_client(driver='libhdfs'):
raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
'an integer')
- return HdfsClient(host, port, user, driver=driver)
+ return pa.HdfsClient(host, port, user, driver=driver)
@pytest.mark.hdfs
@@ -190,7 +188,7 @@ class TestLibHdfs(HdfsTestCases, unittest.TestCase):
@classmethod
def check_driver(cls):
- if not io.have_libhdfs():
+ if not pa.have_libhdfs():
pytest.fail('No libhdfs available on system')
def test_hdfs_orphaned_file(self):
@@ -209,5 +207,5 @@ class TestLibHdfs3(HdfsTestCases, unittest.TestCase):
@classmethod
def check_driver(cls):
- if not io.have_libhdfs3():
+ if not pa.have_libhdfs3():
pytest.fail('No libhdfs3 available on system')
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/tests/test_io.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index beb6113..c5d3708 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -24,7 +24,6 @@ import numpy as np
from pyarrow.compat import u, guid
import pyarrow as pa
-import pyarrow.io as io
# ----------------------------------------------------------------------
# Python file-like objects
@@ -33,7 +32,7 @@ import pyarrow.io as io
def test_python_file_write():
buf = BytesIO()
- f = io.PythonFileInterface(buf)
+ f = pa.PythonFileInterface(buf)
assert f.tell() == 0
@@ -57,7 +56,7 @@ def test_python_file_read():
data = b'some sample data'
buf = BytesIO(data)
- f = io.PythonFileInterface(buf, mode='r')
+ f = pa.PythonFileInterface(buf, mode='r')
assert f.size() == len(data)
@@ -82,7 +81,7 @@ def test_python_file_read():
def test_bytes_reader():
# Like a BytesIO, but zero-copy underneath for C++ consumers
data = b'some sample data'
- f = io.BufferReader(data)
+ f = pa.BufferReader(data)
assert f.tell() == 0
assert f.size() == len(data)
@@ -103,7 +102,7 @@ def test_bytes_reader():
def test_bytes_reader_non_bytes():
with pytest.raises(ValueError):
- io.BufferReader(u('some sample data'))
+ pa.BufferReader(u('some sample data'))
def test_bytes_reader_retains_parent_reference():
@@ -112,7 +111,7 @@ def test_bytes_reader_retains_parent_reference():
# ARROW-421
def get_buffer():
data = b'some sample data' * 1000
- reader = io.BufferReader(data)
+ reader = pa.BufferReader(data)
reader.seek(5)
return reader.read_buffer(6)
@@ -129,7 +128,7 @@ def test_buffer_bytes():
val = b'some data'
buf = pa.frombuffer(val)
- assert isinstance(buf, io.Buffer)
+ assert isinstance(buf, pa.Buffer)
result = buf.to_pybytes()
@@ -140,7 +139,7 @@ def test_buffer_memoryview():
val = b'some data'
buf = pa.frombuffer(val)
- assert isinstance(buf, io.Buffer)
+ assert isinstance(buf, pa.Buffer)
result = memoryview(buf)
@@ -151,7 +150,7 @@ def test_buffer_bytearray():
val = bytearray(b'some data')
buf = pa.frombuffer(val)
- assert isinstance(buf, io.Buffer)
+ assert isinstance(buf, pa.Buffer)
result = bytearray(buf)
@@ -162,7 +161,7 @@ def test_buffer_memoryview_is_immutable():
val = b'some data'
buf = pa.frombuffer(val)
- assert isinstance(buf, io.Buffer)
+ assert isinstance(buf, pa.Buffer)
result = memoryview(buf)
@@ -180,7 +179,7 @@ def test_memory_output_stream():
# 10 bytes
val = b'dataabcdef'
- f = io.InMemoryOutputStream()
+ f = pa.InMemoryOutputStream()
K = 1000
for i in range(K):
@@ -193,7 +192,7 @@ def test_memory_output_stream():
def test_inmemory_write_after_closed():
- f = io.InMemoryOutputStream()
+ f = pa.InMemoryOutputStream()
f.write(b'ok')
f.get_result()
@@ -213,7 +212,7 @@ def test_buffer_protocol_ref_counting():
def test_nativefile_write_memoryview():
- f = io.InMemoryOutputStream()
+ f = pa.InMemoryOutputStream()
data = b'ok'
arr = np.frombuffer(data, dtype='S1')
@@ -289,7 +288,7 @@ def test_memory_map_retain_buffer_reference(sample_disk_data):
def test_os_file_reader(sample_disk_data):
- _check_native_file_reader(io.OSFile, sample_disk_data)
+ _check_native_file_reader(pa.OSFile, sample_disk_data)
def _try_delete(path):
@@ -354,10 +353,10 @@ def test_os_file_writer():
f.write(data)
# Truncates file
- f2 = io.OSFile(path, mode='w')
+ f2 = pa.OSFile(path, mode='w')
f2.write('foo')
- with io.OSFile(path) as f3:
+ with pa.OSFile(path) as f3:
assert f3.size() == 3
with pytest.raises(IOError):
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index de1b148..a5c70aa 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -24,7 +24,6 @@ import pytest
from pyarrow.compat import guid, u
from pyarrow.filesystem import LocalFilesystem
import pyarrow as pa
-import pyarrow.io as paio
from .pandas_examples import dataframe_with_arrays, dataframe_with_lists
import numpy as np
@@ -180,10 +179,10 @@ def _test_dataframe(size=10000, seed=0):
def test_pandas_parquet_native_file_roundtrip(tmpdir):
df = _test_dataframe(10000)
arrow_table = pa.Table.from_pandas(df)
- imos = paio.InMemoryOutputStream()
+ imos = pa.InMemoryOutputStream()
pq.write_table(arrow_table, imos, version="2.0")
buf = imos.get_result()
- reader = paio.BufferReader(buf)
+ reader = pa.BufferReader(buf)
df_read = pq.read_table(reader).to_pandas()
tm.assert_frame_equal(df, df_read)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/tests/test_schema.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_schema.py b/python/pyarrow/tests/test_schema.py
index 5588840..53b6b68 100644
--- a/python/pyarrow/tests/test_schema.py
+++ b/python/pyarrow/tests/test_schema.py
@@ -16,13 +16,9 @@
# under the License.
import pytest
-
-import pyarrow as pa
-
import numpy as np
-# XXX: pyarrow.schema.schema masks the module on imports
-sch = pa._schema
+import pyarrow as pa
def test_type_integers():
@@ -62,7 +58,7 @@ def test_type_from_numpy_dtype_timestamps():
]
for dt, pt in cases:
- result = sch.type_from_numpy_dtype(dt)
+ result = pa.from_numpy_dtype(dt)
assert result == pt
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index 99bac15..3991856 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -99,16 +99,14 @@ class build_ext(_build_ext):
os.environ.get('PYARROW_BUNDLE_ARROW_CPP', '0'))
CYTHON_MODULE_NAMES = [
- 'array',
- 'config',
- 'error',
- 'io',
- 'jemalloc',
- 'memory',
+ '_array',
+ '_config',
+ '_error',
+ '_io',
+ '_jemalloc',
+ '_memory',
'_parquet',
- 'scalar',
- 'schema',
- 'table']
+ '_table']
def _run_cmake(self):
# The directory containing this setup.py
@@ -261,7 +259,7 @@ class build_ext(_build_ext):
def _failure_permitted(self, name):
if name == '_parquet' and not self.with_parquet:
return True
- if name == 'jemalloc' and not self.with_jemalloc:
+ if name == '_jemalloc' and not self.with_jemalloc:
return True
return False
[4/4] arrow git commit: ARROW-751: [Python] Make all Cython modules
private. Some code tidying
Posted by uw...@apache.org.
ARROW-751: [Python] Make all Cython modules private. Some code tidying
I also combined schema/array/scalar, as they are all interrelated.
Author: Wes McKinney <we...@twosigma.com>
Closes #533 from wesm/ARROW-751 and squashes the following commits:
63b479b [Wes McKinney] jemalloc is now private
0f46116 [Wes McKinney] Fix APIs in Parquet
1074e7c [Wes McKinney] Make all Cython modules private. Code cleaning
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/8b64a4fb
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/8b64a4fb
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/8b64a4fb
Branch: refs/heads/master
Commit: 8b64a4fb2d3973813e2094e108021606034d27f4
Parents: e934365
Author: Wes McKinney <we...@twosigma.com>
Authored: Thu Apr 13 12:51:47 2017 +0200
Committer: Uwe L. Korn <uw...@xhochy.com>
Committed: Thu Apr 13 12:51:47 2017 +0200
----------------------------------------------------------------------
ci/travis_script_python.sh | 2 +-
python/CMakeLists.txt | 16 +-
python/pyarrow/__init__.py | 84 +-
python/pyarrow/_array.pxd | 233 +++++
python/pyarrow/_array.pyx | 1368 +++++++++++++++++++++++++++++
python/pyarrow/_config.pyx | 54 ++
python/pyarrow/_error.pxd | 20 +
python/pyarrow/_error.pyx | 70 ++
python/pyarrow/_io.pxd | 50 ++
python/pyarrow/_io.pyx | 1273 +++++++++++++++++++++++++++
python/pyarrow/_jemalloc.pyx | 28 +
python/pyarrow/_memory.pxd | 30 +
python/pyarrow/_memory.pyx | 52 ++
python/pyarrow/_parquet.pyx | 16 +-
python/pyarrow/_table.pxd | 62 ++
python/pyarrow/_table.pyx | 913 +++++++++++++++++++
python/pyarrow/array.pxd | 141 ---
python/pyarrow/array.pyx | 646 --------------
python/pyarrow/config.pyx | 54 --
python/pyarrow/error.pxd | 20 -
python/pyarrow/error.pyx | 70 --
python/pyarrow/feather.py | 6 +-
python/pyarrow/filesystem.py | 2 +-
python/pyarrow/formatting.py | 4 +-
python/pyarrow/includes/libarrow.pxd | 5 +-
python/pyarrow/io.pxd | 50 --
python/pyarrow/io.pyx | 1276 ---------------------------
python/pyarrow/ipc.py | 10 +-
python/pyarrow/jemalloc.pyx | 28 -
python/pyarrow/memory.pxd | 30 -
python/pyarrow/memory.pyx | 52 --
python/pyarrow/parquet.py | 4 +-
python/pyarrow/scalar.pxd | 72 --
python/pyarrow/scalar.pyx | 315 -------
python/pyarrow/schema.pxd | 76 --
python/pyarrow/schema.pyx | 477 ----------
python/pyarrow/table.pxd | 63 --
python/pyarrow/table.pyx | 915 -------------------
python/pyarrow/tests/test_feather.py | 2 +-
python/pyarrow/tests/test_hdfs.py | 8 +-
python/pyarrow/tests/test_io.py | 31 +-
python/pyarrow/tests/test_parquet.py | 5 +-
python/pyarrow/tests/test_schema.py | 8 +-
python/setup.py | 18 +-
44 files changed, 4255 insertions(+), 4404 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/ci/travis_script_python.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh
index 680eb01..549fe11 100755
--- a/ci/travis_script_python.sh
+++ b/ci/travis_script_python.sh
@@ -115,7 +115,7 @@ python_version_tests() {
python setup.py build_ext --inplace --with-parquet --with-jemalloc
python -c "import pyarrow.parquet"
- python -c "import pyarrow.jemalloc"
+ python -c "import pyarrow._jemalloc"
python -m pytest -vv -r sxX pyarrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 3e86521..36052bc 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -261,14 +261,12 @@ if (UNIX)
endif()
set(CYTHON_EXTENSIONS
- array
- config
- error
- io
- memory
- scalar
- schema
- table
+ _array
+ _config
+ _error
+ _io
+ _memory
+ _table
)
set(LINK_LIBS
@@ -313,7 +311,7 @@ if (PYARROW_BUILD_JEMALLOC)
arrow_jemalloc_shared)
set(CYTHON_EXTENSIONS
${CYTHON_EXTENSIONS}
- jemalloc)
+ _jemalloc)
endif()
############################################################
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index df615b4..66bde49 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -25,49 +25,10 @@ except DistributionNotFound:
pass
-import pyarrow.config
-from pyarrow.config import cpu_count, set_cpu_count
+import pyarrow._config
+from pyarrow._config import cpu_count, set_cpu_count
-from pyarrow.array import (Array, Tensor, from_pylist,
- NumericArray, IntegerArray, FloatingPointArray,
- BooleanArray,
- Int8Array, UInt8Array,
- Int16Array, UInt16Array,
- Int32Array, UInt32Array,
- Int64Array, UInt64Array,
- ListArray, StringArray,
- DictionaryArray)
-
-from pyarrow.error import (ArrowException,
- ArrowKeyError,
- ArrowInvalid,
- ArrowIOError,
- ArrowMemoryError,
- ArrowNotImplementedError,
- ArrowTypeError)
-
-from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
-from pyarrow.io import (HdfsFile, NativeFile, PythonFileInterface,
- Buffer, BufferReader, InMemoryOutputStream,
- MemoryMappedFile, memory_map,
- frombuffer, read_tensor, write_tensor,
- memory_map, create_memory_map,
- get_record_batch_size, get_tensor_size)
-
-from pyarrow.ipc import FileReader, FileWriter, StreamReader, StreamWriter
-
-from pyarrow.memory import MemoryPool, total_allocated_bytes
-
-from pyarrow.scalar import (ArrayValue, Scalar, NA, NAType,
- BooleanValue,
- Int8Value, Int16Value, Int32Value, Int64Value,
- UInt8Value, UInt16Value, UInt32Value, UInt64Value,
- FloatValue, DoubleValue, ListValue,
- BinaryValue, StringValue, FixedSizeBinaryValue)
-
-import pyarrow.schema as _schema
-
-from pyarrow.schema import (null, bool_,
+from pyarrow._array import (null, bool_,
int8, int16, int32, int64,
uint8, uint16, uint32, uint64,
timestamp, date32, date64,
@@ -75,10 +36,45 @@ from pyarrow.schema import (null, bool_,
binary, string, decimal,
list_, struct, dictionary, field,
DataType, FixedSizeBinaryType,
- Field, Schema, schema)
+ Field, Schema, schema,
+ Array, Tensor,
+ from_pylist,
+ from_numpy_dtype,
+ NumericArray, IntegerArray, FloatingPointArray,
+ BooleanArray,
+ Int8Array, UInt8Array,
+ Int16Array, UInt16Array,
+ Int32Array, UInt32Array,
+ Int64Array, UInt64Array,
+ ListArray, StringArray,
+ DictionaryArray,
+ ArrayValue, Scalar, NA, NAType,
+ BooleanValue,
+ Int8Value, Int16Value, Int32Value, Int64Value,
+ UInt8Value, UInt16Value, UInt32Value, UInt64Value,
+ FloatValue, DoubleValue, ListValue,
+ BinaryValue, StringValue, FixedSizeBinaryValue)
+from pyarrow._io import (HdfsFile, NativeFile, PythonFileInterface,
+ Buffer, BufferReader, InMemoryOutputStream,
+ OSFile, MemoryMappedFile, memory_map,
+ frombuffer, read_tensor, write_tensor,
+ memory_map, create_memory_map,
+ get_record_batch_size, get_tensor_size)
+
+from pyarrow._memory import MemoryPool, total_allocated_bytes
+from pyarrow._table import Column, RecordBatch, Table, concat_tables
+from pyarrow._error import (ArrowException,
+ ArrowKeyError,
+ ArrowInvalid,
+ ArrowIOError,
+ ArrowMemoryError,
+ ArrowNotImplementedError,
+ ArrowTypeError)
-from pyarrow.table import Column, RecordBatch, Table, concat_tables
+from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
+
+from pyarrow.ipc import FileReader, FileWriter, StreamReader, StreamWriter
localfs = LocalFilesystem.get_instance()
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_array.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_array.pxd b/python/pyarrow/_array.pxd
new file mode 100644
index 0000000..4041374
--- /dev/null
+++ b/python/pyarrow/_array.pxd
@@ -0,0 +1,233 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport *
+
+from cpython cimport PyObject
+
+cdef extern from "Python.h":
+ int PySlice_Check(object)
+
+
+cdef class DataType:
+ cdef:
+ shared_ptr[CDataType] sp_type
+ CDataType* type
+
+ cdef void init(self, const shared_ptr[CDataType]& type)
+
+
+cdef class DictionaryType(DataType):
+ cdef:
+ const CDictionaryType* dict_type
+
+
+cdef class TimestampType(DataType):
+ cdef:
+ const CTimestampType* ts_type
+
+
+cdef class FixedSizeBinaryType(DataType):
+ cdef:
+ const CFixedSizeBinaryType* fixed_size_binary_type
+
+
+cdef class DecimalType(FixedSizeBinaryType):
+ cdef:
+ const CDecimalType* decimal_type
+
+
+cdef class Field:
+ cdef:
+ shared_ptr[CField] sp_field
+ CField* field
+
+ cdef readonly:
+ DataType type
+
+ cdef init(self, const shared_ptr[CField]& field)
+
+
+cdef class Schema:
+ cdef:
+ shared_ptr[CSchema] sp_schema
+ CSchema* schema
+
+ cdef init(self, const vector[shared_ptr[CField]]& fields)
+ cdef init_schema(self, const shared_ptr[CSchema]& schema)
+
+
+cdef class Scalar:
+ cdef readonly:
+ DataType type
+
+
+cdef class NAType(Scalar):
+ pass
+
+
+cdef class ArrayValue(Scalar):
+ cdef:
+ shared_ptr[CArray] sp_array
+ int64_t index
+
+ cdef void init(self, DataType type,
+ const shared_ptr[CArray]& sp_array, int64_t index)
+
+ cdef void _set_array(self, const shared_ptr[CArray]& sp_array)
+
+
+cdef class Int8Value(ArrayValue):
+ pass
+
+
+cdef class Int64Value(ArrayValue):
+ pass
+
+
+cdef class ListValue(ArrayValue):
+ cdef readonly:
+ DataType value_type
+
+ cdef:
+ CListArray* ap
+
+ cdef getitem(self, int64_t i)
+
+
+cdef class StringValue(ArrayValue):
+ pass
+
+
+cdef class FixedSizeBinaryValue(ArrayValue):
+ pass
+
+
+cdef class Array:
+ cdef:
+ shared_ptr[CArray] sp_array
+ CArray* ap
+
+ cdef readonly:
+ DataType type
+
+ cdef init(self, const shared_ptr[CArray]& sp_array)
+ cdef getitem(self, int64_t i)
+
+
+cdef class Tensor:
+ cdef:
+ shared_ptr[CTensor] sp_tensor
+ CTensor* tp
+
+ cdef readonly:
+ DataType type
+
+ cdef init(self, const shared_ptr[CTensor]& sp_tensor)
+
+
+cdef class BooleanArray(Array):
+ pass
+
+
+cdef class NumericArray(Array):
+ pass
+
+
+cdef class IntegerArray(NumericArray):
+ pass
+
+
+cdef class FloatingPointArray(NumericArray):
+ pass
+
+
+cdef class Int8Array(IntegerArray):
+ pass
+
+
+cdef class UInt8Array(IntegerArray):
+ pass
+
+
+cdef class Int16Array(IntegerArray):
+ pass
+
+
+cdef class UInt16Array(IntegerArray):
+ pass
+
+
+cdef class Int32Array(IntegerArray):
+ pass
+
+
+cdef class UInt32Array(IntegerArray):
+ pass
+
+
+cdef class Int64Array(IntegerArray):
+ pass
+
+
+cdef class UInt64Array(IntegerArray):
+ pass
+
+
+cdef class FloatArray(FloatingPointArray):
+ pass
+
+
+cdef class DoubleArray(FloatingPointArray):
+ pass
+
+
+cdef class FixedSizeBinaryArray(Array):
+ pass
+
+
+cdef class DecimalArray(FixedSizeBinaryArray):
+ pass
+
+
+cdef class ListArray(Array):
+ pass
+
+
+cdef class StringArray(Array):
+ pass
+
+
+cdef class BinaryArray(Array):
+ pass
+
+
+cdef class DictionaryArray(Array):
+ cdef:
+ object _indices, _dictionary
+
+
+cdef wrap_array_output(PyObject* output)
+cdef DataType box_data_type(const shared_ptr[CDataType]& type)
+cdef Field box_field(const shared_ptr[CField]& field)
+cdef Schema box_schema(const shared_ptr[CSchema]& schema)
+cdef object box_array(const shared_ptr[CArray]& sp_array)
+cdef object box_tensor(const shared_ptr[CTensor]& sp_tensor)
+cdef object box_scalar(DataType type,
+ const shared_ptr[CArray]& sp_array,
+ int64_t index)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_array.pyx b/python/pyarrow/_array.pyx
new file mode 100644
index 0000000..7ef8e58
--- /dev/null
+++ b/python/pyarrow/_array.pyx
@@ -0,0 +1,1368 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from cython.operator cimport dereference as deref
+from pyarrow.includes.libarrow cimport *
+from pyarrow.includes.common cimport PyObject_to_object
+cimport pyarrow.includes.pyarrow as pyarrow
+from pyarrow._error cimport check_status
+from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
+cimport cpython as cp
+
+
+import datetime
+import decimal as _pydecimal
+import numpy as np
+import six
+import pyarrow._config
+from pyarrow.compat import frombytes, tobytes, PandasSeries, Categorical
+
+
+cdef _pandas():
+ import pandas as pd
+ return pd
+
+
+cdef class DataType:
+
+ def __cinit__(self):
+ pass
+
+ cdef void init(self, const shared_ptr[CDataType]& type):
+ self.sp_type = type
+ self.type = type.get()
+
+ def __str__(self):
+ return frombytes(self.type.ToString())
+
+ def __repr__(self):
+ return '{0.__class__.__name__}({0})'.format(self)
+
+ def __richcmp__(DataType self, DataType other, int op):
+ if op == cp.Py_EQ:
+ return self.type.Equals(deref(other.type))
+ elif op == cp.Py_NE:
+ return not self.type.Equals(deref(other.type))
+ else:
+ raise TypeError('Invalid comparison')
+
+
+cdef class DictionaryType(DataType):
+
+ cdef void init(self, const shared_ptr[CDataType]& type):
+ DataType.init(self, type)
+ self.dict_type = <const CDictionaryType*> type.get()
+
+
+cdef class TimestampType(DataType):
+
+ cdef void init(self, const shared_ptr[CDataType]& type):
+ DataType.init(self, type)
+ self.ts_type = <const CTimestampType*> type.get()
+
+ property unit:
+
+ def __get__(self):
+ return timeunit_to_string(self.ts_type.unit())
+
+ property tz:
+
+ def __get__(self):
+ if self.ts_type.timezone().size() > 0:
+ return frombytes(self.ts_type.timezone())
+ else:
+ return None
+
+
+cdef class FixedSizeBinaryType(DataType):
+
+ cdef void init(self, const shared_ptr[CDataType]& type):
+ DataType.init(self, type)
+ self.fixed_size_binary_type = (
+ <const CFixedSizeBinaryType*> type.get())
+
+ property byte_width:
+
+ def __get__(self):
+ return self.fixed_size_binary_type.byte_width()
+
+
+cdef class DecimalType(FixedSizeBinaryType):
+
+ cdef void init(self, const shared_ptr[CDataType]& type):
+ DataType.init(self, type)
+ self.decimal_type = <const CDecimalType*> type.get()
+
+
+cdef class Field:
+
+ def __cinit__(self):
+ pass
+
+ cdef init(self, const shared_ptr[CField]& field):
+ self.sp_field = field
+ self.field = field.get()
+ self.type = box_data_type(field.get().type())
+
+ @classmethod
+ def from_py(cls, object name, DataType type, bint nullable=True):
+ cdef Field result = Field()
+ result.type = type
+ result.sp_field.reset(new CField(tobytes(name), type.sp_type,
+ nullable))
+ result.field = result.sp_field.get()
+
+ return result
+
+ def __repr__(self):
+ return 'Field({0!r}, type={1})'.format(self.name, str(self.type))
+
+ property nullable:
+
+ def __get__(self):
+ return self.field.nullable()
+
+ property name:
+
+ def __get__(self):
+ if box_field(self.sp_field) is None:
+ raise ReferenceError(
+ 'Field not initialized (references NULL pointer)')
+ return frombytes(self.field.name())
+
+
+cdef class Schema:
+
+ def __cinit__(self):
+ pass
+
+ def __len__(self):
+ return self.schema.num_fields()
+
+ def __getitem__(self, i):
+ if i < 0 or i >= len(self):
+ raise IndexError("{0} is out of bounds".format(i))
+
+ cdef Field result = Field()
+ result.init(self.schema.field(i))
+ result.type = box_data_type(result.field.type())
+
+ return result
+
+ cdef init(self, const vector[shared_ptr[CField]]& fields):
+ self.schema = new CSchema(fields)
+ self.sp_schema.reset(self.schema)
+
+ cdef init_schema(self, const shared_ptr[CSchema]& schema):
+ self.schema = schema.get()
+ self.sp_schema = schema
+
+ def equals(self, other):
+ """
+ Test if this schema is equal to the other
+ """
+ cdef Schema _other
+ _other = other
+
+ return self.sp_schema.get().Equals(deref(_other.schema))
+
+ def field_by_name(self, name):
+ """
+ Access a field by its name rather than the column index.
+
+ Parameters
+ ----------
+ name: str
+
+ Returns
+ -------
+ field: pyarrow.Field
+ """
+ return box_field(self.schema.GetFieldByName(tobytes(name)))
+
+ @classmethod
+ def from_fields(cls, fields):
+ cdef:
+ Schema result
+ Field field
+ vector[shared_ptr[CField]] c_fields
+
+ c_fields.resize(len(fields))
+
+ for i in range(len(fields)):
+ field = fields[i]
+ c_fields[i] = field.sp_field
+
+ result = Schema()
+ result.init(c_fields)
+
+ return result
+
+ def __str__(self):
+ return frombytes(self.schema.ToString())
+
+ def __repr__(self):
+ return self.__str__()
+
+
+cdef dict _type_cache = {}
+
+
+cdef DataType primitive_type(Type type):
+ if type in _type_cache:
+ return _type_cache[type]
+
+ cdef DataType out = DataType()
+ out.init(pyarrow.GetPrimitiveType(type))
+
+ _type_cache[type] = out
+ return out
+
+#------------------------------------------------------------
+# Type factory functions
+
+def field(name, type, bint nullable=True):
+ return Field.from_py(name, type, nullable)
+
+
+cdef set PRIMITIVE_TYPES = set([
+ Type_NA, Type_BOOL,
+ Type_UINT8, Type_INT8,
+ Type_UINT16, Type_INT16,
+ Type_UINT32, Type_INT32,
+ Type_UINT64, Type_INT64,
+ Type_TIMESTAMP, Type_DATE32,
+ Type_DATE64,
+ Type_HALF_FLOAT,
+ Type_FLOAT,
+ Type_DOUBLE])
+
+
+def null():
+ return primitive_type(Type_NA)
+
+
+def bool_():
+ return primitive_type(Type_BOOL)
+
+
+def uint8():
+ return primitive_type(Type_UINT8)
+
+
+def int8():
+ return primitive_type(Type_INT8)
+
+
+def uint16():
+ return primitive_type(Type_UINT16)
+
+
+def int16():
+ return primitive_type(Type_INT16)
+
+
+def uint32():
+ return primitive_type(Type_UINT32)
+
+
+def int32():
+ return primitive_type(Type_INT32)
+
+
+def uint64():
+ return primitive_type(Type_UINT64)
+
+
+def int64():
+ return primitive_type(Type_INT64)
+
+
+cdef dict _timestamp_type_cache = {}
+
+
+cdef timeunit_to_string(TimeUnit unit):
+ if unit == TimeUnit_SECOND:
+ return 's'
+ elif unit == TimeUnit_MILLI:
+ return 'ms'
+ elif unit == TimeUnit_MICRO:
+ return 'us'
+ elif unit == TimeUnit_NANO:
+ return 'ns'
+
+
+def timestamp(unit_str, tz=None):
+ cdef:
+ TimeUnit unit
+ c_string c_timezone
+
+ if unit_str == "s":
+ unit = TimeUnit_SECOND
+ elif unit_str == 'ms':
+ unit = TimeUnit_MILLI
+ elif unit_str == 'us':
+ unit = TimeUnit_MICRO
+ elif unit_str == 'ns':
+ unit = TimeUnit_NANO
+ else:
+ raise TypeError('Invalid TimeUnit string')
+
+ cdef TimestampType out = TimestampType()
+
+ if tz is None:
+ out.init(ctimestamp(unit))
+ if unit in _timestamp_type_cache:
+ return _timestamp_type_cache[unit]
+ _timestamp_type_cache[unit] = out
+ else:
+ if not isinstance(tz, six.string_types):
+ tz = tz.zone
+
+ c_timezone = tobytes(tz)
+ out.init(ctimestamp(unit, c_timezone))
+
+ return out
+
+
+def date32():
+ return primitive_type(Type_DATE32)
+
+
+def date64():
+ return primitive_type(Type_DATE64)
+
+
+def float16():
+ return primitive_type(Type_HALF_FLOAT)
+
+
+def float32():
+ return primitive_type(Type_FLOAT)
+
+
+def float64():
+ return primitive_type(Type_DOUBLE)
+
+
+cpdef DataType decimal(int precision, int scale=0):
+ cdef shared_ptr[CDataType] decimal_type
+ decimal_type.reset(new CDecimalType(precision, scale))
+ return box_data_type(decimal_type)
+
+
+def string():
+ """
+ UTF8 string
+ """
+ return primitive_type(Type_STRING)
+
+
+def binary(int length=-1):
+ """Binary (PyBytes-like) type
+
+ Parameters
+ ----------
+ length : int, optional, default -1
+ If length == -1 then return a variable length binary type. If length is
+ greater than or equal to 0 then return a fixed size binary type of
+ width `length`.
+ """
+ if length == -1:
+ return primitive_type(Type_BINARY)
+
+ cdef shared_ptr[CDataType] fixed_size_binary_type
+ fixed_size_binary_type.reset(new CFixedSizeBinaryType(length))
+ return box_data_type(fixed_size_binary_type)
+
+
+def list_(DataType value_type):
+ cdef DataType out = DataType()
+ cdef shared_ptr[CDataType] list_type
+ list_type.reset(new CListType(value_type.sp_type))
+ out.init(list_type)
+ return out
+
+
+def dictionary(DataType index_type, Array dictionary):
+ """
+ Dictionary (categorical, or simply encoded) type
+ """
+ cdef DictionaryType out = DictionaryType()
+ cdef shared_ptr[CDataType] dict_type
+ dict_type.reset(new CDictionaryType(index_type.sp_type,
+ dictionary.sp_array))
+ out.init(dict_type)
+ return out
+
+
+def struct(fields):
+ """
+
+ """
+ cdef:
+ DataType out = DataType()
+ Field field
+ vector[shared_ptr[CField]] c_fields
+ cdef shared_ptr[CDataType] struct_type
+
+ for field in fields:
+ c_fields.push_back(field.sp_field)
+
+ struct_type.reset(new CStructType(c_fields))
+ out.init(struct_type)
+ return out
+
+
+def schema(fields):
+ return Schema.from_fields(fields)
+
+
+cdef DataType box_data_type(const shared_ptr[CDataType]& type):
+ cdef:
+ DataType out
+
+ if type.get() == NULL:
+ return None
+
+ if type.get().id() == Type_DICTIONARY:
+ out = DictionaryType()
+ elif type.get().id() == Type_TIMESTAMP:
+ out = TimestampType()
+ elif type.get().id() == Type_FIXED_SIZE_BINARY:
+ out = FixedSizeBinaryType()
+ elif type.get().id() == Type_DECIMAL:
+ out = DecimalType()
+ else:
+ out = DataType()
+
+ out.init(type)
+ return out
+
+cdef Field box_field(const shared_ptr[CField]& field):
+ if field.get() == NULL:
+ return None
+ cdef Field out = Field()
+ out.init(field)
+ return out
+
+cdef Schema box_schema(const shared_ptr[CSchema]& type):
+ cdef Schema out = Schema()
+ out.init_schema(type)
+ return out
+
+
+def from_numpy_dtype(object dtype):
+ cdef shared_ptr[CDataType] c_type
+ with nogil:
+ check_status(pyarrow.NumPyDtypeToArrow(dtype, &c_type))
+
+ return box_data_type(c_type)
+
+
+NA = None
+
+
+cdef class NAType(Scalar):
+
+ def __cinit__(self):
+ global NA
+ if NA is not None:
+ raise Exception('Cannot create multiple NAType instances')
+
+ self.type = null()
+
+ def __repr__(self):
+ return 'NA'
+
+ def as_py(self):
+ return None
+
+
+NA = NAType()
+
+
+cdef class ArrayValue(Scalar):
+
+ cdef void init(self, DataType type, const shared_ptr[CArray]& sp_array,
+ int64_t index):
+ self.type = type
+ self.index = index
+ self._set_array(sp_array)
+
+ cdef void _set_array(self, const shared_ptr[CArray]& sp_array):
+ self.sp_array = sp_array
+
+ def __repr__(self):
+ if hasattr(self, 'as_py'):
+ return repr(self.as_py())
+ else:
+ return super(Scalar, self).__repr__()
+
+
+cdef class BooleanValue(ArrayValue):
+
+ def as_py(self):
+ cdef CBooleanArray* ap = <CBooleanArray*> self.sp_array.get()
+ return ap.Value(self.index)
+
+
+cdef class Int8Value(ArrayValue):
+
+ def as_py(self):
+ cdef CInt8Array* ap = <CInt8Array*> self.sp_array.get()
+ return ap.Value(self.index)
+
+
+cdef class UInt8Value(ArrayValue):
+
+ def as_py(self):
+ cdef CUInt8Array* ap = <CUInt8Array*> self.sp_array.get()
+ return ap.Value(self.index)
+
+
+cdef class Int16Value(ArrayValue):
+
+ def as_py(self):
+ cdef CInt16Array* ap = <CInt16Array*> self.sp_array.get()
+ return ap.Value(self.index)
+
+
+cdef class UInt16Value(ArrayValue):
+
+ def as_py(self):
+ cdef CUInt16Array* ap = <CUInt16Array*> self.sp_array.get()
+ return ap.Value(self.index)
+
+
+cdef class Int32Value(ArrayValue):
+
+ def as_py(self):
+ cdef CInt32Array* ap = <CInt32Array*> self.sp_array.get()
+ return ap.Value(self.index)
+
+
+cdef class UInt32Value(ArrayValue):
+
+ def as_py(self):
+ cdef CUInt32Array* ap = <CUInt32Array*> self.sp_array.get()
+ return ap.Value(self.index)
+
+
+cdef class Int64Value(ArrayValue):
+
+ def as_py(self):
+ cdef CInt64Array* ap = <CInt64Array*> self.sp_array.get()
+ return ap.Value(self.index)
+
+
+cdef class UInt64Value(ArrayValue):
+
+ def as_py(self):
+ cdef CUInt64Array* ap = <CUInt64Array*> self.sp_array.get()
+ return ap.Value(self.index)
+
+
+cdef class Date32Value(ArrayValue):
+
+ def as_py(self):
+ cdef CDate32Array* ap = <CDate32Array*> self.sp_array.get()
+
+ # Shift to seconds since epoch
+ return datetime.datetime.utcfromtimestamp(
+ int(ap.Value(self.index)) * 86400).date()
+
+
+cdef class Date64Value(ArrayValue):
+
+ def as_py(self):
+ cdef CDate64Array* ap = <CDate64Array*> self.sp_array.get()
+ return datetime.datetime.utcfromtimestamp(
+ ap.Value(self.index) / 1000).date()
+
+
+cdef class TimestampValue(ArrayValue):
+
+ def as_py(self):
+ cdef:
+ CTimestampArray* ap = <CTimestampArray*> self.sp_array.get()
+ CTimestampType* dtype = <CTimestampType*>ap.type().get()
+ int64_t val = ap.Value(self.index)
+
+ timezone = None
+ tzinfo = None
+ if dtype.timezone().size() > 0:
+ timezone = frombytes(dtype.timezone())
+ import pytz
+ tzinfo = pytz.timezone(timezone)
+
+ try:
+ pd = _pandas()
+ if dtype.unit() == TimeUnit_SECOND:
+ val = val * 1000000000
+ elif dtype.unit() == TimeUnit_MILLI:
+ val = val * 1000000
+ elif dtype.unit() == TimeUnit_MICRO:
+ val = val * 1000
+ return pd.Timestamp(val, tz=tzinfo)
+ except ImportError:
+ if dtype.unit() == TimeUnit_SECOND:
+ result = datetime.datetime.utcfromtimestamp(val)
+ elif dtype.unit() == TimeUnit_MILLI:
+ result = datetime.datetime.utcfromtimestamp(float(val) / 1000)
+ elif dtype.unit() == TimeUnit_MICRO:
+ result = datetime.datetime.utcfromtimestamp(
+ float(val) / 1000000)
+ else:
+ # TimeUnit_NANO
+ raise NotImplementedError("Cannot convert nanosecond "
+ "timestamps without pandas")
+ if timezone is not None:
+ result = result.replace(tzinfo=tzinfo)
+ return result
+
+
+cdef class FloatValue(ArrayValue):
+
+ def as_py(self):
+ cdef CFloatArray* ap = <CFloatArray*> self.sp_array.get()
+ return ap.Value(self.index)
+
+
+cdef class DoubleValue(ArrayValue):
+
+ def as_py(self):
+ cdef CDoubleArray* ap = <CDoubleArray*> self.sp_array.get()
+ return ap.Value(self.index)
+
+
+cdef class DecimalValue(ArrayValue):
+
+ def as_py(self):
+ cdef:
+ CDecimalArray* ap = <CDecimalArray*> self.sp_array.get()
+ c_string s = ap.FormatValue(self.index)
+ return _pydecimal.Decimal(s.decode('utf8'))
+
+
+cdef class StringValue(ArrayValue):
+
+ def as_py(self):
+ cdef CStringArray* ap = <CStringArray*> self.sp_array.get()
+ return ap.GetString(self.index).decode('utf-8')
+
+
+cdef class BinaryValue(ArrayValue):
+
+ def as_py(self):
+ cdef:
+ const uint8_t* ptr
+ int32_t length
+ CBinaryArray* ap = <CBinaryArray*> self.sp_array.get()
+
+ ptr = ap.GetValue(self.index, &length)
+ return cp.PyBytes_FromStringAndSize(<const char*>(ptr), length)
+
+
+cdef class ListValue(ArrayValue):
+
+ def __len__(self):
+ return self.ap.value_length(self.index)
+
+ def __getitem__(self, i):
+ return self.getitem(i)
+
+ def __iter__(self):
+ for i in range(len(self)):
+ yield self.getitem(i)
+ raise StopIteration
+
+ cdef void _set_array(self, const shared_ptr[CArray]& sp_array):
+ self.sp_array = sp_array
+ self.ap = <CListArray*> sp_array.get()
+ self.value_type = box_data_type(self.ap.value_type())
+
+ cdef getitem(self, int64_t i):
+ cdef int64_t j = self.ap.value_offset(self.index) + i
+ return box_scalar(self.value_type, self.ap.values(), j)
+
+ def as_py(self):
+ cdef:
+ int64_t j
+ list result = []
+
+ for j in range(len(self)):
+ result.append(self.getitem(j).as_py())
+
+ return result
+
+
+cdef class FixedSizeBinaryValue(ArrayValue):
+
+ def as_py(self):
+ cdef:
+ CFixedSizeBinaryArray* ap
+ CFixedSizeBinaryType* ap_type
+ int32_t length
+ const char* data
+ ap = <CFixedSizeBinaryArray*> self.sp_array.get()
+ ap_type = <CFixedSizeBinaryType*> ap.type().get()
+ length = ap_type.byte_width()
+ data = <const char*> ap.GetValue(self.index)
+ return cp.PyBytes_FromStringAndSize(data, length)
+
+
+
+cdef dict _scalar_classes = {
+ Type_BOOL: BooleanValue,
+ Type_UINT8: Int8Value,
+ Type_UINT16: Int16Value,
+ Type_UINT32: Int32Value,
+ Type_UINT64: Int64Value,
+ Type_INT8: Int8Value,
+ Type_INT16: Int16Value,
+ Type_INT32: Int32Value,
+ Type_INT64: Int64Value,
+ Type_DATE32: Date32Value,
+ Type_DATE64: Date64Value,
+ Type_TIMESTAMP: TimestampValue,
+ Type_FLOAT: FloatValue,
+ Type_DOUBLE: DoubleValue,
+ Type_LIST: ListValue,
+ Type_BINARY: BinaryValue,
+ Type_STRING: StringValue,
+ Type_FIXED_SIZE_BINARY: FixedSizeBinaryValue,
+ Type_DECIMAL: DecimalValue,
+}
+
+cdef object box_scalar(DataType type, const shared_ptr[CArray]& sp_array,
+ int64_t index):
+ cdef ArrayValue val
+ if type.type.id() == Type_NA:
+ return NA
+ elif sp_array.get().IsNull(index):
+ return NA
+ else:
+ val = _scalar_classes[type.type.id()]()
+ val.init(type, sp_array, index)
+ return val
+
+
+cdef maybe_coerce_datetime64(values, dtype, DataType type,
+ timestamps_to_ms=False):
+
+ from pyarrow.compat import DatetimeTZDtype
+
+ if values.dtype.type != np.datetime64:
+ return values, type
+
+ coerce_ms = timestamps_to_ms and values.dtype != 'datetime64[ms]'
+
+ if coerce_ms:
+ values = values.astype('datetime64[ms]')
+
+ if isinstance(dtype, DatetimeTZDtype):
+ tz = dtype.tz
+ unit = 'ms' if coerce_ms else dtype.unit
+ type = timestamp(unit, tz)
+ elif type is None:
+ # Trust the NumPy dtype
+ type = from_numpy_dtype(values.dtype)
+
+ return values, type
+
+
+cdef class Array:
+
+ cdef init(self, const shared_ptr[CArray]& sp_array):
+ self.sp_array = sp_array
+ self.ap = sp_array.get()
+ self.type = box_data_type(self.sp_array.get().type())
+
+ @staticmethod
+ def from_numpy(obj, mask=None, DataType type=None,
+ timestamps_to_ms=False,
+ MemoryPool memory_pool=None):
+ """
+ Convert pandas.Series to an Arrow Array.
+
+ Parameters
+ ----------
+ series : pandas.Series or numpy.ndarray
+
+ mask : pandas.Series or numpy.ndarray, optional
+ boolean mask if the object is valid or null
+
+ type : pyarrow.DataType
+ Explicit type to attempt to coerce to
+
+ timestamps_to_ms : bool, optional
+ Convert datetime columns to ms resolution. This is needed for
+ compatibility with other functionality like Parquet I/O which
+ only supports milliseconds.
+
+ memory_pool: MemoryPool, optional
+ Specific memory pool to use to allocate the resulting Arrow array.
+
+ Notes
+ -----
+ Localized timestamps will currently be returned as UTC (pandas's native
+ representation). Timezone-naive data will be implicitly interpreted as
+ UTC.
+
+ Examples
+ --------
+
+ >>> import pandas as pd
+ >>> import pyarrow as pa
+ >>> pa.Array.from_numpy(pd.Series([1, 2]))
+ <pyarrow.array.Int64Array object at 0x7f674e4c0e10>
+ [
+ 1,
+ 2
+ ]
+
+ >>> import numpy as np
+ >>> pa.Array.from_numpy(pd.Series([1, 2]), np.array([0, 1],
+ ... dtype=bool))
+ <pyarrow.array.Int64Array object at 0x7f9019e11208>
+ [
+ 1,
+ NA
+ ]
+
+ Returns
+ -------
+ pyarrow.array.Array
+ """
+ cdef:
+ shared_ptr[CArray] out
+ shared_ptr[CDataType] c_type
+ CMemoryPool* pool
+
+ if mask is not None:
+ mask = get_series_values(mask)
+
+ values = get_series_values(obj)
+ pool = maybe_unbox_memory_pool(memory_pool)
+
+ if isinstance(values, Categorical):
+ return DictionaryArray.from_arrays(
+ values.codes, values.categories.values,
+ mask=mask, memory_pool=memory_pool)
+ elif values.dtype == object:
+ # Object dtype undergoes a different conversion path as more type
+ # inference may be needed
+ if type is not None:
+ c_type = type.sp_type
+ with nogil:
+ check_status(pyarrow.PandasObjectsToArrow(
+ pool, values, mask, c_type, &out))
+ else:
+ values, type = maybe_coerce_datetime64(
+ values, obj.dtype, type, timestamps_to_ms=timestamps_to_ms)
+
+ if type is None:
+ check_status(pyarrow.NumPyDtypeToArrow(values.dtype, &c_type))
+ else:
+ c_type = type.sp_type
+
+ with nogil:
+ check_status(pyarrow.PandasToArrow(
+ pool, values, mask, c_type, &out))
+
+ return box_array(out)
+
+ @staticmethod
+ def from_list(object list_obj, DataType type=None,
+ MemoryPool memory_pool=None):
+ """
+ Convert Python list to Arrow array
+
+ Parameters
+ ----------
+ list_obj : array_like
+
+ Returns
+ -------
+ pyarrow.array.Array
+ """
+ cdef:
+ shared_ptr[CArray] sp_array
+ CMemoryPool* pool
+
+ pool = maybe_unbox_memory_pool(memory_pool)
+ if type is None:
+ check_status(pyarrow.ConvertPySequence(list_obj, pool, &sp_array))
+ else:
+ check_status(
+ pyarrow.ConvertPySequence(
+ list_obj, pool, &sp_array, type.sp_type
+ )
+ )
+
+ return box_array(sp_array)
+
+ property null_count:
+
+ def __get__(self):
+ return self.sp_array.get().null_count()
+
+ def __iter__(self):
+ for i in range(len(self)):
+ yield self.getitem(i)
+ raise StopIteration
+
+ def __repr__(self):
+ from pyarrow.formatting import array_format
+ type_format = object.__repr__(self)
+ values = array_format(self, window=10)
+ return '{0}\n{1}'.format(type_format, values)
+
+ def equals(Array self, Array other):
+ return self.ap.Equals(deref(other.ap))
+
+ def __len__(self):
+ if self.sp_array.get():
+ return self.sp_array.get().length()
+ else:
+ return 0
+
+ def isnull(self):
+ raise NotImplemented
+
+ def __getitem__(self, key):
+ cdef:
+ Py_ssize_t n = len(self)
+
+ if PySlice_Check(key):
+ start = key.start or 0
+ while start < 0:
+ start += n
+
+ stop = key.stop if key.stop is not None else n
+ while stop < 0:
+ stop += n
+
+ step = key.step or 1
+ if step != 1:
+ raise IndexError('only slices with step 1 supported')
+ else:
+ return self.slice(start, stop - start)
+
+ while key < 0:
+ key += len(self)
+
+ return self.getitem(key)
+
+ cdef getitem(self, int64_t i):
+ return box_scalar(self.type, self.sp_array, i)
+
+ def slice(self, offset=0, length=None):
+ """
+ Compute zero-copy slice of this array
+
+ Parameters
+ ----------
+ offset : int, default 0
+ Offset from start of array to slice
+ length : int, default None
+ Length of slice (default is until end of Array starting from
+ offset)
+
+ Returns
+ -------
+ sliced : RecordBatch
+ """
+ cdef:
+ shared_ptr[CArray] result
+
+ if offset < 0:
+ raise IndexError('Offset must be non-negative')
+
+ if length is None:
+ result = self.ap.Slice(offset)
+ else:
+ result = self.ap.Slice(offset, length)
+
+ return box_array(result)
+
+ def to_pandas(self):
+ """
+ Convert to an array object suitable for use in pandas
+
+ See also
+ --------
+ Column.to_pandas
+ Table.to_pandas
+ RecordBatch.to_pandas
+ """
+ cdef:
+ PyObject* out
+
+ with nogil:
+ check_status(
+ pyarrow.ConvertArrayToPandas(self.sp_array, <PyObject*> self,
+ &out))
+ return wrap_array_output(out)
+
+ def to_pylist(self):
+ """
+ Convert to an list of native Python objects.
+ """
+ return [x.as_py() for x in self]
+
+
+cdef class Tensor:
+
+ cdef init(self, const shared_ptr[CTensor]& sp_tensor):
+ self.sp_tensor = sp_tensor
+ self.tp = sp_tensor.get()
+ self.type = box_data_type(self.tp.type())
+
+ def __repr__(self):
+ return """<pyarrow.Tensor>
+type: {0}
+shape: {1}
+strides: {2}""".format(self.type, self.shape, self.strides)
+
+ @staticmethod
+ def from_numpy(obj):
+ cdef shared_ptr[CTensor] ctensor
+ check_status(pyarrow.NdarrayToTensor(default_memory_pool(),
+ obj, &ctensor))
+ return box_tensor(ctensor)
+
+ def to_numpy(self):
+ """
+ Convert arrow::Tensor to numpy.ndarray with zero copy
+ """
+ cdef:
+ PyObject* out
+
+ check_status(pyarrow.TensorToNdarray(deref(self.tp), <PyObject*> self,
+ &out))
+ return PyObject_to_object(out)
+
+ def equals(self, Tensor other):
+ """
+ Return true if the tensors contains exactly equal data
+ """
+ return self.tp.Equals(deref(other.tp))
+
+ property is_mutable:
+
+ def __get__(self):
+ return self.tp.is_mutable()
+
+ property is_contiguous:
+
+ def __get__(self):
+ return self.tp.is_contiguous()
+
+ property ndim:
+
+ def __get__(self):
+ return self.tp.ndim()
+
+ property size:
+
+ def __get__(self):
+ return self.tp.size()
+
+ property shape:
+
+ def __get__(self):
+ cdef size_t i
+ py_shape = []
+ for i in range(self.tp.shape().size()):
+ py_shape.append(self.tp.shape()[i])
+ return py_shape
+
+ property strides:
+
+ def __get__(self):
+ cdef size_t i
+ py_strides = []
+ for i in range(self.tp.strides().size()):
+ py_strides.append(self.tp.strides()[i])
+ return py_strides
+
+
+
+cdef wrap_array_output(PyObject* output):
+ cdef object obj = PyObject_to_object(output)
+
+ if isinstance(obj, dict):
+ return Categorical(obj['indices'],
+ categories=obj['dictionary'],
+ fastpath=True)
+ else:
+ return obj
+
+
+cdef class NullArray(Array):
+ pass
+
+
+cdef class BooleanArray(Array):
+ pass
+
+
+cdef class NumericArray(Array):
+ pass
+
+
+cdef class IntegerArray(NumericArray):
+ pass
+
+
+cdef class FloatingPointArray(NumericArray):
+ pass
+
+
+cdef class Int8Array(IntegerArray):
+ pass
+
+
+cdef class UInt8Array(IntegerArray):
+ pass
+
+
+cdef class Int16Array(IntegerArray):
+ pass
+
+
+cdef class UInt16Array(IntegerArray):
+ pass
+
+
+cdef class Int32Array(IntegerArray):
+ pass
+
+
+cdef class UInt32Array(IntegerArray):
+ pass
+
+
+cdef class Int64Array(IntegerArray):
+ pass
+
+
+cdef class UInt64Array(IntegerArray):
+ pass
+
+
+cdef class Date32Array(NumericArray):
+ pass
+
+
+cdef class Date64Array(NumericArray):
+ pass
+
+
+cdef class TimestampArray(NumericArray):
+ pass
+
+
+cdef class Time32Array(NumericArray):
+ pass
+
+
+cdef class Time64Array(NumericArray):
+ pass
+
+
+cdef class FloatArray(FloatingPointArray):
+ pass
+
+
+cdef class DoubleArray(FloatingPointArray):
+ pass
+
+
+cdef class FixedSizeBinaryArray(Array):
+ pass
+
+
+cdef class DecimalArray(FixedSizeBinaryArray):
+ pass
+
+
+cdef class ListArray(Array):
+ pass
+
+
+cdef class StringArray(Array):
+ pass
+
+
+cdef class BinaryArray(Array):
+ pass
+
+
+cdef class DictionaryArray(Array):
+
+ cdef getitem(self, int64_t i):
+ cdef Array dictionary = self.dictionary
+ index = self.indices[i]
+ if index is NA:
+ return index
+ else:
+ return box_scalar(dictionary.type, dictionary.sp_array,
+ index.as_py())
+
+ property dictionary:
+
+ def __get__(self):
+ cdef CDictionaryArray* darr = <CDictionaryArray*>(self.ap)
+
+ if self._dictionary is None:
+ self._dictionary = box_array(darr.dictionary())
+
+ return self._dictionary
+
+ property indices:
+
+ def __get__(self):
+ cdef CDictionaryArray* darr = <CDictionaryArray*>(self.ap)
+
+ if self._indices is None:
+ self._indices = box_array(darr.indices())
+
+ return self._indices
+
+ @staticmethod
+ def from_arrays(indices, dictionary, mask=None,
+ MemoryPool memory_pool=None):
+ """
+ Construct Arrow DictionaryArray from array of indices (must be
+ non-negative integers) and corresponding array of dictionary values
+
+ Parameters
+ ----------
+ indices : ndarray or pandas.Series, integer type
+ dictionary : ndarray or pandas.Series
+ mask : ndarray or pandas.Series, boolean type
+ True values indicate that indices are actually null
+
+ Returns
+ -------
+ dict_array : DictionaryArray
+ """
+ cdef:
+ Array arrow_indices, arrow_dictionary
+ DictionaryArray result
+ shared_ptr[CDataType] c_type
+ shared_ptr[CArray] c_result
+
+ if isinstance(indices, Array):
+ if mask is not None:
+ raise NotImplementedError(
+ "mask not implemented with Arrow array inputs yet")
+ arrow_indices = indices
+ else:
+ if mask is None:
+ mask = indices == -1
+ else:
+ mask = mask | (indices == -1)
+ arrow_indices = Array.from_numpy(indices, mask=mask,
+ memory_pool=memory_pool)
+
+ if isinstance(dictionary, Array):
+ arrow_dictionary = dictionary
+ else:
+ arrow_dictionary = Array.from_numpy(dictionary,
+ memory_pool=memory_pool)
+
+ if not isinstance(arrow_indices, IntegerArray):
+ raise ValueError('Indices must be integer type')
+
+ c_type.reset(new CDictionaryType(arrow_indices.type.sp_type,
+ arrow_dictionary.sp_array))
+ c_result.reset(new CDictionaryArray(c_type, arrow_indices.sp_array))
+
+ result = DictionaryArray()
+ result.init(c_result)
+ return result
+
+
+cdef dict _array_classes = {
+ Type_NA: NullArray,
+ Type_BOOL: BooleanArray,
+ Type_UINT8: UInt8Array,
+ Type_UINT16: UInt16Array,
+ Type_UINT32: UInt32Array,
+ Type_UINT64: UInt64Array,
+ Type_INT8: Int8Array,
+ Type_INT16: Int16Array,
+ Type_INT32: Int32Array,
+ Type_INT64: Int64Array,
+ Type_DATE32: Date32Array,
+ Type_DATE64: Date64Array,
+ Type_TIMESTAMP: TimestampArray,
+ Type_TIME32: Time32Array,
+ Type_TIME64: Time64Array,
+ Type_FLOAT: FloatArray,
+ Type_DOUBLE: DoubleArray,
+ Type_LIST: ListArray,
+ Type_BINARY: BinaryArray,
+ Type_STRING: StringArray,
+ Type_DICTIONARY: DictionaryArray,
+ Type_FIXED_SIZE_BINARY: FixedSizeBinaryArray,
+ Type_DECIMAL: DecimalArray,
+}
+
+cdef object box_array(const shared_ptr[CArray]& sp_array):
+ if sp_array.get() == NULL:
+ raise ValueError('Array was NULL')
+
+ cdef CDataType* data_type = sp_array.get().type().get()
+
+ if data_type == NULL:
+ raise ValueError('Array data type was NULL')
+
+ cdef Array arr = _array_classes[data_type.id()]()
+ arr.init(sp_array)
+ return arr
+
+
+cdef object box_tensor(const shared_ptr[CTensor]& sp_tensor):
+ if sp_tensor.get() == NULL:
+ raise ValueError('Tensor was NULL')
+
+ cdef Tensor tensor = Tensor()
+ tensor.init(sp_tensor)
+ return tensor
+
+
+cdef object get_series_values(object obj):
+ if isinstance(obj, PandasSeries):
+ result = obj.values
+ elif isinstance(obj, np.ndarray):
+ result = obj
+ else:
+ result = PandasSeries(obj).values
+
+ return result
+
+
+from_pylist = Array.from_list
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_config.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_config.pyx b/python/pyarrow/_config.pyx
new file mode 100644
index 0000000..536f278
--- /dev/null
+++ b/python/pyarrow/_config.pyx
@@ -0,0 +1,54 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License. See accompanying LICENSE file.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+cdef extern from 'arrow/python/do_import_numpy.h':
+ pass
+
+cdef extern from 'arrow/python/numpy_interop.h' namespace 'arrow::py':
+ int import_numpy()
+
+cdef extern from 'arrow/python/config.h' namespace 'arrow::py':
+ void Init()
+ void set_numpy_nan(object o)
+
+import_numpy()
+Init()
+
+import numpy as np
+set_numpy_nan(np.nan)
+
+import multiprocessing
+import os
+cdef int CPU_COUNT = int(
+ os.environ.get('OMP_NUM_THREADS',
+ max(multiprocessing.cpu_count() // 2, 1)))
+
+def cpu_count():
+ """
+ Returns
+ -------
+ count : Number of CPUs to use by default in parallel operations. Default is
+ max(1, multiprocessing.cpu_count() / 2), but can be overridden by the
+ OMP_NUM_THREADS environment variable. For the default, we divide the CPU
+ count by 2 because most modern computers have hyperthreading turned on,
+ so doubling the CPU count beyond the number of physical cores does not
+ help.
+ """
+ return CPU_COUNT
+
+def set_cpu_count(count):
+ global CPU_COUNT
+ CPU_COUNT = max(int(count), 1)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_error.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_error.pxd b/python/pyarrow/_error.pxd
new file mode 100644
index 0000000..4fb46c2
--- /dev/null
+++ b/python/pyarrow/_error.pxd
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.includes.libarrow cimport CStatus
+
+cdef int check_status(const CStatus& status) nogil except -1
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_error.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_error.pyx b/python/pyarrow/_error.pyx
new file mode 100644
index 0000000..259aeb0
--- /dev/null
+++ b/python/pyarrow/_error.pyx
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.includes.libarrow cimport CStatus
+from pyarrow.includes.common cimport c_string
+from pyarrow.compat import frombytes
+
+
+class ArrowException(Exception):
+ pass
+
+
+class ArrowInvalid(ValueError, ArrowException):
+ pass
+
+
+class ArrowMemoryError(MemoryError, ArrowException):
+ pass
+
+
+class ArrowIOError(IOError, ArrowException):
+ pass
+
+
+class ArrowKeyError(KeyError, ArrowException):
+ pass
+
+
+class ArrowTypeError(TypeError, ArrowException):
+ pass
+
+
+class ArrowNotImplementedError(NotImplementedError, ArrowException):
+ pass
+
+
+cdef int check_status(const CStatus& status) nogil except -1:
+ if status.ok():
+ return 0
+
+ with gil:
+ message = frombytes(status.ToString())
+ if status.IsInvalid():
+ raise ArrowInvalid(message)
+ elif status.IsIOError():
+ raise ArrowIOError(message)
+ elif status.IsOutOfMemory():
+ raise ArrowMemoryError(message)
+ elif status.IsKeyError():
+ raise ArrowKeyError(message)
+ elif status.IsNotImplemented():
+ raise ArrowNotImplementedError(message)
+ elif status.IsTypeError():
+ raise ArrowTypeError(message)
+ else:
+ raise ArrowException(message)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_io.pxd b/python/pyarrow/_io.pxd
new file mode 100644
index 0000000..0c37a09
--- /dev/null
+++ b/python/pyarrow/_io.pxd
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# distutils: language = c++
+
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport *
+
+
+cdef class Buffer:
+ cdef:
+ shared_ptr[CBuffer] buffer
+ Py_ssize_t shape[1]
+ Py_ssize_t strides[1]
+
+ cdef init(self, const shared_ptr[CBuffer]& buffer)
+
+
+cdef class NativeFile:
+ cdef:
+ shared_ptr[RandomAccessFile] rd_file
+ shared_ptr[OutputStream] wr_file
+ bint is_readable
+ bint is_writeable
+ bint is_open
+ bint own_file
+
+ # By implementing these "virtual" functions (all functions in Cython
+ # extension classes are technically virtual in the C++ sense) we can expose
+ # the arrow::io abstract file interfaces to other components throughout the
+ # suite of Arrow C++ libraries
+ cdef read_handle(self, shared_ptr[RandomAccessFile]* file)
+ cdef write_handle(self, shared_ptr[OutputStream]* file)
+
+cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader)
+cdef get_writer(object source, shared_ptr[OutputStream]* writer)
[2/4] arrow git commit: ARROW-751: [Python] Make all Cython modules
private. Some code tidying
Posted by uw...@apache.org.
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx
deleted file mode 100644
index 1c4253e..0000000
--- a/python/pyarrow/array.pyx
+++ /dev/null
@@ -1,646 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from cython.operator cimport dereference as deref
-
-import numpy as np
-
-from pyarrow.includes.libarrow cimport *
-from pyarrow.includes.common cimport PyObject_to_object
-cimport pyarrow.includes.pyarrow as pyarrow
-
-import pyarrow.config
-
-from pyarrow.compat import frombytes, tobytes, PandasSeries, Categorical
-from pyarrow.error cimport check_status
-from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool
-
-cimport pyarrow.scalar as scalar
-from pyarrow.scalar import NA
-
-from pyarrow.schema cimport (DataType, Field, Schema, DictionaryType,
- FixedSizeBinaryType,
- box_data_type)
-import pyarrow.schema as schema
-
-cimport cpython
-
-
-cdef maybe_coerce_datetime64(values, dtype, DataType type,
- timestamps_to_ms=False):
-
- from pyarrow.compat import DatetimeTZDtype
-
- if values.dtype.type != np.datetime64:
- return values, type
-
- coerce_ms = timestamps_to_ms and values.dtype != 'datetime64[ms]'
-
- if coerce_ms:
- values = values.astype('datetime64[ms]')
-
- if isinstance(dtype, DatetimeTZDtype):
- tz = dtype.tz
- unit = 'ms' if coerce_ms else dtype.unit
- type = schema.timestamp(unit, tz)
- elif type is None:
- # Trust the NumPy dtype
- type = schema.type_from_numpy_dtype(values.dtype)
-
- return values, type
-
-
-cdef class Array:
-
- cdef init(self, const shared_ptr[CArray]& sp_array):
- self.sp_array = sp_array
- self.ap = sp_array.get()
- self.type = box_data_type(self.sp_array.get().type())
-
- @staticmethod
- def from_numpy(obj, mask=None, DataType type=None,
- timestamps_to_ms=False,
- MemoryPool memory_pool=None):
- """
- Convert pandas.Series to an Arrow Array.
-
- Parameters
- ----------
- series : pandas.Series or numpy.ndarray
-
- mask : pandas.Series or numpy.ndarray, optional
- boolean mask if the object is valid or null
-
- type : pyarrow.DataType
- Explicit type to attempt to coerce to
-
- timestamps_to_ms : bool, optional
- Convert datetime columns to ms resolution. This is needed for
- compatibility with other functionality like Parquet I/O which
- only supports milliseconds.
-
- memory_pool: MemoryPool, optional
- Specific memory pool to use to allocate the resulting Arrow array.
-
- Notes
- -----
- Localized timestamps will currently be returned as UTC (pandas's native
- representation). Timezone-naive data will be implicitly interpreted as
- UTC.
-
- Examples
- --------
-
- >>> import pandas as pd
- >>> import pyarrow as pa
- >>> pa.Array.from_numpy(pd.Series([1, 2]))
- <pyarrow.array.Int64Array object at 0x7f674e4c0e10>
- [
- 1,
- 2
- ]
-
- >>> import numpy as np
- >>> pa.Array.from_numpy(pd.Series([1, 2]), np.array([0, 1],
- ... dtype=bool))
- <pyarrow.array.Int64Array object at 0x7f9019e11208>
- [
- 1,
- NA
- ]
-
- Returns
- -------
- pyarrow.array.Array
- """
- cdef:
- shared_ptr[CArray] out
- shared_ptr[CDataType] c_type
- CMemoryPool* pool
-
- if mask is not None:
- mask = get_series_values(mask)
-
- values = get_series_values(obj)
- pool = maybe_unbox_memory_pool(memory_pool)
-
- if isinstance(values, Categorical):
- return DictionaryArray.from_arrays(
- values.codes, values.categories.values,
- mask=mask, memory_pool=memory_pool)
- elif values.dtype == object:
- # Object dtype undergoes a different conversion path as more type
- # inference may be needed
- if type is not None:
- c_type = type.sp_type
- with nogil:
- check_status(pyarrow.PandasObjectsToArrow(
- pool, values, mask, c_type, &out))
- else:
- values, type = maybe_coerce_datetime64(
- values, obj.dtype, type, timestamps_to_ms=timestamps_to_ms)
-
- if type is None:
- check_status(pyarrow.NumPyDtypeToArrow(values.dtype, &c_type))
- else:
- c_type = type.sp_type
-
- with nogil:
- check_status(pyarrow.PandasToArrow(
- pool, values, mask, c_type, &out))
-
- return box_array(out)
-
- @staticmethod
- def from_list(object list_obj, DataType type=None,
- MemoryPool memory_pool=None):
- """
- Convert Python list to Arrow array
-
- Parameters
- ----------
- list_obj : array_like
-
- Returns
- -------
- pyarrow.array.Array
- """
- cdef:
- shared_ptr[CArray] sp_array
- CMemoryPool* pool
-
- pool = maybe_unbox_memory_pool(memory_pool)
- if type is None:
- check_status(pyarrow.ConvertPySequence(list_obj, pool, &sp_array))
- else:
- check_status(
- pyarrow.ConvertPySequence(
- list_obj, pool, &sp_array, type.sp_type
- )
- )
-
- return box_array(sp_array)
-
- property null_count:
-
- def __get__(self):
- return self.sp_array.get().null_count()
-
- def __iter__(self):
- for i in range(len(self)):
- yield self.getitem(i)
- raise StopIteration
-
- def __repr__(self):
- from pyarrow.formatting import array_format
- type_format = object.__repr__(self)
- values = array_format(self, window=10)
- return '{0}\n{1}'.format(type_format, values)
-
- def equals(Array self, Array other):
- return self.ap.Equals(deref(other.ap))
-
- def __len__(self):
- if self.sp_array.get():
- return self.sp_array.get().length()
- else:
- return 0
-
- def isnull(self):
- raise NotImplemented
-
- def __getitem__(self, key):
- cdef:
- Py_ssize_t n = len(self)
-
- if PySlice_Check(key):
- start = key.start or 0
- while start < 0:
- start += n
-
- stop = key.stop if key.stop is not None else n
- while stop < 0:
- stop += n
-
- step = key.step or 1
- if step != 1:
- raise IndexError('only slices with step 1 supported')
- else:
- return self.slice(start, stop - start)
-
- while key < 0:
- key += len(self)
-
- return self.getitem(key)
-
- cdef getitem(self, int64_t i):
- return scalar.box_scalar(self.type, self.sp_array, i)
-
- def slice(self, offset=0, length=None):
- """
- Compute zero-copy slice of this array
-
- Parameters
- ----------
- offset : int, default 0
- Offset from start of array to slice
- length : int, default None
- Length of slice (default is until end of Array starting from
- offset)
-
- Returns
- -------
- sliced : RecordBatch
- """
- cdef:
- shared_ptr[CArray] result
-
- if offset < 0:
- raise IndexError('Offset must be non-negative')
-
- if length is None:
- result = self.ap.Slice(offset)
- else:
- result = self.ap.Slice(offset, length)
-
- return box_array(result)
-
- def to_pandas(self):
- """
- Convert to an array object suitable for use in pandas
-
- See also
- --------
- Column.to_pandas
- Table.to_pandas
- RecordBatch.to_pandas
- """
- cdef:
- PyObject* out
-
- with nogil:
- check_status(
- pyarrow.ConvertArrayToPandas(self.sp_array, <PyObject*> self,
- &out))
- return wrap_array_output(out)
-
- def to_pylist(self):
- """
- Convert to an list of native Python objects.
- """
- return [x.as_py() for x in self]
-
-
-cdef class Tensor:
-
- cdef init(self, const shared_ptr[CTensor]& sp_tensor):
- self.sp_tensor = sp_tensor
- self.tp = sp_tensor.get()
- self.type = box_data_type(self.tp.type())
-
- def __repr__(self):
- return """<pyarrow.Tensor>
-type: {0}
-shape: {1}
-strides: {2}""".format(self.type, self.shape, self.strides)
-
- @staticmethod
- def from_numpy(obj):
- cdef shared_ptr[CTensor] ctensor
- check_status(pyarrow.NdarrayToTensor(default_memory_pool(),
- obj, &ctensor))
- return box_tensor(ctensor)
-
- def to_numpy(self):
- """
- Convert arrow::Tensor to numpy.ndarray with zero copy
- """
- cdef:
- PyObject* out
-
- check_status(pyarrow.TensorToNdarray(deref(self.tp), <PyObject*> self,
- &out))
- return PyObject_to_object(out)
-
- def equals(self, Tensor other):
- """
- Return true if the tensors contains exactly equal data
- """
- return self.tp.Equals(deref(other.tp))
-
- property is_mutable:
-
- def __get__(self):
- return self.tp.is_mutable()
-
- property is_contiguous:
-
- def __get__(self):
- return self.tp.is_contiguous()
-
- property ndim:
-
- def __get__(self):
- return self.tp.ndim()
-
- property size:
-
- def __get__(self):
- return self.tp.size()
-
- property shape:
-
- def __get__(self):
- cdef size_t i
- py_shape = []
- for i in range(self.tp.shape().size()):
- py_shape.append(self.tp.shape()[i])
- return py_shape
-
- property strides:
-
- def __get__(self):
- cdef size_t i
- py_strides = []
- for i in range(self.tp.strides().size()):
- py_strides.append(self.tp.strides()[i])
- return py_strides
-
-
-
-cdef wrap_array_output(PyObject* output):
- cdef object obj = PyObject_to_object(output)
-
- if isinstance(obj, dict):
- return Categorical(obj['indices'],
- categories=obj['dictionary'],
- fastpath=True)
- else:
- return obj
-
-
-cdef class NullArray(Array):
- pass
-
-
-cdef class BooleanArray(Array):
- pass
-
-
-cdef class NumericArray(Array):
- pass
-
-
-cdef class IntegerArray(NumericArray):
- pass
-
-
-cdef class FloatingPointArray(NumericArray):
- pass
-
-
-cdef class Int8Array(IntegerArray):
- pass
-
-
-cdef class UInt8Array(IntegerArray):
- pass
-
-
-cdef class Int16Array(IntegerArray):
- pass
-
-
-cdef class UInt16Array(IntegerArray):
- pass
-
-
-cdef class Int32Array(IntegerArray):
- pass
-
-
-cdef class UInt32Array(IntegerArray):
- pass
-
-
-cdef class Int64Array(IntegerArray):
- pass
-
-
-cdef class UInt64Array(IntegerArray):
- pass
-
-
-cdef class Date32Array(NumericArray):
- pass
-
-
-cdef class Date64Array(NumericArray):
- pass
-
-
-cdef class TimestampArray(NumericArray):
- pass
-
-
-cdef class Time32Array(NumericArray):
- pass
-
-
-cdef class Time64Array(NumericArray):
- pass
-
-
-cdef class FloatArray(FloatingPointArray):
- pass
-
-
-cdef class DoubleArray(FloatingPointArray):
- pass
-
-
-cdef class FixedSizeBinaryArray(Array):
- pass
-
-
-cdef class DecimalArray(FixedSizeBinaryArray):
- pass
-
-
-cdef class ListArray(Array):
- pass
-
-
-cdef class StringArray(Array):
- pass
-
-
-cdef class BinaryArray(Array):
- pass
-
-
-cdef class DictionaryArray(Array):
-
- cdef getitem(self, int64_t i):
- cdef Array dictionary = self.dictionary
- index = self.indices[i]
- if index is NA:
- return index
- else:
- return scalar.box_scalar(dictionary.type, dictionary.sp_array,
- index.as_py())
-
- property dictionary:
-
- def __get__(self):
- cdef CDictionaryArray* darr = <CDictionaryArray*>(self.ap)
-
- if self._dictionary is None:
- self._dictionary = box_array(darr.dictionary())
-
- return self._dictionary
-
- property indices:
-
- def __get__(self):
- cdef CDictionaryArray* darr = <CDictionaryArray*>(self.ap)
-
- if self._indices is None:
- self._indices = box_array(darr.indices())
-
- return self._indices
-
- @staticmethod
- def from_arrays(indices, dictionary, mask=None,
- MemoryPool memory_pool=None):
- """
- Construct Arrow DictionaryArray from array of indices (must be
- non-negative integers) and corresponding array of dictionary values
-
- Parameters
- ----------
- indices : ndarray or pandas.Series, integer type
- dictionary : ndarray or pandas.Series
- mask : ndarray or pandas.Series, boolean type
- True values indicate that indices are actually null
-
- Returns
- -------
- dict_array : DictionaryArray
- """
- cdef:
- Array arrow_indices, arrow_dictionary
- DictionaryArray result
- shared_ptr[CDataType] c_type
- shared_ptr[CArray] c_result
-
- if isinstance(indices, Array):
- if mask is not None:
- raise NotImplementedError(
- "mask not implemented with Arrow array inputs yet")
- arrow_indices = indices
- else:
- if mask is None:
- mask = indices == -1
- else:
- mask = mask | (indices == -1)
- arrow_indices = Array.from_numpy(indices, mask=mask,
- memory_pool=memory_pool)
-
- if isinstance(dictionary, Array):
- arrow_dictionary = dictionary
- else:
- arrow_dictionary = Array.from_numpy(dictionary,
- memory_pool=memory_pool)
-
- if not isinstance(arrow_indices, IntegerArray):
- raise ValueError('Indices must be integer type')
-
- c_type.reset(new CDictionaryType(arrow_indices.type.sp_type,
- arrow_dictionary.sp_array))
- c_result.reset(new CDictionaryArray(c_type, arrow_indices.sp_array))
-
- result = DictionaryArray()
- result.init(c_result)
- return result
-
-
-cdef dict _array_classes = {
- Type_NA: NullArray,
- Type_BOOL: BooleanArray,
- Type_UINT8: UInt8Array,
- Type_UINT16: UInt16Array,
- Type_UINT32: UInt32Array,
- Type_UINT64: UInt64Array,
- Type_INT8: Int8Array,
- Type_INT16: Int16Array,
- Type_INT32: Int32Array,
- Type_INT64: Int64Array,
- Type_DATE32: Date32Array,
- Type_DATE64: Date64Array,
- Type_TIMESTAMP: TimestampArray,
- Type_TIME32: Time32Array,
- Type_TIME64: Time64Array,
- Type_FLOAT: FloatArray,
- Type_DOUBLE: DoubleArray,
- Type_LIST: ListArray,
- Type_BINARY: BinaryArray,
- Type_STRING: StringArray,
- Type_DICTIONARY: DictionaryArray,
- Type_FIXED_SIZE_BINARY: FixedSizeBinaryArray,
- Type_DECIMAL: DecimalArray,
-}
-
-cdef object box_array(const shared_ptr[CArray]& sp_array):
- if sp_array.get() == NULL:
- raise ValueError('Array was NULL')
-
- cdef CDataType* data_type = sp_array.get().type().get()
-
- if data_type == NULL:
- raise ValueError('Array data type was NULL')
-
- cdef Array arr = _array_classes[data_type.id()]()
- arr.init(sp_array)
- return arr
-
-
-cdef object box_tensor(const shared_ptr[CTensor]& sp_tensor):
- if sp_tensor.get() == NULL:
- raise ValueError('Tensor was NULL')
-
- cdef Tensor tensor = Tensor()
- tensor.init(sp_tensor)
- return tensor
-
-
-cdef object get_series_values(object obj):
- if isinstance(obj, PandasSeries):
- result = obj.values
- elif isinstance(obj, np.ndarray):
- result = obj
- else:
- result = PandasSeries(obj).values
-
- return result
-
-
-from_pylist = Array.from_list
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/config.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/config.pyx b/python/pyarrow/config.pyx
deleted file mode 100644
index 536f278..0000000
--- a/python/pyarrow/config.pyx
+++ /dev/null
@@ -1,54 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License. See accompanying LICENSE file.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-cdef extern from 'arrow/python/do_import_numpy.h':
- pass
-
-cdef extern from 'arrow/python/numpy_interop.h' namespace 'arrow::py':
- int import_numpy()
-
-cdef extern from 'arrow/python/config.h' namespace 'arrow::py':
- void Init()
- void set_numpy_nan(object o)
-
-import_numpy()
-Init()
-
-import numpy as np
-set_numpy_nan(np.nan)
-
-import multiprocessing
-import os
-cdef int CPU_COUNT = int(
- os.environ.get('OMP_NUM_THREADS',
- max(multiprocessing.cpu_count() // 2, 1)))
-
-def cpu_count():
- """
- Returns
- -------
- count : Number of CPUs to use by default in parallel operations. Default is
- max(1, multiprocessing.cpu_count() / 2), but can be overridden by the
- OMP_NUM_THREADS environment variable. For the default, we divide the CPU
- count by 2 because most modern computers have hyperthreading turned on,
- so doubling the CPU count beyond the number of physical cores does not
- help.
- """
- return CPU_COUNT
-
-def set_cpu_count(count):
- global CPU_COUNT
- CPU_COUNT = max(int(count), 1)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/error.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/error.pxd b/python/pyarrow/error.pxd
deleted file mode 100644
index 4fb46c2..0000000
--- a/python/pyarrow/error.pxd
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.libarrow cimport CStatus
-
-cdef int check_status(const CStatus& status) nogil except -1
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/error.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/error.pyx b/python/pyarrow/error.pyx
deleted file mode 100644
index 259aeb0..0000000
--- a/python/pyarrow/error.pyx
+++ /dev/null
@@ -1,70 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.libarrow cimport CStatus
-from pyarrow.includes.common cimport c_string
-from pyarrow.compat import frombytes
-
-
-class ArrowException(Exception):
- pass
-
-
-class ArrowInvalid(ValueError, ArrowException):
- pass
-
-
-class ArrowMemoryError(MemoryError, ArrowException):
- pass
-
-
-class ArrowIOError(IOError, ArrowException):
- pass
-
-
-class ArrowKeyError(KeyError, ArrowException):
- pass
-
-
-class ArrowTypeError(TypeError, ArrowException):
- pass
-
-
-class ArrowNotImplementedError(NotImplementedError, ArrowException):
- pass
-
-
-cdef int check_status(const CStatus& status) nogil except -1:
- if status.ok():
- return 0
-
- with gil:
- message = frombytes(status.ToString())
- if status.IsInvalid():
- raise ArrowInvalid(message)
- elif status.IsIOError():
- raise ArrowIOError(message)
- elif status.IsOutOfMemory():
- raise ArrowMemoryError(message)
- elif status.IsKeyError():
- raise ArrowKeyError(message)
- elif status.IsNotImplemented():
- raise ArrowNotImplementedError(message)
- elif status.IsTypeError():
- raise ArrowTypeError(message)
- else:
- raise ArrowException(message)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/feather.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/feather.py b/python/pyarrow/feather.py
index 3b5716e..c7b118e 100644
--- a/python/pyarrow/feather.py
+++ b/python/pyarrow/feather.py
@@ -22,9 +22,9 @@ import six
import pandas as pd
from pyarrow.compat import pdapi
-from pyarrow.io import FeatherError # noqa
-from pyarrow.table import Table
-import pyarrow.io as ext
+from pyarrow._io import FeatherError # noqa
+from pyarrow._table import Table
+import pyarrow._io as ext
if LooseVersion(pd.__version__) < '0.17.0':
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/filesystem.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py
index 269cf1c..92dd91c 100644
--- a/python/pyarrow/filesystem.py
+++ b/python/pyarrow/filesystem.py
@@ -19,7 +19,7 @@ from os.path import join as pjoin
import os
from pyarrow.util import implements
-import pyarrow.io as io
+import pyarrow._io as io
class Filesystem(object):
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/formatting.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/formatting.py b/python/pyarrow/formatting.py
index 5fe0611..c358344 100644
--- a/python/pyarrow/formatting.py
+++ b/python/pyarrow/formatting.py
@@ -17,7 +17,7 @@
# Pretty-printing and other formatting utilities for Arrow data structures
-import pyarrow.scalar as scalar
+import pyarrow._array as _array
def array_format(arr, window=None):
@@ -42,7 +42,7 @@ def array_format(arr, window=None):
def value_format(x, indent_level=0):
- if isinstance(x, scalar.ListValue):
+ if isinstance(x, _array.ListValue):
contents = ',\n'.join(value_format(item) for item in x)
return '[{0}]'.format(_indent(contents, 1).strip())
else:
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index ae2b45f..2444f3f 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -113,8 +113,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CDataType] index_type()
shared_ptr[CArray] dictionary()
- shared_ptr[CDataType] timestamp(TimeUnit unit)
- shared_ptr[CDataType] timestamp(TimeUnit unit, const c_string& timezone)
+ shared_ptr[CDataType] ctimestamp" arrow::timestamp"(TimeUnit unit)
+ shared_ptr[CDataType] ctimestamp" arrow::timestamp"(
+ TimeUnit unit, const c_string& timezone)
cdef cppclass CMemoryPool" arrow::MemoryPool":
int64_t bytes_allocated()
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd
deleted file mode 100644
index 0c37a09..0000000
--- a/python/pyarrow/io.pxd
+++ /dev/null
@@ -1,50 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# distutils: language = c++
-
-from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport *
-
-
-cdef class Buffer:
- cdef:
- shared_ptr[CBuffer] buffer
- Py_ssize_t shape[1]
- Py_ssize_t strides[1]
-
- cdef init(self, const shared_ptr[CBuffer]& buffer)
-
-
-cdef class NativeFile:
- cdef:
- shared_ptr[RandomAccessFile] rd_file
- shared_ptr[OutputStream] wr_file
- bint is_readable
- bint is_writeable
- bint is_open
- bint own_file
-
- # By implementing these "virtual" functions (all functions in Cython
- # extension classes are technically virtual in the C++ sense) we can expose
- # the arrow::io abstract file interfaces to other components throughout the
- # suite of Arrow C++ libraries
- cdef read_handle(self, shared_ptr[RandomAccessFile]* file)
- cdef write_handle(self, shared_ptr[OutputStream]* file)
-
-cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader)
-cdef get_writer(object source, shared_ptr[OutputStream]* writer)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
deleted file mode 100644
index 4eb0816..0000000
--- a/python/pyarrow/io.pyx
+++ /dev/null
@@ -1,1276 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Cython wrappers for IO interfaces defined in arrow::io and messaging in
-# arrow::ipc
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from cython.operator cimport dereference as deref
-
-from libc.stdlib cimport malloc, free
-
-from pyarrow.includes.libarrow cimport *
-cimport pyarrow.includes.pyarrow as pyarrow
-
-from pyarrow.compat import frombytes, tobytes, encode_file_path
-from pyarrow.array cimport Array, Tensor, box_tensor
-from pyarrow.error cimport check_status
-from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool
-from pyarrow.schema cimport Schema
-from pyarrow.table cimport (Column, RecordBatch, batch_from_cbatch,
- table_from_ctable)
-
-cimport cpython as cp
-
-import re
-import six
-import sys
-import threading
-import time
-
-
-# 64K
-DEFAULT_BUFFER_SIZE = 2 ** 16
-
-
-# To let us get a PyObject* and avoid Cython auto-ref-counting
-cdef extern from "Python.h":
- PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
- char *v, Py_ssize_t len) except NULL
-
-cdef class NativeFile:
-
- def __cinit__(self):
- self.is_open = False
- self.own_file = False
-
- def __dealloc__(self):
- if self.is_open and self.own_file:
- self.close()
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, tb):
- self.close()
-
- def close(self):
- if self.is_open:
- with nogil:
- if self.is_readable:
- check_status(self.rd_file.get().Close())
- else:
- check_status(self.wr_file.get().Close())
- self.is_open = False
-
- cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
- self._assert_readable()
- file[0] = <shared_ptr[RandomAccessFile]> self.rd_file
-
- cdef write_handle(self, shared_ptr[OutputStream]* file):
- self._assert_writeable()
- file[0] = <shared_ptr[OutputStream]> self.wr_file
-
- def _assert_readable(self):
- if not self.is_readable:
- raise IOError("only valid on readonly files")
-
- if not self.is_open:
- raise IOError("file not open")
-
- def _assert_writeable(self):
- if not self.is_writeable:
- raise IOError("only valid on writeable files")
-
- if not self.is_open:
- raise IOError("file not open")
-
- def size(self):
- cdef int64_t size
- self._assert_readable()
- with nogil:
- check_status(self.rd_file.get().GetSize(&size))
- return size
-
- def tell(self):
- cdef int64_t position
- with nogil:
- if self.is_readable:
- check_status(self.rd_file.get().Tell(&position))
- else:
- check_status(self.wr_file.get().Tell(&position))
- return position
-
- def seek(self, int64_t position):
- self._assert_readable()
- with nogil:
- check_status(self.rd_file.get().Seek(position))
-
- def write(self, data):
- """
- Write byte from any object implementing buffer protocol (bytes,
- bytearray, ndarray, pyarrow.Buffer)
- """
- self._assert_writeable()
-
- if isinstance(data, six.string_types):
- data = tobytes(data)
-
- cdef Buffer arrow_buffer = frombuffer(data)
-
- cdef const uint8_t* buf = arrow_buffer.buffer.get().data()
- cdef int64_t bufsize = len(arrow_buffer)
- with nogil:
- check_status(self.wr_file.get().Write(buf, bufsize))
-
- def read(self, nbytes=None):
- cdef:
- int64_t c_nbytes
- int64_t bytes_read = 0
- PyObject* obj
-
- if nbytes is None:
- c_nbytes = self.size() - self.tell()
- else:
- c_nbytes = nbytes
-
- self._assert_readable()
-
- # Allocate empty write space
- obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes)
-
- cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
- with nogil:
- check_status(self.rd_file.get().Read(c_nbytes, &bytes_read, buf))
-
- if bytes_read < c_nbytes:
- cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read)
-
- return PyObject_to_object(obj)
-
- def read_buffer(self, nbytes=None):
- cdef:
- int64_t c_nbytes
- int64_t bytes_read = 0
- shared_ptr[CBuffer] output
- self._assert_readable()
-
- if nbytes is None:
- c_nbytes = self.size() - self.tell()
- else:
- c_nbytes = nbytes
-
- with nogil:
- check_status(self.rd_file.get().ReadB(c_nbytes, &output))
-
- return wrap_buffer(output)
-
- def download(self, stream_or_path, buffer_size=None):
- """
- Read file completely to local path (rather than reading completely into
- memory). First seeks to the beginning of the file.
- """
- cdef:
- int64_t bytes_read = 0
- uint8_t* buf
- self._assert_readable()
-
- buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
-
- write_queue = Queue(50)
-
- if not hasattr(stream_or_path, 'read'):
- stream = open(stream_or_path, 'wb')
- cleanup = lambda: stream.close()
- else:
- stream = stream_or_path
- cleanup = lambda: None
-
- done = False
- exc_info = None
- def bg_write():
- try:
- while not done or write_queue.qsize() > 0:
- try:
- buf = write_queue.get(timeout=0.01)
- except QueueEmpty:
- continue
- stream.write(buf)
- except Exception as e:
- exc_info = sys.exc_info()
- finally:
- cleanup()
-
- self.seek(0)
-
- writer_thread = threading.Thread(target=bg_write)
-
- # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
- # the passed buffer, so it's hard for us to avoid doubling the memory
- buf = <uint8_t*> malloc(buffer_size)
- if buf == NULL:
- raise MemoryError("Failed to allocate {0} bytes"
- .format(buffer_size))
-
- writer_thread.start()
-
- cdef int64_t total_bytes = 0
- cdef int32_t c_buffer_size = buffer_size
-
- try:
- while True:
- with nogil:
- check_status(self.rd_file.get()
- .Read(c_buffer_size, &bytes_read, buf))
-
- total_bytes += bytes_read
-
- # EOF
- if bytes_read == 0:
- break
-
- pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
- bytes_read)
-
- write_queue.put_nowait(pybuf)
- finally:
- free(buf)
- done = True
-
- writer_thread.join()
- if exc_info is not None:
- raise exc_info[0], exc_info[1], exc_info[2]
-
- def upload(self, stream, buffer_size=None):
- """
- Pipe file-like object to file
- """
- write_queue = Queue(50)
- self._assert_writeable()
-
- buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
-
- done = False
- exc_info = None
- def bg_write():
- try:
- while not done or write_queue.qsize() > 0:
- try:
- buf = write_queue.get(timeout=0.01)
- except QueueEmpty:
- continue
-
- self.write(buf)
-
- except Exception as e:
- exc_info = sys.exc_info()
-
- writer_thread = threading.Thread(target=bg_write)
- writer_thread.start()
-
- try:
- while True:
- buf = stream.read(buffer_size)
- if not buf:
- break
-
- if writer_thread.is_alive():
- while write_queue.full():
- time.sleep(0.01)
- else:
- break
-
- write_queue.put_nowait(buf)
- finally:
- done = True
-
- writer_thread.join()
- if exc_info is not None:
- raise exc_info[0], exc_info[1], exc_info[2]
-
-
-# ----------------------------------------------------------------------
-# Python file-like objects
-
-
-cdef class PythonFileInterface(NativeFile):
- cdef:
- object handle
-
- def __cinit__(self, handle, mode='w'):
- self.handle = handle
-
- if mode.startswith('w'):
- self.wr_file.reset(new pyarrow.PyOutputStream(handle))
- self.is_readable = 0
- self.is_writeable = 1
- elif mode.startswith('r'):
- self.rd_file.reset(new pyarrow.PyReadableFile(handle))
- self.is_readable = 1
- self.is_writeable = 0
- else:
- raise ValueError('Invalid file mode: {0}'.format(mode))
-
- self.is_open = True
-
-
-cdef class MemoryMappedFile(NativeFile):
- """
- Supports 'r', 'r+w', 'w' modes
- """
- cdef:
- object path
-
- def __cinit__(self):
- self.is_open = False
- self.is_readable = 0
- self.is_writeable = 0
-
- @staticmethod
- def create(path, size):
- cdef:
- shared_ptr[CMemoryMappedFile] handle
- c_string c_path = encode_file_path(path)
- int64_t c_size = size
-
- with nogil:
- check_status(CMemoryMappedFile.Create(c_path, c_size, &handle))
-
- cdef MemoryMappedFile result = MemoryMappedFile()
- result.path = path
- result.is_readable = 1
- result.is_writeable = 1
- result.wr_file = <shared_ptr[OutputStream]> handle
- result.rd_file = <shared_ptr[RandomAccessFile]> handle
- result.is_open = True
-
- return result
-
- def open(self, path, mode='r'):
- self.path = path
-
- cdef:
- FileMode c_mode
- shared_ptr[CMemoryMappedFile] handle
- c_string c_path = encode_file_path(path)
-
- if mode in ('r', 'rb'):
- c_mode = FileMode_READ
- self.is_readable = 1
- elif mode in ('w', 'wb'):
- c_mode = FileMode_WRITE
- self.is_writeable = 1
- elif mode == 'r+w':
- c_mode = FileMode_READWRITE
- self.is_readable = 1
- self.is_writeable = 1
- else:
- raise ValueError('Invalid file mode: {0}'.format(mode))
-
- check_status(CMemoryMappedFile.Open(c_path, c_mode, &handle))
-
- self.wr_file = <shared_ptr[OutputStream]> handle
- self.rd_file = <shared_ptr[RandomAccessFile]> handle
- self.is_open = True
-
-
-def memory_map(path, mode='r'):
- """
- Open memory map at file path. Size of the memory map cannot change
-
- Parameters
- ----------
- path : string
- mode : {'r', 'w'}, default 'r'
-
- Returns
- -------
- mmap : MemoryMappedFile
- """
- cdef MemoryMappedFile mmap = MemoryMappedFile()
- mmap.open(path, mode)
- return mmap
-
-
-def create_memory_map(path, size):
- """
- Create memory map at indicated path of the given size, return open
- writeable file object
-
- Parameters
- ----------
- path : string
- size : int
-
- Returns
- -------
- mmap : MemoryMappedFile
- """
- return MemoryMappedFile.create(path, size)
-
-
-cdef class OSFile(NativeFile):
- """
- Supports 'r', 'w' modes
- """
- cdef:
- object path
-
- def __cinit__(self, path, mode='r', MemoryPool memory_pool=None):
- self.path = path
-
- cdef:
- FileMode c_mode
- shared_ptr[Readable] handle
- c_string c_path = encode_file_path(path)
-
- self.is_readable = self.is_writeable = 0
-
- if mode in ('r', 'rb'):
- self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
- elif mode in ('w', 'wb'):
- self._open_writeable(c_path)
- else:
- raise ValueError('Invalid file mode: {0}'.format(mode))
-
- self.is_open = True
-
- cdef _open_readable(self, c_string path, CMemoryPool* pool):
- cdef shared_ptr[ReadableFile] handle
-
- with nogil:
- check_status(ReadableFile.Open(path, pool, &handle))
-
- self.is_readable = 1
- self.rd_file = <shared_ptr[RandomAccessFile]> handle
-
- cdef _open_writeable(self, c_string path):
- cdef shared_ptr[FileOutputStream] handle
-
- with nogil:
- check_status(FileOutputStream.Open(path, &handle))
- self.is_writeable = 1
- self.wr_file = <shared_ptr[OutputStream]> handle
-
-
-# ----------------------------------------------------------------------
-# Arrow buffers
-
-
-cdef class Buffer:
-
- def __cinit__(self):
- pass
-
- cdef init(self, const shared_ptr[CBuffer]& buffer):
- self.buffer = buffer
- self.shape[0] = self.size
- self.strides[0] = <Py_ssize_t>(1)
-
- def __len__(self):
- return self.size
-
- property size:
-
- def __get__(self):
- return self.buffer.get().size()
-
- property parent:
-
- def __get__(self):
- cdef shared_ptr[CBuffer] parent_buf = self.buffer.get().parent()
-
- if parent_buf.get() == NULL:
- return None
- else:
- return wrap_buffer(parent_buf)
-
- def __getitem__(self, key):
- # TODO(wesm): buffer slicing
- raise NotImplementedError
-
- def to_pybytes(self):
- return cp.PyBytes_FromStringAndSize(
- <const char*>self.buffer.get().data(),
- self.buffer.get().size())
-
- def __getbuffer__(self, cp.Py_buffer* buffer, int flags):
-
- buffer.buf = <char *>self.buffer.get().data()
- buffer.format = 'b'
- buffer.internal = NULL
- buffer.itemsize = 1
- buffer.len = self.size
- buffer.ndim = 1
- buffer.obj = self
- buffer.readonly = 1
- buffer.shape = self.shape
- buffer.strides = self.strides
- buffer.suboffsets = NULL
-
-cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool):
- cdef shared_ptr[PoolBuffer] result
- result.reset(new PoolBuffer(pool))
- return result
-
-
-cdef class InMemoryOutputStream(NativeFile):
-
- cdef:
- shared_ptr[PoolBuffer] buffer
-
- def __cinit__(self, MemoryPool memory_pool=None):
- self.buffer = allocate_buffer(maybe_unbox_memory_pool(memory_pool))
- self.wr_file.reset(new BufferOutputStream(
- <shared_ptr[ResizableBuffer]> self.buffer))
- self.is_readable = 0
- self.is_writeable = 1
- self.is_open = True
-
- def get_result(self):
- check_status(self.wr_file.get().Close())
- self.is_open = False
- return wrap_buffer(<shared_ptr[CBuffer]> self.buffer)
-
-
-cdef class BufferReader(NativeFile):
- """
- Zero-copy reader from objects convertible to Arrow buffer
-
- Parameters
- ----------
- obj : Python bytes or pyarrow.io.Buffer
- """
- cdef:
- Buffer buffer
-
- def __cinit__(self, object obj):
-
- if isinstance(obj, Buffer):
- self.buffer = obj
- else:
- self.buffer = frombuffer(obj)
-
- self.rd_file.reset(new CBufferReader(self.buffer.buffer))
- self.is_readable = 1
- self.is_writeable = 0
- self.is_open = True
-
-
-def frombuffer(object obj):
- """
- Construct an Arrow buffer from a Python bytes object
- """
- cdef shared_ptr[CBuffer] buf
- try:
- memoryview(obj)
- buf.reset(new pyarrow.PyBuffer(obj))
- return wrap_buffer(buf)
- except TypeError:
- raise ValueError('Must pass object that implements buffer protocol')
-
-
-
-cdef Buffer wrap_buffer(const shared_ptr[CBuffer]& buf):
- cdef Buffer result = Buffer()
- result.init(buf)
- return result
-
-
-cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader):
- cdef NativeFile nf
-
- if isinstance(source, six.string_types):
- source = memory_map(source, mode='r')
- elif isinstance(source, Buffer):
- source = BufferReader(source)
- elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
- # Optimistically hope this is file-like
- source = PythonFileInterface(source, mode='r')
-
- if isinstance(source, NativeFile):
- nf = source
-
- # TODO: what about read-write sources (e.g. memory maps)
- if not nf.is_readable:
- raise IOError('Native file is not readable')
-
- nf.read_handle(reader)
- else:
- raise TypeError('Unable to read from object of type: {0}'
- .format(type(source)))
-
-
-cdef get_writer(object source, shared_ptr[OutputStream]* writer):
- cdef NativeFile nf
-
- if isinstance(source, six.string_types):
- source = OSFile(source, mode='w')
- elif not isinstance(source, NativeFile) and hasattr(source, 'write'):
- # Optimistically hope this is file-like
- source = PythonFileInterface(source, mode='w')
-
- if isinstance(source, NativeFile):
- nf = source
-
- if nf.is_readable:
- raise IOError('Native file is not writeable')
-
- nf.write_handle(writer)
- else:
- raise TypeError('Unable to read from object of type: {0}'
- .format(type(source)))
-
-# ----------------------------------------------------------------------
-# HDFS IO implementation
-
-_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)')
-
-try:
- # Python 3
- from queue import Queue, Empty as QueueEmpty, Full as QueueFull
-except ImportError:
- from Queue import Queue, Empty as QueueEmpty, Full as QueueFull
-
-
-def have_libhdfs():
- try:
- check_status(HaveLibHdfs())
- return True
- except:
- return False
-
-
-def have_libhdfs3():
- try:
- check_status(HaveLibHdfs3())
- return True
- except:
- return False
-
-
-def strip_hdfs_abspath(path):
- m = _HDFS_PATH_RE.match(path)
- if m:
- return m.group(3)
- else:
- return path
-
-
-cdef class _HdfsClient:
- cdef:
- shared_ptr[CHdfsClient] client
-
- cdef readonly:
- bint is_open
-
- def __cinit__(self):
- pass
-
- def _connect(self, host, port, user, kerb_ticket, driver):
- cdef HdfsConnectionConfig conf
-
- if host is not None:
- conf.host = tobytes(host)
- conf.port = port
- if user is not None:
- conf.user = tobytes(user)
- if kerb_ticket is not None:
- conf.kerb_ticket = tobytes(kerb_ticket)
-
- if driver == 'libhdfs':
- check_status(HaveLibHdfs())
- conf.driver = HdfsDriver_LIBHDFS
- else:
- check_status(HaveLibHdfs3())
- conf.driver = HdfsDriver_LIBHDFS3
-
- with nogil:
- check_status(CHdfsClient.Connect(&conf, &self.client))
- self.is_open = True
-
- @classmethod
- def connect(cls, *args, **kwargs):
- return cls(*args, **kwargs)
-
- def __dealloc__(self):
- if self.is_open:
- self.close()
-
- def close(self):
- """
- Disconnect from the HDFS cluster
- """
- self._ensure_client()
- with nogil:
- check_status(self.client.get().Disconnect())
- self.is_open = False
-
- cdef _ensure_client(self):
- if self.client.get() == NULL:
- raise IOError('HDFS client improperly initialized')
- elif not self.is_open:
- raise IOError('HDFS client is closed')
-
- def exists(self, path):
- """
- Returns True if the path is known to the cluster, False if it does not
- (or there is an RPC error)
- """
- self._ensure_client()
-
- cdef c_string c_path = tobytes(path)
- cdef c_bool result
- with nogil:
- result = self.client.get().Exists(c_path)
- return result
-
- def isdir(self, path):
- cdef HdfsPathInfo info
- self._path_info(path, &info)
- return info.kind == ObjectType_DIRECTORY
-
- def isfile(self, path):
- cdef HdfsPathInfo info
- self._path_info(path, &info)
- return info.kind == ObjectType_FILE
-
- cdef _path_info(self, path, HdfsPathInfo* info):
- cdef c_string c_path = tobytes(path)
-
- with nogil:
- check_status(self.client.get()
- .GetPathInfo(c_path, info))
-
-
- def ls(self, path, bint full_info):
- cdef:
- c_string c_path = tobytes(path)
- vector[HdfsPathInfo] listing
- list results = []
- int i
-
- self._ensure_client()
-
- with nogil:
- check_status(self.client.get()
- .ListDirectory(c_path, &listing))
-
- cdef const HdfsPathInfo* info
- for i in range(<int> listing.size()):
- info = &listing[i]
-
- # Try to trim off the hdfs://HOST:PORT piece
- name = strip_hdfs_abspath(frombytes(info.name))
-
- if full_info:
- kind = ('file' if info.kind == ObjectType_FILE
- else 'directory')
-
- results.append({
- 'kind': kind,
- 'name': name,
- 'owner': frombytes(info.owner),
- 'group': frombytes(info.group),
- 'list_modified_time': info.last_modified_time,
- 'list_access_time': info.last_access_time,
- 'size': info.size,
- 'replication': info.replication,
- 'block_size': info.block_size,
- 'permissions': info.permissions
- })
- else:
- results.append(name)
-
- return results
-
- def mkdir(self, path):
- """
- Create indicated directory and any necessary parent directories
- """
- self._ensure_client()
-
- cdef c_string c_path = tobytes(path)
- with nogil:
- check_status(self.client.get()
- .CreateDirectory(c_path))
-
- def delete(self, path, bint recursive=False):
- """
- Delete the indicated file or directory
-
- Parameters
- ----------
- path : string
- recursive : boolean, default False
- If True, also delete child paths for directories
- """
- self._ensure_client()
-
- cdef c_string c_path = tobytes(path)
- with nogil:
- check_status(self.client.get()
- .Delete(c_path, recursive))
-
- def open(self, path, mode='rb', buffer_size=None, replication=None,
- default_block_size=None):
- """
- Parameters
- ----------
- mode : string, 'rb', 'wb', 'ab'
- """
- self._ensure_client()
-
- cdef HdfsFile out = HdfsFile()
-
- if mode not in ('rb', 'wb', 'ab'):
- raise Exception("Mode must be 'rb' (read), "
- "'wb' (write, new file), or 'ab' (append)")
-
- cdef c_string c_path = tobytes(path)
- cdef c_bool append = False
-
- # 0 in libhdfs means "use the default"
- cdef int32_t c_buffer_size = buffer_size or 0
- cdef int16_t c_replication = replication or 0
- cdef int64_t c_default_block_size = default_block_size or 0
-
- cdef shared_ptr[HdfsOutputStream] wr_handle
- cdef shared_ptr[HdfsReadableFile] rd_handle
-
- if mode in ('wb', 'ab'):
- if mode == 'ab':
- append = True
-
- with nogil:
- check_status(
- self.client.get()
- .OpenWriteable(c_path, append, c_buffer_size,
- c_replication, c_default_block_size,
- &wr_handle))
-
- out.wr_file = <shared_ptr[OutputStream]> wr_handle
-
- out.is_readable = False
- out.is_writeable = 1
- else:
- with nogil:
- check_status(self.client.get()
- .OpenReadable(c_path, &rd_handle))
-
- out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle
- out.is_readable = True
- out.is_writeable = 0
-
- if c_buffer_size == 0:
- c_buffer_size = 2 ** 16
-
- out.mode = mode
- out.buffer_size = c_buffer_size
- out.parent = _HdfsFileNanny(self, out)
- out.is_open = True
- out.own_file = True
-
- return out
-
- def download(self, path, stream, buffer_size=None):
- with self.open(path, 'rb') as f:
- f.download(stream, buffer_size=buffer_size)
-
- def upload(self, path, stream, buffer_size=None):
- """
- Upload file-like object to HDFS path
- """
- with self.open(path, 'wb') as f:
- f.upload(stream, buffer_size=buffer_size)
-
-
-# ARROW-404: Helper class to ensure that files are closed before the
-# client. During deallocation of the extension class, the attributes are
-# decref'd which can cause the client to get closed first if the file has the
-# last remaining reference
-cdef class _HdfsFileNanny:
- cdef:
- object client
- object file_handle_ref
-
- def __cinit__(self, client, file_handle):
- import weakref
- self.client = client
- self.file_handle_ref = weakref.ref(file_handle)
-
- def __dealloc__(self):
- fh = self.file_handle_ref()
- if fh:
- fh.close()
- # avoid cyclic GC
- self.file_handle_ref = None
- self.client = None
-
-
-cdef class HdfsFile(NativeFile):
- cdef readonly:
- int32_t buffer_size
- object mode
- object parent
-
- cdef object __weakref__
-
- def __dealloc__(self):
- self.parent = None
-
-# ----------------------------------------------------------------------
-# File and stream readers and writers
-
-cdef class _StreamWriter:
- cdef:
- shared_ptr[CStreamWriter] writer
- shared_ptr[OutputStream] sink
- bint closed
-
- def __cinit__(self):
- self.closed = True
-
- def __dealloc__(self):
- if not self.closed:
- self.close()
-
- def _open(self, sink, Schema schema):
- get_writer(sink, &self.sink)
-
- with nogil:
- check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema,
- &self.writer))
-
- self.closed = False
-
- def write_batch(self, RecordBatch batch):
- with nogil:
- check_status(self.writer.get()
- .WriteRecordBatch(deref(batch.batch)))
-
- def close(self):
- with nogil:
- check_status(self.writer.get().Close())
- self.closed = True
-
-
-cdef class _StreamReader:
- cdef:
- shared_ptr[CStreamReader] reader
-
- cdef readonly:
- Schema schema
-
- def __cinit__(self):
- pass
-
- def _open(self, source):
- cdef:
- shared_ptr[RandomAccessFile] reader
- shared_ptr[InputStream] in_stream
-
- get_reader(source, &reader)
- in_stream = <shared_ptr[InputStream]> reader
-
- with nogil:
- check_status(CStreamReader.Open(in_stream, &self.reader))
-
- self.schema = Schema()
- self.schema.init_schema(self.reader.get().schema())
-
- def get_next_batch(self):
- """
- Read next RecordBatch from the stream. Raises StopIteration at end of
- stream
- """
- cdef shared_ptr[CRecordBatch] batch
-
- with nogil:
- check_status(self.reader.get().GetNextRecordBatch(&batch))
-
- if batch.get() == NULL:
- raise StopIteration
-
- return batch_from_cbatch(batch)
-
- def read_all(self):
- """
- Read all record batches as a pyarrow.Table
- """
- cdef:
- vector[shared_ptr[CRecordBatch]] batches
- shared_ptr[CRecordBatch] batch
- shared_ptr[CTable] table
-
- with nogil:
- while True:
- check_status(self.reader.get().GetNextRecordBatch(&batch))
- if batch.get() == NULL:
- break
- batches.push_back(batch)
-
- check_status(CTable.FromRecordBatches(batches, &table))
-
- return table_from_ctable(table)
-
-
-cdef class _FileWriter(_StreamWriter):
-
- def _open(self, sink, Schema schema):
- cdef shared_ptr[CFileWriter] writer
- get_writer(sink, &self.sink)
-
- with nogil:
- check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema,
- &writer))
-
- # Cast to base class, because has same interface
- self.writer = <shared_ptr[CStreamWriter]> writer
- self.closed = False
-
-
-cdef class _FileReader:
- cdef:
- shared_ptr[CFileReader] reader
-
- def __cinit__(self):
- pass
-
- def _open(self, source, footer_offset=None):
- cdef shared_ptr[RandomAccessFile] reader
- get_reader(source, &reader)
-
- cdef int64_t offset = 0
- if footer_offset is not None:
- offset = footer_offset
-
- with nogil:
- if offset != 0:
- check_status(CFileReader.Open2(reader, offset, &self.reader))
- else:
- check_status(CFileReader.Open(reader, &self.reader))
-
- property num_record_batches:
-
- def __get__(self):
- return self.reader.get().num_record_batches()
-
- def get_batch(self, int i):
- cdef shared_ptr[CRecordBatch] batch
-
- if i < 0 or i >= self.num_record_batches:
- raise ValueError('Batch number {0} out of range'.format(i))
-
- with nogil:
- check_status(self.reader.get().GetRecordBatch(i, &batch))
-
- return batch_from_cbatch(batch)
-
- # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
- # time has passed
- get_record_batch = get_batch
-
- def read_all(self):
- """
- Read all record batches as a pyarrow.Table
- """
- cdef:
- vector[shared_ptr[CRecordBatch]] batches
- shared_ptr[CTable] table
- int i, nbatches
-
- nbatches = self.num_record_batches
-
- batches.resize(nbatches)
- with nogil:
- for i in range(nbatches):
- check_status(self.reader.get().GetRecordBatch(i, &batches[i]))
- check_status(CTable.FromRecordBatches(batches, &table))
-
- return table_from_ctable(table)
-
-
-#----------------------------------------------------------------------
-# Implement legacy Feather file format
-
-
-class FeatherError(Exception):
- pass
-
-
-cdef class FeatherWriter:
- cdef:
- unique_ptr[CFeatherWriter] writer
-
- cdef public:
- int64_t num_rows
-
- def __cinit__(self):
- self.num_rows = -1
-
- def open(self, object dest):
- cdef shared_ptr[OutputStream] sink
- get_writer(dest, &sink)
-
- with nogil:
- check_status(CFeatherWriter.Open(sink, &self.writer))
-
- def close(self):
- if self.num_rows < 0:
- self.num_rows = 0
- self.writer.get().SetNumRows(self.num_rows)
- check_status(self.writer.get().Finalize())
-
- def write_array(self, object name, object col, object mask=None):
- cdef Array arr
-
- if self.num_rows >= 0:
- if len(col) != self.num_rows:
- raise ValueError('prior column had a different number of rows')
- else:
- self.num_rows = len(col)
-
- if isinstance(col, Array):
- arr = col
- else:
- arr = Array.from_numpy(col, mask=mask)
-
- cdef c_string c_name = tobytes(name)
-
- with nogil:
- check_status(
- self.writer.get().Append(c_name, deref(arr.sp_array)))
-
-
-cdef class FeatherReader:
- cdef:
- unique_ptr[CFeatherReader] reader
-
- def __cinit__(self):
- pass
-
- def open(self, source):
- cdef shared_ptr[RandomAccessFile] reader
- get_reader(source, &reader)
-
- with nogil:
- check_status(CFeatherReader.Open(reader, &self.reader))
-
- property num_rows:
-
- def __get__(self):
- return self.reader.get().num_rows()
-
- property num_columns:
-
- def __get__(self):
- return self.reader.get().num_columns()
-
- def get_column_name(self, int i):
- cdef c_string name = self.reader.get().GetColumnName(i)
- return frombytes(name)
-
- def get_column(self, int i):
- if i < 0 or i >= self.num_columns:
- raise IndexError(i)
-
- cdef shared_ptr[CColumn] sp_column
- with nogil:
- check_status(self.reader.get()
- .GetColumn(i, &sp_column))
-
- cdef Column col = Column()
- col.init(sp_column)
- return col
-
-
-def get_tensor_size(Tensor tensor):
- """
- Return total size of serialized Tensor including metadata and padding
- """
- cdef int64_t size
- with nogil:
- check_status(GetTensorSize(deref(tensor.tp), &size))
- return size
-
-
-def get_record_batch_size(RecordBatch batch):
- """
- Return total size of serialized RecordBatch including metadata and padding
- """
- cdef int64_t size
- with nogil:
- check_status(GetRecordBatchSize(deref(batch.batch), &size))
- return size
-
-
-def write_tensor(Tensor tensor, NativeFile dest):
- """
- Write pyarrow.Tensor to pyarrow.NativeFile object its current position
-
- Parameters
- ----------
- tensor : pyarrow.Tensor
- dest : pyarrow.NativeFile
-
- Returns
- -------
- bytes_written : int
- Total number of bytes written to the file
- """
- cdef:
- int32_t metadata_length
- int64_t body_length
-
- dest._assert_writeable()
-
- with nogil:
- check_status(
- WriteTensor(deref(tensor.tp), dest.wr_file.get(),
- &metadata_length, &body_length))
-
- return metadata_length + body_length
-
-
-def read_tensor(NativeFile source):
- """
- Read pyarrow.Tensor from pyarrow.NativeFile object from current
- position. If the file source supports zero copy (e.g. a memory map), then
- this operation does not allocate any memory
-
- Parameters
- ----------
- source : pyarrow.NativeFile
-
- Returns
- -------
- tensor : Tensor
- """
- cdef:
- shared_ptr[CTensor] sp_tensor
-
- source._assert_writeable()
-
- cdef int64_t offset = source.tell()
- with nogil:
- check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor))
-
- return box_tensor(sp_tensor)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index 5a56165..f96ead3 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -17,10 +17,10 @@
# Arrow file and stream reader/writer classes, and other messaging tools
-import pyarrow.io as io
+import pyarrow._io as _io
-class StreamReader(io._StreamReader):
+class StreamReader(_io._StreamReader):
"""
Reader for the Arrow streaming binary format
@@ -37,7 +37,7 @@ class StreamReader(io._StreamReader):
yield self.get_next_batch()
-class StreamWriter(io._StreamWriter):
+class StreamWriter(_io._StreamWriter):
"""
Writer for the Arrow streaming binary format
@@ -52,7 +52,7 @@ class StreamWriter(io._StreamWriter):
self._open(sink, schema)
-class FileReader(io._FileReader):
+class FileReader(_io._FileReader):
"""
Class for reading Arrow record batch data from the Arrow binary file format
@@ -68,7 +68,7 @@ class FileReader(io._FileReader):
self._open(source, footer_offset=footer_offset)
-class FileWriter(io._FileWriter):
+class FileWriter(_io._FileWriter):
"""
Writer to create the Arrow binary file format
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/jemalloc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/jemalloc.pyx b/python/pyarrow/jemalloc.pyx
deleted file mode 100644
index 97583f4..0000000
--- a/python/pyarrow/jemalloc.pyx
+++ /dev/null
@@ -1,28 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from pyarrow.includes.libarrow_jemalloc cimport CJemallocMemoryPool
-from pyarrow.memory cimport MemoryPool
-
-def default_pool():
- cdef MemoryPool pool = MemoryPool()
- pool.init(CJemallocMemoryPool.default_pool())
- return pool
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/memory.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/memory.pxd b/python/pyarrow/memory.pxd
deleted file mode 100644
index bb1af85..0000000
--- a/python/pyarrow/memory.pxd
+++ /dev/null
@@ -1,30 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool
-
-
-cdef class MemoryPool:
- cdef:
- CMemoryPool* pool
-
- cdef init(self, CMemoryPool* pool)
-
-cdef class LoggingMemoryPool(MemoryPool):
- pass
-
-cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/memory.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/memory.pyx b/python/pyarrow/memory.pyx
deleted file mode 100644
index 98dbf66..0000000
--- a/python/pyarrow/memory.pyx
+++ /dev/null
@@ -1,52 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool
-from pyarrow.includes.pyarrow cimport set_default_memory_pool, get_memory_pool
-
-cdef class MemoryPool:
- cdef init(self, CMemoryPool* pool):
- self.pool = pool
-
- def bytes_allocated(self):
- return self.pool.bytes_allocated()
-
-cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool):
- if memory_pool is None:
- return get_memory_pool()
- else:
- return memory_pool.pool
-
-cdef class LoggingMemoryPool(MemoryPool):
- pass
-
-def default_pool():
- cdef:
- MemoryPool pool = MemoryPool()
- pool.init(get_memory_pool())
- return pool
-
-def set_default_pool(MemoryPool pool):
- set_default_memory_pool(pool.pool)
-
-def total_allocated_bytes():
- cdef CMemoryPool* pool = get_memory_pool()
- return pool.bytes_allocated()
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index f81b6c2..aaec43a 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -23,8 +23,8 @@ from pyarrow.filesystem import LocalFilesystem
from pyarrow._parquet import (ParquetReader, FileMetaData, # noqa
RowGroupMetaData, Schema, ParquetWriter)
import pyarrow._parquet as _parquet # noqa
-import pyarrow.array as _array
-import pyarrow.table as _table
+import pyarrow._array as _array
+import pyarrow._table as _table
# ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/scalar.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/scalar.pxd b/python/pyarrow/scalar.pxd
deleted file mode 100644
index 62a5664..0000000
--- a/python/pyarrow/scalar.pxd
+++ /dev/null
@@ -1,72 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport *
-
-from pyarrow.schema cimport DataType
-
-
-cdef class Scalar:
- cdef readonly:
- DataType type
-
-
-cdef class NAType(Scalar):
- pass
-
-
-cdef class ArrayValue(Scalar):
- cdef:
- shared_ptr[CArray] sp_array
- int64_t index
-
- cdef void init(self, DataType type,
- const shared_ptr[CArray]& sp_array, int64_t index)
-
- cdef void _set_array(self, const shared_ptr[CArray]& sp_array)
-
-
-cdef class Int8Value(ArrayValue):
- pass
-
-
-cdef class Int64Value(ArrayValue):
- pass
-
-
-cdef class ListValue(ArrayValue):
- cdef readonly:
- DataType value_type
-
- cdef:
- CListArray* ap
-
- cdef getitem(self, int64_t i)
-
-
-cdef class StringValue(ArrayValue):
- pass
-
-
-cdef class FixedSizeBinaryValue(ArrayValue):
- pass
-
-
-cdef object box_scalar(DataType type,
- const shared_ptr[CArray]& sp_array,
- int64_t index)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/scalar.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/scalar.pyx b/python/pyarrow/scalar.pyx
deleted file mode 100644
index 2b6746a..0000000
--- a/python/pyarrow/scalar.pyx
+++ /dev/null
@@ -1,315 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.schema cimport DataType, box_data_type
-
-from pyarrow.compat import frombytes
-import pyarrow.schema as schema
-import decimal
-import datetime
-
-cimport cpython as cp
-
-NA = None
-
-
-cdef _pandas():
- import pandas as pd
- return pd
-
-
-cdef class NAType(Scalar):
-
- def __cinit__(self):
- global NA
- if NA is not None:
- raise Exception('Cannot create multiple NAType instances')
-
- self.type = schema.null()
-
- def __repr__(self):
- return 'NA'
-
- def as_py(self):
- return None
-
-NA = NAType()
-
-cdef class ArrayValue(Scalar):
-
- cdef void init(self, DataType type, const shared_ptr[CArray]& sp_array,
- int64_t index):
- self.type = type
- self.index = index
- self._set_array(sp_array)
-
- cdef void _set_array(self, const shared_ptr[CArray]& sp_array):
- self.sp_array = sp_array
-
- def __repr__(self):
- if hasattr(self, 'as_py'):
- return repr(self.as_py())
- else:
- return super(Scalar, self).__repr__()
-
-
-cdef class BooleanValue(ArrayValue):
-
- def as_py(self):
- cdef CBooleanArray* ap = <CBooleanArray*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class Int8Value(ArrayValue):
-
- def as_py(self):
- cdef CInt8Array* ap = <CInt8Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class UInt8Value(ArrayValue):
-
- def as_py(self):
- cdef CUInt8Array* ap = <CUInt8Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class Int16Value(ArrayValue):
-
- def as_py(self):
- cdef CInt16Array* ap = <CInt16Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class UInt16Value(ArrayValue):
-
- def as_py(self):
- cdef CUInt16Array* ap = <CUInt16Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class Int32Value(ArrayValue):
-
- def as_py(self):
- cdef CInt32Array* ap = <CInt32Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class UInt32Value(ArrayValue):
-
- def as_py(self):
- cdef CUInt32Array* ap = <CUInt32Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class Int64Value(ArrayValue):
-
- def as_py(self):
- cdef CInt64Array* ap = <CInt64Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class UInt64Value(ArrayValue):
-
- def as_py(self):
- cdef CUInt64Array* ap = <CUInt64Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class Date32Value(ArrayValue):
-
- def as_py(self):
- cdef CDate32Array* ap = <CDate32Array*> self.sp_array.get()
-
- # Shift to seconds since epoch
- return datetime.datetime.utcfromtimestamp(
- int(ap.Value(self.index)) * 86400).date()
-
-
-cdef class Date64Value(ArrayValue):
-
- def as_py(self):
- cdef CDate64Array* ap = <CDate64Array*> self.sp_array.get()
- return datetime.datetime.utcfromtimestamp(
- ap.Value(self.index) / 1000).date()
-
-
-cdef class TimestampValue(ArrayValue):
-
- def as_py(self):
- cdef:
- CTimestampArray* ap = <CTimestampArray*> self.sp_array.get()
- CTimestampType* dtype = <CTimestampType*>ap.type().get()
- int64_t val = ap.Value(self.index)
-
- timezone = None
- tzinfo = None
- if dtype.timezone().size() > 0:
- timezone = frombytes(dtype.timezone())
- import pytz
- tzinfo = pytz.timezone(timezone)
-
- try:
- pd = _pandas()
- if dtype.unit() == TimeUnit_SECOND:
- val = val * 1000000000
- elif dtype.unit() == TimeUnit_MILLI:
- val = val * 1000000
- elif dtype.unit() == TimeUnit_MICRO:
- val = val * 1000
- return pd.Timestamp(val, tz=tzinfo)
- except ImportError:
- if dtype.unit() == TimeUnit_SECOND:
- result = datetime.datetime.utcfromtimestamp(val)
- elif dtype.unit() == TimeUnit_MILLI:
- result = datetime.datetime.utcfromtimestamp(float(val) / 1000)
- elif dtype.unit() == TimeUnit_MICRO:
- result = datetime.datetime.utcfromtimestamp(
- float(val) / 1000000)
- else:
- # TimeUnit_NANO
- raise NotImplementedError("Cannot convert nanosecond "
- "timestamps without pandas")
- if timezone is not None:
- result = result.replace(tzinfo=tzinfo)
- return result
-
-
-cdef class FloatValue(ArrayValue):
-
- def as_py(self):
- cdef CFloatArray* ap = <CFloatArray*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class DoubleValue(ArrayValue):
-
- def as_py(self):
- cdef CDoubleArray* ap = <CDoubleArray*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class DecimalValue(ArrayValue):
-
- def as_py(self):
- cdef:
- CDecimalArray* ap = <CDecimalArray*> self.sp_array.get()
- c_string s = ap.FormatValue(self.index)
- return decimal.Decimal(s.decode('utf8'))
-
-
-cdef class StringValue(ArrayValue):
-
- def as_py(self):
- cdef CStringArray* ap = <CStringArray*> self.sp_array.get()
- return ap.GetString(self.index).decode('utf-8')
-
-
-cdef class BinaryValue(ArrayValue):
-
- def as_py(self):
- cdef:
- const uint8_t* ptr
- int32_t length
- CBinaryArray* ap = <CBinaryArray*> self.sp_array.get()
-
- ptr = ap.GetValue(self.index, &length)
- return cp.PyBytes_FromStringAndSize(<const char*>(ptr), length)
-
-
-cdef class ListValue(ArrayValue):
-
- def __len__(self):
- return self.ap.value_length(self.index)
-
- def __getitem__(self, i):
- return self.getitem(i)
-
- def __iter__(self):
- for i in range(len(self)):
- yield self.getitem(i)
- raise StopIteration
-
- cdef void _set_array(self, const shared_ptr[CArray]& sp_array):
- self.sp_array = sp_array
- self.ap = <CListArray*> sp_array.get()
- self.value_type = box_data_type(self.ap.value_type())
-
- cdef getitem(self, int64_t i):
- cdef int64_t j = self.ap.value_offset(self.index) + i
- return box_scalar(self.value_type, self.ap.values(), j)
-
- def as_py(self):
- cdef:
- int64_t j
- list result = []
-
- for j in range(len(self)):
- result.append(self.getitem(j).as_py())
-
- return result
-
-
-cdef class FixedSizeBinaryValue(ArrayValue):
-
- def as_py(self):
- cdef:
- CFixedSizeBinaryArray* ap
- CFixedSizeBinaryType* ap_type
- int32_t length
- const char* data
- ap = <CFixedSizeBinaryArray*> self.sp_array.get()
- ap_type = <CFixedSizeBinaryType*> ap.type().get()
- length = ap_type.byte_width()
- data = <const char*> ap.GetValue(self.index)
- return cp.PyBytes_FromStringAndSize(data, length)
-
-
-
-cdef dict _scalar_classes = {
- Type_BOOL: BooleanValue,
- Type_UINT8: Int8Value,
- Type_UINT16: Int16Value,
- Type_UINT32: Int32Value,
- Type_UINT64: Int64Value,
- Type_INT8: Int8Value,
- Type_INT16: Int16Value,
- Type_INT32: Int32Value,
- Type_INT64: Int64Value,
- Type_DATE32: Date32Value,
- Type_DATE64: Date64Value,
- Type_TIMESTAMP: TimestampValue,
- Type_FLOAT: FloatValue,
- Type_DOUBLE: DoubleValue,
- Type_LIST: ListValue,
- Type_BINARY: BinaryValue,
- Type_STRING: StringValue,
- Type_FIXED_SIZE_BINARY: FixedSizeBinaryValue,
- Type_DECIMAL: DecimalValue,
-}
-
-cdef object box_scalar(DataType type, const shared_ptr[CArray]& sp_array,
- int64_t index):
- cdef ArrayValue val
- if type.type.id() == Type_NA:
- return NA
- elif sp_array.get().IsNull(index):
- return NA
- else:
- val = _scalar_classes[type.type.id()]()
- val.init(type, sp_array, index)
- return val
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/schema.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/schema.pxd b/python/pyarrow/schema.pxd
deleted file mode 100644
index eceedba..0000000
--- a/python/pyarrow/schema.pxd
+++ /dev/null
@@ -1,76 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport (CDataType,
- CDictionaryType,
- CTimestampType,
- CFixedSizeBinaryType,
- CDecimalType,
- CField, CSchema)
-
-cdef class DataType:
- cdef:
- shared_ptr[CDataType] sp_type
- CDataType* type
-
- cdef void init(self, const shared_ptr[CDataType]& type)
-
-
-cdef class DictionaryType(DataType):
- cdef:
- const CDictionaryType* dict_type
-
-
-cdef class TimestampType(DataType):
- cdef:
- const CTimestampType* ts_type
-
-
-cdef class FixedSizeBinaryType(DataType):
- cdef:
- const CFixedSizeBinaryType* fixed_size_binary_type
-
-
-cdef class DecimalType(FixedSizeBinaryType):
- cdef:
- const CDecimalType* decimal_type
-
-
-cdef class Field:
- cdef:
- shared_ptr[CField] sp_field
- CField* field
-
- cdef readonly:
- DataType type
-
- cdef init(self, const shared_ptr[CField]& field)
-
-
-cdef class Schema:
- cdef:
- shared_ptr[CSchema] sp_schema
- CSchema* schema
-
- cdef init(self, const vector[shared_ptr[CField]]& fields)
- cdef init_schema(self, const shared_ptr[CSchema]& schema)
-
-
-cdef DataType box_data_type(const shared_ptr[CDataType]& type)
-cdef Field box_field(const shared_ptr[CField]& field)
-cdef Schema box_schema(const shared_ptr[CSchema]& schema)
[3/4] arrow git commit: ARROW-751: [Python] Make all Cython modules
private. Some code tidying
Posted by uw...@apache.org.
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_io.pyx b/python/pyarrow/_io.pyx
new file mode 100644
index 0000000..9f067fb
--- /dev/null
+++ b/python/pyarrow/_io.pyx
@@ -0,0 +1,1273 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Cython wrappers for IO interfaces defined in arrow::io and messaging in
+# arrow::ipc
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from cython.operator cimport dereference as deref
+from libc.stdlib cimport malloc, free
+from pyarrow.includes.libarrow cimport *
+cimport pyarrow.includes.pyarrow as pyarrow
+from pyarrow._array cimport Array, Tensor, box_tensor, Schema
+from pyarrow._error cimport check_status
+from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
+from pyarrow._table cimport (Column, RecordBatch, batch_from_cbatch,
+ table_from_ctable)
+cimport cpython as cp
+
+import pyarrow._config
+from pyarrow.compat import frombytes, tobytes, encode_file_path
+
+import re
+import six
+import sys
+import threading
+import time
+
+
+# 64K
+DEFAULT_BUFFER_SIZE = 2 ** 16
+
+
+# To let us get a PyObject* and avoid Cython auto-ref-counting
+cdef extern from "Python.h":
+ PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
+ char *v, Py_ssize_t len) except NULL
+
+cdef class NativeFile:
+
+ def __cinit__(self):
+ self.is_open = False
+ self.own_file = False
+
+ def __dealloc__(self):
+ if self.is_open and self.own_file:
+ self.close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ self.close()
+
+ def close(self):
+ if self.is_open:
+ with nogil:
+ if self.is_readable:
+ check_status(self.rd_file.get().Close())
+ else:
+ check_status(self.wr_file.get().Close())
+ self.is_open = False
+
+ cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
+ self._assert_readable()
+ file[0] = <shared_ptr[RandomAccessFile]> self.rd_file
+
+ cdef write_handle(self, shared_ptr[OutputStream]* file):
+ self._assert_writeable()
+ file[0] = <shared_ptr[OutputStream]> self.wr_file
+
+ def _assert_readable(self):
+ if not self.is_readable:
+ raise IOError("only valid on readonly files")
+
+ if not self.is_open:
+ raise IOError("file not open")
+
+ def _assert_writeable(self):
+ if not self.is_writeable:
+ raise IOError("only valid on writeable files")
+
+ if not self.is_open:
+ raise IOError("file not open")
+
+ def size(self):
+ cdef int64_t size
+ self._assert_readable()
+ with nogil:
+ check_status(self.rd_file.get().GetSize(&size))
+ return size
+
+ def tell(self):
+ cdef int64_t position
+ with nogil:
+ if self.is_readable:
+ check_status(self.rd_file.get().Tell(&position))
+ else:
+ check_status(self.wr_file.get().Tell(&position))
+ return position
+
+ def seek(self, int64_t position):
+ self._assert_readable()
+ with nogil:
+ check_status(self.rd_file.get().Seek(position))
+
+ def write(self, data):
+ """
+ Write byte from any object implementing buffer protocol (bytes,
+ bytearray, ndarray, pyarrow.Buffer)
+ """
+ self._assert_writeable()
+
+ if isinstance(data, six.string_types):
+ data = tobytes(data)
+
+ cdef Buffer arrow_buffer = frombuffer(data)
+
+ cdef const uint8_t* buf = arrow_buffer.buffer.get().data()
+ cdef int64_t bufsize = len(arrow_buffer)
+ with nogil:
+ check_status(self.wr_file.get().Write(buf, bufsize))
+
+ def read(self, nbytes=None):
+ cdef:
+ int64_t c_nbytes
+ int64_t bytes_read = 0
+ PyObject* obj
+
+ if nbytes is None:
+ c_nbytes = self.size() - self.tell()
+ else:
+ c_nbytes = nbytes
+
+ self._assert_readable()
+
+ # Allocate empty write space
+ obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes)
+
+ cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
+ with nogil:
+ check_status(self.rd_file.get().Read(c_nbytes, &bytes_read, buf))
+
+ if bytes_read < c_nbytes:
+ cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read)
+
+ return PyObject_to_object(obj)
+
+ def read_buffer(self, nbytes=None):
+ cdef:
+ int64_t c_nbytes
+ int64_t bytes_read = 0
+ shared_ptr[CBuffer] output
+ self._assert_readable()
+
+ if nbytes is None:
+ c_nbytes = self.size() - self.tell()
+ else:
+ c_nbytes = nbytes
+
+ with nogil:
+ check_status(self.rd_file.get().ReadB(c_nbytes, &output))
+
+ return wrap_buffer(output)
+
+ def download(self, stream_or_path, buffer_size=None):
+ """
+ Read file completely to local path (rather than reading completely into
+ memory). First seeks to the beginning of the file.
+ """
+ cdef:
+ int64_t bytes_read = 0
+ uint8_t* buf
+ self._assert_readable()
+
+ buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
+
+ write_queue = Queue(50)
+
+ if not hasattr(stream_or_path, 'read'):
+ stream = open(stream_or_path, 'wb')
+ cleanup = lambda: stream.close()
+ else:
+ stream = stream_or_path
+ cleanup = lambda: None
+
+ done = False
+ exc_info = None
+ def bg_write():
+ try:
+ while not done or write_queue.qsize() > 0:
+ try:
+ buf = write_queue.get(timeout=0.01)
+ except QueueEmpty:
+ continue
+ stream.write(buf)
+ except Exception as e:
+ exc_info = sys.exc_info()
+ finally:
+ cleanup()
+
+ self.seek(0)
+
+ writer_thread = threading.Thread(target=bg_write)
+
+ # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
+ # the passed buffer, so it's hard for us to avoid doubling the memory
+ buf = <uint8_t*> malloc(buffer_size)
+ if buf == NULL:
+ raise MemoryError("Failed to allocate {0} bytes"
+ .format(buffer_size))
+
+ writer_thread.start()
+
+ cdef int64_t total_bytes = 0
+ cdef int32_t c_buffer_size = buffer_size
+
+ try:
+ while True:
+ with nogil:
+ check_status(self.rd_file.get()
+ .Read(c_buffer_size, &bytes_read, buf))
+
+ total_bytes += bytes_read
+
+ # EOF
+ if bytes_read == 0:
+ break
+
+ pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
+ bytes_read)
+
+ write_queue.put_nowait(pybuf)
+ finally:
+ free(buf)
+ done = True
+
+ writer_thread.join()
+ if exc_info is not None:
+ raise exc_info[0], exc_info[1], exc_info[2]
+
+ def upload(self, stream, buffer_size=None):
+ """
+ Pipe file-like object to file
+ """
+ write_queue = Queue(50)
+ self._assert_writeable()
+
+ buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
+
+ done = False
+ exc_info = None
+ def bg_write():
+ try:
+ while not done or write_queue.qsize() > 0:
+ try:
+ buf = write_queue.get(timeout=0.01)
+ except QueueEmpty:
+ continue
+
+ self.write(buf)
+
+ except Exception as e:
+ exc_info = sys.exc_info()
+
+ writer_thread = threading.Thread(target=bg_write)
+ writer_thread.start()
+
+ try:
+ while True:
+ buf = stream.read(buffer_size)
+ if not buf:
+ break
+
+ if writer_thread.is_alive():
+ while write_queue.full():
+ time.sleep(0.01)
+ else:
+ break
+
+ write_queue.put_nowait(buf)
+ finally:
+ done = True
+
+ writer_thread.join()
+ if exc_info is not None:
+ raise exc_info[0], exc_info[1], exc_info[2]
+
+
+# ----------------------------------------------------------------------
+# Python file-like objects
+
+
+cdef class PythonFileInterface(NativeFile):
+ cdef:
+ object handle
+
+ def __cinit__(self, handle, mode='w'):
+ self.handle = handle
+
+ if mode.startswith('w'):
+ self.wr_file.reset(new pyarrow.PyOutputStream(handle))
+ self.is_readable = 0
+ self.is_writeable = 1
+ elif mode.startswith('r'):
+ self.rd_file.reset(new pyarrow.PyReadableFile(handle))
+ self.is_readable = 1
+ self.is_writeable = 0
+ else:
+ raise ValueError('Invalid file mode: {0}'.format(mode))
+
+ self.is_open = True
+
+
+cdef class MemoryMappedFile(NativeFile):
+ """
+ Supports 'r', 'r+w', 'w' modes
+ """
+ cdef:
+ object path
+
+ def __cinit__(self):
+ self.is_open = False
+ self.is_readable = 0
+ self.is_writeable = 0
+
+ @staticmethod
+ def create(path, size):
+ cdef:
+ shared_ptr[CMemoryMappedFile] handle
+ c_string c_path = encode_file_path(path)
+ int64_t c_size = size
+
+ with nogil:
+ check_status(CMemoryMappedFile.Create(c_path, c_size, &handle))
+
+ cdef MemoryMappedFile result = MemoryMappedFile()
+ result.path = path
+ result.is_readable = 1
+ result.is_writeable = 1
+ result.wr_file = <shared_ptr[OutputStream]> handle
+ result.rd_file = <shared_ptr[RandomAccessFile]> handle
+ result.is_open = True
+
+ return result
+
+ def open(self, path, mode='r'):
+ self.path = path
+
+ cdef:
+ FileMode c_mode
+ shared_ptr[CMemoryMappedFile] handle
+ c_string c_path = encode_file_path(path)
+
+ if mode in ('r', 'rb'):
+ c_mode = FileMode_READ
+ self.is_readable = 1
+ elif mode in ('w', 'wb'):
+ c_mode = FileMode_WRITE
+ self.is_writeable = 1
+ elif mode == 'r+w':
+ c_mode = FileMode_READWRITE
+ self.is_readable = 1
+ self.is_writeable = 1
+ else:
+ raise ValueError('Invalid file mode: {0}'.format(mode))
+
+ check_status(CMemoryMappedFile.Open(c_path, c_mode, &handle))
+
+ self.wr_file = <shared_ptr[OutputStream]> handle
+ self.rd_file = <shared_ptr[RandomAccessFile]> handle
+ self.is_open = True
+
+
+def memory_map(path, mode='r'):
+ """
+ Open memory map at file path. Size of the memory map cannot change
+
+ Parameters
+ ----------
+ path : string
+ mode : {'r', 'w'}, default 'r'
+
+ Returns
+ -------
+ mmap : MemoryMappedFile
+ """
+ cdef MemoryMappedFile mmap = MemoryMappedFile()
+ mmap.open(path, mode)
+ return mmap
+
+
+def create_memory_map(path, size):
+ """
+ Create memory map at indicated path of the given size, return open
+ writeable file object
+
+ Parameters
+ ----------
+ path : string
+ size : int
+
+ Returns
+ -------
+ mmap : MemoryMappedFile
+ """
+ return MemoryMappedFile.create(path, size)
+
+
+cdef class OSFile(NativeFile):
+ """
+ Supports 'r', 'w' modes
+ """
+ cdef:
+ object path
+
+ def __cinit__(self, path, mode='r', MemoryPool memory_pool=None):
+ self.path = path
+
+ cdef:
+ FileMode c_mode
+ shared_ptr[Readable] handle
+ c_string c_path = encode_file_path(path)
+
+ self.is_readable = self.is_writeable = 0
+
+ if mode in ('r', 'rb'):
+ self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
+ elif mode in ('w', 'wb'):
+ self._open_writeable(c_path)
+ else:
+ raise ValueError('Invalid file mode: {0}'.format(mode))
+
+ self.is_open = True
+
+ cdef _open_readable(self, c_string path, CMemoryPool* pool):
+ cdef shared_ptr[ReadableFile] handle
+
+ with nogil:
+ check_status(ReadableFile.Open(path, pool, &handle))
+
+ self.is_readable = 1
+ self.rd_file = <shared_ptr[RandomAccessFile]> handle
+
+ cdef _open_writeable(self, c_string path):
+ cdef shared_ptr[FileOutputStream] handle
+
+ with nogil:
+ check_status(FileOutputStream.Open(path, &handle))
+ self.is_writeable = 1
+ self.wr_file = <shared_ptr[OutputStream]> handle
+
+
+# ----------------------------------------------------------------------
+# Arrow buffers
+
+
+cdef class Buffer:
+
+ def __cinit__(self):
+ pass
+
+ cdef init(self, const shared_ptr[CBuffer]& buffer):
+ self.buffer = buffer
+ self.shape[0] = self.size
+ self.strides[0] = <Py_ssize_t>(1)
+
+ def __len__(self):
+ return self.size
+
+ property size:
+
+ def __get__(self):
+ return self.buffer.get().size()
+
+ property parent:
+
+ def __get__(self):
+ cdef shared_ptr[CBuffer] parent_buf = self.buffer.get().parent()
+
+ if parent_buf.get() == NULL:
+ return None
+ else:
+ return wrap_buffer(parent_buf)
+
+ def __getitem__(self, key):
+ # TODO(wesm): buffer slicing
+ raise NotImplementedError
+
+ def to_pybytes(self):
+ return cp.PyBytes_FromStringAndSize(
+ <const char*>self.buffer.get().data(),
+ self.buffer.get().size())
+
+ def __getbuffer__(self, cp.Py_buffer* buffer, int flags):
+
+ buffer.buf = <char *>self.buffer.get().data()
+ buffer.format = 'b'
+ buffer.internal = NULL
+ buffer.itemsize = 1
+ buffer.len = self.size
+ buffer.ndim = 1
+ buffer.obj = self
+ buffer.readonly = 1
+ buffer.shape = self.shape
+ buffer.strides = self.strides
+ buffer.suboffsets = NULL
+
+cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool):
+ cdef shared_ptr[PoolBuffer] result
+ result.reset(new PoolBuffer(pool))
+ return result
+
+
+cdef class InMemoryOutputStream(NativeFile):
+
+ cdef:
+ shared_ptr[PoolBuffer] buffer
+
+ def __cinit__(self, MemoryPool memory_pool=None):
+ self.buffer = allocate_buffer(maybe_unbox_memory_pool(memory_pool))
+ self.wr_file.reset(new BufferOutputStream(
+ <shared_ptr[ResizableBuffer]> self.buffer))
+ self.is_readable = 0
+ self.is_writeable = 1
+ self.is_open = True
+
+ def get_result(self):
+ check_status(self.wr_file.get().Close())
+ self.is_open = False
+ return wrap_buffer(<shared_ptr[CBuffer]> self.buffer)
+
+
+cdef class BufferReader(NativeFile):
+ """
+ Zero-copy reader from objects convertible to Arrow buffer
+
+ Parameters
+ ----------
+ obj : Python bytes or pyarrow.io.Buffer
+ """
+ cdef:
+ Buffer buffer
+
+ def __cinit__(self, object obj):
+
+ if isinstance(obj, Buffer):
+ self.buffer = obj
+ else:
+ self.buffer = frombuffer(obj)
+
+ self.rd_file.reset(new CBufferReader(self.buffer.buffer))
+ self.is_readable = 1
+ self.is_writeable = 0
+ self.is_open = True
+
+
+def frombuffer(object obj):
+ """
+ Construct an Arrow buffer from a Python bytes object
+ """
+ cdef shared_ptr[CBuffer] buf
+ try:
+ memoryview(obj)
+ buf.reset(new pyarrow.PyBuffer(obj))
+ return wrap_buffer(buf)
+ except TypeError:
+ raise ValueError('Must pass object that implements buffer protocol')
+
+
+
+cdef Buffer wrap_buffer(const shared_ptr[CBuffer]& buf):
+ cdef Buffer result = Buffer()
+ result.init(buf)
+ return result
+
+
+cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader):
+ cdef NativeFile nf
+
+ if isinstance(source, six.string_types):
+ source = memory_map(source, mode='r')
+ elif isinstance(source, Buffer):
+ source = BufferReader(source)
+ elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
+ # Optimistically hope this is file-like
+ source = PythonFileInterface(source, mode='r')
+
+ if isinstance(source, NativeFile):
+ nf = source
+
+ # TODO: what about read-write sources (e.g. memory maps)
+ if not nf.is_readable:
+ raise IOError('Native file is not readable')
+
+ nf.read_handle(reader)
+ else:
+ raise TypeError('Unable to read from object of type: {0}'
+ .format(type(source)))
+
+
+cdef get_writer(object source, shared_ptr[OutputStream]* writer):
+ cdef NativeFile nf
+
+ if isinstance(source, six.string_types):
+ source = OSFile(source, mode='w')
+ elif not isinstance(source, NativeFile) and hasattr(source, 'write'):
+ # Optimistically hope this is file-like
+ source = PythonFileInterface(source, mode='w')
+
+ if isinstance(source, NativeFile):
+ nf = source
+
+ if nf.is_readable:
+ raise IOError('Native file is not writeable')
+
+ nf.write_handle(writer)
+ else:
+ raise TypeError('Unable to read from object of type: {0}'
+ .format(type(source)))
+
+# ----------------------------------------------------------------------
+# HDFS IO implementation
+
+_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)')
+
+try:
+ # Python 3
+ from queue import Queue, Empty as QueueEmpty, Full as QueueFull
+except ImportError:
+ from Queue import Queue, Empty as QueueEmpty, Full as QueueFull
+
+
+def have_libhdfs():
+ try:
+ check_status(HaveLibHdfs())
+ return True
+ except:
+ return False
+
+
+def have_libhdfs3():
+ try:
+ check_status(HaveLibHdfs3())
+ return True
+ except:
+ return False
+
+
+def strip_hdfs_abspath(path):
+ m = _HDFS_PATH_RE.match(path)
+ if m:
+ return m.group(3)
+ else:
+ return path
+
+
+cdef class _HdfsClient:
+ cdef:
+ shared_ptr[CHdfsClient] client
+
+ cdef readonly:
+ bint is_open
+
+ def __cinit__(self):
+ pass
+
+ def _connect(self, host, port, user, kerb_ticket, driver):
+ cdef HdfsConnectionConfig conf
+
+ if host is not None:
+ conf.host = tobytes(host)
+ conf.port = port
+ if user is not None:
+ conf.user = tobytes(user)
+ if kerb_ticket is not None:
+ conf.kerb_ticket = tobytes(kerb_ticket)
+
+ if driver == 'libhdfs':
+ check_status(HaveLibHdfs())
+ conf.driver = HdfsDriver_LIBHDFS
+ else:
+ check_status(HaveLibHdfs3())
+ conf.driver = HdfsDriver_LIBHDFS3
+
+ with nogil:
+ check_status(CHdfsClient.Connect(&conf, &self.client))
+ self.is_open = True
+
+ @classmethod
+ def connect(cls, *args, **kwargs):
+ return cls(*args, **kwargs)
+
+ def __dealloc__(self):
+ if self.is_open:
+ self.close()
+
+ def close(self):
+ """
+ Disconnect from the HDFS cluster
+ """
+ self._ensure_client()
+ with nogil:
+ check_status(self.client.get().Disconnect())
+ self.is_open = False
+
+ cdef _ensure_client(self):
+ if self.client.get() == NULL:
+ raise IOError('HDFS client improperly initialized')
+ elif not self.is_open:
+ raise IOError('HDFS client is closed')
+
+ def exists(self, path):
+ """
+ Returns True if the path is known to the cluster, False if it does not
+ (or there is an RPC error)
+ """
+ self._ensure_client()
+
+ cdef c_string c_path = tobytes(path)
+ cdef c_bool result
+ with nogil:
+ result = self.client.get().Exists(c_path)
+ return result
+
+ def isdir(self, path):
+ cdef HdfsPathInfo info
+ self._path_info(path, &info)
+ return info.kind == ObjectType_DIRECTORY
+
+ def isfile(self, path):
+ cdef HdfsPathInfo info
+ self._path_info(path, &info)
+ return info.kind == ObjectType_FILE
+
+ cdef _path_info(self, path, HdfsPathInfo* info):
+ cdef c_string c_path = tobytes(path)
+
+ with nogil:
+ check_status(self.client.get()
+ .GetPathInfo(c_path, info))
+
+
+ def ls(self, path, bint full_info):
+ cdef:
+ c_string c_path = tobytes(path)
+ vector[HdfsPathInfo] listing
+ list results = []
+ int i
+
+ self._ensure_client()
+
+ with nogil:
+ check_status(self.client.get()
+ .ListDirectory(c_path, &listing))
+
+ cdef const HdfsPathInfo* info
+ for i in range(<int> listing.size()):
+ info = &listing[i]
+
+ # Try to trim off the hdfs://HOST:PORT piece
+ name = strip_hdfs_abspath(frombytes(info.name))
+
+ if full_info:
+ kind = ('file' if info.kind == ObjectType_FILE
+ else 'directory')
+
+ results.append({
+ 'kind': kind,
+ 'name': name,
+ 'owner': frombytes(info.owner),
+ 'group': frombytes(info.group),
+ 'list_modified_time': info.last_modified_time,
+ 'list_access_time': info.last_access_time,
+ 'size': info.size,
+ 'replication': info.replication,
+ 'block_size': info.block_size,
+ 'permissions': info.permissions
+ })
+ else:
+ results.append(name)
+
+ return results
+
+ def mkdir(self, path):
+ """
+ Create indicated directory and any necessary parent directories
+ """
+ self._ensure_client()
+
+ cdef c_string c_path = tobytes(path)
+ with nogil:
+ check_status(self.client.get()
+ .CreateDirectory(c_path))
+
+ def delete(self, path, bint recursive=False):
+ """
+ Delete the indicated file or directory
+
+ Parameters
+ ----------
+ path : string
+ recursive : boolean, default False
+ If True, also delete child paths for directories
+ """
+ self._ensure_client()
+
+ cdef c_string c_path = tobytes(path)
+ with nogil:
+ check_status(self.client.get()
+ .Delete(c_path, recursive))
+
+ def open(self, path, mode='rb', buffer_size=None, replication=None,
+ default_block_size=None):
+ """
+ Parameters
+ ----------
+ mode : string, 'rb', 'wb', 'ab'
+ """
+ self._ensure_client()
+
+ cdef HdfsFile out = HdfsFile()
+
+ if mode not in ('rb', 'wb', 'ab'):
+ raise Exception("Mode must be 'rb' (read), "
+ "'wb' (write, new file), or 'ab' (append)")
+
+ cdef c_string c_path = tobytes(path)
+ cdef c_bool append = False
+
+ # 0 in libhdfs means "use the default"
+ cdef int32_t c_buffer_size = buffer_size or 0
+ cdef int16_t c_replication = replication or 0
+ cdef int64_t c_default_block_size = default_block_size or 0
+
+ cdef shared_ptr[HdfsOutputStream] wr_handle
+ cdef shared_ptr[HdfsReadableFile] rd_handle
+
+ if mode in ('wb', 'ab'):
+ if mode == 'ab':
+ append = True
+
+ with nogil:
+ check_status(
+ self.client.get()
+ .OpenWriteable(c_path, append, c_buffer_size,
+ c_replication, c_default_block_size,
+ &wr_handle))
+
+ out.wr_file = <shared_ptr[OutputStream]> wr_handle
+
+ out.is_readable = False
+ out.is_writeable = 1
+ else:
+ with nogil:
+ check_status(self.client.get()
+ .OpenReadable(c_path, &rd_handle))
+
+ out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle
+ out.is_readable = True
+ out.is_writeable = 0
+
+ if c_buffer_size == 0:
+ c_buffer_size = 2 ** 16
+
+ out.mode = mode
+ out.buffer_size = c_buffer_size
+ out.parent = _HdfsFileNanny(self, out)
+ out.is_open = True
+ out.own_file = True
+
+ return out
+
+ def download(self, path, stream, buffer_size=None):
+ with self.open(path, 'rb') as f:
+ f.download(stream, buffer_size=buffer_size)
+
+ def upload(self, path, stream, buffer_size=None):
+ """
+ Upload file-like object to HDFS path
+ """
+ with self.open(path, 'wb') as f:
+ f.upload(stream, buffer_size=buffer_size)
+
+
+# ARROW-404: Helper class to ensure that files are closed before the
+# client. During deallocation of the extension class, the attributes are
+# decref'd which can cause the client to get closed first if the file has the
+# last remaining reference
+cdef class _HdfsFileNanny:
+ cdef:
+ object client
+ object file_handle_ref
+
+ def __cinit__(self, client, file_handle):
+ import weakref
+ self.client = client
+ self.file_handle_ref = weakref.ref(file_handle)
+
+ def __dealloc__(self):
+ fh = self.file_handle_ref()
+ if fh:
+ fh.close()
+ # avoid cyclic GC
+ self.file_handle_ref = None
+ self.client = None
+
+
+cdef class HdfsFile(NativeFile):
+ cdef readonly:
+ int32_t buffer_size
+ object mode
+ object parent
+
+ cdef object __weakref__
+
+ def __dealloc__(self):
+ self.parent = None
+
+# ----------------------------------------------------------------------
+# File and stream readers and writers
+
+cdef class _StreamWriter:
+ cdef:
+ shared_ptr[CStreamWriter] writer
+ shared_ptr[OutputStream] sink
+ bint closed
+
+ def __cinit__(self):
+ self.closed = True
+
+ def __dealloc__(self):
+ if not self.closed:
+ self.close()
+
+ def _open(self, sink, Schema schema):
+ get_writer(sink, &self.sink)
+
+ with nogil:
+ check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema,
+ &self.writer))
+
+ self.closed = False
+
+ def write_batch(self, RecordBatch batch):
+ with nogil:
+ check_status(self.writer.get()
+ .WriteRecordBatch(deref(batch.batch)))
+
+ def close(self):
+ with nogil:
+ check_status(self.writer.get().Close())
+ self.closed = True
+
+
+cdef class _StreamReader:
+ cdef:
+ shared_ptr[CStreamReader] reader
+
+ cdef readonly:
+ Schema schema
+
+ def __cinit__(self):
+ pass
+
+ def _open(self, source):
+ cdef:
+ shared_ptr[RandomAccessFile] reader
+ shared_ptr[InputStream] in_stream
+
+ get_reader(source, &reader)
+ in_stream = <shared_ptr[InputStream]> reader
+
+ with nogil:
+ check_status(CStreamReader.Open(in_stream, &self.reader))
+
+ self.schema = Schema()
+ self.schema.init_schema(self.reader.get().schema())
+
+ def get_next_batch(self):
+ """
+ Read next RecordBatch from the stream. Raises StopIteration at end of
+ stream
+ """
+ cdef shared_ptr[CRecordBatch] batch
+
+ with nogil:
+ check_status(self.reader.get().GetNextRecordBatch(&batch))
+
+ if batch.get() == NULL:
+ raise StopIteration
+
+ return batch_from_cbatch(batch)
+
+ def read_all(self):
+ """
+ Read all record batches as a pyarrow.Table
+ """
+ cdef:
+ vector[shared_ptr[CRecordBatch]] batches
+ shared_ptr[CRecordBatch] batch
+ shared_ptr[CTable] table
+
+ with nogil:
+ while True:
+ check_status(self.reader.get().GetNextRecordBatch(&batch))
+ if batch.get() == NULL:
+ break
+ batches.push_back(batch)
+
+ check_status(CTable.FromRecordBatches(batches, &table))
+
+ return table_from_ctable(table)
+
+
+cdef class _FileWriter(_StreamWriter):
+
+ def _open(self, sink, Schema schema):
+ cdef shared_ptr[CFileWriter] writer
+ get_writer(sink, &self.sink)
+
+ with nogil:
+ check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema,
+ &writer))
+
+ # Cast to base class, because has same interface
+ self.writer = <shared_ptr[CStreamWriter]> writer
+ self.closed = False
+
+
+cdef class _FileReader:
+ cdef:
+ shared_ptr[CFileReader] reader
+
+ def __cinit__(self):
+ pass
+
+ def _open(self, source, footer_offset=None):
+ cdef shared_ptr[RandomAccessFile] reader
+ get_reader(source, &reader)
+
+ cdef int64_t offset = 0
+ if footer_offset is not None:
+ offset = footer_offset
+
+ with nogil:
+ if offset != 0:
+ check_status(CFileReader.Open2(reader, offset, &self.reader))
+ else:
+ check_status(CFileReader.Open(reader, &self.reader))
+
+ property num_record_batches:
+
+ def __get__(self):
+ return self.reader.get().num_record_batches()
+
+ def get_batch(self, int i):
+ cdef shared_ptr[CRecordBatch] batch
+
+ if i < 0 or i >= self.num_record_batches:
+ raise ValueError('Batch number {0} out of range'.format(i))
+
+ with nogil:
+ check_status(self.reader.get().GetRecordBatch(i, &batch))
+
+ return batch_from_cbatch(batch)
+
+ # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
+ # time has passed
+ get_record_batch = get_batch
+
+ def read_all(self):
+ """
+ Read all record batches as a pyarrow.Table
+ """
+ cdef:
+ vector[shared_ptr[CRecordBatch]] batches
+ shared_ptr[CTable] table
+ int i, nbatches
+
+ nbatches = self.num_record_batches
+
+ batches.resize(nbatches)
+ with nogil:
+ for i in range(nbatches):
+ check_status(self.reader.get().GetRecordBatch(i, &batches[i]))
+ check_status(CTable.FromRecordBatches(batches, &table))
+
+ return table_from_ctable(table)
+
+
+#----------------------------------------------------------------------
+# Implement legacy Feather file format
+
+
+class FeatherError(Exception):
+ pass
+
+
+cdef class FeatherWriter:
+ cdef:
+ unique_ptr[CFeatherWriter] writer
+
+ cdef public:
+ int64_t num_rows
+
+ def __cinit__(self):
+ self.num_rows = -1
+
+ def open(self, object dest):
+ cdef shared_ptr[OutputStream] sink
+ get_writer(dest, &sink)
+
+ with nogil:
+ check_status(CFeatherWriter.Open(sink, &self.writer))
+
+ def close(self):
+ if self.num_rows < 0:
+ self.num_rows = 0
+ self.writer.get().SetNumRows(self.num_rows)
+ check_status(self.writer.get().Finalize())
+
+ def write_array(self, object name, object col, object mask=None):
+ cdef Array arr
+
+ if self.num_rows >= 0:
+ if len(col) != self.num_rows:
+ raise ValueError('prior column had a different number of rows')
+ else:
+ self.num_rows = len(col)
+
+ if isinstance(col, Array):
+ arr = col
+ else:
+ arr = Array.from_numpy(col, mask=mask)
+
+ cdef c_string c_name = tobytes(name)
+
+ with nogil:
+ check_status(
+ self.writer.get().Append(c_name, deref(arr.sp_array)))
+
+
+cdef class FeatherReader:
+ cdef:
+ unique_ptr[CFeatherReader] reader
+
+ def __cinit__(self):
+ pass
+
+ def open(self, source):
+ cdef shared_ptr[RandomAccessFile] reader
+ get_reader(source, &reader)
+
+ with nogil:
+ check_status(CFeatherReader.Open(reader, &self.reader))
+
+ property num_rows:
+
+ def __get__(self):
+ return self.reader.get().num_rows()
+
+ property num_columns:
+
+ def __get__(self):
+ return self.reader.get().num_columns()
+
+ def get_column_name(self, int i):
+ cdef c_string name = self.reader.get().GetColumnName(i)
+ return frombytes(name)
+
+ def get_column(self, int i):
+ if i < 0 or i >= self.num_columns:
+ raise IndexError(i)
+
+ cdef shared_ptr[CColumn] sp_column
+ with nogil:
+ check_status(self.reader.get()
+ .GetColumn(i, &sp_column))
+
+ cdef Column col = Column()
+ col.init(sp_column)
+ return col
+
+
+def get_tensor_size(Tensor tensor):
+ """
+ Return total size of serialized Tensor including metadata and padding
+ """
+ cdef int64_t size
+ with nogil:
+ check_status(GetTensorSize(deref(tensor.tp), &size))
+ return size
+
+
+def get_record_batch_size(RecordBatch batch):
+ """
+ Return total size of serialized RecordBatch including metadata and padding
+ """
+ cdef int64_t size
+ with nogil:
+ check_status(GetRecordBatchSize(deref(batch.batch), &size))
+ return size
+
+
+def write_tensor(Tensor tensor, NativeFile dest):
+ """
+ Write pyarrow.Tensor to pyarrow.NativeFile object its current position
+
+ Parameters
+ ----------
+ tensor : pyarrow.Tensor
+ dest : pyarrow.NativeFile
+
+ Returns
+ -------
+ bytes_written : int
+ Total number of bytes written to the file
+ """
+ cdef:
+ int32_t metadata_length
+ int64_t body_length
+
+ dest._assert_writeable()
+
+ with nogil:
+ check_status(
+ WriteTensor(deref(tensor.tp), dest.wr_file.get(),
+ &metadata_length, &body_length))
+
+ return metadata_length + body_length
+
+
+def read_tensor(NativeFile source):
+ """
+ Read pyarrow.Tensor from pyarrow.NativeFile object from current
+ position. If the file source supports zero copy (e.g. a memory map), then
+ this operation does not allocate any memory
+
+ Parameters
+ ----------
+ source : pyarrow.NativeFile
+
+ Returns
+ -------
+ tensor : Tensor
+ """
+ cdef:
+ shared_ptr[CTensor] sp_tensor
+
+ source._assert_writeable()
+
+ cdef int64_t offset = source.tell()
+ with nogil:
+ check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor))
+
+ return box_tensor(sp_tensor)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_jemalloc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_jemalloc.pyx b/python/pyarrow/_jemalloc.pyx
new file mode 100644
index 0000000..3b41964
--- /dev/null
+++ b/python/pyarrow/_jemalloc.pyx
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from pyarrow.includes.libarrow_jemalloc cimport CJemallocMemoryPool
+from pyarrow._memory cimport MemoryPool
+
+def default_pool():
+ cdef MemoryPool pool = MemoryPool()
+ pool.init(CJemallocMemoryPool.default_pool())
+ return pool
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_memory.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_memory.pxd b/python/pyarrow/_memory.pxd
new file mode 100644
index 0000000..bb1af85
--- /dev/null
+++ b/python/pyarrow/_memory.pxd
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool
+
+
+cdef class MemoryPool:
+ cdef:
+ CMemoryPool* pool
+
+ cdef init(self, CMemoryPool* pool)
+
+cdef class LoggingMemoryPool(MemoryPool):
+ pass
+
+cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_memory.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_memory.pyx b/python/pyarrow/_memory.pyx
new file mode 100644
index 0000000..98dbf66
--- /dev/null
+++ b/python/pyarrow/_memory.pyx
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool
+from pyarrow.includes.pyarrow cimport set_default_memory_pool, get_memory_pool
+
+cdef class MemoryPool:
+ cdef init(self, CMemoryPool* pool):
+ self.pool = pool
+
+ def bytes_allocated(self):
+ return self.pool.bytes_allocated()
+
+cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool):
+ if memory_pool is None:
+ return get_memory_pool()
+ else:
+ return memory_pool.pool
+
+cdef class LoggingMemoryPool(MemoryPool):
+ pass
+
+def default_pool():
+ cdef:
+ MemoryPool pool = MemoryPool()
+ pool.init(get_memory_pool())
+ return pool
+
+def set_default_pool(MemoryPool pool):
+ set_default_memory_pool(pool.pool)
+
+def total_allocated_bytes():
+ cdef CMemoryPool* pool = get_memory_pool()
+ return pool.bytes_allocated()
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 079bf5e..5418e1d 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -20,20 +20,18 @@
# cython: embedsignature = True
from cython.operator cimport dereference as deref
-
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
cimport pyarrow.includes.pyarrow as pyarrow
+from pyarrow._array cimport Array
+from pyarrow._error cimport check_status
+from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
+from pyarrow._table cimport Table, table_from_ctable
+from pyarrow._io cimport NativeFile, get_reader, get_writer
-from pyarrow.array cimport Array
from pyarrow.compat import tobytes, frombytes
-from pyarrow.error import ArrowException
-from pyarrow.error cimport check_status
-from pyarrow.io import NativeFile
-from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool
-from pyarrow.table cimport Table, table_from_ctable
-
-from pyarrow.io cimport NativeFile, get_reader, get_writer
+from pyarrow._error import ArrowException
+from pyarrow._io import NativeFile
import six
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_table.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_table.pxd b/python/pyarrow/_table.pxd
new file mode 100644
index 0000000..e61e90d
--- /dev/null
+++ b/python/pyarrow/_table.pxd
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.includes.common cimport shared_ptr
+from pyarrow.includes.libarrow cimport (CChunkedArray, CColumn, CTable,
+ CRecordBatch)
+from pyarrow._array cimport Schema
+
+
+cdef class ChunkedArray:
+ cdef:
+ shared_ptr[CChunkedArray] sp_chunked_array
+ CChunkedArray* chunked_array
+
+ cdef init(self, const shared_ptr[CChunkedArray]& chunked_array)
+ cdef _check_nullptr(self)
+
+
+cdef class Column:
+ cdef:
+ shared_ptr[CColumn] sp_column
+ CColumn* column
+
+ cdef init(self, const shared_ptr[CColumn]& column)
+ cdef _check_nullptr(self)
+
+
+cdef class Table:
+ cdef:
+ shared_ptr[CTable] sp_table
+ CTable* table
+
+ cdef init(self, const shared_ptr[CTable]& table)
+ cdef _check_nullptr(self)
+
+
+cdef class RecordBatch:
+ cdef:
+ shared_ptr[CRecordBatch] sp_batch
+ CRecordBatch* batch
+ Schema _schema
+
+ cdef init(self, const shared_ptr[CRecordBatch]& table)
+ cdef _check_nullptr(self)
+
+cdef object box_column(const shared_ptr[CColumn]& ccolumn)
+cdef api object table_from_ctable(const shared_ptr[CTable]& ctable)
+cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/_table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_table.pyx b/python/pyarrow/_table.pyx
new file mode 100644
index 0000000..6558b2e
--- /dev/null
+++ b/python/pyarrow/_table.pyx
@@ -0,0 +1,913 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from cython.operator cimport dereference as deref
+
+from pyarrow.includes.libarrow cimport *
+from pyarrow.includes.common cimport *
+cimport pyarrow.includes.pyarrow as pyarrow
+from pyarrow._array cimport (Array, box_array, wrap_array_output,
+ box_data_type, box_schema, DataType, Field)
+from pyarrow._error cimport check_status
+cimport cpython
+
+import pyarrow._config
+from pyarrow._error import ArrowException
+from pyarrow._array import field
+from pyarrow.compat import frombytes, tobytes
+
+
+from collections import OrderedDict
+
+
+cdef _pandas():
+ import pandas as pd
+ return pd
+
+
+cdef class ChunkedArray:
+ """
+ Array backed via one or more memory chunks.
+
+ Warning
+ -------
+ Do not call this class's constructor directly.
+ """
+
+ def __cinit__(self):
+ self.chunked_array = NULL
+
+ cdef init(self, const shared_ptr[CChunkedArray]& chunked_array):
+ self.sp_chunked_array = chunked_array
+ self.chunked_array = chunked_array.get()
+
+ cdef _check_nullptr(self):
+ if self.chunked_array == NULL:
+ raise ReferenceError("ChunkedArray object references a NULL "
+ "pointer. Not initialized.")
+
+ def length(self):
+ self._check_nullptr()
+ return self.chunked_array.length()
+
+ def __len__(self):
+ return self.length()
+
+ @property
+ def null_count(self):
+ """
+ Number of null entires
+
+ Returns
+ -------
+ int
+ """
+ self._check_nullptr()
+ return self.chunked_array.null_count()
+
+ @property
+ def num_chunks(self):
+ """
+ Number of underlying chunks
+
+ Returns
+ -------
+ int
+ """
+ self._check_nullptr()
+ return self.chunked_array.num_chunks()
+
+ def chunk(self, i):
+ """
+ Select a chunk by its index
+
+ Parameters
+ ----------
+ i : int
+
+ Returns
+ -------
+ pyarrow.array.Array
+ """
+ self._check_nullptr()
+ return box_array(self.chunked_array.chunk(i))
+
+ def iterchunks(self):
+ for i in range(self.num_chunks):
+ yield self.chunk(i)
+
+ def to_pylist(self):
+ """
+ Convert to a list of native Python objects.
+ """
+ result = []
+ for i in range(self.num_chunks):
+ result += self.chunk(i).to_pylist()
+ return result
+
+
+cdef class Column:
+ """
+ Named vector of elements of equal type.
+
+ Warning
+ -------
+ Do not call this class's constructor directly.
+ """
+
+ def __cinit__(self):
+ self.column = NULL
+
+ cdef init(self, const shared_ptr[CColumn]& column):
+ self.sp_column = column
+ self.column = column.get()
+
+ @staticmethod
+ def from_array(object field_or_name, Array arr):
+ cdef Field boxed_field
+
+ if isinstance(field_or_name, Field):
+ boxed_field = field_or_name
+ else:
+ boxed_field = field(field_or_name, arr.type)
+
+ cdef shared_ptr[CColumn] sp_column
+ sp_column.reset(new CColumn(boxed_field.sp_field, arr.sp_array))
+ return box_column(sp_column)
+
+ def to_pandas(self):
+ """
+ Convert the arrow::Column to a pandas.Series
+
+ Returns
+ -------
+ pandas.Series
+ """
+ cdef:
+ PyObject* out
+
+ check_status(pyarrow.ConvertColumnToPandas(self.sp_column,
+ <PyObject*> self, &out))
+
+ return _pandas().Series(wrap_array_output(out), name=self.name)
+
+ def equals(self, Column other):
+ """
+ Check if contents of two columns are equal
+
+ Parameters
+ ----------
+ other : pyarrow.Column
+
+ Returns
+ -------
+ are_equal : boolean
+ """
+ cdef:
+ CColumn* my_col = self.column
+ CColumn* other_col = other.column
+ c_bool result
+
+ self._check_nullptr()
+ other._check_nullptr()
+
+ with nogil:
+ result = my_col.Equals(deref(other_col))
+
+ return result
+
+ def to_pylist(self):
+ """
+ Convert to a list of native Python objects.
+ """
+ return self.data.to_pylist()
+
+ cdef _check_nullptr(self):
+ if self.column == NULL:
+ raise ReferenceError("Column object references a NULL pointer."
+ "Not initialized.")
+
+ def __len__(self):
+ self._check_nullptr()
+ return self.column.length()
+
+ def length(self):
+ self._check_nullptr()
+ return self.column.length()
+
+ @property
+ def shape(self):
+ """
+ Dimensions of this columns
+
+ Returns
+ -------
+ (int,)
+ """
+ self._check_nullptr()
+ return (self.length(),)
+
+ @property
+ def null_count(self):
+ """
+ Number of null entires
+
+ Returns
+ -------
+ int
+ """
+ self._check_nullptr()
+ return self.column.null_count()
+
+ @property
+ def name(self):
+ """
+ Label of the column
+
+ Returns
+ -------
+ str
+ """
+ return bytes(self.column.name()).decode('utf8')
+
+ @property
+ def type(self):
+ """
+ Type information for this column
+
+ Returns
+ -------
+ pyarrow.schema.DataType
+ """
+ return box_data_type(self.column.type())
+
+ @property
+ def data(self):
+ """
+ The underlying data
+
+ Returns
+ -------
+ pyarrow.table.ChunkedArray
+ """
+ cdef ChunkedArray chunked_array = ChunkedArray()
+ chunked_array.init(self.column.data())
+ return chunked_array
+
+
+cdef _schema_from_arrays(arrays, names, shared_ptr[CSchema]* schema):
+ cdef:
+ Array arr
+ Column col
+ c_string c_name
+ vector[shared_ptr[CField]] fields
+ cdef shared_ptr[CDataType] type_
+
+ cdef int K = len(arrays)
+
+ fields.resize(K)
+
+ if len(arrays) == 0:
+ raise ValueError('Must pass at least one array')
+
+ if isinstance(arrays[0], Array):
+ if names is None:
+ raise ValueError('Must pass names when constructing '
+ 'from Array objects')
+ for i in range(K):
+ arr = arrays[i]
+ type_ = arr.type.sp_type
+ c_name = tobytes(names[i])
+ fields[i].reset(new CField(c_name, type_, True))
+ elif isinstance(arrays[0], Column):
+ for i in range(K):
+ col = arrays[i]
+ type_ = col.sp_column.get().type()
+ c_name = tobytes(col.name)
+ fields[i].reset(new CField(c_name, type_, True))
+ else:
+ raise TypeError(type(arrays[0]))
+
+ schema.reset(new CSchema(fields))
+
+
+
+cdef _dataframe_to_arrays(df, timestamps_to_ms, Schema schema):
+ cdef:
+ list names = []
+ list arrays = []
+ DataType type = None
+
+ for name in df.columns:
+ col = df[name]
+ if schema is not None:
+ type = schema.field_by_name(name).type
+
+ arr = Array.from_numpy(col, type=type,
+ timestamps_to_ms=timestamps_to_ms)
+ names.append(name)
+ arrays.append(arr)
+
+ return names, arrays
+
+
+cdef class RecordBatch:
+ """
+ Batch of rows of columns of equal length
+
+ Warning
+ -------
+ Do not call this class's constructor directly, use one of the ``from_*``
+ methods instead.
+ """
+
+ def __cinit__(self):
+ self.batch = NULL
+ self._schema = None
+
+ cdef init(self, const shared_ptr[CRecordBatch]& batch):
+ self.sp_batch = batch
+ self.batch = batch.get()
+
+ cdef _check_nullptr(self):
+ if self.batch == NULL:
+ raise ReferenceError("Object not initialized")
+
+ def __len__(self):
+ self._check_nullptr()
+ return self.batch.num_rows()
+
+ @property
+ def num_columns(self):
+ """
+ Number of columns
+
+ Returns
+ -------
+ int
+ """
+ self._check_nullptr()
+ return self.batch.num_columns()
+
+ @property
+ def num_rows(self):
+ """
+ Number of rows
+
+ Due to the definition of a RecordBatch, all columns have the same
+ number of rows.
+
+ Returns
+ -------
+ int
+ """
+ return len(self)
+
+ @property
+ def schema(self):
+ """
+ Schema of the RecordBatch and its columns
+
+ Returns
+ -------
+ pyarrow.schema.Schema
+ """
+ cdef Schema schema
+ self._check_nullptr()
+ if self._schema is None:
+ schema = Schema()
+ schema.init_schema(self.batch.schema())
+ self._schema = schema
+
+ return self._schema
+
+ def __getitem__(self, i):
+ return box_array(self.batch.column(i))
+
+ def slice(self, offset=0, length=None):
+ """
+ Compute zero-copy slice of this RecordBatch
+
+ Parameters
+ ----------
+ offset : int, default 0
+ Offset from start of array to slice
+ length : int, default None
+ Length of slice (default is until end of batch starting from
+ offset)
+
+ Returns
+ -------
+ sliced : RecordBatch
+ """
+ cdef shared_ptr[CRecordBatch] result
+
+ if offset < 0:
+ raise IndexError('Offset must be non-negative')
+
+ if length is None:
+ result = self.batch.Slice(offset)
+ else:
+ result = self.batch.Slice(offset, length)
+
+ return batch_from_cbatch(result)
+
+ def equals(self, RecordBatch other):
+ cdef:
+ CRecordBatch* my_batch = self.batch
+ CRecordBatch* other_batch = other.batch
+ c_bool result
+
+ self._check_nullptr()
+ other._check_nullptr()
+
+ with nogil:
+ result = my_batch.Equals(deref(other_batch))
+
+ return result
+
+ def to_pydict(self):
+ """
+ Converted the arrow::RecordBatch to an OrderedDict
+
+ Returns
+ -------
+ OrderedDict
+ """
+ entries = []
+ for i in range(self.batch.num_columns()):
+ name = bytes(self.batch.column_name(i)).decode('utf8')
+ column = self[i].to_pylist()
+ entries.append((name, column))
+ return OrderedDict(entries)
+
+
+ def to_pandas(self, nthreads=None):
+ """
+ Convert the arrow::RecordBatch to a pandas DataFrame
+
+ Returns
+ -------
+ pandas.DataFrame
+ """
+ return Table.from_batches([self]).to_pandas(nthreads=nthreads)
+
+ @classmethod
+ def from_pandas(cls, df, schema=None):
+ """
+ Convert pandas.DataFrame to an Arrow RecordBatch
+
+ Parameters
+ ----------
+ df: pandas.DataFrame
+ schema: pyarrow.Schema (optional)
+ The expected schema of the RecordBatch. This can be used to
+ indicate the type of columns if we cannot infer it automatically.
+
+ Returns
+ -------
+ pyarrow.table.RecordBatch
+ """
+ names, arrays = _dataframe_to_arrays(df, False, schema)
+ return cls.from_arrays(arrays, names)
+
+ @staticmethod
+ def from_arrays(arrays, names):
+ """
+ Construct a RecordBatch from multiple pyarrow.Arrays
+
+ Parameters
+ ----------
+ arrays: list of pyarrow.Array
+ column-wise data vectors
+ names: list of str
+ Labels for the columns
+
+ Returns
+ -------
+ pyarrow.table.RecordBatch
+ """
+ cdef:
+ Array arr
+ c_string c_name
+ shared_ptr[CSchema] schema
+ shared_ptr[CRecordBatch] batch
+ vector[shared_ptr[CArray]] c_arrays
+ int64_t num_rows
+
+ if len(arrays) == 0:
+ raise ValueError('Record batch cannot contain no arrays (for now)')
+
+ num_rows = len(arrays[0])
+ _schema_from_arrays(arrays, names, &schema)
+
+ for i in range(len(arrays)):
+ arr = arrays[i]
+ c_arrays.push_back(arr.sp_array)
+
+ batch.reset(new CRecordBatch(schema, num_rows, c_arrays))
+ return batch_from_cbatch(batch)
+
+
+cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
+ cdef:
+ PyObject* result_obj
+ CColumn* col
+ int i
+
+ import pandas.core.internals as _int
+ from pandas import RangeIndex, Categorical
+ from pyarrow.compat import DatetimeTZDtype
+
+ with nogil:
+ check_status(pyarrow.ConvertTableToPandas(table, nthreads,
+ &result_obj))
+
+ result = PyObject_to_object(result_obj)
+
+ blocks = []
+ for item in result:
+ block_arr = item['block']
+ placement = item['placement']
+ if 'dictionary' in item:
+ cat = Categorical(block_arr,
+ categories=item['dictionary'],
+ ordered=False, fastpath=True)
+ block = _int.make_block(cat, placement=placement,
+ klass=_int.CategoricalBlock,
+ fastpath=True)
+ elif 'timezone' in item:
+ dtype = DatetimeTZDtype('ns', tz=item['timezone'])
+ block = _int.make_block(block_arr, placement=placement,
+ klass=_int.DatetimeTZBlock,
+ dtype=dtype, fastpath=True)
+ else:
+ block = _int.make_block(block_arr, placement=placement)
+ blocks.append(block)
+
+ names = []
+ for i in range(table.get().num_columns()):
+ col = table.get().column(i).get()
+ names.append(frombytes(col.name()))
+
+ axes = [names, RangeIndex(table.get().num_rows())]
+ return _int.BlockManager(blocks, axes)
+
+
+cdef class Table:
+ """
+ A collection of top-level named, equal length Arrow arrays.
+
+ Warning
+ -------
+ Do not call this class's constructor directly, use one of the ``from_*``
+ methods instead.
+ """
+
+ def __cinit__(self):
+ self.table = NULL
+
+ def __repr__(self):
+ return 'pyarrow.Table\n{0}'.format(str(self.schema))
+
+ cdef init(self, const shared_ptr[CTable]& table):
+ self.sp_table = table
+ self.table = table.get()
+
+ cdef _check_nullptr(self):
+ if self.table == NULL:
+ raise ReferenceError("Table object references a NULL pointer."
+ "Not initialized.")
+
+ def equals(self, Table other):
+ """
+ Check if contents of two tables are equal
+
+ Parameters
+ ----------
+ other : pyarrow.Table
+
+ Returns
+ -------
+ are_equal : boolean
+ """
+ cdef:
+ CTable* my_table = self.table
+ CTable* other_table = other.table
+ c_bool result
+
+ self._check_nullptr()
+ other._check_nullptr()
+
+ with nogil:
+ result = my_table.Equals(deref(other_table))
+
+ return result
+
+ @classmethod
+ def from_pandas(cls, df, timestamps_to_ms=False, schema=None):
+ """
+ Convert pandas.DataFrame to an Arrow Table
+
+ Parameters
+ ----------
+ df: pandas.DataFrame
+
+ timestamps_to_ms: bool
+ Convert datetime columns to ms resolution. This is needed for
+ compability with other functionality like Parquet I/O which
+ only supports milliseconds.
+
+ schema: pyarrow.Schema (optional)
+ The expected schema of the Arrow Table. This can be used to
+ indicate the type of columns if we cannot infer it automatically.
+
+ Returns
+ -------
+ pyarrow.table.Table
+
+ Examples
+ --------
+
+ >>> import pandas as pd
+ >>> import pyarrow as pa
+ >>> df = pd.DataFrame({
+ ... 'int': [1, 2],
+ ... 'str': ['a', 'b']
+ ... })
+ >>> pa.Table.from_pandas(df)
+ <pyarrow.table.Table object at 0x7f05d1fb1b40>
+ """
+ names, arrays = _dataframe_to_arrays(df,
+ timestamps_to_ms=timestamps_to_ms,
+ schema=schema)
+ return cls.from_arrays(arrays, names=names)
+
+ @staticmethod
+ def from_arrays(arrays, names=None):
+ """
+ Construct a Table from Arrow arrays or columns
+
+ Parameters
+ ----------
+ arrays: list of pyarrow.Array or pyarrow.Column
+ Equal-length arrays that should form the table.
+ names: list of str, optional
+ Names for the table columns. If Columns passed, will be
+ inferred. If Arrays passed, this argument is required
+
+ Returns
+ -------
+ pyarrow.table.Table
+
+ """
+ cdef:
+ vector[shared_ptr[CField]] fields
+ vector[shared_ptr[CColumn]] columns
+ shared_ptr[CSchema] schema
+ shared_ptr[CTable] table
+
+ _schema_from_arrays(arrays, names, &schema)
+
+ cdef int K = len(arrays)
+ columns.resize(K)
+
+ for i in range(K):
+ if isinstance(arrays[i], Array):
+ columns[i].reset(new CColumn(schema.get().field(i),
+ (<Array> arrays[i]).sp_array))
+ elif isinstance(arrays[i], Column):
+ columns[i] = (<Column> arrays[i]).sp_column
+ else:
+ raise ValueError(type(arrays[i]))
+
+ table.reset(new CTable(schema, columns))
+ return table_from_ctable(table)
+
+ @staticmethod
+ def from_batches(batches):
+ """
+ Construct a Table from a list of Arrow RecordBatches
+
+ Parameters
+ ----------
+
+ batches: list of RecordBatch
+ RecordBatch list to be converted, schemas must be equal
+ """
+ cdef:
+ vector[shared_ptr[CRecordBatch]] c_batches
+ shared_ptr[CTable] c_table
+ RecordBatch batch
+
+ for batch in batches:
+ c_batches.push_back(batch.sp_batch)
+
+ with nogil:
+ check_status(CTable.FromRecordBatches(c_batches, &c_table))
+
+ return table_from_ctable(c_table)
+
+ def to_pandas(self, nthreads=None):
+ """
+ Convert the arrow::Table to a pandas DataFrame
+
+ Parameters
+ ----------
+ nthreads : int, default max(1, multiprocessing.cpu_count() / 2)
+ For the default, we divide the CPU count by 2 because most modern
+ computers have hyperthreading turned on, so doubling the CPU count
+ beyond the number of physical cores does not help
+
+ Returns
+ -------
+ pandas.DataFrame
+ """
+ if nthreads is None:
+ nthreads = pyarrow._config.cpu_count()
+
+ mgr = table_to_blockmanager(self.sp_table, nthreads)
+ return _pandas().DataFrame(mgr)
+
+ def to_pydict(self):
+ """
+ Converted the arrow::Table to an OrderedDict
+
+ Returns
+ -------
+ OrderedDict
+ """
+ entries = []
+ for i in range(self.table.num_columns()):
+ name = self.column(i).name
+ column = self.column(i).to_pylist()
+ entries.append((name, column))
+ return OrderedDict(entries)
+
+ @property
+ def schema(self):
+ """
+ Schema of the table and its columns
+
+ Returns
+ -------
+ pyarrow.schema.Schema
+ """
+ return box_schema(self.table.schema())
+
+ def column(self, index):
+ """
+ Select a column by its numeric index.
+
+ Parameters
+ ----------
+ index: int
+
+ Returns
+ -------
+ pyarrow.table.Column
+ """
+ self._check_nullptr()
+ cdef Column column = Column()
+ column.init(self.table.column(index))
+ return column
+
+ def __getitem__(self, i):
+ return self.column(i)
+
+ def itercolumns(self):
+ """
+ Iterator over all columns in their numerical order
+ """
+ for i in range(self.num_columns):
+ yield self.column(i)
+
+ @property
+ def num_columns(self):
+ """
+ Number of columns in this table
+
+ Returns
+ -------
+ int
+ """
+ self._check_nullptr()
+ return self.table.num_columns()
+
+ @property
+ def num_rows(self):
+ """
+ Number of rows in this table.
+
+ Due to the definition of a table, all columns have the same number of rows.
+
+ Returns
+ -------
+ int
+ """
+ self._check_nullptr()
+ return self.table.num_rows()
+
+ def __len__(self):
+ return self.num_rows
+
+ @property
+ def shape(self):
+ """
+ Dimensions of the table: (#rows, #columns)
+
+ Returns
+ -------
+ (int, int)
+ """
+ return (self.num_rows, self.num_columns)
+
+ def add_column(self, int i, Column column):
+ """
+ Add column to Table at position. Returns new table
+ """
+ cdef:
+ shared_ptr[CTable] c_table
+
+ with nogil:
+ check_status(self.table.AddColumn(i, column.sp_column, &c_table))
+
+ return table_from_ctable(c_table)
+
+ def append_column(self, Column column):
+ """
+ Append column at end of columns. Returns new table
+ """
+ return self.add_column(self.num_columns, column)
+
+ def remove_column(self, int i):
+ """
+ Create new Table with the indicated column removed
+ """
+ cdef shared_ptr[CTable] c_table
+
+ with nogil:
+ check_status(self.table.RemoveColumn(i, &c_table))
+
+ return table_from_ctable(c_table)
+
+
+def concat_tables(tables):
+ """
+ Perform zero-copy concatenation of pyarrow.Table objects. Raises exception
+ if all of the Table schemas are not the same
+
+ Parameters
+ ----------
+ tables : iterable of pyarrow.Table objects
+ output_name : string, default None
+ A name for the output table, if any
+ """
+ cdef:
+ vector[shared_ptr[CTable]] c_tables
+ shared_ptr[CTable] c_result
+ Table table
+
+ for table in tables:
+ c_tables.push_back(table.sp_table)
+
+ with nogil:
+ check_status(ConcatenateTables(c_tables, &c_result))
+
+ return table_from_ctable(c_result)
+
+
+cdef object box_column(const shared_ptr[CColumn]& ccolumn):
+ cdef Column column = Column()
+ column.init(ccolumn)
+ return column
+
+
+cdef api object table_from_ctable(const shared_ptr[CTable]& ctable):
+ cdef Table table = Table()
+ table.init(ctable)
+ return table
+
+
+cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch):
+ cdef RecordBatch batch = RecordBatch()
+ batch.init(cbatch)
+ return batch
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/array.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pxd b/python/pyarrow/array.pxd
deleted file mode 100644
index 3ba4871..0000000
--- a/python/pyarrow/array.pxd
+++ /dev/null
@@ -1,141 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.common cimport shared_ptr, int64_t
-from pyarrow.includes.libarrow cimport CArray, CTensor
-
-from pyarrow.scalar import NA
-
-from pyarrow.schema cimport DataType
-
-from cpython cimport PyObject
-
-
-cdef extern from "Python.h":
- int PySlice_Check(object)
-
-
-cdef class Array:
- cdef:
- shared_ptr[CArray] sp_array
- CArray* ap
-
- cdef readonly:
- DataType type
-
- cdef init(self, const shared_ptr[CArray]& sp_array)
- cdef getitem(self, int64_t i)
-
-
-cdef class Tensor:
- cdef:
- shared_ptr[CTensor] sp_tensor
- CTensor* tp
-
- cdef readonly:
- DataType type
-
- cdef init(self, const shared_ptr[CTensor]& sp_tensor)
-
-
-cdef object box_array(const shared_ptr[CArray]& sp_array)
-cdef object box_tensor(const shared_ptr[CTensor]& sp_tensor)
-
-
-cdef class BooleanArray(Array):
- pass
-
-
-cdef class NumericArray(Array):
- pass
-
-
-cdef class IntegerArray(NumericArray):
- pass
-
-
-cdef class FloatingPointArray(NumericArray):
- pass
-
-
-cdef class Int8Array(IntegerArray):
- pass
-
-
-cdef class UInt8Array(IntegerArray):
- pass
-
-
-cdef class Int16Array(IntegerArray):
- pass
-
-
-cdef class UInt16Array(IntegerArray):
- pass
-
-
-cdef class Int32Array(IntegerArray):
- pass
-
-
-cdef class UInt32Array(IntegerArray):
- pass
-
-
-cdef class Int64Array(IntegerArray):
- pass
-
-
-cdef class UInt64Array(IntegerArray):
- pass
-
-
-cdef class FloatArray(FloatingPointArray):
- pass
-
-
-cdef class DoubleArray(FloatingPointArray):
- pass
-
-
-cdef class FixedSizeBinaryArray(Array):
- pass
-
-
-cdef class DecimalArray(FixedSizeBinaryArray):
- pass
-
-
-cdef class ListArray(Array):
- pass
-
-
-cdef class StringArray(Array):
- pass
-
-
-cdef class BinaryArray(Array):
- pass
-
-
-cdef class DictionaryArray(Array):
- cdef:
- object _indices, _dictionary
-
-
-
-cdef wrap_array_output(PyObject* output)