You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ue...@apache.org on 2021/07/28 18:31:15 UTC
[spark] branch branch-3.2 updated: [SPARK-36143][PYTHON] Adjust
`astype` of fractional Series with missing values to follow pandas
This is an automated email from the ASF dual-hosted git repository.
ueshin pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new bdf1570 [SPARK-36143][PYTHON] Adjust `astype` of fractional Series with missing values to follow pandas
bdf1570 is described below
commit bdf1570911c29f02a5993a079092accafb80f6c9
Author: Xinrong Meng <xi...@databricks.com>
AuthorDate: Wed Jul 28 11:26:48 2021 -0700
[SPARK-36143][PYTHON] Adjust `astype` of fractional Series with missing values to follow pandas
### What changes were proposed in this pull request?
Adjust `astype` of fractional Series with missing values to follow pandas.
Non-goal: Adjust the issue of `astype` of Decimal Series with missing values to follow pandas.
### Why are the changes needed?
`astype` of fractional Series with missing values doesn't behave the same as pandas, for example, float Series returns itself when `astype` integer, while a ValueError is raised in pandas.
We ought to follow pandas.
### Does this PR introduce _any_ user-facing change?
Yes.
From:
```py
>>> import numpy as np
>>> import pyspark.pandas as ps
>>> psser = ps.Series([1, 2, np.nan])
>>> psser.astype(int)
0 1.0
1 2.0
2 NaN
dtype: float64
```
To:
```py
>>> import numpy as np
>>> import pyspark.pandas as ps
>>> psser = ps.Series([1, 2, np.nan])
>>> psser.astype(int)
Traceback (most recent call last):
...
ValueError: Cannot convert fractions with missing values to integer
```
### How was this patch tested?
Unit tests.
Closes #33466 from xinrong-databricks/extension_astype.
Authored-by: Xinrong Meng <xi...@databricks.com>
Signed-off-by: Takuya UESHIN <ue...@databricks.com>
(cherry picked from commit 01213095e2c26a4b0940735daf290c90d3267f51)
Signed-off-by: Takuya UESHIN <ue...@databricks.com>
---
python/pyspark/pandas/data_type_ops/num_ops.py | 87 +++++++++++++++++-----
python/pyspark/pandas/series.py | 3 +-
.../pandas/tests/data_type_ops/test_num_ops.py | 51 +++++++++++++
3 files changed, 120 insertions(+), 21 deletions(-)
diff --git a/python/pyspark/pandas/data_type_ops/num_ops.py b/python/pyspark/pandas/data_type_ops/num_ops.py
index 43c3b01..a7987bc 100644
--- a/python/pyspark/pandas/data_type_ops/num_ops.py
+++ b/python/pyspark/pandas/data_type_ops/num_ops.py
@@ -20,7 +20,7 @@ from typing import Any, Union
import numpy as np
import pandas as pd
-from pandas.api.types import CategoricalDtype
+from pandas.api.types import is_bool_dtype, is_integer_dtype, CategoricalDtype
from pyspark.pandas._typing import Dtype, IndexOpsLike, SeriesOrIndex
from pyspark.pandas.base import column_op, IndexOpsMixin, numpy_column_op
@@ -39,10 +39,24 @@ from pyspark.sql import functions as F
from pyspark.sql.column import Column
from pyspark.sql.types import (
BooleanType,
+ DataType,
StringType,
)
+def _non_fractional_astype(
+ index_ops: IndexOpsLike, dtype: Dtype, spark_type: DataType
+) -> IndexOpsLike:
+ if isinstance(dtype, CategoricalDtype):
+ return _as_categorical_type(index_ops, dtype, spark_type)
+ elif isinstance(spark_type, BooleanType):
+ return _as_bool_type(index_ops, dtype)
+ elif isinstance(spark_type, StringType):
+ return _as_string_type(index_ops, dtype, null_str=str(np.nan))
+ else:
+ return _as_other_type(index_ops, dtype, spark_type)
+
+
class NumericOps(DataTypeOps):
"""The class for binary operations of numeric pandas-on-Spark objects."""
@@ -224,15 +238,7 @@ class IntegralOps(NumericOps):
def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
dtype, spark_type = pandas_on_spark_type(dtype)
-
- if isinstance(dtype, CategoricalDtype):
- return _as_categorical_type(index_ops, dtype, spark_type)
- elif isinstance(spark_type, BooleanType):
- return _as_bool_type(index_ops, dtype)
- elif isinstance(spark_type, StringType):
- return _as_string_type(index_ops, dtype, null_str=str(np.nan))
- else:
- return _as_other_type(index_ops, dtype, spark_type)
+ return _non_fractional_astype(index_ops, dtype, spark_type)
class FractionalOps(NumericOps):
@@ -323,6 +329,12 @@ class FractionalOps(NumericOps):
def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
dtype, spark_type = pandas_on_spark_type(dtype)
+ if is_integer_dtype(dtype) and not isinstance(dtype, extension_dtypes):
+ if index_ops.hasnans:
+ raise ValueError(
+ "Cannot convert %s with missing values to integer" % self.pretty_name
+ )
+
if isinstance(dtype, CategoricalDtype):
return _as_categorical_type(index_ops, dtype, spark_type)
elif isinstance(spark_type, BooleanType):
@@ -374,16 +386,9 @@ class DecimalOps(FractionalOps):
)
def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
+ # TODO(SPARK-36230): check index_ops.hasnans after fixing SPARK-36230
dtype, spark_type = pandas_on_spark_type(dtype)
-
- if isinstance(dtype, CategoricalDtype):
- return _as_categorical_type(index_ops, dtype, spark_type)
- elif isinstance(spark_type, BooleanType):
- return _as_bool_type(index_ops, dtype)
- elif isinstance(spark_type, StringType):
- return _as_string_type(index_ops, dtype, null_str=str(np.nan))
- else:
- return _as_other_type(index_ops, dtype, spark_type)
+ return _non_fractional_astype(index_ops, dtype, spark_type)
class IntegralExtensionOps(IntegralOps):
@@ -399,6 +404,19 @@ class IntegralExtensionOps(IntegralOps):
"""Restore column when to_pandas."""
return col.astype(self.dtype)
+ def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
+ dtype, spark_type = pandas_on_spark_type(dtype)
+
+ if is_integer_dtype(dtype) and not isinstance(dtype, extension_dtypes):
+ if index_ops.hasnans:
+ raise ValueError(
+ "Cannot convert %s with missing values to integer" % self.pretty_name
+ )
+ elif is_bool_dtype(dtype) and not isinstance(dtype, extension_dtypes):
+ if index_ops.hasnans:
+ raise ValueError("Cannot convert %s with missing values to bool" % self.pretty_name)
+ return _non_fractional_astype(index_ops, dtype, spark_type)
+
class FractionalExtensionOps(FractionalOps):
"""
@@ -412,3 +430,34 @@ class FractionalExtensionOps(FractionalOps):
def restore(self, col: pd.Series) -> pd.Series:
"""Restore column when to_pandas."""
return col.astype(self.dtype)
+
+ def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
+ dtype, spark_type = pandas_on_spark_type(dtype)
+
+ if is_integer_dtype(dtype) and not isinstance(dtype, extension_dtypes):
+ if index_ops.hasnans:
+ raise ValueError(
+ "Cannot convert %s with missing values to integer" % self.pretty_name
+ )
+ elif is_bool_dtype(dtype) and not isinstance(dtype, extension_dtypes):
+ if index_ops.hasnans:
+ raise ValueError("Cannot convert %s with missing values to bool" % self.pretty_name)
+
+ if isinstance(dtype, CategoricalDtype):
+ return _as_categorical_type(index_ops, dtype, spark_type)
+ elif isinstance(spark_type, BooleanType):
+ if isinstance(dtype, extension_dtypes):
+ scol = index_ops.spark.column.cast(spark_type)
+ else:
+ scol = F.when(
+ index_ops.spark.column.isNull() | F.isnan(index_ops.spark.column),
+ SF.lit(True),
+ ).otherwise(index_ops.spark.column.cast(spark_type))
+ return index_ops._with_new_scol(
+ scol.alias(index_ops._internal.data_spark_column_names[0]),
+ field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type),
+ )
+ elif isinstance(spark_type, StringType):
+ return _as_string_type(index_ops, dtype, null_str=str(np.nan))
+ else:
+ return _as_other_type(index_ops, dtype, spark_type)
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index 3eb3a2c..442fbae 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -3598,8 +3598,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
Window.unboundedPreceding, Window.unboundedFollowing
)
scol = stat_func(F.row_number().over(window1)).over(window2)
- psser = self._with_new_scol(scol)
- return psser.astype(np.float64)
+ return self._with_new_scol(scol.cast(DoubleType()))
def filter(
self,
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
index 0dd3501..29a21b9 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
@@ -335,6 +335,21 @@ class NumOpsTest(PandasOnSparkTestCase, TestCasesUtils):
self.assert_eq(pser.astype("category"), psser.astype("category"))
cat_type = CategoricalDtype(categories=[2, 1, 3])
self.assert_eq(pser.astype(cat_type), psser.astype(cat_type))
+ self.assertRaisesRegex(
+ ValueError,
+ "Cannot convert fractions with missing values to integer",
+ lambda: self.float_withnan_psser.astype(int),
+ )
+ self.assertRaisesRegex(
+ ValueError,
+ "Cannot convert fractions with missing values to integer",
+ lambda: self.float_withnan_psser.astype(np.int32),
+ )
+ self.assert_eq(self.float_withnan_psser.astype(str), self.float_withnan_psser.astype(str))
+ self.assert_eq(self.float_withnan_psser.astype(bool), self.float_withnan_psser.astype(bool))
+ self.assert_eq(
+ self.float_withnan_psser.astype("category"), self.float_withnan_psser.astype("category")
+ )
def test_neg(self):
pdf, psdf = self.pdf, self.psdf
@@ -439,6 +454,24 @@ class IntegralExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
self.check_extension(pser.astype(dtype), psser.astype(dtype))
else:
self.check_extension(pser.astype(dtype), psser.astype(dtype))
+ for pser, psser in self.intergral_extension_pser_psser_pairs:
+ self.assert_eq(pser.astype(float), psser.astype(float))
+ self.assert_eq(pser.astype(np.float32), psser.astype(np.float32))
+ self.assertRaisesRegex(
+ ValueError,
+ "Cannot convert integrals with missing values to bool",
+ lambda: psser.astype(bool),
+ )
+ self.assertRaisesRegex(
+ ValueError,
+ "Cannot convert integrals with missing values to integer",
+ lambda: psser.astype(int),
+ )
+ self.assertRaisesRegex(
+ ValueError,
+ "Cannot convert integrals with missing values to integer",
+ lambda: psser.astype(np.int32),
+ )
def test_neg(self):
for pser, psser in self.intergral_extension_pser_psser_pairs:
@@ -522,6 +555,24 @@ class FractionalExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
for pser, psser in self.fractional_extension_pser_psser_pairs:
for dtype in self.extension_dtypes:
self.check_extension(pser.astype(dtype), psser.astype(dtype))
+ for pser, psser in self.fractional_extension_pser_psser_pairs:
+ self.assert_eq(pser.astype(float), psser.astype(float))
+ self.assert_eq(pser.astype(np.float32), psser.astype(np.float32))
+ self.assertRaisesRegex(
+ ValueError,
+ "Cannot convert fractions with missing values to bool",
+ lambda: psser.astype(bool),
+ )
+ self.assertRaisesRegex(
+ ValueError,
+ "Cannot convert fractions with missing values to integer",
+ lambda: psser.astype(int),
+ )
+ self.assertRaisesRegex(
+ ValueError,
+ "Cannot convert fractions with missing values to integer",
+ lambda: psser.astype(np.int32),
+ )
def test_neg(self):
# pandas raises "TypeError: bad operand type for unary -: 'FloatingArray'"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org