You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/10/05 23:42:26 UTC

[spark] branch master updated: [SPARK-36711][PYTHON][FOLLOW-UP] Refactor typing logic for multi-index support

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a2452f  [SPARK-36711][PYTHON][FOLLOW-UP] Refactor typing logic for multi-index support
6a2452f is described below

commit 6a2452fb5cd776dc1f292704e6b86bbec0ff24e7
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Wed Oct 6 08:41:10 2021 +0900

    [SPARK-36711][PYTHON][FOLLOW-UP] Refactor typing logic for multi-index support
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to refactor typing logic for multi-index support that was mostly introduced in https://github.com/apache/spark/pull/34176
    
    At a high level, the below functions were introduced
    
    ```bash
    _extract_types # renamed from `extract_types`
    ```
    
    ```
    _is_named_params
    _address_named_type_hoders
    _to_tuple_of_params
    _convert_tuples_to_zip
    _address_unnamed_type_holders
    ```
    
    In this PR, they become as below with simplification:
    
    ```bash
    _to_type_holders # renamed from `_extract_types`
    ```
    
    ```bash
    _new_type_holders # merged from `_is_named_params`, etc.
    ```
    
    ### Why are the changes needed?
    
    To make the codes easier to read.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, dev-only.
    
    ### How was this patch tested?
    
    Existing tests should cover them.
    
    Closes #34181 from HyukjinKwon/SPARK-36711.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/pandas/accessors.py         |  12 +-
 python/pyspark/pandas/frame.py             |   2 +-
 python/pyspark/pandas/groupby.py           |   2 +-
 python/pyspark/pandas/typedef/typehints.py | 190 +++++++++++++----------------
 4 files changed, 92 insertions(+), 114 deletions(-)

diff --git a/python/pyspark/pandas/accessors.py b/python/pyspark/pandas/accessors.py
index afb3424..e69a86e 100644
--- a/python/pyspark/pandas/accessors.py
+++ b/python/pyspark/pandas/accessors.py
@@ -385,7 +385,7 @@ class PandasOnSparkFrameMethods(object):
                     "hints; however, the return type was %s." % return_sig
                 )
             index_fields = cast(DataFrameType, return_type).index_fields
-            should_retain_index = index_fields is not None
+            should_retain_index = len(index_fields) > 0
             return_schema = cast(DataFrameType, return_type).spark_type
 
             output_func = GroupBy._make_pandas_df_builder_func(
@@ -688,18 +688,14 @@ class PandasOnSparkFrameMethods(object):
                 return first_series(DataFrame(internal))
             else:
                 index_fields = cast(DataFrameType, return_type).index_fields
-                index_fields = (
-                    [index_field.normalize_spark_type() for index_field in index_fields]
-                    if index_fields is not None
-                    else None
-                )
+                index_fields = [index_field.normalize_spark_type() for index_field in index_fields]
                 data_fields = [
                     field.normalize_spark_type()
                     for field in cast(DataFrameType, return_type).data_fields
                 ]
-                normalized_fields = (index_fields if index_fields is not None else []) + data_fields
+                normalized_fields = index_fields + data_fields
                 return_schema = StructType([field.struct_field for field in normalized_fields])
-                should_retain_index = index_fields is not None
+                should_retain_index = len(index_fields) > 0
 
                 self_applied = DataFrame(self._psdf._internal.resolved_copy)
 
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 9c0f857..7a4817d 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -2538,7 +2538,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
                         "was %s" % return_sig
                     )
                 index_fields = cast(DataFrameType, return_type).index_fields
-                should_retain_index = index_fields is not None
+                should_retain_index = len(index_fields) > 0
                 data_fields = cast(DataFrameType, return_type).data_fields
                 return_schema = cast(DataFrameType, return_type).spark_type
             else:
diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index 097afb6..57c2281 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -1253,7 +1253,7 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
                 data_fields = cast(DataFrameType, return_type).data_fields
                 return_schema = cast(DataFrameType, return_type).spark_type
                 index_fields = cast(DataFrameType, return_type).index_fields
-                should_retain_index = index_fields is not None
+                should_retain_index = len(index_fields) > 0
                 psdf_from_pandas = None
             else:
                 should_return_series = True
diff --git a/python/pyspark/pandas/typedef/typehints.py b/python/pyspark/pandas/typedef/typehints.py
index 9fe6e3e..f77d17f 100644
--- a/python/pyspark/pandas/typedef/typehints.py
+++ b/python/pyspark/pandas/typedef/typehints.py
@@ -94,12 +94,12 @@ class SeriesType(Generic[T]):
 class DataFrameType(object):
     def __init__(
         self,
-        index_fields: Optional[List["InternalField"]],
+        index_fields: List["InternalField"],
         data_fields: List["InternalField"],
     ):
         self.index_fields = index_fields
         self.data_fields = data_fields
-        self.fields = index_fields + data_fields if isinstance(index_fields, List) else data_fields
+        self.fields = index_fields + data_fields
 
     @property
     def dtypes(self) -> List[Dtype]:
@@ -135,11 +135,13 @@ class UnknownType(object):
 class IndexNameTypeHolder(object):
     name = None
     tpe = None
+    short_name = "IndexNameType"
 
 
 class NameTypeHolder(object):
     name = None
     tpe = None
+    short_name = "NameType"
 
 
 def as_spark_type(
@@ -612,9 +614,8 @@ def infer_return_type(f: Callable) -> Union[SeriesType, DataFrameType, ScalarTyp
                     )
                 )
         else:
-            assert len(index_parameters) == 0
             # No type hint for index.
-            index_fields = None
+            assert len(index_parameters) == 0
 
         data_dtypes, data_spark_types = zip(
             *(
@@ -665,7 +666,9 @@ def create_type_for_series_type(param: Any) -> Type[SeriesType]:
     from pyspark.pandas.typedef import NameTypeHolder
 
     if isinstance(param, ExtensionDtype):
-        new_class = type("NameType", (NameTypeHolder,), {})  # type: Type[NameTypeHolder]
+        new_class = type(
+            NameTypeHolder.short_name, (NameTypeHolder,), {}
+        )  # type: Type[NameTypeHolder]
         new_class.tpe = param
     else:
         new_class = param.type if isinstance(param, np.dtype) else param
@@ -726,131 +729,110 @@ def create_tuple_for_frame_type(params: Any) -> object:
         ... # doctest: +ELLIPSIS
         typing.Tuple[...IndexNameType, ...NameType]
     """
-    return Tuple[_extract_types(params)]
+    return Tuple[_to_type_holders(params)]
 
 
-def _extract_types(params: Any) -> Tuple:
-    origin = params
+def _to_type_holders(params: Any) -> Tuple:
+    from pyspark.pandas.typedef import NameTypeHolder, IndexNameTypeHolder
 
-    params = _to_tuple_of_params(params)
+    is_with_index = (
+        isinstance(params, tuple)
+        and len(params) == 2
+        and isinstance(params[1], (zip, list, pd.Series))
+    )
 
-    if _is_named_params(params):
-        # Example:
-        #   DataFrame["id": int, "A": int]
-        new_params = _address_named_type_hoders(params, is_index=False)
-        return tuple(new_params)
-    elif len(params) == 2 and isinstance(params[1], (zip, list, pd.Series)):
-        # Example:
-        #   DataFrame[int, [int, int]]
-        #   DataFrame[pdf.index.dtype, pdf.dtypes]
-        #   DataFrame[("index", int), [("id", int), ("A", int)]]
-        #   DataFrame[(pdf.index.name, pdf.index.dtype), zip(pdf.columns, pdf.dtypes)]
-        #
-        #   DataFrame[[int, int], [int, int]]
-        #   DataFrame[pdf.index.dtypes, pdf.dtypes]
-        #   DataFrame[[("index", int), ("index-2", int)], [("id", int), ("A", int)]]
-        #   DataFrame[zip(pdf.index.names, pdf.index.dtypes), zip(pdf.columns, pdf.dtypes)]
+    if is_with_index:
+        # With index
+        #   DataFrame[index_type, [type, ...]]
+        #   DataFrame[dtype instance, dtypes instance]
+        #   DataFrame[[index_type, ...], [type, ...]]
+        #   DataFrame[dtypes instance, dtypes instance]
+        #   DataFrame[(index_name, index_type), [(name, type), ...]]
+        #   DataFrame[(index_name, index_type), zip(names, types)]
+        #   DataFrame[[(index_name, index_type), ...], [(name, type), ...]]
+        #   DataFrame[zip(index_names, index_types), zip(names, types)]
+        def is_list_of_pairs(p: Any) -> bool:
+            return (
+                isinstance(p, list)
+                and len(p) >= 1
+                and all(isinstance(param, tuple) and (len(param) == 2) for param in p)
+            )
 
         index_params = params[0]
-
         if isinstance(index_params, tuple) and len(index_params) == 2:
-            index_params = tuple([slice(*index_params)])
+            # DataFrame[("index", int), ...]
+            index_params = [index_params]
 
-        index_params = _convert_tuples_to_zip(index_params)
-        index_params = _to_tuple_of_params(index_params)
+        if is_list_of_pairs(index_params):
+            # DataFrame[[("index", int), ("index-2", int)], ...]
+            index_params = tuple(slice(name, tpe) for name, tpe in index_params)
 
-        if _is_named_params(index_params):
-            # Example:
-            #   DataFrame[[("id", int), ("A", int)], [int, int]]
-            new_index_params = _address_named_type_hoders(index_params, is_index=True)
-            index_types = tuple(new_index_params)
-        else:
-            # Exaxmples:
-            #   DataFrame[[float, float], [int, int]]
-            #   DataFrame[pdf.dtypes, [int, int]]
-            index_types = _address_unnamed_type_holders(index_params, origin, is_index=True)
+        index_types = _new_type_holders(index_params, IndexNameTypeHolder)
 
         data_types = params[1]
-        data_types = _convert_tuples_to_zip(data_types)
+        if is_list_of_pairs(data_types):
+            # DataFrame[..., [("id", int), ("A", int)]]
+            data_types = tuple(slice(*data_type) for data_type in data_types)
 
-        return index_types + _extract_types(data_types)
+        data_types = _new_type_holders(data_types, NameTypeHolder)
 
+        return index_types + data_types
     else:
-        # Exaxmples:
-        #   DataFrame[float, float]
-        #   DataFrame[pdf.dtypes]
-        return _address_unnamed_type_holders(params, origin, is_index=False)
-
-
-def _is_named_params(params: Any) -> Any:
-    return all(
-        isinstance(param, slice) and param.step is None and param.stop is not None
-        for param in params
-    )
-
-
-def _address_named_type_hoders(params: Any, is_index: bool) -> Any:
-    # Example:
-    #   params = (slice("id", int, None), slice("A", int, None))
-    new_params = []
-    for param in params:
-        new_param = (
-            type("IndexNameType", (IndexNameTypeHolder,), {})
-            if is_index
-            else type("NameType", (NameTypeHolder,), {})
-        )  # type: Union[Type[IndexNameTypeHolder], Type[NameTypeHolder]]
-        new_param.name = param.start
-        if isinstance(param.stop, ExtensionDtype):
-            new_param.tpe = param.stop
-        else:
-            # When the given argument is a numpy's dtype instance.
-            new_param.tpe = param.stop.type if isinstance(param.stop, np.dtype) else param.stop
-        new_params.append(new_param)
-    return new_params
+        # Without index
+        #   DataFrame[type, type, ...]
+        #   DataFrame[name: type, name: type, ...]
+        #   DataFrame[dtypes instance]
+        #   DataFrame[zip(names, types)]
+        return _new_type_holders(params, NameTypeHolder)
 
 
-def _to_tuple_of_params(params: Any) -> Any:
-    """
-    >>> _to_tuple_of_params(int)
-    (<class 'int'>,)
-
-    >>> _to_tuple_of_params([int, int, int])
-    (<class 'int'>, <class 'int'>, <class 'int'>)
-
-    >>> arrays = [[1, 1, 2], ['red', 'blue', 'red']]
-    >>> idx = pd.MultiIndex.from_arrays(arrays, names=('number', 'color'))
-    >>> pdf = pd.DataFrame([[1, 2], [2, 3], [4, 5]], index=idx, columns=["a", "b"])
-
-    >>> _to_tuple_of_params(zip(pdf.columns, pdf.dtypes))
-    (slice('a', dtype('int64'), None), slice('b', dtype('int64'), None))
-    >>> _to_tuple_of_params(zip(pdf.index.names, pdf.index.dtypes))
-    (slice('number', dtype('int64'), None), slice('color', dtype('O'), None))
-    """
+def _new_type_holders(
+    params: Any, holder_clazz: Type[Union[NameTypeHolder, IndexNameTypeHolder]]
+) -> Tuple:
     if isinstance(params, zip):
+        #   DataFrame[zip(names, types)]
         params = tuple(slice(name, tpe) for name, tpe in params)  # type: ignore[misc, has-type]
 
     if isinstance(params, Iterable):
+        #   DataFrame[type, type, ...]
+        #   DataFrame[name: type, name: type, ...]
+        #   DataFrame[dtypes instance]
         params = tuple(params)
     else:
+        #   DataFrame[type, type]
+        #   DataFrame[name: type]
         params = (params,)
-    return params
-
-
-def _convert_tuples_to_zip(params: Any) -> Any:
-    if isinstance(params, list) and len(params) >= 1 and isinstance(params[0], tuple):
-        return zip((name for name, _ in params), (tpe for _, tpe in params))
-    return params
 
+    is_named_params = all(
+        isinstance(param, slice) and param.step is None and param.stop is not None
+        for param in params
+    )
+    is_unnamed_params = all(
+        not isinstance(param, slice) and not isinstance(param, Iterable) for param in params
+    )
 
-def _address_unnamed_type_holders(params: Any, origin: Any, is_index: bool) -> Any:
-    if all(not isinstance(param, slice) and not isinstance(param, Iterable) for param in params):
+    if is_named_params:
+        # DataFrame["id": int, "A": int]
+        new_params = []
+        for param in params:
+            new_param = type(
+                holder_clazz.short_name, (holder_clazz,), {}
+            )  # type: Type[Union[NameTypeHolder, IndexNameTypeHolder]]
+            new_param.name = param.start
+            if isinstance(param.stop, ExtensionDtype):
+                new_param.tpe = param.stop
+            else:
+                # When the given argument is a numpy's dtype instance.
+                new_param.tpe = param.stop.type if isinstance(param.stop, np.dtype) else param.stop
+            new_params.append(new_param)
+        return tuple(new_params)
+    elif is_unnamed_params:
+        # DataFrame[float, float]
         new_types = []
         for param in params:
-            new_type = (
-                type("IndexNameType", (IndexNameTypeHolder,), {})
-                if is_index
-                else type("NameType", (NameTypeHolder,), {})
-            )  # type: Union[Type[IndexNameTypeHolder], Type[NameTypeHolder]]
+            new_type = type(
+                holder_clazz.short_name, (holder_clazz,), {}
+            )  # type: Type[Union[NameTypeHolder, IndexNameTypeHolder]]
             if isinstance(param, ExtensionDtype):
                 new_type.tpe = param
             else:
@@ -872,7 +854,7 @@ def _address_unnamed_type_holders(params: Any, origin: Any, is_index: bool) -> A
   - DataFrame[[(index_name, index_type), ...], [(name, type), ...]]
   - DataFrame[dtypes instance, dtypes instance]
   - DataFrame[zip(index_names, index_types), zip(names, types)]\n"""
-            + "However, got %s." % str(origin)
+            + "However, got %s." % str(params)
         )
 
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org