You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/10/05 23:42:26 UTC
[spark] branch master updated: [SPARK-36711][PYTHON][FOLLOW-UP]
Refactor typing logic for multi-index support
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6a2452f [SPARK-36711][PYTHON][FOLLOW-UP] Refactor typing logic for multi-index support
6a2452f is described below
commit 6a2452fb5cd776dc1f292704e6b86bbec0ff24e7
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Wed Oct 6 08:41:10 2021 +0900
[SPARK-36711][PYTHON][FOLLOW-UP] Refactor typing logic for multi-index support
### What changes were proposed in this pull request?
This PR proposes to refactor typing logic for multi-index support that was mostly introduced in https://github.com/apache/spark/pull/34176
At a high level, the below functions were introduced
```bash
_extract_types # renamed from `extract_types`
```
```
_is_named_params
_address_named_type_hoders
_to_tuple_of_params
_convert_tuples_to_zip
_address_unnamed_type_holders
```
In this PR, they become as below with simplification:
```bash
_to_type_holders # renamed from `_extract_types`
```
```bash
_new_type_holders # merged from `_is_named_params`, etc.
```
### Why are the changes needed?
To make the codes easier to read.
### Does this PR introduce _any_ user-facing change?
No, dev-only.
### How was this patch tested?
Existing tests should cover them.
Closes #34181 from HyukjinKwon/SPARK-36711.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/pandas/accessors.py | 12 +-
python/pyspark/pandas/frame.py | 2 +-
python/pyspark/pandas/groupby.py | 2 +-
python/pyspark/pandas/typedef/typehints.py | 190 +++++++++++++----------------
4 files changed, 92 insertions(+), 114 deletions(-)
diff --git a/python/pyspark/pandas/accessors.py b/python/pyspark/pandas/accessors.py
index afb3424..e69a86e 100644
--- a/python/pyspark/pandas/accessors.py
+++ b/python/pyspark/pandas/accessors.py
@@ -385,7 +385,7 @@ class PandasOnSparkFrameMethods(object):
"hints; however, the return type was %s." % return_sig
)
index_fields = cast(DataFrameType, return_type).index_fields
- should_retain_index = index_fields is not None
+ should_retain_index = len(index_fields) > 0
return_schema = cast(DataFrameType, return_type).spark_type
output_func = GroupBy._make_pandas_df_builder_func(
@@ -688,18 +688,14 @@ class PandasOnSparkFrameMethods(object):
return first_series(DataFrame(internal))
else:
index_fields = cast(DataFrameType, return_type).index_fields
- index_fields = (
- [index_field.normalize_spark_type() for index_field in index_fields]
- if index_fields is not None
- else None
- )
+ index_fields = [index_field.normalize_spark_type() for index_field in index_fields]
data_fields = [
field.normalize_spark_type()
for field in cast(DataFrameType, return_type).data_fields
]
- normalized_fields = (index_fields if index_fields is not None else []) + data_fields
+ normalized_fields = index_fields + data_fields
return_schema = StructType([field.struct_field for field in normalized_fields])
- should_retain_index = index_fields is not None
+ should_retain_index = len(index_fields) > 0
self_applied = DataFrame(self._psdf._internal.resolved_copy)
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 9c0f857..7a4817d 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -2538,7 +2538,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
"was %s" % return_sig
)
index_fields = cast(DataFrameType, return_type).index_fields
- should_retain_index = index_fields is not None
+ should_retain_index = len(index_fields) > 0
data_fields = cast(DataFrameType, return_type).data_fields
return_schema = cast(DataFrameType, return_type).spark_type
else:
diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index 097afb6..57c2281 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -1253,7 +1253,7 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
data_fields = cast(DataFrameType, return_type).data_fields
return_schema = cast(DataFrameType, return_type).spark_type
index_fields = cast(DataFrameType, return_type).index_fields
- should_retain_index = index_fields is not None
+ should_retain_index = len(index_fields) > 0
psdf_from_pandas = None
else:
should_return_series = True
diff --git a/python/pyspark/pandas/typedef/typehints.py b/python/pyspark/pandas/typedef/typehints.py
index 9fe6e3e..f77d17f 100644
--- a/python/pyspark/pandas/typedef/typehints.py
+++ b/python/pyspark/pandas/typedef/typehints.py
@@ -94,12 +94,12 @@ class SeriesType(Generic[T]):
class DataFrameType(object):
def __init__(
self,
- index_fields: Optional[List["InternalField"]],
+ index_fields: List["InternalField"],
data_fields: List["InternalField"],
):
self.index_fields = index_fields
self.data_fields = data_fields
- self.fields = index_fields + data_fields if isinstance(index_fields, List) else data_fields
+ self.fields = index_fields + data_fields
@property
def dtypes(self) -> List[Dtype]:
@@ -135,11 +135,13 @@ class UnknownType(object):
class IndexNameTypeHolder(object):
name = None
tpe = None
+ short_name = "IndexNameType"
class NameTypeHolder(object):
name = None
tpe = None
+ short_name = "NameType"
def as_spark_type(
@@ -612,9 +614,8 @@ def infer_return_type(f: Callable) -> Union[SeriesType, DataFrameType, ScalarTyp
)
)
else:
- assert len(index_parameters) == 0
# No type hint for index.
- index_fields = None
+ assert len(index_parameters) == 0
data_dtypes, data_spark_types = zip(
*(
@@ -665,7 +666,9 @@ def create_type_for_series_type(param: Any) -> Type[SeriesType]:
from pyspark.pandas.typedef import NameTypeHolder
if isinstance(param, ExtensionDtype):
- new_class = type("NameType", (NameTypeHolder,), {}) # type: Type[NameTypeHolder]
+ new_class = type(
+ NameTypeHolder.short_name, (NameTypeHolder,), {}
+ ) # type: Type[NameTypeHolder]
new_class.tpe = param
else:
new_class = param.type if isinstance(param, np.dtype) else param
@@ -726,131 +729,110 @@ def create_tuple_for_frame_type(params: Any) -> object:
... # doctest: +ELLIPSIS
typing.Tuple[...IndexNameType, ...NameType]
"""
- return Tuple[_extract_types(params)]
+ return Tuple[_to_type_holders(params)]
-def _extract_types(params: Any) -> Tuple:
- origin = params
+def _to_type_holders(params: Any) -> Tuple:
+ from pyspark.pandas.typedef import NameTypeHolder, IndexNameTypeHolder
- params = _to_tuple_of_params(params)
+ is_with_index = (
+ isinstance(params, tuple)
+ and len(params) == 2
+ and isinstance(params[1], (zip, list, pd.Series))
+ )
- if _is_named_params(params):
- # Example:
- # DataFrame["id": int, "A": int]
- new_params = _address_named_type_hoders(params, is_index=False)
- return tuple(new_params)
- elif len(params) == 2 and isinstance(params[1], (zip, list, pd.Series)):
- # Example:
- # DataFrame[int, [int, int]]
- # DataFrame[pdf.index.dtype, pdf.dtypes]
- # DataFrame[("index", int), [("id", int), ("A", int)]]
- # DataFrame[(pdf.index.name, pdf.index.dtype), zip(pdf.columns, pdf.dtypes)]
- #
- # DataFrame[[int, int], [int, int]]
- # DataFrame[pdf.index.dtypes, pdf.dtypes]
- # DataFrame[[("index", int), ("index-2", int)], [("id", int), ("A", int)]]
- # DataFrame[zip(pdf.index.names, pdf.index.dtypes), zip(pdf.columns, pdf.dtypes)]
+ if is_with_index:
+ # With index
+ # DataFrame[index_type, [type, ...]]
+ # DataFrame[dtype instance, dtypes instance]
+ # DataFrame[[index_type, ...], [type, ...]]
+ # DataFrame[dtypes instance, dtypes instance]
+ # DataFrame[(index_name, index_type), [(name, type), ...]]
+ # DataFrame[(index_name, index_type), zip(names, types)]
+ # DataFrame[[(index_name, index_type), ...], [(name, type), ...]]
+ # DataFrame[zip(index_names, index_types), zip(names, types)]
+ def is_list_of_pairs(p: Any) -> bool:
+ return (
+ isinstance(p, list)
+ and len(p) >= 1
+ and all(isinstance(param, tuple) and (len(param) == 2) for param in p)
+ )
index_params = params[0]
-
if isinstance(index_params, tuple) and len(index_params) == 2:
- index_params = tuple([slice(*index_params)])
+ # DataFrame[("index", int), ...]
+ index_params = [index_params]
- index_params = _convert_tuples_to_zip(index_params)
- index_params = _to_tuple_of_params(index_params)
+ if is_list_of_pairs(index_params):
+ # DataFrame[[("index", int), ("index-2", int)], ...]
+ index_params = tuple(slice(name, tpe) for name, tpe in index_params)
- if _is_named_params(index_params):
- # Example:
- # DataFrame[[("id", int), ("A", int)], [int, int]]
- new_index_params = _address_named_type_hoders(index_params, is_index=True)
- index_types = tuple(new_index_params)
- else:
- # Exaxmples:
- # DataFrame[[float, float], [int, int]]
- # DataFrame[pdf.dtypes, [int, int]]
- index_types = _address_unnamed_type_holders(index_params, origin, is_index=True)
+ index_types = _new_type_holders(index_params, IndexNameTypeHolder)
data_types = params[1]
- data_types = _convert_tuples_to_zip(data_types)
+ if is_list_of_pairs(data_types):
+ # DataFrame[..., [("id", int), ("A", int)]]
+ data_types = tuple(slice(*data_type) for data_type in data_types)
- return index_types + _extract_types(data_types)
+ data_types = _new_type_holders(data_types, NameTypeHolder)
+ return index_types + data_types
else:
- # Exaxmples:
- # DataFrame[float, float]
- # DataFrame[pdf.dtypes]
- return _address_unnamed_type_holders(params, origin, is_index=False)
-
-
-def _is_named_params(params: Any) -> Any:
- return all(
- isinstance(param, slice) and param.step is None and param.stop is not None
- for param in params
- )
-
-
-def _address_named_type_hoders(params: Any, is_index: bool) -> Any:
- # Example:
- # params = (slice("id", int, None), slice("A", int, None))
- new_params = []
- for param in params:
- new_param = (
- type("IndexNameType", (IndexNameTypeHolder,), {})
- if is_index
- else type("NameType", (NameTypeHolder,), {})
- ) # type: Union[Type[IndexNameTypeHolder], Type[NameTypeHolder]]
- new_param.name = param.start
- if isinstance(param.stop, ExtensionDtype):
- new_param.tpe = param.stop
- else:
- # When the given argument is a numpy's dtype instance.
- new_param.tpe = param.stop.type if isinstance(param.stop, np.dtype) else param.stop
- new_params.append(new_param)
- return new_params
+ # Without index
+ # DataFrame[type, type, ...]
+ # DataFrame[name: type, name: type, ...]
+ # DataFrame[dtypes instance]
+ # DataFrame[zip(names, types)]
+ return _new_type_holders(params, NameTypeHolder)
-def _to_tuple_of_params(params: Any) -> Any:
- """
- >>> _to_tuple_of_params(int)
- (<class 'int'>,)
-
- >>> _to_tuple_of_params([int, int, int])
- (<class 'int'>, <class 'int'>, <class 'int'>)
-
- >>> arrays = [[1, 1, 2], ['red', 'blue', 'red']]
- >>> idx = pd.MultiIndex.from_arrays(arrays, names=('number', 'color'))
- >>> pdf = pd.DataFrame([[1, 2], [2, 3], [4, 5]], index=idx, columns=["a", "b"])
-
- >>> _to_tuple_of_params(zip(pdf.columns, pdf.dtypes))
- (slice('a', dtype('int64'), None), slice('b', dtype('int64'), None))
- >>> _to_tuple_of_params(zip(pdf.index.names, pdf.index.dtypes))
- (slice('number', dtype('int64'), None), slice('color', dtype('O'), None))
- """
+def _new_type_holders(
+ params: Any, holder_clazz: Type[Union[NameTypeHolder, IndexNameTypeHolder]]
+) -> Tuple:
if isinstance(params, zip):
+ # DataFrame[zip(names, types)]
params = tuple(slice(name, tpe) for name, tpe in params) # type: ignore[misc, has-type]
if isinstance(params, Iterable):
+ # DataFrame[type, type, ...]
+ # DataFrame[name: type, name: type, ...]
+ # DataFrame[dtypes instance]
params = tuple(params)
else:
+ # DataFrame[type, type]
+ # DataFrame[name: type]
params = (params,)
- return params
-
-
-def _convert_tuples_to_zip(params: Any) -> Any:
- if isinstance(params, list) and len(params) >= 1 and isinstance(params[0], tuple):
- return zip((name for name, _ in params), (tpe for _, tpe in params))
- return params
+ is_named_params = all(
+ isinstance(param, slice) and param.step is None and param.stop is not None
+ for param in params
+ )
+ is_unnamed_params = all(
+ not isinstance(param, slice) and not isinstance(param, Iterable) for param in params
+ )
-def _address_unnamed_type_holders(params: Any, origin: Any, is_index: bool) -> Any:
- if all(not isinstance(param, slice) and not isinstance(param, Iterable) for param in params):
+ if is_named_params:
+ # DataFrame["id": int, "A": int]
+ new_params = []
+ for param in params:
+ new_param = type(
+ holder_clazz.short_name, (holder_clazz,), {}
+ ) # type: Type[Union[NameTypeHolder, IndexNameTypeHolder]]
+ new_param.name = param.start
+ if isinstance(param.stop, ExtensionDtype):
+ new_param.tpe = param.stop
+ else:
+ # When the given argument is a numpy's dtype instance.
+ new_param.tpe = param.stop.type if isinstance(param.stop, np.dtype) else param.stop
+ new_params.append(new_param)
+ return tuple(new_params)
+ elif is_unnamed_params:
+ # DataFrame[float, float]
new_types = []
for param in params:
- new_type = (
- type("IndexNameType", (IndexNameTypeHolder,), {})
- if is_index
- else type("NameType", (NameTypeHolder,), {})
- ) # type: Union[Type[IndexNameTypeHolder], Type[NameTypeHolder]]
+ new_type = type(
+ holder_clazz.short_name, (holder_clazz,), {}
+ ) # type: Type[Union[NameTypeHolder, IndexNameTypeHolder]]
if isinstance(param, ExtensionDtype):
new_type.tpe = param
else:
@@ -872,7 +854,7 @@ def _address_unnamed_type_holders(params: Any, origin: Any, is_index: bool) -> A
- DataFrame[[(index_name, index_type), ...], [(name, type), ...]]
- DataFrame[dtypes instance, dtypes instance]
- DataFrame[zip(index_names, index_types), zip(names, types)]\n"""
- + "However, got %s." % str(origin)
+ + "However, got %s." % str(params)
)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org