You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/04/17 13:22:21 UTC
arrow git commit: ARROW-827: [Python] Miscellaneous improvements to
help with Dask support
Repository: arrow
Updated Branches:
refs/heads/master 09e6eade1 -> f51259068
ARROW-827: [Python] Miscellaneous improvements to help with Dask support
Author: Wes McKinney <we...@twosigma.com>
Closes #543 from wesm/dask-improvements and squashes the following commits:
1f587e2 [Wes McKinney] Store the input Parquet paths on the dataset object
3504281 [Wes McKinney] Add some more cases
edc9b59 [Wes McKinney] Unit tests
88f4380 [Wes McKinney] Use dict for type mapping for now
7e69cab [Wes McKinney] Miscellaneous improvements to help with Dask support
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/f5125906
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/f5125906
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/f5125906
Branch: refs/heads/master
Commit: f51259068640af92490c0832d5d55885a510776d
Parents: 09e6ead
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Apr 17 09:22:15 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Apr 17 09:22:15 2017 -0400
----------------------------------------------------------------------
python/pyarrow/_array.pyx | 193 ++++++++++++++++++------------
python/pyarrow/_parquet.pyx | 23 +++-
python/pyarrow/includes/libarrow.pxd | 64 +++++-----
python/pyarrow/parquet.py | 22 ++--
python/pyarrow/tests/test_parquet.py | 4 +
python/pyarrow/tests/test_schema.py | 30 +++++
6 files changed, 222 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/f5125906/python/pyarrow/_array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_array.pyx b/python/pyarrow/_array.pyx
index 7ef8e58..c5a595c 100644
--- a/python/pyarrow/_array.pyx
+++ b/python/pyarrow/_array.pyx
@@ -41,6 +41,31 @@ cdef _pandas():
return pd
+# These are imprecise because the type (in pandas 0.x) depends on the presence
+# of nulls
+_pandas_type_map = {
+ _Type_NA: np.float64, # NaNs
+ _Type_BOOL: np.bool_,
+ _Type_INT8: np.int8,
+ _Type_INT16: np.int16,
+ _Type_INT32: np.int32,
+ _Type_INT64: np.int64,
+ _Type_UINT8: np.uint8,
+ _Type_UINT16: np.uint16,
+ _Type_UINT32: np.uint32,
+ _Type_UINT64: np.uint64,
+ _Type_HALF_FLOAT: np.float16,
+ _Type_FLOAT: np.float32,
+ _Type_DOUBLE: np.float64,
+ _Type_DATE32: np.dtype('datetime64[ns]'),
+ _Type_DATE64: np.dtype('datetime64[ns]'),
+ _Type_TIMESTAMP: np.dtype('datetime64[ns]'),
+ _Type_BINARY: np.object_,
+ _Type_FIXED_SIZE_BINARY: np.object_,
+ _Type_STRING: np.object_,
+ _Type_LIST: np.object_
+}
+
cdef class DataType:
def __cinit__(self):
@@ -64,6 +89,16 @@ cdef class DataType:
else:
raise TypeError('Invalid comparison')
+ def to_pandas_dtype(self):
+ """
+ Return the NumPy dtype that would be used for storing this
+ """
+ cdef Type type_id = self.type.id()
+ if type_id in _pandas_type_map:
+ return _pandas_type_map[type_id]
+ else:
+ raise NotImplementedError(str(self))
+
cdef class DictionaryType(DataType):
@@ -167,6 +202,16 @@ cdef class Schema:
return result
+ property names:
+
+ def __get__(self):
+ cdef int i
+ result = []
+ for i in range(self.schema.num_fields()):
+ name = frombytes(self.schema.field(i).get().name())
+ result.append(name)
+ return result
+
cdef init(self, const vector[shared_ptr[CField]]& fields):
self.schema = new CSchema(fields)
self.sp_schema.reset(self.schema)
@@ -244,56 +289,56 @@ def field(name, type, bint nullable=True):
cdef set PRIMITIVE_TYPES = set([
- Type_NA, Type_BOOL,
- Type_UINT8, Type_INT8,
- Type_UINT16, Type_INT16,
- Type_UINT32, Type_INT32,
- Type_UINT64, Type_INT64,
- Type_TIMESTAMP, Type_DATE32,
- Type_DATE64,
- Type_HALF_FLOAT,
- Type_FLOAT,
- Type_DOUBLE])
+ _Type_NA, _Type_BOOL,
+ _Type_UINT8, _Type_INT8,
+ _Type_UINT16, _Type_INT16,
+ _Type_UINT32, _Type_INT32,
+ _Type_UINT64, _Type_INT64,
+ _Type_TIMESTAMP, _Type_DATE32,
+ _Type_DATE64,
+ _Type_HALF_FLOAT,
+ _Type_FLOAT,
+ _Type_DOUBLE])
def null():
- return primitive_type(Type_NA)
+ return primitive_type(_Type_NA)
def bool_():
- return primitive_type(Type_BOOL)
+ return primitive_type(_Type_BOOL)
def uint8():
- return primitive_type(Type_UINT8)
+ return primitive_type(_Type_UINT8)
def int8():
- return primitive_type(Type_INT8)
+ return primitive_type(_Type_INT8)
def uint16():
- return primitive_type(Type_UINT16)
+ return primitive_type(_Type_UINT16)
def int16():
- return primitive_type(Type_INT16)
+ return primitive_type(_Type_INT16)
def uint32():
- return primitive_type(Type_UINT32)
+ return primitive_type(_Type_UINT32)
def int32():
- return primitive_type(Type_INT32)
+ return primitive_type(_Type_INT32)
def uint64():
- return primitive_type(Type_UINT64)
+ return primitive_type(_Type_UINT64)
def int64():
- return primitive_type(Type_INT64)
+ return primitive_type(_Type_INT64)
cdef dict _timestamp_type_cache = {}
@@ -344,23 +389,23 @@ def timestamp(unit_str, tz=None):
def date32():
- return primitive_type(Type_DATE32)
+ return primitive_type(_Type_DATE32)
def date64():
- return primitive_type(Type_DATE64)
+ return primitive_type(_Type_DATE64)
def float16():
- return primitive_type(Type_HALF_FLOAT)
+ return primitive_type(_Type_HALF_FLOAT)
def float32():
- return primitive_type(Type_FLOAT)
+ return primitive_type(_Type_FLOAT)
def float64():
- return primitive_type(Type_DOUBLE)
+ return primitive_type(_Type_DOUBLE)
cpdef DataType decimal(int precision, int scale=0):
@@ -373,7 +418,7 @@ def string():
"""
UTF8 string
"""
- return primitive_type(Type_STRING)
+ return primitive_type(_Type_STRING)
def binary(int length=-1):
@@ -387,7 +432,7 @@ def binary(int length=-1):
width `length`.
"""
if length == -1:
- return primitive_type(Type_BINARY)
+ return primitive_type(_Type_BINARY)
cdef shared_ptr[CDataType] fixed_size_binary_type
fixed_size_binary_type.reset(new CFixedSizeBinaryType(length))
@@ -443,13 +488,13 @@ cdef DataType box_data_type(const shared_ptr[CDataType]& type):
if type.get() == NULL:
return None
- if type.get().id() == Type_DICTIONARY:
+ if type.get().id() == _Type_DICTIONARY:
out = DictionaryType()
- elif type.get().id() == Type_TIMESTAMP:
+ elif type.get().id() == _Type_TIMESTAMP:
out = TimestampType()
- elif type.get().id() == Type_FIXED_SIZE_BINARY:
+ elif type.get().id() == _Type_FIXED_SIZE_BINARY:
out = FixedSizeBinaryType()
- elif type.get().id() == Type_DECIMAL:
+ elif type.get().id() == _Type_DECIMAL:
out = DecimalType()
else:
out = DataType()
@@ -732,31 +777,31 @@ cdef class FixedSizeBinaryValue(ArrayValue):
cdef dict _scalar_classes = {
- Type_BOOL: BooleanValue,
- Type_UINT8: Int8Value,
- Type_UINT16: Int16Value,
- Type_UINT32: Int32Value,
- Type_UINT64: Int64Value,
- Type_INT8: Int8Value,
- Type_INT16: Int16Value,
- Type_INT32: Int32Value,
- Type_INT64: Int64Value,
- Type_DATE32: Date32Value,
- Type_DATE64: Date64Value,
- Type_TIMESTAMP: TimestampValue,
- Type_FLOAT: FloatValue,
- Type_DOUBLE: DoubleValue,
- Type_LIST: ListValue,
- Type_BINARY: BinaryValue,
- Type_STRING: StringValue,
- Type_FIXED_SIZE_BINARY: FixedSizeBinaryValue,
- Type_DECIMAL: DecimalValue,
+ _Type_BOOL: BooleanValue,
+ _Type_UINT8: Int8Value,
+ _Type_UINT16: Int16Value,
+ _Type_UINT32: Int32Value,
+ _Type_UINT64: Int64Value,
+ _Type_INT8: Int8Value,
+ _Type_INT16: Int16Value,
+ _Type_INT32: Int32Value,
+ _Type_INT64: Int64Value,
+ _Type_DATE32: Date32Value,
+ _Type_DATE64: Date64Value,
+ _Type_TIMESTAMP: TimestampValue,
+ _Type_FLOAT: FloatValue,
+ _Type_DOUBLE: DoubleValue,
+ _Type_LIST: ListValue,
+ _Type_BINARY: BinaryValue,
+ _Type_STRING: StringValue,
+ _Type_FIXED_SIZE_BINARY: FixedSizeBinaryValue,
+ _Type_DECIMAL: DecimalValue,
}
cdef object box_scalar(DataType type, const shared_ptr[CArray]& sp_array,
int64_t index):
cdef ArrayValue val
- if type.type.id() == Type_NA:
+ if type.type.id() == _Type_NA:
return NA
elif sp_array.get().IsNull(index):
return NA
@@ -1306,29 +1351,29 @@ cdef class DictionaryArray(Array):
cdef dict _array_classes = {
- Type_NA: NullArray,
- Type_BOOL: BooleanArray,
- Type_UINT8: UInt8Array,
- Type_UINT16: UInt16Array,
- Type_UINT32: UInt32Array,
- Type_UINT64: UInt64Array,
- Type_INT8: Int8Array,
- Type_INT16: Int16Array,
- Type_INT32: Int32Array,
- Type_INT64: Int64Array,
- Type_DATE32: Date32Array,
- Type_DATE64: Date64Array,
- Type_TIMESTAMP: TimestampArray,
- Type_TIME32: Time32Array,
- Type_TIME64: Time64Array,
- Type_FLOAT: FloatArray,
- Type_DOUBLE: DoubleArray,
- Type_LIST: ListArray,
- Type_BINARY: BinaryArray,
- Type_STRING: StringArray,
- Type_DICTIONARY: DictionaryArray,
- Type_FIXED_SIZE_BINARY: FixedSizeBinaryArray,
- Type_DECIMAL: DecimalArray,
+ _Type_NA: NullArray,
+ _Type_BOOL: BooleanArray,
+ _Type_UINT8: UInt8Array,
+ _Type_UINT16: UInt16Array,
+ _Type_UINT32: UInt32Array,
+ _Type_UINT64: UInt64Array,
+ _Type_INT8: Int8Array,
+ _Type_INT16: Int16Array,
+ _Type_INT32: Int32Array,
+ _Type_INT64: Int64Array,
+ _Type_DATE32: Date32Array,
+ _Type_DATE64: Date64Array,
+ _Type_TIMESTAMP: TimestampArray,
+ _Type_TIME32: Time32Array,
+ _Type_TIME64: Time64Array,
+ _Type_FLOAT: FloatArray,
+ _Type_DOUBLE: DoubleArray,
+ _Type_LIST: ListArray,
+ _Type_BINARY: BinaryArray,
+ _Type_STRING: StringArray,
+ _Type_DICTIONARY: DictionaryArray,
+ _Type_FIXED_SIZE_BINARY: FixedSizeBinaryArray,
+ _Type_DECIMAL: DecimalArray,
}
cdef object box_array(const shared_ptr[CArray]& sp_array):
http://git-wip-us.apache.org/repos/asf/arrow/blob/f5125906/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index dafcdaf..c06eab2 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -23,7 +23,7 @@ from cython.operator cimport dereference as deref
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
cimport pyarrow.includes.pyarrow as pyarrow
-from pyarrow._array cimport Array, Schema
+from pyarrow._array cimport Array, Schema, box_schema
from pyarrow._error cimport check_status
from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
from pyarrow._table cimport Table, table_from_ctable
@@ -194,6 +194,27 @@ cdef class ParquetSchema:
def __getitem__(self, i):
return self.column(i)
+ property names:
+
+ def __get__(self):
+ return [self[i].name for i in range(len(self))]
+
+ def to_arrow_schema(self):
+ """
+ Convert Parquet schema to effective Arrow schema
+
+ Returns
+ -------
+ schema : pyarrow.Schema
+ """
+ cdef:
+ shared_ptr[CSchema] sp_arrow_schema
+
+ with nogil:
+ check_status(FromParquetSchema(self.schema, &sp_arrow_schema))
+
+ return box_schema(sp_arrow_schema)
+
def equals(self, ParquetSchema other):
"""
Returns True if the Parquet schemas are equal
http://git-wip-us.apache.org/repos/asf/arrow/blob/f5125906/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 2444f3f..b8aa24c 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1,4 +1,4 @@
-# Licensed to the Apache Software Foundation (ASF) under one
+#t Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
@@ -22,37 +22,37 @@ from pyarrow.includes.common cimport *
cdef extern from "arrow/api.h" namespace "arrow" nogil:
enum Type" arrow::Type::type":
- Type_NA" arrow::Type::NA"
-
- Type_BOOL" arrow::Type::BOOL"
-
- Type_UINT8" arrow::Type::UINT8"
- Type_INT8" arrow::Type::INT8"
- Type_UINT16" arrow::Type::UINT16"
- Type_INT16" arrow::Type::INT16"
- Type_UINT32" arrow::Type::UINT32"
- Type_INT32" arrow::Type::INT32"
- Type_UINT64" arrow::Type::UINT64"
- Type_INT64" arrow::Type::INT64"
-
- Type_HALF_FLOAT" arrow::Type::HALF_FLOAT"
- Type_FLOAT" arrow::Type::FLOAT"
- Type_DOUBLE" arrow::Type::DOUBLE"
-
- Type_DECIMAL" arrow::Type::DECIMAL"
-
- Type_DATE32" arrow::Type::DATE32"
- Type_DATE64" arrow::Type::DATE64"
- Type_TIMESTAMP" arrow::Type::TIMESTAMP"
- Type_TIME32" arrow::Type::TIME32"
- Type_TIME64" arrow::Type::TIME64"
- Type_BINARY" arrow::Type::BINARY"
- Type_STRING" arrow::Type::STRING"
- Type_FIXED_SIZE_BINARY" arrow::Type::FIXED_SIZE_BINARY"
-
- Type_LIST" arrow::Type::LIST"
- Type_STRUCT" arrow::Type::STRUCT"
- Type_DICTIONARY" arrow::Type::DICTIONARY"
+ _Type_NA" arrow::Type::NA"
+
+ _Type_BOOL" arrow::Type::BOOL"
+
+ _Type_UINT8" arrow::Type::UINT8"
+ _Type_INT8" arrow::Type::INT8"
+ _Type_UINT16" arrow::Type::UINT16"
+ _Type_INT16" arrow::Type::INT16"
+ _Type_UINT32" arrow::Type::UINT32"
+ _Type_INT32" arrow::Type::INT32"
+ _Type_UINT64" arrow::Type::UINT64"
+ _Type_INT64" arrow::Type::INT64"
+
+ _Type_HALF_FLOAT" arrow::Type::HALF_FLOAT"
+ _Type_FLOAT" arrow::Type::FLOAT"
+ _Type_DOUBLE" arrow::Type::DOUBLE"
+
+ _Type_DECIMAL" arrow::Type::DECIMAL"
+
+ _Type_DATE32" arrow::Type::DATE32"
+ _Type_DATE64" arrow::Type::DATE64"
+ _Type_TIMESTAMP" arrow::Type::TIMESTAMP"
+ _Type_TIME32" arrow::Type::TIME32"
+ _Type_TIME64" arrow::Type::TIME64"
+ _Type_BINARY" arrow::Type::BINARY"
+ _Type_STRING" arrow::Type::STRING"
+ _Type_FIXED_SIZE_BINARY" arrow::Type::FIXED_SIZE_BINARY"
+
+ _Type_LIST" arrow::Type::LIST"
+ _Type_STRUCT" arrow::Type::STRUCT"
+ _Type_DICTIONARY" arrow::Type::DICTIONARY"
enum TimeUnit" arrow::TimeUnit":
TimeUnit_SECOND" arrow::TimeUnit::SECOND"
http://git-wip-us.apache.org/repos/asf/arrow/blob/f5125906/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 4ff7e03..fef99d5 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -180,14 +180,13 @@ class ParquetDatasetPiece(object):
"""
Returns instance of ParquetFile
"""
- if open_file_func is None:
- def simple_opener(path):
- return ParquetFile(path)
- open_file_func = simple_opener
- return open_file_func(self.path)
+ reader = open_file_func(self.path)
+ if not isinstance(reader, ParquetFile):
+ reader = ParquetFile(reader)
+ return reader
def read(self, columns=None, nthreads=1, partitions=None,
- open_file_func=None):
+ open_file_func=None, file=None):
"""
Read this piece as a pyarrow.Table
@@ -205,7 +204,10 @@ class ParquetDatasetPiece(object):
-------
table : pyarrow.Table
"""
- reader = self._open(open_file_func)
+ if open_file_func is not None:
+ reader = self._open(open_file_func)
+ elif file is not None:
+ reader = ParquetFile(file)
if self.row_group is not None:
table = reader.read_row_group(self.row_group, columns=columns,
@@ -472,6 +474,8 @@ class ParquetDataset(object):
else:
self.fs = filesystem
+ self.paths = path_or_paths
+
(self.pieces, self.partitions,
self.metadata_path) = _make_manifest(path_or_paths, self.fs)
@@ -550,6 +554,10 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
partitions = None
metadata_path = None
+ if len(path_or_paths) == 1:
+ # Dask passes a directory as a list of length 1
+ path_or_paths = path_or_paths[0]
+
if is_string(path_or_paths) and fs.isdir(path_or_paths):
manifest = ParquetManifest(path_or_paths, filesystem=fs,
pathsep=pathsep)
http://git-wip-us.apache.org/repos/asf/arrow/blob/f5125906/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index ca6ae2d..fc35781 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -552,6 +552,10 @@ def test_read_common_metadata_files(tmpdir):
pf = pq.ParquetFile(data_path)
assert dataset.schema.equals(pf.schema)
+ # handle list of one directory
+ dataset2 = pq.ParquetDataset([base_path])
+ assert dataset2.schema.equals(dataset.schema)
+
def _filter_partition(df, part_keys):
predicate = np.ones(len(df), dtype=bool)
http://git-wip-us.apache.org/repos/asf/arrow/blob/f5125906/python/pyarrow/tests/test_schema.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_schema.py b/python/pyarrow/tests/test_schema.py
index 53b6b68..d1107fb 100644
--- a/python/pyarrow/tests/test_schema.py
+++ b/python/pyarrow/tests/test_schema.py
@@ -31,6 +31,34 @@ def test_type_integers():
assert str(t) == name
+def test_type_to_pandas_dtype():
+ M8_ns = np.dtype('datetime64[ns]')
+ cases = [
+ (pa.null(), np.float64),
+ (pa.bool_(), np.bool_),
+ (pa.int8(), np.int8),
+ (pa.int16(), np.int16),
+ (pa.int32(), np.int32),
+ (pa.int64(), np.int64),
+ (pa.uint8(), np.uint8),
+ (pa.uint16(), np.uint16),
+ (pa.uint32(), np.uint32),
+ (pa.uint64(), np.uint64),
+ (pa.float16(), np.float16),
+ (pa.float32(), np.float32),
+ (pa.float64(), np.float64),
+ (pa.date32(), M8_ns),
+ (pa.date64(), M8_ns),
+ (pa.timestamp('ms'), M8_ns),
+ (pa.binary(), np.object_),
+ (pa.binary(12), np.object_),
+ (pa.string(), np.object_),
+ (pa.list_(pa.int8()), np.object_),
+ ]
+ for arrow_type, numpy_type in cases:
+ assert arrow_type.to_pandas_dtype() == numpy_type
+
+
def test_type_list():
value_type = pa.int32()
list_type = pa.list_(value_type)
@@ -83,6 +111,8 @@ def test_schema():
]
sch = pa.schema(fields)
+ assert sch.names == ['foo', 'bar', 'baz']
+
assert len(sch) == 3
assert sch[0].name == 'foo'
assert sch[0].type == fields[0].type