You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ue...@apache.org on 2021/07/28 18:31:15 UTC

[spark] branch branch-3.2 updated: [SPARK-36143][PYTHON] Adjust `astype` of fractional Series with missing values to follow pandas

This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new bdf1570  [SPARK-36143][PYTHON] Adjust `astype` of fractional Series with missing values to follow pandas
bdf1570 is described below

commit bdf1570911c29f02a5993a079092accafb80f6c9
Author: Xinrong Meng <xi...@databricks.com>
AuthorDate: Wed Jul 28 11:26:48 2021 -0700

    [SPARK-36143][PYTHON] Adjust `astype` of fractional Series with missing values to follow pandas
    
    ### What changes were proposed in this pull request?
    Adjust `astype` of fractional Series with missing values to follow pandas.
    
    Non-goal: Adjust the issue of `astype` of Decimal Series with missing values to follow pandas.
    
    ### Why are the changes needed?
    `astype` of fractional Series with missing values doesn't behave the same as pandas, for example, float Series returns itself when `astype` integer, while a ValueError is raised in pandas.
    
    We ought to follow pandas.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    
    From:
    ```py
    >>> import numpy as np
    >>> import pyspark.pandas as ps
    >>> psser = ps.Series([1, 2, np.nan])
    >>> psser.astype(int)
    0    1.0
    1    2.0
    2    NaN
    dtype: float64
    
    ```
    
    To:
    ```py
    >>> import numpy as np
    >>> import pyspark.pandas as ps
    >>> psser = ps.Series([1, 2, np.nan])
    >>> psser.astype(int)
    Traceback (most recent call last):
    ...
    ValueError: Cannot convert fractions with missing values to integer
    
    ```
    
    ### How was this patch tested?
    Unit tests.
    
    Closes #33466 from xinrong-databricks/extension_astype.
    
    Authored-by: Xinrong Meng <xi...@databricks.com>
    Signed-off-by: Takuya UESHIN <ue...@databricks.com>
    (cherry picked from commit 01213095e2c26a4b0940735daf290c90d3267f51)
    Signed-off-by: Takuya UESHIN <ue...@databricks.com>
---
 python/pyspark/pandas/data_type_ops/num_ops.py     | 87 +++++++++++++++++-----
 python/pyspark/pandas/series.py                    |  3 +-
 .../pandas/tests/data_type_ops/test_num_ops.py     | 51 +++++++++++++
 3 files changed, 120 insertions(+), 21 deletions(-)

diff --git a/python/pyspark/pandas/data_type_ops/num_ops.py b/python/pyspark/pandas/data_type_ops/num_ops.py
index 43c3b01..a7987bc 100644
--- a/python/pyspark/pandas/data_type_ops/num_ops.py
+++ b/python/pyspark/pandas/data_type_ops/num_ops.py
@@ -20,7 +20,7 @@ from typing import Any, Union
 
 import numpy as np
 import pandas as pd
-from pandas.api.types import CategoricalDtype
+from pandas.api.types import is_bool_dtype, is_integer_dtype, CategoricalDtype
 
 from pyspark.pandas._typing import Dtype, IndexOpsLike, SeriesOrIndex
 from pyspark.pandas.base import column_op, IndexOpsMixin, numpy_column_op
@@ -39,10 +39,24 @@ from pyspark.sql import functions as F
 from pyspark.sql.column import Column
 from pyspark.sql.types import (
     BooleanType,
+    DataType,
     StringType,
 )
 
 
+def _non_fractional_astype(
+    index_ops: IndexOpsLike, dtype: Dtype, spark_type: DataType
+) -> IndexOpsLike:
+    if isinstance(dtype, CategoricalDtype):
+        return _as_categorical_type(index_ops, dtype, spark_type)
+    elif isinstance(spark_type, BooleanType):
+        return _as_bool_type(index_ops, dtype)
+    elif isinstance(spark_type, StringType):
+        return _as_string_type(index_ops, dtype, null_str=str(np.nan))
+    else:
+        return _as_other_type(index_ops, dtype, spark_type)
+
+
 class NumericOps(DataTypeOps):
     """The class for binary operations of numeric pandas-on-Spark objects."""
 
@@ -224,15 +238,7 @@ class IntegralOps(NumericOps):
 
     def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
         dtype, spark_type = pandas_on_spark_type(dtype)
-
-        if isinstance(dtype, CategoricalDtype):
-            return _as_categorical_type(index_ops, dtype, spark_type)
-        elif isinstance(spark_type, BooleanType):
-            return _as_bool_type(index_ops, dtype)
-        elif isinstance(spark_type, StringType):
-            return _as_string_type(index_ops, dtype, null_str=str(np.nan))
-        else:
-            return _as_other_type(index_ops, dtype, spark_type)
+        return _non_fractional_astype(index_ops, dtype, spark_type)
 
 
 class FractionalOps(NumericOps):
@@ -323,6 +329,12 @@ class FractionalOps(NumericOps):
     def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
         dtype, spark_type = pandas_on_spark_type(dtype)
 
+        if is_integer_dtype(dtype) and not isinstance(dtype, extension_dtypes):
+            if index_ops.hasnans:
+                raise ValueError(
+                    "Cannot convert %s with missing values to integer" % self.pretty_name
+                )
+
         if isinstance(dtype, CategoricalDtype):
             return _as_categorical_type(index_ops, dtype, spark_type)
         elif isinstance(spark_type, BooleanType):
@@ -374,16 +386,9 @@ class DecimalOps(FractionalOps):
         )
 
     def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
+        # TODO(SPARK-36230): check index_ops.hasnans after fixing SPARK-36230
         dtype, spark_type = pandas_on_spark_type(dtype)
-
-        if isinstance(dtype, CategoricalDtype):
-            return _as_categorical_type(index_ops, dtype, spark_type)
-        elif isinstance(spark_type, BooleanType):
-            return _as_bool_type(index_ops, dtype)
-        elif isinstance(spark_type, StringType):
-            return _as_string_type(index_ops, dtype, null_str=str(np.nan))
-        else:
-            return _as_other_type(index_ops, dtype, spark_type)
+        return _non_fractional_astype(index_ops, dtype, spark_type)
 
 
 class IntegralExtensionOps(IntegralOps):
@@ -399,6 +404,19 @@ class IntegralExtensionOps(IntegralOps):
         """Restore column when to_pandas."""
         return col.astype(self.dtype)
 
+    def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
+        dtype, spark_type = pandas_on_spark_type(dtype)
+
+        if is_integer_dtype(dtype) and not isinstance(dtype, extension_dtypes):
+            if index_ops.hasnans:
+                raise ValueError(
+                    "Cannot convert %s with missing values to integer" % self.pretty_name
+                )
+        elif is_bool_dtype(dtype) and not isinstance(dtype, extension_dtypes):
+            if index_ops.hasnans:
+                raise ValueError("Cannot convert %s with missing values to bool" % self.pretty_name)
+        return _non_fractional_astype(index_ops, dtype, spark_type)
+
 
 class FractionalExtensionOps(FractionalOps):
     """
@@ -412,3 +430,34 @@ class FractionalExtensionOps(FractionalOps):
     def restore(self, col: pd.Series) -> pd.Series:
         """Restore column when to_pandas."""
         return col.astype(self.dtype)
+
+    def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
+        dtype, spark_type = pandas_on_spark_type(dtype)
+
+        if is_integer_dtype(dtype) and not isinstance(dtype, extension_dtypes):
+            if index_ops.hasnans:
+                raise ValueError(
+                    "Cannot convert %s with missing values to integer" % self.pretty_name
+                )
+        elif is_bool_dtype(dtype) and not isinstance(dtype, extension_dtypes):
+            if index_ops.hasnans:
+                raise ValueError("Cannot convert %s with missing values to bool" % self.pretty_name)
+
+        if isinstance(dtype, CategoricalDtype):
+            return _as_categorical_type(index_ops, dtype, spark_type)
+        elif isinstance(spark_type, BooleanType):
+            if isinstance(dtype, extension_dtypes):
+                scol = index_ops.spark.column.cast(spark_type)
+            else:
+                scol = F.when(
+                    index_ops.spark.column.isNull() | F.isnan(index_ops.spark.column),
+                    SF.lit(True),
+                ).otherwise(index_ops.spark.column.cast(spark_type))
+            return index_ops._with_new_scol(
+                scol.alias(index_ops._internal.data_spark_column_names[0]),
+                field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type),
+            )
+        elif isinstance(spark_type, StringType):
+            return _as_string_type(index_ops, dtype, null_str=str(np.nan))
+        else:
+            return _as_other_type(index_ops, dtype, spark_type)
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index 3eb3a2c..442fbae 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -3598,8 +3598,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
                 Window.unboundedPreceding, Window.unboundedFollowing
             )
             scol = stat_func(F.row_number().over(window1)).over(window2)
-        psser = self._with_new_scol(scol)
-        return psser.astype(np.float64)
+        return self._with_new_scol(scol.cast(DoubleType()))
 
     def filter(
         self,
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
index 0dd3501..29a21b9 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
@@ -335,6 +335,21 @@ class NumOpsTest(PandasOnSparkTestCase, TestCasesUtils):
             self.assert_eq(pser.astype("category"), psser.astype("category"))
             cat_type = CategoricalDtype(categories=[2, 1, 3])
             self.assert_eq(pser.astype(cat_type), psser.astype(cat_type))
+        self.assertRaisesRegex(
+            ValueError,
+            "Cannot convert fractions with missing values to integer",
+            lambda: self.float_withnan_psser.astype(int),
+        )
+        self.assertRaisesRegex(
+            ValueError,
+            "Cannot convert fractions with missing values to integer",
+            lambda: self.float_withnan_psser.astype(np.int32),
+        )
+        self.assert_eq(self.float_withnan_psser.astype(str), self.float_withnan_psser.astype(str))
+        self.assert_eq(self.float_withnan_psser.astype(bool), self.float_withnan_psser.astype(bool))
+        self.assert_eq(
+            self.float_withnan_psser.astype("category"), self.float_withnan_psser.astype("category")
+        )
 
     def test_neg(self):
         pdf, psdf = self.pdf, self.psdf
@@ -439,6 +454,24 @@ class IntegralExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
                         self.check_extension(pser.astype(dtype), psser.astype(dtype))
                 else:
                     self.check_extension(pser.astype(dtype), psser.astype(dtype))
+        for pser, psser in self.intergral_extension_pser_psser_pairs:
+            self.assert_eq(pser.astype(float), psser.astype(float))
+            self.assert_eq(pser.astype(np.float32), psser.astype(np.float32))
+            self.assertRaisesRegex(
+                ValueError,
+                "Cannot convert integrals with missing values to bool",
+                lambda: psser.astype(bool),
+            )
+            self.assertRaisesRegex(
+                ValueError,
+                "Cannot convert integrals with missing values to integer",
+                lambda: psser.astype(int),
+            )
+            self.assertRaisesRegex(
+                ValueError,
+                "Cannot convert integrals with missing values to integer",
+                lambda: psser.astype(np.int32),
+            )
 
     def test_neg(self):
         for pser, psser in self.intergral_extension_pser_psser_pairs:
@@ -522,6 +555,24 @@ class FractionalExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
         for pser, psser in self.fractional_extension_pser_psser_pairs:
             for dtype in self.extension_dtypes:
                 self.check_extension(pser.astype(dtype), psser.astype(dtype))
+        for pser, psser in self.fractional_extension_pser_psser_pairs:
+            self.assert_eq(pser.astype(float), psser.astype(float))
+            self.assert_eq(pser.astype(np.float32), psser.astype(np.float32))
+            self.assertRaisesRegex(
+                ValueError,
+                "Cannot convert fractions with missing values to bool",
+                lambda: psser.astype(bool),
+            )
+            self.assertRaisesRegex(
+                ValueError,
+                "Cannot convert fractions with missing values to integer",
+                lambda: psser.astype(int),
+            )
+            self.assertRaisesRegex(
+                ValueError,
+                "Cannot convert fractions with missing values to integer",
+                lambda: psser.astype(np.int32),
+            )
 
     def test_neg(self):
         # pandas raises "TypeError: bad operand type for unary -: 'FloatingArray'"

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org