You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/04/17 13:22:21 UTC

arrow git commit: ARROW-827: [Python] Miscellaneous improvements to help with Dask support

Repository: arrow
Updated Branches:
  refs/heads/master 09e6eade1 -> f51259068


ARROW-827: [Python] Miscellaneous improvements to help with Dask support

Author: Wes McKinney <we...@twosigma.com>

Closes #543 from wesm/dask-improvements and squashes the following commits:

1f587e2 [Wes McKinney] Store the input Parquet paths on the dataset object
3504281 [Wes McKinney] Add some more cases
edc9b59 [Wes McKinney] Unit tests
88f4380 [Wes McKinney] Use dict for type mapping for now
7e69cab [Wes McKinney] Miscellaneous improvements to help with Dask support


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/f5125906
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/f5125906
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/f5125906

Branch: refs/heads/master
Commit: f51259068640af92490c0832d5d55885a510776d
Parents: 09e6ead
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Apr 17 09:22:15 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Apr 17 09:22:15 2017 -0400

----------------------------------------------------------------------
 python/pyarrow/_array.pyx            | 193 ++++++++++++++++++------------
 python/pyarrow/_parquet.pyx          |  23 +++-
 python/pyarrow/includes/libarrow.pxd |  64 +++++-----
 python/pyarrow/parquet.py            |  22 ++--
 python/pyarrow/tests/test_parquet.py |   4 +
 python/pyarrow/tests/test_schema.py  |  30 +++++
 6 files changed, 222 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/f5125906/python/pyarrow/_array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_array.pyx b/python/pyarrow/_array.pyx
index 7ef8e58..c5a595c 100644
--- a/python/pyarrow/_array.pyx
+++ b/python/pyarrow/_array.pyx
@@ -41,6 +41,31 @@ cdef _pandas():
     return pd
 
 
+# These are imprecise because the type (in pandas 0.x) depends on the presence
+# of nulls
+_pandas_type_map = {
+    _Type_NA: np.float64,  # NaNs
+    _Type_BOOL: np.bool_,
+    _Type_INT8: np.int8,
+    _Type_INT16: np.int16,
+    _Type_INT32: np.int32,
+    _Type_INT64: np.int64,
+    _Type_UINT8: np.uint8,
+    _Type_UINT16: np.uint16,
+    _Type_UINT32: np.uint32,
+    _Type_UINT64: np.uint64,
+    _Type_HALF_FLOAT: np.float16,
+    _Type_FLOAT: np.float32,
+    _Type_DOUBLE: np.float64,
+    _Type_DATE32: np.dtype('datetime64[ns]'),
+    _Type_DATE64: np.dtype('datetime64[ns]'),
+    _Type_TIMESTAMP: np.dtype('datetime64[ns]'),
+    _Type_BINARY: np.object_,
+    _Type_FIXED_SIZE_BINARY: np.object_,
+    _Type_STRING: np.object_,
+    _Type_LIST: np.object_
+}
+
 cdef class DataType:
 
     def __cinit__(self):
@@ -64,6 +89,16 @@ cdef class DataType:
         else:
             raise TypeError('Invalid comparison')
 
+    def to_pandas_dtype(self):
+        """
+        Return the NumPy dtype that would be used for storing this
+        """
+        cdef Type type_id = self.type.id()
+        if type_id in _pandas_type_map:
+            return _pandas_type_map[type_id]
+        else:
+            raise NotImplementedError(str(self))
+
 
 cdef class DictionaryType(DataType):
 
@@ -167,6 +202,16 @@ cdef class Schema:
 
         return result
 
+    property names:
+
+        def __get__(self):
+            cdef int i
+            result = []
+            for i in range(self.schema.num_fields()):
+                name = frombytes(self.schema.field(i).get().name())
+                result.append(name)
+            return result
+
     cdef init(self, const vector[shared_ptr[CField]]& fields):
         self.schema = new CSchema(fields)
         self.sp_schema.reset(self.schema)
@@ -244,56 +289,56 @@ def field(name, type, bint nullable=True):
 
 
 cdef set PRIMITIVE_TYPES = set([
-    Type_NA, Type_BOOL,
-    Type_UINT8, Type_INT8,
-    Type_UINT16, Type_INT16,
-    Type_UINT32, Type_INT32,
-    Type_UINT64, Type_INT64,
-    Type_TIMESTAMP, Type_DATE32,
-    Type_DATE64,
-    Type_HALF_FLOAT,
-    Type_FLOAT,
-    Type_DOUBLE])
+    _Type_NA, _Type_BOOL,
+    _Type_UINT8, _Type_INT8,
+    _Type_UINT16, _Type_INT16,
+    _Type_UINT32, _Type_INT32,
+    _Type_UINT64, _Type_INT64,
+    _Type_TIMESTAMP, _Type_DATE32,
+    _Type_DATE64,
+    _Type_HALF_FLOAT,
+    _Type_FLOAT,
+    _Type_DOUBLE])
 
 
 def null():
-    return primitive_type(Type_NA)
+    return primitive_type(_Type_NA)
 
 
 def bool_():
-    return primitive_type(Type_BOOL)
+    return primitive_type(_Type_BOOL)
 
 
 def uint8():
-    return primitive_type(Type_UINT8)
+    return primitive_type(_Type_UINT8)
 
 
 def int8():
-    return primitive_type(Type_INT8)
+    return primitive_type(_Type_INT8)
 
 
 def uint16():
-    return primitive_type(Type_UINT16)
+    return primitive_type(_Type_UINT16)
 
 
 def int16():
-    return primitive_type(Type_INT16)
+    return primitive_type(_Type_INT16)
 
 
 def uint32():
-    return primitive_type(Type_UINT32)
+    return primitive_type(_Type_UINT32)
 
 
 def int32():
-    return primitive_type(Type_INT32)
+    return primitive_type(_Type_INT32)
 
 
 def uint64():
-    return primitive_type(Type_UINT64)
+    return primitive_type(_Type_UINT64)
 
 
 def int64():
-    return primitive_type(Type_INT64)
+    return primitive_type(_Type_INT64)
 
 
 cdef dict _timestamp_type_cache = {}
@@ -344,23 +389,23 @@ def timestamp(unit_str, tz=None):
 
 
 def date32():
-    return primitive_type(Type_DATE32)
+    return primitive_type(_Type_DATE32)
 
 
 def date64():
-    return primitive_type(Type_DATE64)
+    return primitive_type(_Type_DATE64)
 
 
 def float16():
-    return primitive_type(Type_HALF_FLOAT)
+    return primitive_type(_Type_HALF_FLOAT)
 
 
 def float32():
-    return primitive_type(Type_FLOAT)
+    return primitive_type(_Type_FLOAT)
 
 
 def float64():
-    return primitive_type(Type_DOUBLE)
+    return primitive_type(_Type_DOUBLE)
 
 
 cpdef DataType decimal(int precision, int scale=0):
@@ -373,7 +418,7 @@ def string():
     """
     UTF8 string
     """
-    return primitive_type(Type_STRING)
+    return primitive_type(_Type_STRING)
 
 
 def binary(int length=-1):
@@ -387,7 +432,7 @@ def binary(int length=-1):
         width `length`.
     """
     if length == -1:
-        return primitive_type(Type_BINARY)
+        return primitive_type(_Type_BINARY)
 
     cdef shared_ptr[CDataType] fixed_size_binary_type
     fixed_size_binary_type.reset(new CFixedSizeBinaryType(length))
@@ -443,13 +488,13 @@ cdef DataType box_data_type(const shared_ptr[CDataType]& type):
     if type.get() == NULL:
         return None
 
-    if type.get().id() == Type_DICTIONARY:
+    if type.get().id() == _Type_DICTIONARY:
         out = DictionaryType()
-    elif type.get().id() == Type_TIMESTAMP:
+    elif type.get().id() == _Type_TIMESTAMP:
         out = TimestampType()
-    elif type.get().id() == Type_FIXED_SIZE_BINARY:
+    elif type.get().id() == _Type_FIXED_SIZE_BINARY:
         out = FixedSizeBinaryType()
-    elif type.get().id() == Type_DECIMAL:
+    elif type.get().id() == _Type_DECIMAL:
         out = DecimalType()
     else:
         out = DataType()
@@ -732,31 +777,31 @@ cdef class FixedSizeBinaryValue(ArrayValue):
 
 
 cdef dict _scalar_classes = {
-    Type_BOOL: BooleanValue,
-    Type_UINT8: Int8Value,
-    Type_UINT16: Int16Value,
-    Type_UINT32: Int32Value,
-    Type_UINT64: Int64Value,
-    Type_INT8: Int8Value,
-    Type_INT16: Int16Value,
-    Type_INT32: Int32Value,
-    Type_INT64: Int64Value,
-    Type_DATE32: Date32Value,
-    Type_DATE64: Date64Value,
-    Type_TIMESTAMP: TimestampValue,
-    Type_FLOAT: FloatValue,
-    Type_DOUBLE: DoubleValue,
-    Type_LIST: ListValue,
-    Type_BINARY: BinaryValue,
-    Type_STRING: StringValue,
-    Type_FIXED_SIZE_BINARY: FixedSizeBinaryValue,
-    Type_DECIMAL: DecimalValue,
+    _Type_BOOL: BooleanValue,
+    _Type_UINT8: Int8Value,
+    _Type_UINT16: Int16Value,
+    _Type_UINT32: Int32Value,
+    _Type_UINT64: Int64Value,
+    _Type_INT8: Int8Value,
+    _Type_INT16: Int16Value,
+    _Type_INT32: Int32Value,
+    _Type_INT64: Int64Value,
+    _Type_DATE32: Date32Value,
+    _Type_DATE64: Date64Value,
+    _Type_TIMESTAMP: TimestampValue,
+    _Type_FLOAT: FloatValue,
+    _Type_DOUBLE: DoubleValue,
+    _Type_LIST: ListValue,
+    _Type_BINARY: BinaryValue,
+    _Type_STRING: StringValue,
+    _Type_FIXED_SIZE_BINARY: FixedSizeBinaryValue,
+    _Type_DECIMAL: DecimalValue,
 }
 
 cdef object box_scalar(DataType type, const shared_ptr[CArray]& sp_array,
                        int64_t index):
     cdef ArrayValue val
-    if type.type.id() == Type_NA:
+    if type.type.id() == _Type_NA:
         return NA
     elif sp_array.get().IsNull(index):
         return NA
@@ -1306,29 +1351,29 @@ cdef class DictionaryArray(Array):
 
 
 cdef dict _array_classes = {
-    Type_NA: NullArray,
-    Type_BOOL: BooleanArray,
-    Type_UINT8: UInt8Array,
-    Type_UINT16: UInt16Array,
-    Type_UINT32: UInt32Array,
-    Type_UINT64: UInt64Array,
-    Type_INT8: Int8Array,
-    Type_INT16: Int16Array,
-    Type_INT32: Int32Array,
-    Type_INT64: Int64Array,
-    Type_DATE32: Date32Array,
-    Type_DATE64: Date64Array,
-    Type_TIMESTAMP: TimestampArray,
-    Type_TIME32: Time32Array,
-    Type_TIME64: Time64Array,
-    Type_FLOAT: FloatArray,
-    Type_DOUBLE: DoubleArray,
-    Type_LIST: ListArray,
-    Type_BINARY: BinaryArray,
-    Type_STRING: StringArray,
-    Type_DICTIONARY: DictionaryArray,
-    Type_FIXED_SIZE_BINARY: FixedSizeBinaryArray,
-    Type_DECIMAL: DecimalArray,
+    _Type_NA: NullArray,
+    _Type_BOOL: BooleanArray,
+    _Type_UINT8: UInt8Array,
+    _Type_UINT16: UInt16Array,
+    _Type_UINT32: UInt32Array,
+    _Type_UINT64: UInt64Array,
+    _Type_INT8: Int8Array,
+    _Type_INT16: Int16Array,
+    _Type_INT32: Int32Array,
+    _Type_INT64: Int64Array,
+    _Type_DATE32: Date32Array,
+    _Type_DATE64: Date64Array,
+    _Type_TIMESTAMP: TimestampArray,
+    _Type_TIME32: Time32Array,
+    _Type_TIME64: Time64Array,
+    _Type_FLOAT: FloatArray,
+    _Type_DOUBLE: DoubleArray,
+    _Type_LIST: ListArray,
+    _Type_BINARY: BinaryArray,
+    _Type_STRING: StringArray,
+    _Type_DICTIONARY: DictionaryArray,
+    _Type_FIXED_SIZE_BINARY: FixedSizeBinaryArray,
+    _Type_DECIMAL: DecimalArray,
 }
 
 cdef object box_array(const shared_ptr[CArray]& sp_array):

http://git-wip-us.apache.org/repos/asf/arrow/blob/f5125906/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index dafcdaf..c06eab2 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -23,7 +23,7 @@ from cython.operator cimport dereference as deref
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport *
 cimport pyarrow.includes.pyarrow as pyarrow
-from pyarrow._array cimport Array, Schema
+from pyarrow._array cimport Array, Schema, box_schema
 from pyarrow._error cimport check_status
 from pyarrow._memory cimport MemoryPool, maybe_unbox_memory_pool
 from pyarrow._table cimport Table, table_from_ctable
@@ -194,6 +194,27 @@ cdef class ParquetSchema:
     def __getitem__(self, i):
         return self.column(i)
 
+    property names:
+
+        def __get__(self):
+            return [self[i].name for i in range(len(self))]
+
+    def to_arrow_schema(self):
+        """
+        Convert Parquet schema to effective Arrow schema
+
+        Returns
+        -------
+        schema : pyarrow.Schema
+        """
+        cdef:
+            shared_ptr[CSchema] sp_arrow_schema
+
+        with nogil:
+            check_status(FromParquetSchema(self.schema, &sp_arrow_schema))
+
+        return box_schema(sp_arrow_schema)
+
     def equals(self, ParquetSchema other):
         """
         Returns True if the Parquet schemas are equal

http://git-wip-us.apache.org/repos/asf/arrow/blob/f5125906/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 2444f3f..b8aa24c 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1,4 +1,4 @@
-# Licensed to the Apache Software Foundation (ASF) under one
+#t Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
 # regarding copyright ownership.  The ASF licenses this file
@@ -22,37 +22,37 @@ from pyarrow.includes.common cimport *
 cdef extern from "arrow/api.h" namespace "arrow" nogil:
 
     enum Type" arrow::Type::type":
-        Type_NA" arrow::Type::NA"
-
-        Type_BOOL" arrow::Type::BOOL"
-
-        Type_UINT8" arrow::Type::UINT8"
-        Type_INT8" arrow::Type::INT8"
-        Type_UINT16" arrow::Type::UINT16"
-        Type_INT16" arrow::Type::INT16"
-        Type_UINT32" arrow::Type::UINT32"
-        Type_INT32" arrow::Type::INT32"
-        Type_UINT64" arrow::Type::UINT64"
-        Type_INT64" arrow::Type::INT64"
-
-        Type_HALF_FLOAT" arrow::Type::HALF_FLOAT"
-        Type_FLOAT" arrow::Type::FLOAT"
-        Type_DOUBLE" arrow::Type::DOUBLE"
-
-        Type_DECIMAL" arrow::Type::DECIMAL"
-
-        Type_DATE32" arrow::Type::DATE32"
-        Type_DATE64" arrow::Type::DATE64"
-        Type_TIMESTAMP" arrow::Type::TIMESTAMP"
-        Type_TIME32" arrow::Type::TIME32"
-        Type_TIME64" arrow::Type::TIME64"
-        Type_BINARY" arrow::Type::BINARY"
-        Type_STRING" arrow::Type::STRING"
-        Type_FIXED_SIZE_BINARY" arrow::Type::FIXED_SIZE_BINARY"
-
-        Type_LIST" arrow::Type::LIST"
-        Type_STRUCT" arrow::Type::STRUCT"
-        Type_DICTIONARY" arrow::Type::DICTIONARY"
+        _Type_NA" arrow::Type::NA"
+
+        _Type_BOOL" arrow::Type::BOOL"
+
+        _Type_UINT8" arrow::Type::UINT8"
+        _Type_INT8" arrow::Type::INT8"
+        _Type_UINT16" arrow::Type::UINT16"
+        _Type_INT16" arrow::Type::INT16"
+        _Type_UINT32" arrow::Type::UINT32"
+        _Type_INT32" arrow::Type::INT32"
+        _Type_UINT64" arrow::Type::UINT64"
+        _Type_INT64" arrow::Type::INT64"
+
+        _Type_HALF_FLOAT" arrow::Type::HALF_FLOAT"
+        _Type_FLOAT" arrow::Type::FLOAT"
+        _Type_DOUBLE" arrow::Type::DOUBLE"
+
+        _Type_DECIMAL" arrow::Type::DECIMAL"
+
+        _Type_DATE32" arrow::Type::DATE32"
+        _Type_DATE64" arrow::Type::DATE64"
+        _Type_TIMESTAMP" arrow::Type::TIMESTAMP"
+        _Type_TIME32" arrow::Type::TIME32"
+        _Type_TIME64" arrow::Type::TIME64"
+        _Type_BINARY" arrow::Type::BINARY"
+        _Type_STRING" arrow::Type::STRING"
+        _Type_FIXED_SIZE_BINARY" arrow::Type::FIXED_SIZE_BINARY"
+
+        _Type_LIST" arrow::Type::LIST"
+        _Type_STRUCT" arrow::Type::STRUCT"
+        _Type_DICTIONARY" arrow::Type::DICTIONARY"
 
     enum TimeUnit" arrow::TimeUnit":
         TimeUnit_SECOND" arrow::TimeUnit::SECOND"

http://git-wip-us.apache.org/repos/asf/arrow/blob/f5125906/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 4ff7e03..fef99d5 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -180,14 +180,13 @@ class ParquetDatasetPiece(object):
         """
         Returns instance of ParquetFile
         """
-        if open_file_func is None:
-            def simple_opener(path):
-                return ParquetFile(path)
-            open_file_func = simple_opener
-        return open_file_func(self.path)
+        reader = open_file_func(self.path)
+        if not isinstance(reader, ParquetFile):
+            reader = ParquetFile(reader)
+        return reader
 
     def read(self, columns=None, nthreads=1, partitions=None,
-             open_file_func=None):
+             open_file_func=None, file=None):
         """
         Read this piece as a pyarrow.Table
 
@@ -205,7 +204,10 @@ class ParquetDatasetPiece(object):
         -------
         table : pyarrow.Table
         """
-        reader = self._open(open_file_func)
+        if open_file_func is not None:
+            reader = self._open(open_file_func)
+        elif file is not None:
+            reader = ParquetFile(file)
 
         if self.row_group is not None:
             table = reader.read_row_group(self.row_group, columns=columns,
@@ -472,6 +474,8 @@ class ParquetDataset(object):
         else:
             self.fs = filesystem
 
+        self.paths = path_or_paths
+
         (self.pieces, self.partitions,
          self.metadata_path) = _make_manifest(path_or_paths, self.fs)
 
@@ -550,6 +554,10 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
     partitions = None
     metadata_path = None
 
+    if len(path_or_paths) == 1:
+        # Dask passes a directory as a list of length 1
+        path_or_paths = path_or_paths[0]
+
     if is_string(path_or_paths) and fs.isdir(path_or_paths):
         manifest = ParquetManifest(path_or_paths, filesystem=fs,
                                    pathsep=pathsep)

http://git-wip-us.apache.org/repos/asf/arrow/blob/f5125906/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index ca6ae2d..fc35781 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -552,6 +552,10 @@ def test_read_common_metadata_files(tmpdir):
     pf = pq.ParquetFile(data_path)
     assert dataset.schema.equals(pf.schema)
 
+    # handle list of one directory
+    dataset2 = pq.ParquetDataset([base_path])
+    assert dataset2.schema.equals(dataset.schema)
+
 
 def _filter_partition(df, part_keys):
     predicate = np.ones(len(df), dtype=bool)

http://git-wip-us.apache.org/repos/asf/arrow/blob/f5125906/python/pyarrow/tests/test_schema.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_schema.py b/python/pyarrow/tests/test_schema.py
index 53b6b68..d1107fb 100644
--- a/python/pyarrow/tests/test_schema.py
+++ b/python/pyarrow/tests/test_schema.py
@@ -31,6 +31,34 @@ def test_type_integers():
         assert str(t) == name
 
 
+def test_type_to_pandas_dtype():
+    M8_ns = np.dtype('datetime64[ns]')
+    cases = [
+        (pa.null(), np.float64),
+        (pa.bool_(), np.bool_),
+        (pa.int8(), np.int8),
+        (pa.int16(), np.int16),
+        (pa.int32(), np.int32),
+        (pa.int64(), np.int64),
+        (pa.uint8(), np.uint8),
+        (pa.uint16(), np.uint16),
+        (pa.uint32(), np.uint32),
+        (pa.uint64(), np.uint64),
+        (pa.float16(), np.float16),
+        (pa.float32(), np.float32),
+        (pa.float64(), np.float64),
+        (pa.date32(), M8_ns),
+        (pa.date64(), M8_ns),
+        (pa.timestamp('ms'), M8_ns),
+        (pa.binary(), np.object_),
+        (pa.binary(12), np.object_),
+        (pa.string(), np.object_),
+        (pa.list_(pa.int8()), np.object_),
+    ]
+    for arrow_type, numpy_type in cases:
+        assert arrow_type.to_pandas_dtype() == numpy_type
+
+
 def test_type_list():
     value_type = pa.int32()
     list_type = pa.list_(value_type)
@@ -83,6 +111,8 @@ def test_schema():
     ]
     sch = pa.schema(fields)
 
+    assert sch.names == ['foo', 'bar', 'baz']
+
     assert len(sch) == 3
     assert sch[0].name == 'foo'
     assert sch[0].type == fields[0].type