You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/11/23 09:53:40 UTC

[spark] branch master updated: [SPARK-36231][PYTHON] Support arithmetic operations of decimal(nan) series

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3235edb  [SPARK-36231][PYTHON] Support arithmetic operations of decimal(nan) series
3235edb is described below

commit 3235edbc31c9a4c487c5fe18f646191b91df83b8
Author: Yikun Jiang <yi...@gmail.com>
AuthorDate: Tue Nov 23 18:52:45 2021 +0900

    [SPARK-36231][PYTHON] Support arithmetic operations of decimal(nan) series
    
    ### What changes were proposed in this pull request?
    This patch has changes as below to follow the pandas behavior:
    - **Add nan value process in _non_fractional_astype**: Follow the pandas [to_string](https://github.com/pandas-dev/pandas/blob/0a9f9eed3e3eb7d5fa23cbc588e78b9bef915a89/pandas/core/series.py#L1486) covert method, it should be `"NaN"` rather than `str(np.nan)`(`"nan"`), which is covered by `self.assert_eq(pser.astype(str), psser.astype(str))`.
    - **Add null value process in rpow**, which is covered by `def test_rpow(self)`
    - **Add index_ops.hasnans in `astype`**, which is covered by `test_astype`.
    
    This patch also move `numeric_w_nan_pdf` into `numeric_pdf`, that means all float_nan/decimal_nan separated test case have been cleaned up and merged into numeric test.
    
    ### Why are the changes needed?
    Follow the pandas behavior
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, correct the null value result to follow the pandas behavior
    
    ### How was this patch tested?
    1. ut to cover all changes
    2. Passed all python test case with pandas v1.1.x
    3. Passed all python test case with pandas v1.2.x
    
    Closes #34687 from Yikun/SPARK-36337-skip.
    
    Authored-by: Yikun Jiang <yi...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/pandas/data_type_ops/num_ops.py     | 23 ++++++++-
 .../pandas/tests/data_type_ops/test_num_ops.py     | 56 ++++++++-------------
 .../pandas/tests/data_type_ops/testing_utils.py    | 57 ++++++----------------
 3 files changed, 56 insertions(+), 80 deletions(-)

diff --git a/python/pyspark/pandas/data_type_ops/num_ops.py b/python/pyspark/pandas/data_type_ops/num_ops.py
index 3e74664..e08d6e9 100644
--- a/python/pyspark/pandas/data_type_ops/num_ops.py
+++ b/python/pyspark/pandas/data_type_ops/num_ops.py
@@ -55,7 +55,7 @@ def _non_fractional_astype(
     elif isinstance(spark_type, BooleanType):
         return _as_bool_type(index_ops, dtype)
     elif isinstance(spark_type, StringType):
-        return _as_string_type(index_ops, dtype, null_str=str(np.nan))
+        return _as_string_type(index_ops, dtype, null_str="NaN")
     else:
         return _as_other_type(index_ops, dtype, spark_type)
 
@@ -447,10 +447,29 @@ class DecimalOps(FractionalOps):
         return index_ops.copy()
 
     def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
-        # TODO(SPARK-36230): check index_ops.hasnans after fixing SPARK-36230
         dtype, spark_type = pandas_on_spark_type(dtype)
+        if is_integer_dtype(dtype) and not isinstance(dtype, extension_dtypes):
+            if index_ops.hasnans:
+                raise ValueError(
+                    "Cannot convert %s with missing values to integer" % self.pretty_name
+                )
         return _non_fractional_astype(index_ops, dtype, spark_type)
 
+    def rpow(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+        _sanitize_list_like(right)
+        if not isinstance(right, numbers.Number):
+            raise TypeError("Exponentiation can not be applied to given types.")
+
+        def rpow_func(left: Column, right: Any) -> Column:
+            return (
+                F.when(left.isNull(), np.nan)
+                .when(SF.lit(right == 1), right)
+                .otherwise(Column.__rpow__(left, right))
+            )
+
+        right = transform_boolean_operand_to_numeric(right)
+        return column_op(rpow_func)(left, right)
+
 
 class IntegralExtensionOps(IntegralOps):
     """
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
index 4d1fb23..f4b36f9 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
@@ -172,11 +172,13 @@ class NumOpsTest(PandasOnSparkTestCase, TestCasesUtils):
         pdf, psdf = self.pdf, self.psdf
         for col in self.numeric_df_cols:
             pser, psser = pdf[col], psdf[col]
-            if col == "float":
+            if col in ["float", "float_w_nan"]:
                 self.assert_eq(pser ** pser, psser ** psser)
                 self.assert_eq(pser ** pser.astype(bool), psser ** psser.astype(bool))
                 self.assert_eq(pser ** True, psser ** True)
                 self.assert_eq(pser ** False, psser ** False)
+                self.assert_eq(pser ** 1, psser ** 1)
+                self.assert_eq(pser ** 0, psser ** 0)
 
             for n_col in self.non_numeric_df_cols:
                 if n_col == "bool":
@@ -184,18 +186,6 @@ class NumOpsTest(PandasOnSparkTestCase, TestCasesUtils):
                 else:
                     self.assertRaises(TypeError, lambda: psser ** psdf[n_col])
 
-    # TODO(SPARK-36031): Merge test_pow_with_nan into test_pow
-    def test_pow_with_float_nan(self):
-        for col in self.numeric_w_nan_df_cols:
-            if col == "float_w_nan":
-                pser, psser = self.numeric_w_nan_pdf[col], self.numeric_w_nan_psdf[col]
-                self.assert_eq(pser ** pser, psser ** psser)
-                self.assert_eq(pser ** pser.astype(bool), psser ** psser.astype(bool))
-                self.assert_eq(pser ** True, psser ** True)
-                self.assert_eq(pser ** False, psser ** False)
-                self.assert_eq(pser ** 1, psser ** 1)
-                self.assert_eq(pser ** 0, psser ** 0)
-
     def test_radd(self):
         pdf, psdf = self.pdf, self.psdf
         for col in self.numeric_df_cols:
@@ -344,40 +334,36 @@ class NumOpsTest(PandasOnSparkTestCase, TestCasesUtils):
             self.assert_eq(ps.from_pandas(pser), psser)
 
     def test_isnull(self):
-        pdf, psdf = self.numeric_w_nan_pdf, self.numeric_w_nan_psdf
-        for col in self.numeric_w_nan_df_cols:
+        pdf, psdf = self.pdf, self.psdf
+        for col in self.numeric_df_cols:
             self.assert_eq(pdf[col].isnull(), psdf[col].isnull())
 
     def test_astype(self):
         pdf, psdf = self.pdf, self.psdf
         for col in self.numeric_df_cols:
             pser, psser = pdf[col], psdf[col]
-            self.assert_eq(pser.astype(int), psser.astype(int))
+
+            for int_type in [int, np.int32, np.int16, np.int8]:
+                if not pser.hasnans:
+                    self.assert_eq(pser.astype(int_type), psser.astype(int_type))
+                else:
+                    self.assertRaisesRegex(
+                        ValueError,
+                        "Cannot convert %s with missing "
+                        "values to integer" % psser._dtype_op.pretty_name,
+                        lambda: psser.astype(int_type),
+                    )
+
+            # TODO(SPARK-37039): the np.nan series.astype(bool) should be True
+            if not pser.hasnans:
+                self.assert_eq(pser.astype(bool), psser.astype(bool))
+
             self.assert_eq(pser.astype(float), psser.astype(float))
             self.assert_eq(pser.astype(np.float32), psser.astype(np.float32))
-            self.assert_eq(pser.astype(np.int32), psser.astype(np.int32))
-            self.assert_eq(pser.astype(np.int16), psser.astype(np.int16))
-            self.assert_eq(pser.astype(np.int8), psser.astype(np.int8))
             self.assert_eq(pser.astype(str), psser.astype(str))
-            self.assert_eq(pser.astype(bool), psser.astype(bool))
             self.assert_eq(pser.astype("category"), psser.astype("category"))
             cat_type = CategoricalDtype(categories=[2, 1, 3])
             self.assert_eq(pser.astype(cat_type), psser.astype(cat_type))
-        self.assertRaisesRegex(
-            ValueError,
-            "Cannot convert fractions with missing values to integer",
-            lambda: self.float_withnan_psser.astype(int),
-        )
-        self.assertRaisesRegex(
-            ValueError,
-            "Cannot convert fractions with missing values to integer",
-            lambda: self.float_withnan_psser.astype(np.int32),
-        )
-        self.assert_eq(self.float_withnan_psser.astype(str), self.float_withnan_psser.astype(str))
-        self.assert_eq(self.float_withnan_psser.astype(bool), self.float_withnan_psser.astype(bool))
-        self.assert_eq(
-            self.float_withnan_psser.astype("category"), self.float_withnan_psser.astype("category")
-        )
         if extension_object_dtypes_available and extension_float_dtypes_available:
             pser = pd.Series(pd.Categorical([1.0, 2.0, 3.0]), dtype=pd.Float64Dtype())
             psser = ps.from_pandas(pser)
diff --git a/python/pyspark/pandas/tests/data_type_ops/testing_utils.py b/python/pyspark/pandas/tests/data_type_ops/testing_utils.py
index b7c50d2..37eff6d 100644
--- a/python/pyspark/pandas/tests/data_type_ops/testing_utils.py
+++ b/python/pyspark/pandas/tests/data_type_ops/testing_utils.py
@@ -49,8 +49,21 @@ class TestCasesUtils(object):
         dtypes = [np.int32, int, np.float32, float]
         sers = [pd.Series([1, 2, 3], dtype=dtype) for dtype in dtypes]
         sers.append(pd.Series([decimal.Decimal(1), decimal.Decimal(2), decimal.Decimal(3)]))
+        sers.append(pd.Series([1, 2, np.nan], dtype=float))
+        # Skip decimal_nan test before v1.3.0, it not supported by pandas on spark yet.
+        if LooseVersion(pd.__version__) >= LooseVersion("1.3.0"):
+            sers.append(
+                pd.Series([decimal.Decimal(1), decimal.Decimal(2), decimal.Decimal(np.nan)])
+            )
         pdf = pd.concat(sers, axis=1)
-        pdf.columns = [dtype.__name__ for dtype in dtypes] + ["decimal"]
+        if LooseVersion(pd.__version__) >= LooseVersion("1.3.0"):
+            pdf.columns = [dtype.__name__ for dtype in dtypes] + [
+                "decimal",
+                "float_nan",
+                "decimal_nan",
+            ]
+        else:
+            pdf.columns = [dtype.__name__ for dtype in dtypes] + ["decimal", "float_nan"]
         return pdf
 
     @property
@@ -69,25 +82,6 @@ class TestCasesUtils(object):
     def integral_psdf(self):
         return ps.from_pandas(self.integral_pdf)
 
-    # TODO(SPARK-36031): Merge self.numeric_w_nan_p(s)df into self.numeric_p(s)df
-    @property
-    def numeric_w_nan_pdf(self):
-        psers = {
-            "float_w_nan": pd.Series([1, 2, np.nan]),
-            "decimal_w_nan": pd.Series(
-                [decimal.Decimal(1), decimal.Decimal(2), decimal.Decimal(np.nan)]
-            ),
-        }
-        return pd.concat(psers, axis=1)
-
-    @property
-    def numeric_w_nan_psdf(self):
-        return ps.from_pandas(self.numeric_w_nan_pdf)
-
-    @property
-    def numeric_w_nan_df_cols(self):
-        return self.numeric_w_nan_pdf.columns
-
     @property
     def non_numeric_pdf(self):
         psers = {
@@ -133,33 +127,10 @@ class TestCasesUtils(object):
         return [ps.from_pandas(pser) for pser in self.numeric_psers]
 
     @property
-    def decimal_withnan_pser(self):
-        return pd.Series([decimal.Decimal(1.0), decimal.Decimal(2.0), decimal.Decimal(np.nan)])
-
-    @property
-    def decimal_withnan_psser(self):
-        return ps.from_pandas(self.decimal_withnan_pser)
-
-    @property
-    def float_withnan_pser(self):
-        return pd.Series([1, 2, np.nan])
-
-    @property
-    def float_withnan_psser(self):
-        return ps.from_pandas(self.float_withnan_pser)
-
-    @property
     def numeric_pser_psser_pairs(self):
         return zip(self.numeric_psers, self.numeric_pssers)
 
     @property
-    def numeric_withnan_pser_psser_pairs(self):
-        return zip(
-            self.numeric_psers + [self.decimal_withnan_pser, self.float_withnan_pser],
-            self.numeric_pssers + [self.decimal_withnan_psser, self.float_withnan_psser],
-        )
-
-    @property
     def non_numeric_psers(self):
         psers = {
             "string": pd.Series(["x", "y", "z"]),

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org