You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2017/04/13 10:51:52 UTC
[2/4] arrow git commit: ARROW-751: [Python] Make all Cython modules
private. Some code tidying
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx
deleted file mode 100644
index 1c4253e..0000000
--- a/python/pyarrow/array.pyx
+++ /dev/null
@@ -1,646 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from cython.operator cimport dereference as deref
-
-import numpy as np
-
-from pyarrow.includes.libarrow cimport *
-from pyarrow.includes.common cimport PyObject_to_object
-cimport pyarrow.includes.pyarrow as pyarrow
-
-import pyarrow.config
-
-from pyarrow.compat import frombytes, tobytes, PandasSeries, Categorical
-from pyarrow.error cimport check_status
-from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool
-
-cimport pyarrow.scalar as scalar
-from pyarrow.scalar import NA
-
-from pyarrow.schema cimport (DataType, Field, Schema, DictionaryType,
- FixedSizeBinaryType,
- box_data_type)
-import pyarrow.schema as schema
-
-cimport cpython
-
-
-cdef maybe_coerce_datetime64(values, dtype, DataType type,
- timestamps_to_ms=False):
-
- from pyarrow.compat import DatetimeTZDtype
-
- if values.dtype.type != np.datetime64:
- return values, type
-
- coerce_ms = timestamps_to_ms and values.dtype != 'datetime64[ms]'
-
- if coerce_ms:
- values = values.astype('datetime64[ms]')
-
- if isinstance(dtype, DatetimeTZDtype):
- tz = dtype.tz
- unit = 'ms' if coerce_ms else dtype.unit
- type = schema.timestamp(unit, tz)
- elif type is None:
- # Trust the NumPy dtype
- type = schema.type_from_numpy_dtype(values.dtype)
-
- return values, type
-
-
-cdef class Array:
-
- cdef init(self, const shared_ptr[CArray]& sp_array):
- self.sp_array = sp_array
- self.ap = sp_array.get()
- self.type = box_data_type(self.sp_array.get().type())
-
- @staticmethod
- def from_numpy(obj, mask=None, DataType type=None,
- timestamps_to_ms=False,
- MemoryPool memory_pool=None):
- """
- Convert pandas.Series to an Arrow Array.
-
- Parameters
- ----------
- series : pandas.Series or numpy.ndarray
-
- mask : pandas.Series or numpy.ndarray, optional
- boolean mask if the object is valid or null
-
- type : pyarrow.DataType
- Explicit type to attempt to coerce to
-
- timestamps_to_ms : bool, optional
- Convert datetime columns to ms resolution. This is needed for
- compatibility with other functionality like Parquet I/O which
- only supports milliseconds.
-
- memory_pool: MemoryPool, optional
- Specific memory pool to use to allocate the resulting Arrow array.
-
- Notes
- -----
- Localized timestamps will currently be returned as UTC (pandas's native
- representation). Timezone-naive data will be implicitly interpreted as
- UTC.
-
- Examples
- --------
-
- >>> import pandas as pd
- >>> import pyarrow as pa
- >>> pa.Array.from_numpy(pd.Series([1, 2]))
- <pyarrow.array.Int64Array object at 0x7f674e4c0e10>
- [
- 1,
- 2
- ]
-
- >>> import numpy as np
- >>> pa.Array.from_numpy(pd.Series([1, 2]), np.array([0, 1],
- ... dtype=bool))
- <pyarrow.array.Int64Array object at 0x7f9019e11208>
- [
- 1,
- NA
- ]
-
- Returns
- -------
- pyarrow.array.Array
- """
- cdef:
- shared_ptr[CArray] out
- shared_ptr[CDataType] c_type
- CMemoryPool* pool
-
- if mask is not None:
- mask = get_series_values(mask)
-
- values = get_series_values(obj)
- pool = maybe_unbox_memory_pool(memory_pool)
-
- if isinstance(values, Categorical):
- return DictionaryArray.from_arrays(
- values.codes, values.categories.values,
- mask=mask, memory_pool=memory_pool)
- elif values.dtype == object:
- # Object dtype undergoes a different conversion path as more type
- # inference may be needed
- if type is not None:
- c_type = type.sp_type
- with nogil:
- check_status(pyarrow.PandasObjectsToArrow(
- pool, values, mask, c_type, &out))
- else:
- values, type = maybe_coerce_datetime64(
- values, obj.dtype, type, timestamps_to_ms=timestamps_to_ms)
-
- if type is None:
- check_status(pyarrow.NumPyDtypeToArrow(values.dtype, &c_type))
- else:
- c_type = type.sp_type
-
- with nogil:
- check_status(pyarrow.PandasToArrow(
- pool, values, mask, c_type, &out))
-
- return box_array(out)
-
- @staticmethod
- def from_list(object list_obj, DataType type=None,
- MemoryPool memory_pool=None):
- """
- Convert Python list to Arrow array
-
- Parameters
- ----------
- list_obj : array_like
-
- Returns
- -------
- pyarrow.array.Array
- """
- cdef:
- shared_ptr[CArray] sp_array
- CMemoryPool* pool
-
- pool = maybe_unbox_memory_pool(memory_pool)
- if type is None:
- check_status(pyarrow.ConvertPySequence(list_obj, pool, &sp_array))
- else:
- check_status(
- pyarrow.ConvertPySequence(
- list_obj, pool, &sp_array, type.sp_type
- )
- )
-
- return box_array(sp_array)
-
- property null_count:
-
- def __get__(self):
- return self.sp_array.get().null_count()
-
- def __iter__(self):
- for i in range(len(self)):
- yield self.getitem(i)
- raise StopIteration
-
- def __repr__(self):
- from pyarrow.formatting import array_format
- type_format = object.__repr__(self)
- values = array_format(self, window=10)
- return '{0}\n{1}'.format(type_format, values)
-
- def equals(Array self, Array other):
- return self.ap.Equals(deref(other.ap))
-
- def __len__(self):
- if self.sp_array.get():
- return self.sp_array.get().length()
- else:
- return 0
-
- def isnull(self):
- raise NotImplemented
-
- def __getitem__(self, key):
- cdef:
- Py_ssize_t n = len(self)
-
- if PySlice_Check(key):
- start = key.start or 0
- while start < 0:
- start += n
-
- stop = key.stop if key.stop is not None else n
- while stop < 0:
- stop += n
-
- step = key.step or 1
- if step != 1:
- raise IndexError('only slices with step 1 supported')
- else:
- return self.slice(start, stop - start)
-
- while key < 0:
- key += len(self)
-
- return self.getitem(key)
-
- cdef getitem(self, int64_t i):
- return scalar.box_scalar(self.type, self.sp_array, i)
-
- def slice(self, offset=0, length=None):
- """
- Compute zero-copy slice of this array
-
- Parameters
- ----------
- offset : int, default 0
- Offset from start of array to slice
- length : int, default None
- Length of slice (default is until end of Array starting from
- offset)
-
- Returns
- -------
- sliced : RecordBatch
- """
- cdef:
- shared_ptr[CArray] result
-
- if offset < 0:
- raise IndexError('Offset must be non-negative')
-
- if length is None:
- result = self.ap.Slice(offset)
- else:
- result = self.ap.Slice(offset, length)
-
- return box_array(result)
-
- def to_pandas(self):
- """
- Convert to an array object suitable for use in pandas
-
- See also
- --------
- Column.to_pandas
- Table.to_pandas
- RecordBatch.to_pandas
- """
- cdef:
- PyObject* out
-
- with nogil:
- check_status(
- pyarrow.ConvertArrayToPandas(self.sp_array, <PyObject*> self,
- &out))
- return wrap_array_output(out)
-
- def to_pylist(self):
- """
- Convert to an list of native Python objects.
- """
- return [x.as_py() for x in self]
-
-
-cdef class Tensor:
-
- cdef init(self, const shared_ptr[CTensor]& sp_tensor):
- self.sp_tensor = sp_tensor
- self.tp = sp_tensor.get()
- self.type = box_data_type(self.tp.type())
-
- def __repr__(self):
- return """<pyarrow.Tensor>
-type: {0}
-shape: {1}
-strides: {2}""".format(self.type, self.shape, self.strides)
-
- @staticmethod
- def from_numpy(obj):
- cdef shared_ptr[CTensor] ctensor
- check_status(pyarrow.NdarrayToTensor(default_memory_pool(),
- obj, &ctensor))
- return box_tensor(ctensor)
-
- def to_numpy(self):
- """
- Convert arrow::Tensor to numpy.ndarray with zero copy
- """
- cdef:
- PyObject* out
-
- check_status(pyarrow.TensorToNdarray(deref(self.tp), <PyObject*> self,
- &out))
- return PyObject_to_object(out)
-
- def equals(self, Tensor other):
- """
- Return true if the tensors contains exactly equal data
- """
- return self.tp.Equals(deref(other.tp))
-
- property is_mutable:
-
- def __get__(self):
- return self.tp.is_mutable()
-
- property is_contiguous:
-
- def __get__(self):
- return self.tp.is_contiguous()
-
- property ndim:
-
- def __get__(self):
- return self.tp.ndim()
-
- property size:
-
- def __get__(self):
- return self.tp.size()
-
- property shape:
-
- def __get__(self):
- cdef size_t i
- py_shape = []
- for i in range(self.tp.shape().size()):
- py_shape.append(self.tp.shape()[i])
- return py_shape
-
- property strides:
-
- def __get__(self):
- cdef size_t i
- py_strides = []
- for i in range(self.tp.strides().size()):
- py_strides.append(self.tp.strides()[i])
- return py_strides
-
-
-
-cdef wrap_array_output(PyObject* output):
- cdef object obj = PyObject_to_object(output)
-
- if isinstance(obj, dict):
- return Categorical(obj['indices'],
- categories=obj['dictionary'],
- fastpath=True)
- else:
- return obj
-
-
-cdef class NullArray(Array):
- pass
-
-
-cdef class BooleanArray(Array):
- pass
-
-
-cdef class NumericArray(Array):
- pass
-
-
-cdef class IntegerArray(NumericArray):
- pass
-
-
-cdef class FloatingPointArray(NumericArray):
- pass
-
-
-cdef class Int8Array(IntegerArray):
- pass
-
-
-cdef class UInt8Array(IntegerArray):
- pass
-
-
-cdef class Int16Array(IntegerArray):
- pass
-
-
-cdef class UInt16Array(IntegerArray):
- pass
-
-
-cdef class Int32Array(IntegerArray):
- pass
-
-
-cdef class UInt32Array(IntegerArray):
- pass
-
-
-cdef class Int64Array(IntegerArray):
- pass
-
-
-cdef class UInt64Array(IntegerArray):
- pass
-
-
-cdef class Date32Array(NumericArray):
- pass
-
-
-cdef class Date64Array(NumericArray):
- pass
-
-
-cdef class TimestampArray(NumericArray):
- pass
-
-
-cdef class Time32Array(NumericArray):
- pass
-
-
-cdef class Time64Array(NumericArray):
- pass
-
-
-cdef class FloatArray(FloatingPointArray):
- pass
-
-
-cdef class DoubleArray(FloatingPointArray):
- pass
-
-
-cdef class FixedSizeBinaryArray(Array):
- pass
-
-
-cdef class DecimalArray(FixedSizeBinaryArray):
- pass
-
-
-cdef class ListArray(Array):
- pass
-
-
-cdef class StringArray(Array):
- pass
-
-
-cdef class BinaryArray(Array):
- pass
-
-
-cdef class DictionaryArray(Array):
-
- cdef getitem(self, int64_t i):
- cdef Array dictionary = self.dictionary
- index = self.indices[i]
- if index is NA:
- return index
- else:
- return scalar.box_scalar(dictionary.type, dictionary.sp_array,
- index.as_py())
-
- property dictionary:
-
- def __get__(self):
- cdef CDictionaryArray* darr = <CDictionaryArray*>(self.ap)
-
- if self._dictionary is None:
- self._dictionary = box_array(darr.dictionary())
-
- return self._dictionary
-
- property indices:
-
- def __get__(self):
- cdef CDictionaryArray* darr = <CDictionaryArray*>(self.ap)
-
- if self._indices is None:
- self._indices = box_array(darr.indices())
-
- return self._indices
-
- @staticmethod
- def from_arrays(indices, dictionary, mask=None,
- MemoryPool memory_pool=None):
- """
- Construct Arrow DictionaryArray from array of indices (must be
- non-negative integers) and corresponding array of dictionary values
-
- Parameters
- ----------
- indices : ndarray or pandas.Series, integer type
- dictionary : ndarray or pandas.Series
- mask : ndarray or pandas.Series, boolean type
- True values indicate that indices are actually null
-
- Returns
- -------
- dict_array : DictionaryArray
- """
- cdef:
- Array arrow_indices, arrow_dictionary
- DictionaryArray result
- shared_ptr[CDataType] c_type
- shared_ptr[CArray] c_result
-
- if isinstance(indices, Array):
- if mask is not None:
- raise NotImplementedError(
- "mask not implemented with Arrow array inputs yet")
- arrow_indices = indices
- else:
- if mask is None:
- mask = indices == -1
- else:
- mask = mask | (indices == -1)
- arrow_indices = Array.from_numpy(indices, mask=mask,
- memory_pool=memory_pool)
-
- if isinstance(dictionary, Array):
- arrow_dictionary = dictionary
- else:
- arrow_dictionary = Array.from_numpy(dictionary,
- memory_pool=memory_pool)
-
- if not isinstance(arrow_indices, IntegerArray):
- raise ValueError('Indices must be integer type')
-
- c_type.reset(new CDictionaryType(arrow_indices.type.sp_type,
- arrow_dictionary.sp_array))
- c_result.reset(new CDictionaryArray(c_type, arrow_indices.sp_array))
-
- result = DictionaryArray()
- result.init(c_result)
- return result
-
-
-cdef dict _array_classes = {
- Type_NA: NullArray,
- Type_BOOL: BooleanArray,
- Type_UINT8: UInt8Array,
- Type_UINT16: UInt16Array,
- Type_UINT32: UInt32Array,
- Type_UINT64: UInt64Array,
- Type_INT8: Int8Array,
- Type_INT16: Int16Array,
- Type_INT32: Int32Array,
- Type_INT64: Int64Array,
- Type_DATE32: Date32Array,
- Type_DATE64: Date64Array,
- Type_TIMESTAMP: TimestampArray,
- Type_TIME32: Time32Array,
- Type_TIME64: Time64Array,
- Type_FLOAT: FloatArray,
- Type_DOUBLE: DoubleArray,
- Type_LIST: ListArray,
- Type_BINARY: BinaryArray,
- Type_STRING: StringArray,
- Type_DICTIONARY: DictionaryArray,
- Type_FIXED_SIZE_BINARY: FixedSizeBinaryArray,
- Type_DECIMAL: DecimalArray,
-}
-
-cdef object box_array(const shared_ptr[CArray]& sp_array):
- if sp_array.get() == NULL:
- raise ValueError('Array was NULL')
-
- cdef CDataType* data_type = sp_array.get().type().get()
-
- if data_type == NULL:
- raise ValueError('Array data type was NULL')
-
- cdef Array arr = _array_classes[data_type.id()]()
- arr.init(sp_array)
- return arr
-
-
-cdef object box_tensor(const shared_ptr[CTensor]& sp_tensor):
- if sp_tensor.get() == NULL:
- raise ValueError('Tensor was NULL')
-
- cdef Tensor tensor = Tensor()
- tensor.init(sp_tensor)
- return tensor
-
-
-cdef object get_series_values(object obj):
- if isinstance(obj, PandasSeries):
- result = obj.values
- elif isinstance(obj, np.ndarray):
- result = obj
- else:
- result = PandasSeries(obj).values
-
- return result
-
-
-from_pylist = Array.from_list
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/config.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/config.pyx b/python/pyarrow/config.pyx
deleted file mode 100644
index 536f278..0000000
--- a/python/pyarrow/config.pyx
+++ /dev/null
@@ -1,54 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License. See accompanying LICENSE file.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-cdef extern from 'arrow/python/do_import_numpy.h':
- pass
-
-cdef extern from 'arrow/python/numpy_interop.h' namespace 'arrow::py':
- int import_numpy()
-
-cdef extern from 'arrow/python/config.h' namespace 'arrow::py':
- void Init()
- void set_numpy_nan(object o)
-
-import_numpy()
-Init()
-
-import numpy as np
-set_numpy_nan(np.nan)
-
-import multiprocessing
-import os
-cdef int CPU_COUNT = int(
- os.environ.get('OMP_NUM_THREADS',
- max(multiprocessing.cpu_count() // 2, 1)))
-
-def cpu_count():
- """
- Returns
- -------
- count : Number of CPUs to use by default in parallel operations. Default is
- max(1, multiprocessing.cpu_count() / 2), but can be overridden by the
- OMP_NUM_THREADS environment variable. For the default, we divide the CPU
- count by 2 because most modern computers have hyperthreading turned on,
- so doubling the CPU count beyond the number of physical cores does not
- help.
- """
- return CPU_COUNT
-
-def set_cpu_count(count):
- global CPU_COUNT
- CPU_COUNT = max(int(count), 1)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/error.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/error.pxd b/python/pyarrow/error.pxd
deleted file mode 100644
index 4fb46c2..0000000
--- a/python/pyarrow/error.pxd
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.libarrow cimport CStatus
-
-cdef int check_status(const CStatus& status) nogil except -1
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/error.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/error.pyx b/python/pyarrow/error.pyx
deleted file mode 100644
index 259aeb0..0000000
--- a/python/pyarrow/error.pyx
+++ /dev/null
@@ -1,70 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.libarrow cimport CStatus
-from pyarrow.includes.common cimport c_string
-from pyarrow.compat import frombytes
-
-
-class ArrowException(Exception):
- pass
-
-
-class ArrowInvalid(ValueError, ArrowException):
- pass
-
-
-class ArrowMemoryError(MemoryError, ArrowException):
- pass
-
-
-class ArrowIOError(IOError, ArrowException):
- pass
-
-
-class ArrowKeyError(KeyError, ArrowException):
- pass
-
-
-class ArrowTypeError(TypeError, ArrowException):
- pass
-
-
-class ArrowNotImplementedError(NotImplementedError, ArrowException):
- pass
-
-
-cdef int check_status(const CStatus& status) nogil except -1:
- if status.ok():
- return 0
-
- with gil:
- message = frombytes(status.ToString())
- if status.IsInvalid():
- raise ArrowInvalid(message)
- elif status.IsIOError():
- raise ArrowIOError(message)
- elif status.IsOutOfMemory():
- raise ArrowMemoryError(message)
- elif status.IsKeyError():
- raise ArrowKeyError(message)
- elif status.IsNotImplemented():
- raise ArrowNotImplementedError(message)
- elif status.IsTypeError():
- raise ArrowTypeError(message)
- else:
- raise ArrowException(message)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/feather.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/feather.py b/python/pyarrow/feather.py
index 3b5716e..c7b118e 100644
--- a/python/pyarrow/feather.py
+++ b/python/pyarrow/feather.py
@@ -22,9 +22,9 @@ import six
import pandas as pd
from pyarrow.compat import pdapi
-from pyarrow.io import FeatherError # noqa
-from pyarrow.table import Table
-import pyarrow.io as ext
+from pyarrow._io import FeatherError # noqa
+from pyarrow._table import Table
+import pyarrow._io as ext
if LooseVersion(pd.__version__) < '0.17.0':
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/filesystem.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py
index 269cf1c..92dd91c 100644
--- a/python/pyarrow/filesystem.py
+++ b/python/pyarrow/filesystem.py
@@ -19,7 +19,7 @@ from os.path import join as pjoin
import os
from pyarrow.util import implements
-import pyarrow.io as io
+import pyarrow._io as io
class Filesystem(object):
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/formatting.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/formatting.py b/python/pyarrow/formatting.py
index 5fe0611..c358344 100644
--- a/python/pyarrow/formatting.py
+++ b/python/pyarrow/formatting.py
@@ -17,7 +17,7 @@
# Pretty-printing and other formatting utilities for Arrow data structures
-import pyarrow.scalar as scalar
+import pyarrow._array as _array
def array_format(arr, window=None):
@@ -42,7 +42,7 @@ def array_format(arr, window=None):
def value_format(x, indent_level=0):
- if isinstance(x, scalar.ListValue):
+ if isinstance(x, _array.ListValue):
contents = ',\n'.join(value_format(item) for item in x)
return '[{0}]'.format(_indent(contents, 1).strip())
else:
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index ae2b45f..2444f3f 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -113,8 +113,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CDataType] index_type()
shared_ptr[CArray] dictionary()
- shared_ptr[CDataType] timestamp(TimeUnit unit)
- shared_ptr[CDataType] timestamp(TimeUnit unit, const c_string& timezone)
+ shared_ptr[CDataType] ctimestamp" arrow::timestamp"(TimeUnit unit)
+ shared_ptr[CDataType] ctimestamp" arrow::timestamp"(
+ TimeUnit unit, const c_string& timezone)
cdef cppclass CMemoryPool" arrow::MemoryPool":
int64_t bytes_allocated()
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd
deleted file mode 100644
index 0c37a09..0000000
--- a/python/pyarrow/io.pxd
+++ /dev/null
@@ -1,50 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# distutils: language = c++
-
-from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport *
-
-
-cdef class Buffer:
- cdef:
- shared_ptr[CBuffer] buffer
- Py_ssize_t shape[1]
- Py_ssize_t strides[1]
-
- cdef init(self, const shared_ptr[CBuffer]& buffer)
-
-
-cdef class NativeFile:
- cdef:
- shared_ptr[RandomAccessFile] rd_file
- shared_ptr[OutputStream] wr_file
- bint is_readable
- bint is_writeable
- bint is_open
- bint own_file
-
- # By implementing these "virtual" functions (all functions in Cython
- # extension classes are technically virtual in the C++ sense) we can expose
- # the arrow::io abstract file interfaces to other components throughout the
- # suite of Arrow C++ libraries
- cdef read_handle(self, shared_ptr[RandomAccessFile]* file)
- cdef write_handle(self, shared_ptr[OutputStream]* file)
-
-cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader)
-cdef get_writer(object source, shared_ptr[OutputStream]* writer)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
deleted file mode 100644
index 4eb0816..0000000
--- a/python/pyarrow/io.pyx
+++ /dev/null
@@ -1,1276 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Cython wrappers for IO interfaces defined in arrow::io and messaging in
-# arrow::ipc
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from cython.operator cimport dereference as deref
-
-from libc.stdlib cimport malloc, free
-
-from pyarrow.includes.libarrow cimport *
-cimport pyarrow.includes.pyarrow as pyarrow
-
-from pyarrow.compat import frombytes, tobytes, encode_file_path
-from pyarrow.array cimport Array, Tensor, box_tensor
-from pyarrow.error cimport check_status
-from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool
-from pyarrow.schema cimport Schema
-from pyarrow.table cimport (Column, RecordBatch, batch_from_cbatch,
- table_from_ctable)
-
-cimport cpython as cp
-
-import re
-import six
-import sys
-import threading
-import time
-
-
-# 64K
-DEFAULT_BUFFER_SIZE = 2 ** 16
-
-
-# To let us get a PyObject* and avoid Cython auto-ref-counting
-cdef extern from "Python.h":
- PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
- char *v, Py_ssize_t len) except NULL
-
-cdef class NativeFile:
-
- def __cinit__(self):
- self.is_open = False
- self.own_file = False
-
- def __dealloc__(self):
- if self.is_open and self.own_file:
- self.close()
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, tb):
- self.close()
-
- def close(self):
- if self.is_open:
- with nogil:
- if self.is_readable:
- check_status(self.rd_file.get().Close())
- else:
- check_status(self.wr_file.get().Close())
- self.is_open = False
-
- cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
- self._assert_readable()
- file[0] = <shared_ptr[RandomAccessFile]> self.rd_file
-
- cdef write_handle(self, shared_ptr[OutputStream]* file):
- self._assert_writeable()
- file[0] = <shared_ptr[OutputStream]> self.wr_file
-
- def _assert_readable(self):
- if not self.is_readable:
- raise IOError("only valid on readonly files")
-
- if not self.is_open:
- raise IOError("file not open")
-
- def _assert_writeable(self):
- if not self.is_writeable:
- raise IOError("only valid on writeable files")
-
- if not self.is_open:
- raise IOError("file not open")
-
- def size(self):
- cdef int64_t size
- self._assert_readable()
- with nogil:
- check_status(self.rd_file.get().GetSize(&size))
- return size
-
- def tell(self):
- cdef int64_t position
- with nogil:
- if self.is_readable:
- check_status(self.rd_file.get().Tell(&position))
- else:
- check_status(self.wr_file.get().Tell(&position))
- return position
-
- def seek(self, int64_t position):
- self._assert_readable()
- with nogil:
- check_status(self.rd_file.get().Seek(position))
-
- def write(self, data):
- """
- Write byte from any object implementing buffer protocol (bytes,
- bytearray, ndarray, pyarrow.Buffer)
- """
- self._assert_writeable()
-
- if isinstance(data, six.string_types):
- data = tobytes(data)
-
- cdef Buffer arrow_buffer = frombuffer(data)
-
- cdef const uint8_t* buf = arrow_buffer.buffer.get().data()
- cdef int64_t bufsize = len(arrow_buffer)
- with nogil:
- check_status(self.wr_file.get().Write(buf, bufsize))
-
- def read(self, nbytes=None):
- cdef:
- int64_t c_nbytes
- int64_t bytes_read = 0
- PyObject* obj
-
- if nbytes is None:
- c_nbytes = self.size() - self.tell()
- else:
- c_nbytes = nbytes
-
- self._assert_readable()
-
- # Allocate empty write space
- obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes)
-
- cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj)
- with nogil:
- check_status(self.rd_file.get().Read(c_nbytes, &bytes_read, buf))
-
- if bytes_read < c_nbytes:
- cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read)
-
- return PyObject_to_object(obj)
-
- def read_buffer(self, nbytes=None):
- cdef:
- int64_t c_nbytes
- int64_t bytes_read = 0
- shared_ptr[CBuffer] output
- self._assert_readable()
-
- if nbytes is None:
- c_nbytes = self.size() - self.tell()
- else:
- c_nbytes = nbytes
-
- with nogil:
- check_status(self.rd_file.get().ReadB(c_nbytes, &output))
-
- return wrap_buffer(output)
-
- def download(self, stream_or_path, buffer_size=None):
- """
- Read file completely to local path (rather than reading completely into
- memory). First seeks to the beginning of the file.
- """
- cdef:
- int64_t bytes_read = 0
- uint8_t* buf
- self._assert_readable()
-
- buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
-
- write_queue = Queue(50)
-
- if not hasattr(stream_or_path, 'read'):
- stream = open(stream_or_path, 'wb')
- cleanup = lambda: stream.close()
- else:
- stream = stream_or_path
- cleanup = lambda: None
-
- done = False
- exc_info = None
- def bg_write():
- try:
- while not done or write_queue.qsize() > 0:
- try:
- buf = write_queue.get(timeout=0.01)
- except QueueEmpty:
- continue
- stream.write(buf)
- except Exception as e:
- exc_info = sys.exc_info()
- finally:
- cleanup()
-
- self.seek(0)
-
- writer_thread = threading.Thread(target=bg_write)
-
- # This isn't ideal -- PyBytes_FromStringAndSize copies the data from
- # the passed buffer, so it's hard for us to avoid doubling the memory
- buf = <uint8_t*> malloc(buffer_size)
- if buf == NULL:
- raise MemoryError("Failed to allocate {0} bytes"
- .format(buffer_size))
-
- writer_thread.start()
-
- cdef int64_t total_bytes = 0
- cdef int32_t c_buffer_size = buffer_size
-
- try:
- while True:
- with nogil:
- check_status(self.rd_file.get()
- .Read(c_buffer_size, &bytes_read, buf))
-
- total_bytes += bytes_read
-
- # EOF
- if bytes_read == 0:
- break
-
- pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf,
- bytes_read)
-
- write_queue.put_nowait(pybuf)
- finally:
- free(buf)
- done = True
-
- writer_thread.join()
- if exc_info is not None:
- raise exc_info[0], exc_info[1], exc_info[2]
-
- def upload(self, stream, buffer_size=None):
- """
- Pipe file-like object to file
- """
- write_queue = Queue(50)
- self._assert_writeable()
-
- buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
-
- done = False
- exc_info = None
- def bg_write():
- try:
- while not done or write_queue.qsize() > 0:
- try:
- buf = write_queue.get(timeout=0.01)
- except QueueEmpty:
- continue
-
- self.write(buf)
-
- except Exception as e:
- exc_info = sys.exc_info()
-
- writer_thread = threading.Thread(target=bg_write)
- writer_thread.start()
-
- try:
- while True:
- buf = stream.read(buffer_size)
- if not buf:
- break
-
- if writer_thread.is_alive():
- while write_queue.full():
- time.sleep(0.01)
- else:
- break
-
- write_queue.put_nowait(buf)
- finally:
- done = True
-
- writer_thread.join()
- if exc_info is not None:
- raise exc_info[0], exc_info[1], exc_info[2]
-
-
-# ----------------------------------------------------------------------
-# Python file-like objects
-
-
-cdef class PythonFileInterface(NativeFile):
- cdef:
- object handle
-
- def __cinit__(self, handle, mode='w'):
- self.handle = handle
-
- if mode.startswith('w'):
- self.wr_file.reset(new pyarrow.PyOutputStream(handle))
- self.is_readable = 0
- self.is_writeable = 1
- elif mode.startswith('r'):
- self.rd_file.reset(new pyarrow.PyReadableFile(handle))
- self.is_readable = 1
- self.is_writeable = 0
- else:
- raise ValueError('Invalid file mode: {0}'.format(mode))
-
- self.is_open = True
-
-
-cdef class MemoryMappedFile(NativeFile):
- """
- Supports 'r', 'r+w', 'w' modes
- """
- cdef:
- object path
-
- def __cinit__(self):
- self.is_open = False
- self.is_readable = 0
- self.is_writeable = 0
-
- @staticmethod
- def create(path, size):
- cdef:
- shared_ptr[CMemoryMappedFile] handle
- c_string c_path = encode_file_path(path)
- int64_t c_size = size
-
- with nogil:
- check_status(CMemoryMappedFile.Create(c_path, c_size, &handle))
-
- cdef MemoryMappedFile result = MemoryMappedFile()
- result.path = path
- result.is_readable = 1
- result.is_writeable = 1
- result.wr_file = <shared_ptr[OutputStream]> handle
- result.rd_file = <shared_ptr[RandomAccessFile]> handle
- result.is_open = True
-
- return result
-
- def open(self, path, mode='r'):
- self.path = path
-
- cdef:
- FileMode c_mode
- shared_ptr[CMemoryMappedFile] handle
- c_string c_path = encode_file_path(path)
-
- if mode in ('r', 'rb'):
- c_mode = FileMode_READ
- self.is_readable = 1
- elif mode in ('w', 'wb'):
- c_mode = FileMode_WRITE
- self.is_writeable = 1
- elif mode == 'r+w':
- c_mode = FileMode_READWRITE
- self.is_readable = 1
- self.is_writeable = 1
- else:
- raise ValueError('Invalid file mode: {0}'.format(mode))
-
- check_status(CMemoryMappedFile.Open(c_path, c_mode, &handle))
-
- self.wr_file = <shared_ptr[OutputStream]> handle
- self.rd_file = <shared_ptr[RandomAccessFile]> handle
- self.is_open = True
-
-
-def memory_map(path, mode='r'):
- """
- Open memory map at file path. Size of the memory map cannot change
-
- Parameters
- ----------
- path : string
- mode : {'r', 'w'}, default 'r'
-
- Returns
- -------
- mmap : MemoryMappedFile
- """
- cdef MemoryMappedFile mmap = MemoryMappedFile()
- mmap.open(path, mode)
- return mmap
-
-
-def create_memory_map(path, size):
- """
- Create memory map at indicated path of the given size, return open
- writeable file object
-
- Parameters
- ----------
- path : string
- size : int
-
- Returns
- -------
- mmap : MemoryMappedFile
- """
- return MemoryMappedFile.create(path, size)
-
-
-cdef class OSFile(NativeFile):
- """
- Supports 'r', 'w' modes
- """
- cdef:
- object path
-
- def __cinit__(self, path, mode='r', MemoryPool memory_pool=None):
- self.path = path
-
- cdef:
- FileMode c_mode
- shared_ptr[Readable] handle
- c_string c_path = encode_file_path(path)
-
- self.is_readable = self.is_writeable = 0
-
- if mode in ('r', 'rb'):
- self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
- elif mode in ('w', 'wb'):
- self._open_writeable(c_path)
- else:
- raise ValueError('Invalid file mode: {0}'.format(mode))
-
- self.is_open = True
-
- cdef _open_readable(self, c_string path, CMemoryPool* pool):
- cdef shared_ptr[ReadableFile] handle
-
- with nogil:
- check_status(ReadableFile.Open(path, pool, &handle))
-
- self.is_readable = 1
- self.rd_file = <shared_ptr[RandomAccessFile]> handle
-
- cdef _open_writeable(self, c_string path):
- cdef shared_ptr[FileOutputStream] handle
-
- with nogil:
- check_status(FileOutputStream.Open(path, &handle))
- self.is_writeable = 1
- self.wr_file = <shared_ptr[OutputStream]> handle
-
-
-# ----------------------------------------------------------------------
-# Arrow buffers
-
-
-cdef class Buffer:
-
- def __cinit__(self):
- pass
-
- cdef init(self, const shared_ptr[CBuffer]& buffer):
- self.buffer = buffer
- self.shape[0] = self.size
- self.strides[0] = <Py_ssize_t>(1)
-
- def __len__(self):
- return self.size
-
- property size:
-
- def __get__(self):
- return self.buffer.get().size()
-
- property parent:
-
- def __get__(self):
- cdef shared_ptr[CBuffer] parent_buf = self.buffer.get().parent()
-
- if parent_buf.get() == NULL:
- return None
- else:
- return wrap_buffer(parent_buf)
-
- def __getitem__(self, key):
- # TODO(wesm): buffer slicing
- raise NotImplementedError
-
- def to_pybytes(self):
- return cp.PyBytes_FromStringAndSize(
- <const char*>self.buffer.get().data(),
- self.buffer.get().size())
-
- def __getbuffer__(self, cp.Py_buffer* buffer, int flags):
-
- buffer.buf = <char *>self.buffer.get().data()
- buffer.format = 'b'
- buffer.internal = NULL
- buffer.itemsize = 1
- buffer.len = self.size
- buffer.ndim = 1
- buffer.obj = self
- buffer.readonly = 1
- buffer.shape = self.shape
- buffer.strides = self.strides
- buffer.suboffsets = NULL
-
-cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool):
- cdef shared_ptr[PoolBuffer] result
- result.reset(new PoolBuffer(pool))
- return result
-
-
-cdef class InMemoryOutputStream(NativeFile):
-
- cdef:
- shared_ptr[PoolBuffer] buffer
-
- def __cinit__(self, MemoryPool memory_pool=None):
- self.buffer = allocate_buffer(maybe_unbox_memory_pool(memory_pool))
- self.wr_file.reset(new BufferOutputStream(
- <shared_ptr[ResizableBuffer]> self.buffer))
- self.is_readable = 0
- self.is_writeable = 1
- self.is_open = True
-
- def get_result(self):
- check_status(self.wr_file.get().Close())
- self.is_open = False
- return wrap_buffer(<shared_ptr[CBuffer]> self.buffer)
-
-
-cdef class BufferReader(NativeFile):
- """
- Zero-copy reader from objects convertible to Arrow buffer
-
- Parameters
- ----------
- obj : Python bytes or pyarrow.io.Buffer
- """
- cdef:
- Buffer buffer
-
- def __cinit__(self, object obj):
-
- if isinstance(obj, Buffer):
- self.buffer = obj
- else:
- self.buffer = frombuffer(obj)
-
- self.rd_file.reset(new CBufferReader(self.buffer.buffer))
- self.is_readable = 1
- self.is_writeable = 0
- self.is_open = True
-
-
-def frombuffer(object obj):
- """
- Construct an Arrow buffer from a Python bytes object
- """
- cdef shared_ptr[CBuffer] buf
- try:
- memoryview(obj)
- buf.reset(new pyarrow.PyBuffer(obj))
- return wrap_buffer(buf)
- except TypeError:
- raise ValueError('Must pass object that implements buffer protocol')
-
-
-
-cdef Buffer wrap_buffer(const shared_ptr[CBuffer]& buf):
- cdef Buffer result = Buffer()
- result.init(buf)
- return result
-
-
-cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader):
- cdef NativeFile nf
-
- if isinstance(source, six.string_types):
- source = memory_map(source, mode='r')
- elif isinstance(source, Buffer):
- source = BufferReader(source)
- elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
- # Optimistically hope this is file-like
- source = PythonFileInterface(source, mode='r')
-
- if isinstance(source, NativeFile):
- nf = source
-
- # TODO: what about read-write sources (e.g. memory maps)
- if not nf.is_readable:
- raise IOError('Native file is not readable')
-
- nf.read_handle(reader)
- else:
- raise TypeError('Unable to read from object of type: {0}'
- .format(type(source)))
-
-
-cdef get_writer(object source, shared_ptr[OutputStream]* writer):
- cdef NativeFile nf
-
- if isinstance(source, six.string_types):
- source = OSFile(source, mode='w')
- elif not isinstance(source, NativeFile) and hasattr(source, 'write'):
- # Optimistically hope this is file-like
- source = PythonFileInterface(source, mode='w')
-
- if isinstance(source, NativeFile):
- nf = source
-
- if nf.is_readable:
- raise IOError('Native file is not writeable')
-
- nf.write_handle(writer)
- else:
- raise TypeError('Unable to read from object of type: {0}'
- .format(type(source)))
-
-# ----------------------------------------------------------------------
-# HDFS IO implementation
-
-_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)')
-
-try:
- # Python 3
- from queue import Queue, Empty as QueueEmpty, Full as QueueFull
-except ImportError:
- from Queue import Queue, Empty as QueueEmpty, Full as QueueFull
-
-
-def have_libhdfs():
- try:
- check_status(HaveLibHdfs())
- return True
- except:
- return False
-
-
-def have_libhdfs3():
- try:
- check_status(HaveLibHdfs3())
- return True
- except:
- return False
-
-
-def strip_hdfs_abspath(path):
- m = _HDFS_PATH_RE.match(path)
- if m:
- return m.group(3)
- else:
- return path
-
-
-cdef class _HdfsClient:
- cdef:
- shared_ptr[CHdfsClient] client
-
- cdef readonly:
- bint is_open
-
- def __cinit__(self):
- pass
-
- def _connect(self, host, port, user, kerb_ticket, driver):
- cdef HdfsConnectionConfig conf
-
- if host is not None:
- conf.host = tobytes(host)
- conf.port = port
- if user is not None:
- conf.user = tobytes(user)
- if kerb_ticket is not None:
- conf.kerb_ticket = tobytes(kerb_ticket)
-
- if driver == 'libhdfs':
- check_status(HaveLibHdfs())
- conf.driver = HdfsDriver_LIBHDFS
- else:
- check_status(HaveLibHdfs3())
- conf.driver = HdfsDriver_LIBHDFS3
-
- with nogil:
- check_status(CHdfsClient.Connect(&conf, &self.client))
- self.is_open = True
-
- @classmethod
- def connect(cls, *args, **kwargs):
- return cls(*args, **kwargs)
-
- def __dealloc__(self):
- if self.is_open:
- self.close()
-
- def close(self):
- """
- Disconnect from the HDFS cluster
- """
- self._ensure_client()
- with nogil:
- check_status(self.client.get().Disconnect())
- self.is_open = False
-
- cdef _ensure_client(self):
- if self.client.get() == NULL:
- raise IOError('HDFS client improperly initialized')
- elif not self.is_open:
- raise IOError('HDFS client is closed')
-
- def exists(self, path):
- """
- Returns True if the path is known to the cluster, False if it does not
- (or there is an RPC error)
- """
- self._ensure_client()
-
- cdef c_string c_path = tobytes(path)
- cdef c_bool result
- with nogil:
- result = self.client.get().Exists(c_path)
- return result
-
- def isdir(self, path):
- cdef HdfsPathInfo info
- self._path_info(path, &info)
- return info.kind == ObjectType_DIRECTORY
-
- def isfile(self, path):
- cdef HdfsPathInfo info
- self._path_info(path, &info)
- return info.kind == ObjectType_FILE
-
- cdef _path_info(self, path, HdfsPathInfo* info):
- cdef c_string c_path = tobytes(path)
-
- with nogil:
- check_status(self.client.get()
- .GetPathInfo(c_path, info))
-
-
- def ls(self, path, bint full_info):
- cdef:
- c_string c_path = tobytes(path)
- vector[HdfsPathInfo] listing
- list results = []
- int i
-
- self._ensure_client()
-
- with nogil:
- check_status(self.client.get()
- .ListDirectory(c_path, &listing))
-
- cdef const HdfsPathInfo* info
- for i in range(<int> listing.size()):
- info = &listing[i]
-
- # Try to trim off the hdfs://HOST:PORT piece
- name = strip_hdfs_abspath(frombytes(info.name))
-
- if full_info:
- kind = ('file' if info.kind == ObjectType_FILE
- else 'directory')
-
- results.append({
- 'kind': kind,
- 'name': name,
- 'owner': frombytes(info.owner),
- 'group': frombytes(info.group),
- 'list_modified_time': info.last_modified_time,
- 'list_access_time': info.last_access_time,
- 'size': info.size,
- 'replication': info.replication,
- 'block_size': info.block_size,
- 'permissions': info.permissions
- })
- else:
- results.append(name)
-
- return results
-
- def mkdir(self, path):
- """
- Create indicated directory and any necessary parent directories
- """
- self._ensure_client()
-
- cdef c_string c_path = tobytes(path)
- with nogil:
- check_status(self.client.get()
- .CreateDirectory(c_path))
-
- def delete(self, path, bint recursive=False):
- """
- Delete the indicated file or directory
-
- Parameters
- ----------
- path : string
- recursive : boolean, default False
- If True, also delete child paths for directories
- """
- self._ensure_client()
-
- cdef c_string c_path = tobytes(path)
- with nogil:
- check_status(self.client.get()
- .Delete(c_path, recursive))
-
- def open(self, path, mode='rb', buffer_size=None, replication=None,
- default_block_size=None):
- """
- Parameters
- ----------
- mode : string, 'rb', 'wb', 'ab'
- """
- self._ensure_client()
-
- cdef HdfsFile out = HdfsFile()
-
- if mode not in ('rb', 'wb', 'ab'):
- raise Exception("Mode must be 'rb' (read), "
- "'wb' (write, new file), or 'ab' (append)")
-
- cdef c_string c_path = tobytes(path)
- cdef c_bool append = False
-
- # 0 in libhdfs means "use the default"
- cdef int32_t c_buffer_size = buffer_size or 0
- cdef int16_t c_replication = replication or 0
- cdef int64_t c_default_block_size = default_block_size or 0
-
- cdef shared_ptr[HdfsOutputStream] wr_handle
- cdef shared_ptr[HdfsReadableFile] rd_handle
-
- if mode in ('wb', 'ab'):
- if mode == 'ab':
- append = True
-
- with nogil:
- check_status(
- self.client.get()
- .OpenWriteable(c_path, append, c_buffer_size,
- c_replication, c_default_block_size,
- &wr_handle))
-
- out.wr_file = <shared_ptr[OutputStream]> wr_handle
-
- out.is_readable = False
- out.is_writeable = 1
- else:
- with nogil:
- check_status(self.client.get()
- .OpenReadable(c_path, &rd_handle))
-
- out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle
- out.is_readable = True
- out.is_writeable = 0
-
- if c_buffer_size == 0:
- c_buffer_size = 2 ** 16
-
- out.mode = mode
- out.buffer_size = c_buffer_size
- out.parent = _HdfsFileNanny(self, out)
- out.is_open = True
- out.own_file = True
-
- return out
-
- def download(self, path, stream, buffer_size=None):
- with self.open(path, 'rb') as f:
- f.download(stream, buffer_size=buffer_size)
-
- def upload(self, path, stream, buffer_size=None):
- """
- Upload file-like object to HDFS path
- """
- with self.open(path, 'wb') as f:
- f.upload(stream, buffer_size=buffer_size)
-
-
-# ARROW-404: Helper class to ensure that files are closed before the
-# client. During deallocation of the extension class, the attributes are
-# decref'd which can cause the client to get closed first if the file has the
-# last remaining reference
-cdef class _HdfsFileNanny:
- cdef:
- object client
- object file_handle_ref
-
- def __cinit__(self, client, file_handle):
- import weakref
- self.client = client
- self.file_handle_ref = weakref.ref(file_handle)
-
- def __dealloc__(self):
- fh = self.file_handle_ref()
- if fh:
- fh.close()
- # avoid cyclic GC
- self.file_handle_ref = None
- self.client = None
-
-
-cdef class HdfsFile(NativeFile):
- cdef readonly:
- int32_t buffer_size
- object mode
- object parent
-
- cdef object __weakref__
-
- def __dealloc__(self):
- self.parent = None
-
-# ----------------------------------------------------------------------
-# File and stream readers and writers
-
-cdef class _StreamWriter:
- cdef:
- shared_ptr[CStreamWriter] writer
- shared_ptr[OutputStream] sink
- bint closed
-
- def __cinit__(self):
- self.closed = True
-
- def __dealloc__(self):
- if not self.closed:
- self.close()
-
- def _open(self, sink, Schema schema):
- get_writer(sink, &self.sink)
-
- with nogil:
- check_status(CStreamWriter.Open(self.sink.get(), schema.sp_schema,
- &self.writer))
-
- self.closed = False
-
- def write_batch(self, RecordBatch batch):
- with nogil:
- check_status(self.writer.get()
- .WriteRecordBatch(deref(batch.batch)))
-
- def close(self):
- with nogil:
- check_status(self.writer.get().Close())
- self.closed = True
-
-
-cdef class _StreamReader:
- cdef:
- shared_ptr[CStreamReader] reader
-
- cdef readonly:
- Schema schema
-
- def __cinit__(self):
- pass
-
- def _open(self, source):
- cdef:
- shared_ptr[RandomAccessFile] reader
- shared_ptr[InputStream] in_stream
-
- get_reader(source, &reader)
- in_stream = <shared_ptr[InputStream]> reader
-
- with nogil:
- check_status(CStreamReader.Open(in_stream, &self.reader))
-
- self.schema = Schema()
- self.schema.init_schema(self.reader.get().schema())
-
- def get_next_batch(self):
- """
- Read next RecordBatch from the stream. Raises StopIteration at end of
- stream
- """
- cdef shared_ptr[CRecordBatch] batch
-
- with nogil:
- check_status(self.reader.get().GetNextRecordBatch(&batch))
-
- if batch.get() == NULL:
- raise StopIteration
-
- return batch_from_cbatch(batch)
-
- def read_all(self):
- """
- Read all record batches as a pyarrow.Table
- """
- cdef:
- vector[shared_ptr[CRecordBatch]] batches
- shared_ptr[CRecordBatch] batch
- shared_ptr[CTable] table
-
- with nogil:
- while True:
- check_status(self.reader.get().GetNextRecordBatch(&batch))
- if batch.get() == NULL:
- break
- batches.push_back(batch)
-
- check_status(CTable.FromRecordBatches(batches, &table))
-
- return table_from_ctable(table)
-
-
-cdef class _FileWriter(_StreamWriter):
-
- def _open(self, sink, Schema schema):
- cdef shared_ptr[CFileWriter] writer
- get_writer(sink, &self.sink)
-
- with nogil:
- check_status(CFileWriter.Open(self.sink.get(), schema.sp_schema,
- &writer))
-
- # Cast to base class, because has same interface
- self.writer = <shared_ptr[CStreamWriter]> writer
- self.closed = False
-
-
-cdef class _FileReader:
- cdef:
- shared_ptr[CFileReader] reader
-
- def __cinit__(self):
- pass
-
- def _open(self, source, footer_offset=None):
- cdef shared_ptr[RandomAccessFile] reader
- get_reader(source, &reader)
-
- cdef int64_t offset = 0
- if footer_offset is not None:
- offset = footer_offset
-
- with nogil:
- if offset != 0:
- check_status(CFileReader.Open2(reader, offset, &self.reader))
- else:
- check_status(CFileReader.Open(reader, &self.reader))
-
- property num_record_batches:
-
- def __get__(self):
- return self.reader.get().num_record_batches()
-
- def get_batch(self, int i):
- cdef shared_ptr[CRecordBatch] batch
-
- if i < 0 or i >= self.num_record_batches:
- raise ValueError('Batch number {0} out of range'.format(i))
-
- with nogil:
- check_status(self.reader.get().GetRecordBatch(i, &batch))
-
- return batch_from_cbatch(batch)
-
- # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of
- # time has passed
- get_record_batch = get_batch
-
- def read_all(self):
- """
- Read all record batches as a pyarrow.Table
- """
- cdef:
- vector[shared_ptr[CRecordBatch]] batches
- shared_ptr[CTable] table
- int i, nbatches
-
- nbatches = self.num_record_batches
-
- batches.resize(nbatches)
- with nogil:
- for i in range(nbatches):
- check_status(self.reader.get().GetRecordBatch(i, &batches[i]))
- check_status(CTable.FromRecordBatches(batches, &table))
-
- return table_from_ctable(table)
-
-
-#----------------------------------------------------------------------
-# Implement legacy Feather file format
-
-
-class FeatherError(Exception):
- pass
-
-
-cdef class FeatherWriter:
- cdef:
- unique_ptr[CFeatherWriter] writer
-
- cdef public:
- int64_t num_rows
-
- def __cinit__(self):
- self.num_rows = -1
-
- def open(self, object dest):
- cdef shared_ptr[OutputStream] sink
- get_writer(dest, &sink)
-
- with nogil:
- check_status(CFeatherWriter.Open(sink, &self.writer))
-
- def close(self):
- if self.num_rows < 0:
- self.num_rows = 0
- self.writer.get().SetNumRows(self.num_rows)
- check_status(self.writer.get().Finalize())
-
- def write_array(self, object name, object col, object mask=None):
- cdef Array arr
-
- if self.num_rows >= 0:
- if len(col) != self.num_rows:
- raise ValueError('prior column had a different number of rows')
- else:
- self.num_rows = len(col)
-
- if isinstance(col, Array):
- arr = col
- else:
- arr = Array.from_numpy(col, mask=mask)
-
- cdef c_string c_name = tobytes(name)
-
- with nogil:
- check_status(
- self.writer.get().Append(c_name, deref(arr.sp_array)))
-
-
-cdef class FeatherReader:
- cdef:
- unique_ptr[CFeatherReader] reader
-
- def __cinit__(self):
- pass
-
- def open(self, source):
- cdef shared_ptr[RandomAccessFile] reader
- get_reader(source, &reader)
-
- with nogil:
- check_status(CFeatherReader.Open(reader, &self.reader))
-
- property num_rows:
-
- def __get__(self):
- return self.reader.get().num_rows()
-
- property num_columns:
-
- def __get__(self):
- return self.reader.get().num_columns()
-
- def get_column_name(self, int i):
- cdef c_string name = self.reader.get().GetColumnName(i)
- return frombytes(name)
-
- def get_column(self, int i):
- if i < 0 or i >= self.num_columns:
- raise IndexError(i)
-
- cdef shared_ptr[CColumn] sp_column
- with nogil:
- check_status(self.reader.get()
- .GetColumn(i, &sp_column))
-
- cdef Column col = Column()
- col.init(sp_column)
- return col
-
-
-def get_tensor_size(Tensor tensor):
- """
- Return total size of serialized Tensor including metadata and padding
- """
- cdef int64_t size
- with nogil:
- check_status(GetTensorSize(deref(tensor.tp), &size))
- return size
-
-
-def get_record_batch_size(RecordBatch batch):
- """
- Return total size of serialized RecordBatch including metadata and padding
- """
- cdef int64_t size
- with nogil:
- check_status(GetRecordBatchSize(deref(batch.batch), &size))
- return size
-
-
-def write_tensor(Tensor tensor, NativeFile dest):
- """
- Write pyarrow.Tensor to pyarrow.NativeFile object its current position
-
- Parameters
- ----------
- tensor : pyarrow.Tensor
- dest : pyarrow.NativeFile
-
- Returns
- -------
- bytes_written : int
- Total number of bytes written to the file
- """
- cdef:
- int32_t metadata_length
- int64_t body_length
-
- dest._assert_writeable()
-
- with nogil:
- check_status(
- WriteTensor(deref(tensor.tp), dest.wr_file.get(),
- &metadata_length, &body_length))
-
- return metadata_length + body_length
-
-
-def read_tensor(NativeFile source):
- """
- Read pyarrow.Tensor from pyarrow.NativeFile object from current
- position. If the file source supports zero copy (e.g. a memory map), then
- this operation does not allocate any memory
-
- Parameters
- ----------
- source : pyarrow.NativeFile
-
- Returns
- -------
- tensor : Tensor
- """
- cdef:
- shared_ptr[CTensor] sp_tensor
-
- source._assert_writeable()
-
- cdef int64_t offset = source.tell()
- with nogil:
- check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor))
-
- return box_tensor(sp_tensor)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index 5a56165..f96ead3 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -17,10 +17,10 @@
# Arrow file and stream reader/writer classes, and other messaging tools
-import pyarrow.io as io
+import pyarrow._io as _io
-class StreamReader(io._StreamReader):
+class StreamReader(_io._StreamReader):
"""
Reader for the Arrow streaming binary format
@@ -37,7 +37,7 @@ class StreamReader(io._StreamReader):
yield self.get_next_batch()
-class StreamWriter(io._StreamWriter):
+class StreamWriter(_io._StreamWriter):
"""
Writer for the Arrow streaming binary format
@@ -52,7 +52,7 @@ class StreamWriter(io._StreamWriter):
self._open(sink, schema)
-class FileReader(io._FileReader):
+class FileReader(_io._FileReader):
"""
Class for reading Arrow record batch data from the Arrow binary file format
@@ -68,7 +68,7 @@ class FileReader(io._FileReader):
self._open(source, footer_offset=footer_offset)
-class FileWriter(io._FileWriter):
+class FileWriter(_io._FileWriter):
"""
Writer to create the Arrow binary file format
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/jemalloc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/jemalloc.pyx b/python/pyarrow/jemalloc.pyx
deleted file mode 100644
index 97583f4..0000000
--- a/python/pyarrow/jemalloc.pyx
+++ /dev/null
@@ -1,28 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from pyarrow.includes.libarrow_jemalloc cimport CJemallocMemoryPool
-from pyarrow.memory cimport MemoryPool
-
-def default_pool():
- cdef MemoryPool pool = MemoryPool()
- pool.init(CJemallocMemoryPool.default_pool())
- return pool
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/memory.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/memory.pxd b/python/pyarrow/memory.pxd
deleted file mode 100644
index bb1af85..0000000
--- a/python/pyarrow/memory.pxd
+++ /dev/null
@@ -1,30 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool
-
-
-cdef class MemoryPool:
- cdef:
- CMemoryPool* pool
-
- cdef init(self, CMemoryPool* pool)
-
-cdef class LoggingMemoryPool(MemoryPool):
- pass
-
-cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/memory.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/memory.pyx b/python/pyarrow/memory.pyx
deleted file mode 100644
index 98dbf66..0000000
--- a/python/pyarrow/memory.pyx
+++ /dev/null
@@ -1,52 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# cython: profile=False
-# distutils: language = c++
-# cython: embedsignature = True
-
-from pyarrow.includes.libarrow cimport CMemoryPool, CLoggingMemoryPool
-from pyarrow.includes.pyarrow cimport set_default_memory_pool, get_memory_pool
-
-cdef class MemoryPool:
- cdef init(self, CMemoryPool* pool):
- self.pool = pool
-
- def bytes_allocated(self):
- return self.pool.bytes_allocated()
-
-cdef CMemoryPool* maybe_unbox_memory_pool(MemoryPool memory_pool):
- if memory_pool is None:
- return get_memory_pool()
- else:
- return memory_pool.pool
-
-cdef class LoggingMemoryPool(MemoryPool):
- pass
-
-def default_pool():
- cdef:
- MemoryPool pool = MemoryPool()
- pool.init(get_memory_pool())
- return pool
-
-def set_default_pool(MemoryPool pool):
- set_default_memory_pool(pool.pool)
-
-def total_allocated_bytes():
- cdef CMemoryPool* pool = get_memory_pool()
- return pool.bytes_allocated()
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index f81b6c2..aaec43a 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -23,8 +23,8 @@ from pyarrow.filesystem import LocalFilesystem
from pyarrow._parquet import (ParquetReader, FileMetaData, # noqa
RowGroupMetaData, Schema, ParquetWriter)
import pyarrow._parquet as _parquet # noqa
-import pyarrow.array as _array
-import pyarrow.table as _table
+import pyarrow._array as _array
+import pyarrow._table as _table
# ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/scalar.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/scalar.pxd b/python/pyarrow/scalar.pxd
deleted file mode 100644
index 62a5664..0000000
--- a/python/pyarrow/scalar.pxd
+++ /dev/null
@@ -1,72 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport *
-
-from pyarrow.schema cimport DataType
-
-
-cdef class Scalar:
- cdef readonly:
- DataType type
-
-
-cdef class NAType(Scalar):
- pass
-
-
-cdef class ArrayValue(Scalar):
- cdef:
- shared_ptr[CArray] sp_array
- int64_t index
-
- cdef void init(self, DataType type,
- const shared_ptr[CArray]& sp_array, int64_t index)
-
- cdef void _set_array(self, const shared_ptr[CArray]& sp_array)
-
-
-cdef class Int8Value(ArrayValue):
- pass
-
-
-cdef class Int64Value(ArrayValue):
- pass
-
-
-cdef class ListValue(ArrayValue):
- cdef readonly:
- DataType value_type
-
- cdef:
- CListArray* ap
-
- cdef getitem(self, int64_t i)
-
-
-cdef class StringValue(ArrayValue):
- pass
-
-
-cdef class FixedSizeBinaryValue(ArrayValue):
- pass
-
-
-cdef object box_scalar(DataType type,
- const shared_ptr[CArray]& sp_array,
- int64_t index)
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/scalar.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/scalar.pyx b/python/pyarrow/scalar.pyx
deleted file mode 100644
index 2b6746a..0000000
--- a/python/pyarrow/scalar.pyx
+++ /dev/null
@@ -1,315 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.schema cimport DataType, box_data_type
-
-from pyarrow.compat import frombytes
-import pyarrow.schema as schema
-import decimal
-import datetime
-
-cimport cpython as cp
-
-NA = None
-
-
-cdef _pandas():
- import pandas as pd
- return pd
-
-
-cdef class NAType(Scalar):
-
- def __cinit__(self):
- global NA
- if NA is not None:
- raise Exception('Cannot create multiple NAType instances')
-
- self.type = schema.null()
-
- def __repr__(self):
- return 'NA'
-
- def as_py(self):
- return None
-
-NA = NAType()
-
-cdef class ArrayValue(Scalar):
-
- cdef void init(self, DataType type, const shared_ptr[CArray]& sp_array,
- int64_t index):
- self.type = type
- self.index = index
- self._set_array(sp_array)
-
- cdef void _set_array(self, const shared_ptr[CArray]& sp_array):
- self.sp_array = sp_array
-
- def __repr__(self):
- if hasattr(self, 'as_py'):
- return repr(self.as_py())
- else:
- return super(Scalar, self).__repr__()
-
-
-cdef class BooleanValue(ArrayValue):
-
- def as_py(self):
- cdef CBooleanArray* ap = <CBooleanArray*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class Int8Value(ArrayValue):
-
- def as_py(self):
- cdef CInt8Array* ap = <CInt8Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class UInt8Value(ArrayValue):
-
- def as_py(self):
- cdef CUInt8Array* ap = <CUInt8Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class Int16Value(ArrayValue):
-
- def as_py(self):
- cdef CInt16Array* ap = <CInt16Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class UInt16Value(ArrayValue):
-
- def as_py(self):
- cdef CUInt16Array* ap = <CUInt16Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class Int32Value(ArrayValue):
-
- def as_py(self):
- cdef CInt32Array* ap = <CInt32Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class UInt32Value(ArrayValue):
-
- def as_py(self):
- cdef CUInt32Array* ap = <CUInt32Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class Int64Value(ArrayValue):
-
- def as_py(self):
- cdef CInt64Array* ap = <CInt64Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class UInt64Value(ArrayValue):
-
- def as_py(self):
- cdef CUInt64Array* ap = <CUInt64Array*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class Date32Value(ArrayValue):
-
- def as_py(self):
- cdef CDate32Array* ap = <CDate32Array*> self.sp_array.get()
-
- # Shift to seconds since epoch
- return datetime.datetime.utcfromtimestamp(
- int(ap.Value(self.index)) * 86400).date()
-
-
-cdef class Date64Value(ArrayValue):
-
- def as_py(self):
- cdef CDate64Array* ap = <CDate64Array*> self.sp_array.get()
- return datetime.datetime.utcfromtimestamp(
- ap.Value(self.index) / 1000).date()
-
-
-cdef class TimestampValue(ArrayValue):
-
- def as_py(self):
- cdef:
- CTimestampArray* ap = <CTimestampArray*> self.sp_array.get()
- CTimestampType* dtype = <CTimestampType*>ap.type().get()
- int64_t val = ap.Value(self.index)
-
- timezone = None
- tzinfo = None
- if dtype.timezone().size() > 0:
- timezone = frombytes(dtype.timezone())
- import pytz
- tzinfo = pytz.timezone(timezone)
-
- try:
- pd = _pandas()
- if dtype.unit() == TimeUnit_SECOND:
- val = val * 1000000000
- elif dtype.unit() == TimeUnit_MILLI:
- val = val * 1000000
- elif dtype.unit() == TimeUnit_MICRO:
- val = val * 1000
- return pd.Timestamp(val, tz=tzinfo)
- except ImportError:
- if dtype.unit() == TimeUnit_SECOND:
- result = datetime.datetime.utcfromtimestamp(val)
- elif dtype.unit() == TimeUnit_MILLI:
- result = datetime.datetime.utcfromtimestamp(float(val) / 1000)
- elif dtype.unit() == TimeUnit_MICRO:
- result = datetime.datetime.utcfromtimestamp(
- float(val) / 1000000)
- else:
- # TimeUnit_NANO
- raise NotImplementedError("Cannot convert nanosecond "
- "timestamps without pandas")
- if timezone is not None:
- result = result.replace(tzinfo=tzinfo)
- return result
-
-
-cdef class FloatValue(ArrayValue):
-
- def as_py(self):
- cdef CFloatArray* ap = <CFloatArray*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class DoubleValue(ArrayValue):
-
- def as_py(self):
- cdef CDoubleArray* ap = <CDoubleArray*> self.sp_array.get()
- return ap.Value(self.index)
-
-
-cdef class DecimalValue(ArrayValue):
-
- def as_py(self):
- cdef:
- CDecimalArray* ap = <CDecimalArray*> self.sp_array.get()
- c_string s = ap.FormatValue(self.index)
- return decimal.Decimal(s.decode('utf8'))
-
-
-cdef class StringValue(ArrayValue):
-
- def as_py(self):
- cdef CStringArray* ap = <CStringArray*> self.sp_array.get()
- return ap.GetString(self.index).decode('utf-8')
-
-
-cdef class BinaryValue(ArrayValue):
-
- def as_py(self):
- cdef:
- const uint8_t* ptr
- int32_t length
- CBinaryArray* ap = <CBinaryArray*> self.sp_array.get()
-
- ptr = ap.GetValue(self.index, &length)
- return cp.PyBytes_FromStringAndSize(<const char*>(ptr), length)
-
-
-cdef class ListValue(ArrayValue):
-
- def __len__(self):
- return self.ap.value_length(self.index)
-
- def __getitem__(self, i):
- return self.getitem(i)
-
- def __iter__(self):
- for i in range(len(self)):
- yield self.getitem(i)
- raise StopIteration
-
- cdef void _set_array(self, const shared_ptr[CArray]& sp_array):
- self.sp_array = sp_array
- self.ap = <CListArray*> sp_array.get()
- self.value_type = box_data_type(self.ap.value_type())
-
- cdef getitem(self, int64_t i):
- cdef int64_t j = self.ap.value_offset(self.index) + i
- return box_scalar(self.value_type, self.ap.values(), j)
-
- def as_py(self):
- cdef:
- int64_t j
- list result = []
-
- for j in range(len(self)):
- result.append(self.getitem(j).as_py())
-
- return result
-
-
-cdef class FixedSizeBinaryValue(ArrayValue):
-
- def as_py(self):
- cdef:
- CFixedSizeBinaryArray* ap
- CFixedSizeBinaryType* ap_type
- int32_t length
- const char* data
- ap = <CFixedSizeBinaryArray*> self.sp_array.get()
- ap_type = <CFixedSizeBinaryType*> ap.type().get()
- length = ap_type.byte_width()
- data = <const char*> ap.GetValue(self.index)
- return cp.PyBytes_FromStringAndSize(data, length)
-
-
-
-cdef dict _scalar_classes = {
- Type_BOOL: BooleanValue,
- Type_UINT8: Int8Value,
- Type_UINT16: Int16Value,
- Type_UINT32: Int32Value,
- Type_UINT64: Int64Value,
- Type_INT8: Int8Value,
- Type_INT16: Int16Value,
- Type_INT32: Int32Value,
- Type_INT64: Int64Value,
- Type_DATE32: Date32Value,
- Type_DATE64: Date64Value,
- Type_TIMESTAMP: TimestampValue,
- Type_FLOAT: FloatValue,
- Type_DOUBLE: DoubleValue,
- Type_LIST: ListValue,
- Type_BINARY: BinaryValue,
- Type_STRING: StringValue,
- Type_FIXED_SIZE_BINARY: FixedSizeBinaryValue,
- Type_DECIMAL: DecimalValue,
-}
-
-cdef object box_scalar(DataType type, const shared_ptr[CArray]& sp_array,
- int64_t index):
- cdef ArrayValue val
- if type.type.id() == Type_NA:
- return NA
- elif sp_array.get().IsNull(index):
- return NA
- else:
- val = _scalar_classes[type.type.id()]()
- val.init(type, sp_array, index)
- return val
http://git-wip-us.apache.org/repos/asf/arrow/blob/8b64a4fb/python/pyarrow/schema.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/schema.pxd b/python/pyarrow/schema.pxd
deleted file mode 100644
index eceedba..0000000
--- a/python/pyarrow/schema.pxd
+++ /dev/null
@@ -1,76 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport (CDataType,
- CDictionaryType,
- CTimestampType,
- CFixedSizeBinaryType,
- CDecimalType,
- CField, CSchema)
-
-cdef class DataType:
- cdef:
- shared_ptr[CDataType] sp_type
- CDataType* type
-
- cdef void init(self, const shared_ptr[CDataType]& type)
-
-
-cdef class DictionaryType(DataType):
- cdef:
- const CDictionaryType* dict_type
-
-
-cdef class TimestampType(DataType):
- cdef:
- const CTimestampType* ts_type
-
-
-cdef class FixedSizeBinaryType(DataType):
- cdef:
- const CFixedSizeBinaryType* fixed_size_binary_type
-
-
-cdef class DecimalType(FixedSizeBinaryType):
- cdef:
- const CDecimalType* decimal_type
-
-
-cdef class Field:
- cdef:
- shared_ptr[CField] sp_field
- CField* field
-
- cdef readonly:
- DataType type
-
- cdef init(self, const shared_ptr[CField]& field)
-
-
-cdef class Schema:
- cdef:
- shared_ptr[CSchema] sp_schema
- CSchema* schema
-
- cdef init(self, const vector[shared_ptr[CField]]& fields)
- cdef init_schema(self, const shared_ptr[CSchema]& schema)
-
-
-cdef DataType box_data_type(const shared_ptr[CDataType]& type)
-cdef Field box_field(const shared_ptr[CField]& field)
-cdef Schema box_schema(const shared_ptr[CSchema]& schema)